From d87cd09ba183f1191109735e871566f4e14080a8 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Sun, 12 Feb 2012 20:39:12 +0100 Subject: [PATCH] more generalizing of response writing (implemented a response writer...now it should be possible to implement a stream writer for images --- include/http/message/queue.h | 17 ++++++ include/http/request/parser.h | 4 +- include/http/request/queue.h | 17 ------ include/http/response/writer.h | 34 ++++++++++++ include/interface/stream_writer.h | 19 +++++++ include/server.h | 19 ++----- src/Makefile.am | 10 ++-- src/http/{request => message}/queue.c | 10 ++-- src/http/request/parser.c | 10 ++-- src/http/request/parser/parse.c | 11 ++-- src/http/response.c | 1 - src/http/response/writer.c | 47 ++++++++++++++++ src/http/response/writer/write.c | 79 +++++++++++++++++++++++++++ src/interface/stream_writer.c | 19 +++++++ src/server.c | 5 +- src/server/close_conn.c | 5 +- src/server/handle_accept.c | 3 +- src/server/run.c | 60 +++++++------------- src/testserver.c | 6 +- 19 files changed, 270 insertions(+), 106 deletions(-) create mode 100644 include/http/message/queue.h delete mode 100644 include/http/request/queue.h create mode 100644 include/http/response/writer.h create mode 100644 include/interface/stream_writer.h rename src/http/{request => message}/queue.c (57%) create mode 100644 src/http/response/writer.c create mode 100644 src/http/response/writer/write.c create mode 100644 src/interface/stream_writer.c diff --git a/include/http/message/queue.h b/include/http/message/queue.h new file mode 100644 index 0000000..4b00d65 --- /dev/null +++ b/include/http/message/queue.h @@ -0,0 +1,17 @@ +#ifndef __HTTP_REQUEST_QUEUE_H__ +#define __HTTP_REQUEST_QUEUE_H__ + +#include "class.h" +#include "http/message.h" + +#define HTTP_MESSAGE_QUEUE_MAX 1024 + + +CLASS(HttpMessageQueue) { + HttpMessage msgs[HTTP_MESSAGE_QUEUE_MAX]; + size_t nmsgs; +}; + +#endif /* __HTTP_REQUEST_QUEUE_H__ */ + +// vim: set ts=4 sw=4: diff --git a/include/http/request/parser.h b/include/http/request/parser.h index d0c61e6..f37ae0c 100644 --- a/include/http/request/parser.h +++ b/include/http/request/parser.h @@ -3,7 +3,7 @@ #include "class.h" #include "http/request.h" -#include "http/request/queue.h" +#include "http/message/queue.h" #define HTTP_REQUEST_PARSER_READ_CHUNK 1024 @@ -27,7 +27,7 @@ CLASS(HttpRequestParser) { size_t buffer_used; size_t buffer_size; - HttpRequestQueue request_queue; + HttpMessageQueue request_queue; HttpRequest cur_request; HttpRequestState state; diff --git a/include/http/request/queue.h b/include/http/request/queue.h deleted file mode 100644 index 8df553f..0000000 --- a/include/http/request/queue.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef __HTTP_REQUEST_QUEUE_H__ -#define __HTTP_REQUEST_QUEUE_H__ - -#include "class.h" -#include "http/request.h" - -#define HTTP_REQUEST_QUEUE_MAX 1024 - - -CLASS(HttpRequestQueue) { - HttpRequest requests[HTTP_REQUEST_QUEUE_MAX]; - size_t nrequests; -}; - -#endif /* __HTTP_REQUEST_QUEUE_H__ */ - -// vim: set ts=4 sw=4: diff --git a/include/http/response/writer.h b/include/http/response/writer.h new file mode 100644 index 0000000..31aa3d5 --- /dev/null +++ b/include/http/response/writer.h @@ -0,0 +1,34 @@ +#ifndef __HTTP_RESPONSE_WRITER_H__ +#define __HTTP_RESPONSE_WRITER_H__ + +#include "class.h" +#include "http/response.h" +#include "http/message/queue.h" + +typedef enum e_HttpResponseState { + HTTP_RESPONSE_NO=0, + HTTP_RESPONSE_START, + HTTP_RESPONSE_PIPE, + HTTP_RESPONSE_DONE +} HttpResponseState; + +CLASS(HttpResponseWriter) { + char * buffer; + char pipe[1024]; + + size_t nbuffer; + size_t rpipe; + size_t wpipe; + char pipe_flip; + + HttpMessageQueue response_queue; + HttpResponse cur_response; + + HttpResponseState state; +}; + +size_t httpResponseWriterWrite(HttpResponseWriter, int); + +#endif // __HTTP_RESPONSE_WRITER_H__ + +// vim: set ts=4 sw=4: diff --git a/include/interface/stream_writer.h b/include/interface/stream_writer.h new file mode 100644 index 0000000..4018707 --- /dev/null +++ b/include/interface/stream_writer.h @@ -0,0 +1,19 @@ +#ifndef __STREAM_WRITER_H__ +#define __STREAM_WRITER_H__ + +#include + +typedef size_t (* fptr_streamWriterWrite)(void *, int fd); + +extern const struct interface i_StreamWriter; + +struct i_StreamWriter { + const struct interface * const _; + fptr_streamWriterWrite write; +}; + +extern size_t streamWriterWrite(void *, int fd); + +#endif // __STREAM_WRITER_H__ + +// vim: set ts=4 sw=4: diff --git a/include/server.h b/include/server.h index e732518..4cb9878 100644 --- a/include/server.h +++ b/include/server.h @@ -7,6 +7,8 @@ #include "class.h" #include "socket.h" #include "logger.h" +#include "http/response.h" + #define POLL_FD_NSIZE 1024 #define POLL_FD_SIZE (sizeof(struct pollfd) * POLL_FD_NSIZE) @@ -24,29 +26,16 @@ CLASS(Server) { Logger logger; Sock sock; - void * reader; - - /** - * loeschen: fds[i].event auf 0 - * dann nfds um die anzahl der geloeschten elemente verkleinern. - * die in close pending stehenden socket schliessen. - * vor jedem poll qsort auf fds ueber event. - * nach dem poll qsort auf fds ueber revent und reuckgebewert - * von poll beruecksichtigen. - */ + nfds_t nfds; struct pollfd fds[POLL_FD_NSIZE]; struct { Sock sock; void * reader; + void * writer; char keep_alive; - - char * wbuf; - char * rbuf; - unsigned int rpos; - unsigned int wpos; } conns[POLL_FD_NSIZE]; }; diff --git a/src/Makefile.am b/src/Makefile.am index 5ee8780..17b90e7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,15 +1,17 @@ ACLOCAL_AMFLAGS = -I m4 AUTOMAKE_OPTIONS = subdir-objects -IFACE = interface/class.c interface/stream_reader.c interface/logger.c +IFACE = interface/class.c interface/stream_reader.c interface/logger.c \ + interface/stream_writer.c CLASS = class.c interface.c SOCKET = socket.c socket/accept.c socket/connect.c socket/listen.c SERVER = server.c server/run.c server/close_conn.c LOGGER = logger.c logger/stderr.c logger/syslog.c -MSG = http/message.c -REQ = http/request.c http/request/queue.c http/request/has_keep_alive.c +MSG = http/message.c http/message/queue.c +REQ = http/request.c http/request/has_keep_alive.c RESP = http/response.c http/response/404.c http/response/size_get.c \ http/response/to_string.c +WRITER = http/response/writer.c http/response/writer/write.c HEADER = http/header.c http/header/get.c http/header/add.c \ http/header/size_get.c http/header/to_string.c PARSER = http/request/parser.c http/request/parser/get_header.c \ @@ -23,5 +25,5 @@ bin_PROGRAMS = testserver testserver_SOURCES = testserver.c \ $(IFACE) $(CLASS) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \ - $(RESP) $(HEADER) $(PARSER) signalHandling.c daemonize.c + $(WRITER) $(RESP) $(HEADER) $(PARSER) signalHandling.c daemonize.c testserver_CFLAGS = -Wall -I ../include/ diff --git a/src/http/request/queue.c b/src/http/message/queue.c similarity index 57% rename from src/http/request/queue.c rename to src/http/message/queue.c index 02c3ca0..734ca1d 100644 --- a/src/http/request/queue.c +++ b/src/http/message/queue.c @@ -3,7 +3,7 @@ #include "class.h" #include "interface/class.h" -#include "http/request/queue.h" +#include "http/message/queue.h" static void @@ -13,15 +13,15 @@ static void dtor(void * _this) { - HttpRequestQueue this = _this; + HttpMessageQueue this = _this; int i; - for (i=0; inrequests; i++) { - delete(&(this->requests)[i]); + for (i=0; inmsgs; i++) { + delete(&(this->msgs)[i]); } } INIT_IFACE(Class, ctor, dtor, NULL); -CREATE_CLASS(HttpRequestQueue, NULL, IFACE(Class)); +CREATE_CLASS(HttpMessageQueue, NULL, IFACE(Class)); // vim: set ts=4 sw=4: diff --git a/src/http/request/parser.c b/src/http/request/parser.c index 2883d99..d196ab1 100644 --- a/src/http/request/parser.c +++ b/src/http/request/parser.c @@ -8,10 +8,9 @@ #include "interface/stream_reader.h" #include "http/request/parser.h" -#include "http/request/queue.h" +#include "http/message/queue.h" #include "http/request.h" -void httpRequestParserParse(HttpRequestParser); static void @@ -19,7 +18,7 @@ ctor(void * _this, va_list * params) { HttpRequestParser this = _this; - this->request_queue = new(HttpRequestQueue); + this->request_queue = new(HttpMessageQueue); this->buffer = malloc(HTTP_REQUEST_PARSER_READ_CHUNK); this->buffer[0] = 0; @@ -33,6 +32,9 @@ dtor(void * _this) free(this->buffer); delete(&(this->request_queue)); + + if (NULL != this->cur_request) + delete(&(this->cur_request)); } static @@ -46,7 +48,7 @@ _clone(void * _this, void * _base) /** * every parser has its own queue... */ - this->request_queue = new(HttpRequestQueue); + this->request_queue = new(HttpMessageQueue); this->buffer_used = base->buffer_used; chunks = this->buffer_used / HTTP_REQUEST_PARSER_READ_CHUNK; diff --git a/src/http/request/parser/parse.c b/src/http/request/parser/parse.c index e7b3bfb..84e2120 100644 --- a/src/http/request/parser/parse.c +++ b/src/http/request/parser/parse.c @@ -36,10 +36,10 @@ httpRequestSkip(char ** data) void httpRequestParserParse(HttpRequestParser this) { - char * line; - int cont = 1; + char * line; + int cont = 1; - while(cont) { + while (cont) { switch(this->state) { case HTTP_REQUEST_GARBAGE: this->cur_data = this->buffer; // initialize static pointer @@ -82,8 +82,9 @@ httpRequestParserParse(HttpRequestParser this) /** * enqueue current request */ - this->request_queue->requests[(this->request_queue->nrequests)++] = - this->cur_request; + this->request_queue->msgs[(this->request_queue->nmsgs)++] = + (HttpMessage)this->cur_request; + this->cur_request = NULL; /** * remove processed stuff from input buffer. diff --git a/src/http/response.c b/src/http/response.c index bc58146..a972dcd 100644 --- a/src/http/response.c +++ b/src/http/response.c @@ -13,7 +13,6 @@ void ctor(void * _this, va_list * params) { HttpResponse this = _this; - char * status; char * reason; PARENTCALL(_this, Class, ctor, params); diff --git a/src/http/response/writer.c b/src/http/response/writer.c new file mode 100644 index 0000000..62409cf --- /dev/null +++ b/src/http/response/writer.c @@ -0,0 +1,47 @@ +#include + +#include "class.h" +#include "interface/class.h" +#include "interface/stream_writer.h" + +#include "http/message/queue.h" +#include "http/response/writer.h" + +static +void +ctor(void * _this, va_list * params) +{ + HttpResponseWriter this = _this; + + this->response_queue = new(HttpMessageQueue); +} + +static +void +dtor(void * _this) +{ + HttpResponseWriter this = _this; + + if (NULL != this->buffer) free(this->buffer); + delete(&(this->response_queue)); + + if (NULL != this->cur_response) + delete(&(this->cur_response)); +} + +static +void +_clone(void * _this, void * _base) +{ + HttpResponseWriter this = _this; + //HttpResponseWriter base = _base; + + this->response_queue = new(HttpMessageQueue); +} + +INIT_IFACE(Class, ctor, dtor, _clone); +INIT_IFACE(StreamWriter, + (fptr_streamWriterWrite)httpResponseWriterWrite); +CREATE_CLASS(HttpResponseWriter, NULL, IFACE(Class), IFACE(StreamWriter)); + +// vim: set ts=4 sw=4: diff --git a/src/http/response/writer/write.c b/src/http/response/writer/write.c new file mode 100644 index 0000000..a3dda7e --- /dev/null +++ b/src/http/response/writer/write.c @@ -0,0 +1,79 @@ +#include +#include +#include +#include + +#include "class.h" +#include "interface/class.h" +#include "http/response.h" +#include "http/response/writer.h" + +size_t +httpResponseWriterWrite(HttpResponseWriter this, int fd) +{ + HttpMessageQueue respq = this->response_queue; + int cont = 1; + size_t written = 0; + + while (cont) { + switch (this->state) { + case HTTP_RESPONSE_NO: + if (NULL == this->cur_response && 0 < respq->nmsgs) { + this->cur_response = (HttpResponse)respq->msgs[0]; + memmove(respq->msgs, &(respq->msgs[1]), --respq->nmsgs); + this->state = HTTP_RESPONSE_START; + } + else { + cont = 0; + } + break; + + case HTTP_RESPONSE_START: + if (NULL == this->buffer) { + this->nbuffer = httpResponseSizeGet(this->cur_response); + this->buffer = calloc(1, this->nbuffer); + httpResponseToString(this->cur_response, this->buffer); + } + { + written = write(fd, this->buffer, this->nbuffer); + + if (-1 == written) { + free (this->buffer); + this->buffer = NULL; + return written; + } + + if (written == this->nbuffer) { + if (HTTP_MESSAGE_BUFFERED == + ((HttpMessage)(this->cur_response))->type) { + this->state = HTTP_RESPONSE_DONE; + } + else { + this->state = HTTP_RESPONSE_PIPE; + } + } + else { + this->nbuffer -= written; + memmove(this->buffer, this->buffer + written, this->nbuffer); + cont = 0; + } + } + break; + + case HTTP_RESPONSE_PIPE: + break; + + case HTTP_RESPONSE_DONE: + free (this->buffer); + this->buffer = NULL; + delete(&(this->cur_response)); + this->state = HTTP_RESPONSE_NO; + + break; + } + } + + return written; +} + +// vim: set ts=4 sw=4: diff --git a/src/interface/stream_writer.c b/src/interface/stream_writer.c new file mode 100644 index 0000000..501afb2 --- /dev/null +++ b/src/interface/stream_writer.c @@ -0,0 +1,19 @@ +#include "class.h" +#include "interface/stream_writer.h" + +const struct interface i_StreamWriter = { + "streamWriter", + 1 +}; + +size_t +streamWriterWrite(void * object, int fd) +{ + size_t ret; + + RETCALL(object, StreamWriter, write, ret, fd); + + return ret; +} + +// vim: set ts=4 sw=4: diff --git a/src/server.c b/src/server.c index fd35486..299f15b 100644 --- a/src/server.c +++ b/src/server.c @@ -20,7 +20,6 @@ ctor(void * _this, va_list * params) int flags; this->logger = va_arg(* params, Logger); - this->reader = va_arg(* params, void*); port = va_arg(* params, int); backlog = va_arg(* params, unsigned int); @@ -47,9 +46,7 @@ dtor(void * _this) if (this->sock->handle != (this->fds)[i].fd) { delete(&(this->conns[(this->fds)[i].fd]).sock); delete(&(this->conns[(this->fds)[i].fd]).reader); - - if (this->conns[(this->fds)[i].fd].wbuf) - free(this->conns[(this->fds)[i].fd].wbuf); + delete(&(this->conns[(this->fds)[i].fd]).writer); } } diff --git a/src/server/close_conn.c b/src/server/close_conn.c index 7369c64..264aef3 100644 --- a/src/server/close_conn.c +++ b/src/server/close_conn.c @@ -11,11 +11,8 @@ serverCloseConn(Server this, unsigned int i) delete(&((this->conns)[fd].sock)); delete(&((this->conns)[fd].reader)); + delete(&((this->conns)[fd].writer)); - if ((this->conns)[fd].wbuf != NULL) { - free((this->conns)[fd].wbuf); - (this->conns)[fd].wbuf = NULL; - } (this->conns)[fd].keep_alive = 0; memset(&(this->fds[i]), 0, sizeof(struct pollfd)); diff --git a/src/server/handle_accept.c b/src/server/handle_accept.c index 97e2c76..76fb13e 100644 --- a/src/server/handle_accept.c +++ b/src/server/handle_accept.c @@ -12,7 +12,8 @@ serverHandleAccept(Server this) (this->conns)[acc->handle].sock = acc; //* clone reader - (this->conns)[acc->handle].reader = clone(this->reader); + (this->conns)[acc->handle].reader = new(HttpRequestParser); + (this->conns)[acc->handle].writer = new(HttpResponseWriter); (this->fds)[this->nfds].fd = acc->handle; (this->fds)[this->nfds].events = POLLIN; diff --git a/src/server/run.c b/src/server/run.c index 3f13b1e..c9069c0 100644 --- a/src/server/run.c +++ b/src/server/run.c @@ -13,13 +13,15 @@ #include "signalHandling.h" #include "interface/class.h" #include "interface/stream_reader.h" +#include "interface/stream_writer.h" #include "interface/logger.h" //* @TODO: to be removed #include "http/request.h" #include "http/request/parser.h" -#include "http/request/queue.h" +#include "http/message/queue.h" #include "http/response.h" +#include "http/response/writer.h" //* until here #undef MAX @@ -102,43 +104,39 @@ serverRun(Server this) } else { int j; - HttpRequestQueue queue = - ((HttpRequestParser)(this->conns)[fd].reader)->request_queue; - - for (j=0; jnrequests; j++) { - HttpResponse response; + HttpMessageQueue reqq = ((HttpRequestParser) \ + (this->conns)[fd].reader)->request_queue; + HttpMessageQueue respq = ((HttpResponseWriter) \ + (this->conns)[fd].writer)->response_queue; + for (j=0; jnmsgs; j++) { /** * @TODO: for now simply remove request and send not found. * Make this sane. */ - response = httpResponse404(); + HttpMessage response = (HttpMessage)httpResponse404(); - if (httpRequestHasKeepAlive(queue->requests[j])) { + if (httpRequestHasKeepAlive((HttpRequest)reqq->msgs[j])) { (this->conns)[fd].keep_alive = 1; httpHeaderAdd( - &(((HttpMessage)response)->header), + &(response->header), new(HttpHeader, "Connection", "Keep-Alive")); } else { (this->conns)[fd].keep_alive = 0; httpHeaderAdd( - &(((HttpMessage)response)->header), + &(response->header), new(HttpHeader, "Connection", "Close")); } - delete(&(queue->requests[j])); - - (this->conns)[fd].wbuf = calloc( - 1, httpResponseSizeGet(response) + 1); - httpResponseToString(response, (this->conns)[fd].wbuf); - - delete(&response); + respq->msgs[(respq->nmsgs)++] = response; + response = NULL; + delete(&(reqq->msgs[j])); (this->fds)[i].events |= POLLOUT; } - queue->nrequests = 0; + reqq->nmsgs = 0; } } } @@ -152,31 +150,13 @@ serverRun(Server this) events--; nwrites--; - size = write( - (this->fds)[i].fd, - (this->conns)[fd].wbuf, - strlen((this->conns)[fd].wbuf)); - - if (size == strlen((this->conns)[fd].wbuf) || - -1 == size) { - if (-1 == size) { - loggerLog(this->logger, LOGGER_ERR, - "write error, closing connection"); - } + size = streamWriterWrite((this->conns)[fd].writer, fd); - if ((this->conns)[fd].keep_alive) { - (this->fds)[i].events &= ~POLLOUT; - } - else { - serverCloseConn(this, i); - } - free((this->conns)[fd].wbuf); - (this->conns)[fd].wbuf = NULL; + if ((this->conns)[fd].keep_alive) { + (this->fds)[i].events &= ~POLLOUT; } else { - memmove((this->conns)[fd].wbuf, - (this->conns)[fd].wbuf + size, - strlen((this->conns)[fd].wbuf) - size + 1); + serverCloseConn(this, i); } } } diff --git a/src/testserver.c b/src/testserver.c index 212c8fa..6e3a262 100644 --- a/src/testserver.c +++ b/src/testserver.c @@ -18,9 +18,8 @@ void daemonize(void); int main() { - Logger logger = new(LoggerSyslog, LOGGER_ERR); - HttpRequestParser parser = new(HttpRequestParser); - Server server = new(Server, logger, parser, 11212, SOMAXCONN); + Logger logger = new(LoggerSyslog, LOGGER_ERR); + Server server = new(Server, logger, 11212, SOMAXCONN); struct rlimit limit = {RLIM_INFINITY, RLIM_INFINITY}; setrlimit(RLIMIT_CPU, &limit); @@ -31,7 +30,6 @@ main() delete(&server); delete(&logger); - delete(&parser); return 0; }