From f2dbad19c684012a6ac235a253349ffe049668db Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Sun, 19 Feb 2012 11:35:15 +0100 Subject: [PATCH] another try with a shared memory based ringbuffer...this performs well for keep-alive sessions but is much slower without. actually i am not sure why but most likely the shared memory setup is quite expensive. @TODO: make a profiling. --- ChangeLog | 6 +- include/http/message.h | 1 + include/http/request/parser.h | 8 +- include/http/response/writer.h | 15 ++-- include/http/worker.h | 3 +- include/server.h | 3 - include/socket.h | 2 +- src/Makefile.am | 7 +- src/cbuf.c | 113 +++++++++++++++++++++++++++++ src/cbuf/addr_index.c | 11 +++ src/cbuf/get_data.c | 19 +++++ src/cbuf/get_free.c | 11 +++ src/cbuf/get_line.c | 26 +++++++ src/cbuf/get_read.c | 9 +++ src/cbuf/get_write.c | 9 +++ src/cbuf/inc_read.c | 14 ++++ src/cbuf/inc_write.c | 14 ++++ src/cbuf/is_empty.c | 9 +++ src/cbuf/memchr.c | 11 +++ src/cbuf/read.c | 37 ++++++++++ src/cbuf/set_data.c | 23 ++++++ src/cbuf/skip_non_alpha.c | 12 +++ src/cbuf/write.c | 29 ++++++++ src/http/request/parser.c | 16 ++-- src/http/request/parser/get_body.c | 55 +++++++------- src/http/request/parser/parse.c | 94 ++++++++---------------- src/http/request/parser/read.c | 63 ++++++++++------ src/http/response/writer.c | 7 ++ src/http/response/writer/write.c | 90 +++++++++-------------- src/http/worker.c | 14 ++-- src/server/close_conn.c | 2 - src/server/handle_accept.c | 10 ++- src/server/run.c | 26 +++++-- src/socket/accept.c | 5 +- src/testserver.c | 6 +- 35 files changed, 557 insertions(+), 223 deletions(-) create mode 100644 src/cbuf.c create mode 100644 src/cbuf/addr_index.c create mode 100644 src/cbuf/get_data.c create mode 100644 src/cbuf/get_free.c create mode 100644 src/cbuf/get_line.c create mode 100644 src/cbuf/get_read.c create mode 100644 src/cbuf/get_write.c create mode 100644 src/cbuf/inc_read.c create mode 100644 src/cbuf/inc_write.c create mode 100644 src/cbuf/is_empty.c create mode 100644 src/cbuf/memchr.c create mode 100644 src/cbuf/read.c create mode 100644 src/cbuf/set_data.c create mode 100644 src/cbuf/skip_non_alpha.c create mode 100644 src/cbuf/write.c diff --git a/ChangeLog b/ChangeLog index e383ade..c9a041f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,10 +1,10 @@ -2012-02-18 20:12:27 +0100 Georg Hopp +2012-02-19 11:35:15 +0100 Georg Hopp - * lots of changes but primarily change the request parser to use a ringbuffer. The ringbuffer is implemented using the shared memory trick. (HEAD, master) + * another try with a shared memory based ringbuffer...this performs well for keep-alive sessions but is much slower without. actually i am not sure why but most likely the shared memory setup is quite expensive. @TODO: make a profiling. (HEAD, ringbuffer) 2012-02-15 12:30:33 +0100 Georg Hopp - * some more cleanups in the server code. Removing not needed header includes (origin/master, origin/HEAD) + * some more cleanups in the server code. Removing not needed header includes 2012-02-15 12:17:39 +0100 Georg Hopp diff --git a/include/http/message.h b/include/http/message.h index a70abb7..876360d 100644 --- a/include/http/message.h +++ b/include/http/message.h @@ -19,6 +19,7 @@ CLASS(HttpMessage) { int handle; char * body; int nbody; + int dbody; }; char httpMessageHasKeepAlive(HttpMessage); diff --git a/include/http/request/parser.h b/include/http/request/parser.h index 110390a..412df5b 100644 --- a/include/http/request/parser.h +++ b/include/http/request/parser.h @@ -4,9 +4,10 @@ #include "class.h" #include "http/request.h" #include "http/message/queue.h" -#include "ringbuffer.h" +#include "cbuf.h" + +#define HTTP_REQUEST_PARSER_BUFFER_MAX 8192 -#define HTTP_REQUEST_PARSER_MAX_BUF 131072 /** * limits to stop invalid requests from killing @@ -31,7 +32,7 @@ typedef enum e_HttpRequestState { CLASS(HttpRequestParser) { - Ringbuffer buffer; + Cbuf buffer; HttpMessageQueue request_queue; HttpRequest cur_request; @@ -41,6 +42,7 @@ CLASS(HttpRequestParser) { ssize_t httpRequestParserRead(HttpRequestParser, int); ssize_t httpRequestParserParse(HttpRequestParser, int); +void httpRequestParserGetBody(HttpRequestParser); ssize_t httpRequestParserGetRequestLine(HttpRequestParser, char *); ssize_t httpRequestParserGetHeader(HttpRequestParser, char *); diff --git a/include/http/response/writer.h b/include/http/response/writer.h index 452745b..c3e9b81 100644 --- a/include/http/response/writer.h +++ b/include/http/response/writer.h @@ -1,9 +1,12 @@ #ifndef __HTTP_RESPONSE_WRITER_H__ #define __HTTP_RESPONSE_WRITER_H__ +#include + #include "class.h" #include "http/response.h" #include "http/message/queue.h" +#include "cbuf.h" #define RESPONSE_WRITER_MAX_BUF 131072 @@ -15,17 +18,15 @@ typedef enum e_HttpResponseState { } HttpResponseState; CLASS(HttpResponseWriter) { - char * pipe; - - size_t nheader; - size_t nbuffer; - size_t written; - size_t pstart; - size_t pend; + Cbuf buffer; HttpMessageQueue response_queue; HttpResponse cur_response; + size_t nheader; + size_t nbody; + size_t written; + HttpResponseState state; }; diff --git a/include/http/worker.h b/include/http/worker.h index cc44256..9040b0c 100644 --- a/include/http/worker.h +++ b/include/http/worker.h @@ -8,8 +8,7 @@ #include "http/response/writer.h" CLASS(HttpWorker) { - char * remoteAddr; - int handle; + char * id; HttpRequestParser parser; HttpResponseWriter writer; diff --git a/include/server.h b/include/server.h index dbc0737..2d7b388 100644 --- a/include/server.h +++ b/include/server.h @@ -34,10 +34,7 @@ CLASS(Server) { struct { Sock sock; - void * worker; - - char keep_alive; } conns[POLL_FD_NSIZE]; }; diff --git a/include/socket.h b/include/socket.h index ee6a698..50b750b 100644 --- a/include/socket.h +++ b/include/socket.h @@ -15,7 +15,7 @@ CLASS(Sock) { void socketConnect(Sock this, const char * addr); void socketListen(Sock this, int backlog); -Sock socketAccept(Sock this, char (*remoteAddr)[]); +Sock socketAccept(Sock this, char (*remoteAddr)[16]); #endif // __SOCKET_H__ diff --git a/src/Makefile.am b/src/Makefile.am index f2905a1..041880c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -9,6 +9,11 @@ RB = ringbuffer.c ringbuffer/rb_read.c SOCKET = socket.c socket/accept.c socket/connect.c socket/listen.c SERVER = server.c server/run.c server/close_conn.c LOGGER = logger.c logger/stderr.c logger/syslog.c +CB = cbuf.c cbuf/read.c cbuf/write.c \ + cbuf/get_line.c cbuf/set_data.c cbuf/get_data.c \ + cbuf/addr_index.c cbuf/get_free.c cbuf/get_read.c cbuf/get_write.c \ + cbuf/inc_read.c cbuf/inc_write.c cbuf/is_empty.c cbuf/memchr.c \ + cbuf/skip_non_alpha.c MSG = http/message.c http/message/queue.c http/message/has_keep_alive.c \ http/message/header_size_get.c http/message/header_to_string.c REQ = http/request.c @@ -31,7 +36,7 @@ bin_PROGRAMS = testserver testserver_SOURCES = testserver.c \ $(IFACE) $(CLASS) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \ - $(WRITER) $(RESP) $(HEADER) $(PARSER) $(WORKER) $(RB) \ + $(WRITER) $(RESP) $(HEADER) $(PARSER) $(WORKER) $(CB) \ signalHandling.c daemonize.c testserver_CFLAGS = -Wall -I ../include/ testserver_LDFLAGS = -lrt diff --git a/src/cbuf.c b/src/cbuf.c new file mode 100644 index 0000000..6c86e37 --- /dev/null +++ b/src/cbuf.c @@ -0,0 +1,113 @@ +#define _POSIX_SOURCE +#define _POSIX_C_SOURCE 200112L + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "class.h" +#include "interface/class.h" + +#include "cbuf.h" + + +static void dtor(void*); + +static +void +ctor(void * _this, va_list * params) +{ + Cbuf this = _this; + char state = 0; + char * shm_name = va_arg(*params, char*); + long psize = sysconf(_SC_PAGESIZE); + size_t size; + int shm; + + this->shm_name = malloc(strlen(shm_name) + 7 + 2); + sprintf(this->shm_name, "/%06d_%s", getpid(), shm_name); + + /** + * align size at page boundary. + * increase as neccessary + */ + size = va_arg(*params, size_t); + size = (0 >= size)? 1 : (0 != size%psize)? (size/psize)+1 : size/psize; + this->bsize = psize * size; + + while (0 == state) { + shm = shm_open(this->shm_name, O_RDWR|O_CREAT|O_EXCL, S_IRWXU); + if (-1 == shm) { + break; + } + + if (-1 == ftruncate(shm, this->bsize)) { + break; + } + + this->data = mmap (0, this->bsize << 1, + PROT_READ|PROT_WRITE, MAP_SHARED, shm, 0); + if (this->data == MAP_FAILED) { + this->data = NULL; + break; + } + + munmap(this->data + this->bsize, this->bsize); + + this->mirror = mmap (this->data + this->bsize, this->bsize, + PROT_READ|PROT_WRITE, MAP_SHARED, shm, 0); + if (this->mirror != this->data + this->bsize) { + if (this->mirror == this->data - this->bsize) { + this->data = this->mirror; + this->mirror += this->bsize; + } + else { + this->mirror = NULL; + break; + } + } + + state = 1; + } + + if (-1 != shm) { + shm_unlink(this->shm_name); + close(shm); + } + + if (1 != state) { + dtor(this); + } +} + +static +void +dtor(void * _this) +{ + Cbuf this = _this; + + if (NULL != this->shm_name) { + free(this->shm_name); + this->shm_name = NULL; + } + + if (NULL != this->data) { + munmap(this->data, this->bsize); + this->data = NULL; + } + + if (NULL != this->mirror) { + munmap(this->mirror, this->bsize); + this->mirror = NULL; + } +} + +INIT_IFACE(Class, ctor, dtor, NULL); +CREATE_CLASS(Cbuf, NULL, IFACE(Class)); + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/addr_index.c b/src/cbuf/addr_index.c new file mode 100644 index 0000000..8639b73 --- /dev/null +++ b/src/cbuf/addr_index.c @@ -0,0 +1,11 @@ +#include + +#include "cbuf.h" + +size_t +cbufAddrIndex(Cbuf this, const void * c) +{ + return c - (const void *)cbufGetRead(this); +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/get_data.c b/src/cbuf/get_data.c new file mode 100644 index 0000000..0ebdb9f --- /dev/null +++ b/src/cbuf/get_data.c @@ -0,0 +1,19 @@ +#include +#include + +#include "cbuf.h" + +char * +cbufGetData(Cbuf this, size_t n) +{ + char * ret = cbufGetRead(this); + + if (n > this->bused) { + return -1; + } + + cbufIncRead(this, n); + return ret; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/get_free.c b/src/cbuf/get_free.c new file mode 100644 index 0000000..a42ca22 --- /dev/null +++ b/src/cbuf/get_free.c @@ -0,0 +1,11 @@ +#include + +#include "cbuf.h" + +size_t +cbufGetFree(Cbuf this) +{ + return this->bsize - this->bused; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/get_line.c b/src/cbuf/get_line.c new file mode 100644 index 0000000..1bf7c0d --- /dev/null +++ b/src/cbuf/get_line.c @@ -0,0 +1,26 @@ +#include + +#include + +#include "cbuf.h" + +char * +cbufGetLine(Cbuf this) +{ + char * nl = cbufMemchr(this, '\n'); + char * ret = NULL; + + if (NULL != nl) { + size_t len = cbufAddrIndex(this, nl) + 1; + + *nl = 0; + *(nl-1) = ('\r' == *(nl-1))? 0 : *(nl-1); + + ret = cbufGetRead(this); + cbufIncRead(this, len); + } + + return ret; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/get_read.c b/src/cbuf/get_read.c new file mode 100644 index 0000000..62097df --- /dev/null +++ b/src/cbuf/get_read.c @@ -0,0 +1,9 @@ +#include "cbuf.h" + +char * +cbufGetRead(Cbuf this) +{ + return this->data + this->read; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/get_write.c b/src/cbuf/get_write.c new file mode 100644 index 0000000..91d74b1 --- /dev/null +++ b/src/cbuf/get_write.c @@ -0,0 +1,9 @@ +#include "cbuf.h" + +char * +cbufGetWrite(Cbuf this) +{ + return this->data + this->write; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/inc_read.c b/src/cbuf/inc_read.c new file mode 100644 index 0000000..431f754 --- /dev/null +++ b/src/cbuf/inc_read.c @@ -0,0 +1,14 @@ +#include + +#include "cbuf.h" + +void +cbufIncRead(Cbuf this, size_t n) +{ + this->read += n; + this->read = (this->read >= this->bsize)? + this->read - this->bsize : this->read; + this->bused -= n; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/inc_write.c b/src/cbuf/inc_write.c new file mode 100644 index 0000000..68aee12 --- /dev/null +++ b/src/cbuf/inc_write.c @@ -0,0 +1,14 @@ +#include + +#include "cbuf.h" + +void +cbufIncWrite(Cbuf this, size_t n) +{ + this->write += n; + this->write = (this->write >= this->bsize)? + this->write - this->bsize : this->write; + this->bused += n; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/is_empty.c b/src/cbuf/is_empty.c new file mode 100644 index 0000000..e4ca0aa --- /dev/null +++ b/src/cbuf/is_empty.c @@ -0,0 +1,9 @@ +#include "cbuf.h" + +char +cbufIsEmpty(Cbuf this) +{ + return (0 == this->bused); +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/memchr.c b/src/cbuf/memchr.c new file mode 100644 index 0000000..fcbb182 --- /dev/null +++ b/src/cbuf/memchr.c @@ -0,0 +1,11 @@ +#include + +#include "cbuf.h" + +char * +cbufMemchr(Cbuf this, int c) +{ + return memchr(cbufGetRead(this), c, this->bused); +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/read.c b/src/cbuf/read.c new file mode 100644 index 0000000..5f71511 --- /dev/null +++ b/src/cbuf/read.c @@ -0,0 +1,37 @@ +#include +#include +#include + +#include "cbuf.h" + + +ssize_t +cbufRead(Cbuf this, int fd) +{ + ssize_t rrsize = 0; + size_t rsize = cbufGetFree(this); + + if (0 == rsize) { + errno = ECBUFOVFL; + return -1; + } + + rrsize = read(fd, cbufGetWrite(this), rsize); + + switch (rrsize) { + case 0: + rrsize = -2; + // DROP THROUGH + + case -1: + break; + + default: + cbufIncWrite(this, rrsize); + break; + } + + return rrsize; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/set_data.c b/src/cbuf/set_data.c new file mode 100644 index 0000000..1748c66 --- /dev/null +++ b/src/cbuf/set_data.c @@ -0,0 +1,23 @@ +#include +#include +#include + +#include "cbuf.h" + +char * +cbufSetData(Cbuf this, const void * src, size_t n) +{ + char * addr; + + if (n > cbufGetFree(this)) { + errno = ECBUFOVFL; + return -1; + } + + addr = memcpy(cbufGetWrite(this), src, n); + cbufIncWrite(this, n); + + return addr; +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/skip_non_alpha.c b/src/cbuf/skip_non_alpha.c new file mode 100644 index 0000000..25d96b9 --- /dev/null +++ b/src/cbuf/skip_non_alpha.c @@ -0,0 +1,12 @@ +#include + +#include "cbuf.h" + +void +cbufSkipNonAlpha(Cbuf this) +{ + while(0 > this->bused && isalpha(*cbufGetRead(this))) + cbufIncRead(this, 1); +} + +// vim: set ts=4 sw=4: diff --git a/src/cbuf/write.c b/src/cbuf/write.c new file mode 100644 index 0000000..37ec154 --- /dev/null +++ b/src/cbuf/write.c @@ -0,0 +1,29 @@ +#include +#include + +#include "cbuf.h" + + +ssize_t +cbufWrite(Cbuf this, int fd) +{ + ssize_t wwsize = 0; + size_t wsize = this->bused; + + if (0 == wsize) return 0; + + wwsize = write(fd, cbufGetRead(this), wsize); + + switch (wwsize) { + case -1: + break; + + default: + cbufIncRead(this, wwsize); + break; + } + + return wwsize; +} + +// vim: set ts=4 sw=4: diff --git a/src/http/request/parser.c b/src/http/request/parser.c index d33e864..bf0d2f0 100644 --- a/src/http/request/parser.c +++ b/src/http/request/parser.c @@ -1,5 +1,5 @@ #include -#include +#include #include #include @@ -10,17 +10,20 @@ #include "http/request/parser.h" #include "http/message/queue.h" #include "http/request.h" -#include "ringbuffer.h" +#include "cbuf.h" static void ctor(void * _this, va_list * params) { - HttpRequestParser this = _this; - char * shm_name = va_arg(*params, char*); + HttpRequestParser this = _this; + char * id = va_arg(*params, char*); + char cbuf_id[100]; + + sprintf(cbuf_id, "%s_%s", "parser", id); - this->buffer = new(Ringbuffer, shm_name, HTTP_REQUEST_LINE_MAX); + this->buffer = new(Cbuf, cbuf_id, HTTP_REQUEST_PARSER_BUFFER_MAX); this->request_queue = new(HttpMessageQueue); } @@ -37,9 +40,8 @@ dtor(void * _this) delete(&(this->cur_request)); } - INIT_IFACE(Class, ctor, dtor, NULL); -INIT_IFACE(StreamReader, (fptr_streamReaderRead)httpRequestParserParse); +INIT_IFACE(StreamReader, (fptr_streamReaderRead)httpRequestParserRead); CREATE_CLASS(HttpRequestParser, NULL, IFACE(Class), IFACE(StreamReader)); // vim: set ts=4 sw=4: diff --git a/src/http/request/parser/get_body.c b/src/http/request/parser/get_body.c index 4c00f8d..372ec9a 100644 --- a/src/http/request/parser/get_body.c +++ b/src/http/request/parser/get_body.c @@ -3,6 +3,10 @@ #include "http/header.h" #include "http/message.h" #include "http/request/parser.h" +#include "cbuf.h" + +#define MAX(a,b) (((a) > (b))? (a) : (b)) + #define MAX(x,y) ((x) > (y) ? (x) : (y)) @@ -14,41 +18,34 @@ void httpRequestParserGetBody(HttpRequestParser this) { HttpMessage message = (HttpMessage)(this->cur_request); - char * str_nbody; - int nbody; - int len; - - str_nbody = httpHeaderGet( - &(message->header), - "Content-Length"); - - if (NULL == str_nbody) { - this->state = HTTP_REQUEST_DONE; - return -1; - } - - nbody = atoi(str_nbody); + size_t len; if (0 == message->nbody) { - message->type = HTTP_MESSAGE_BUFFERED; - if (0 < nbody) - message->body = malloc(nbody); + char * nbody = httpHeaderGet( + &(message->header), + "Content-Length"); + + if (NULL == nbody) { + this->state = HTTP_REQUEST_DONE; + return; + } + else { + message->type = HTTP_MESSAGE_BUFFERED; + message->nbody = atoi(nbody); + message->body = calloc(1, message->nbody); + message->dbody = 0; + } } + this->buffer->bused -= len; - len = MAX(nbody - message->nbody, this->buffer->bused); - memcpy(message->body + message->nbody, - this->buffer->buffer + this->buffer->bstart, - len); + if (message->dbody < message->nbody) { + len = MAX( + message->nbody - message->dbody, + this->buffer->bused); - message->nbody += len; - this->buffer->bstart += len; - if (this->buffer->bstart >= this->buffer->bsize) { - this->buffer->bstart -= this->buffer->bsize; - } - this->buffer->bused -= len; + memcpy(message->body, cbufGetData(this->buffer, len), len); - if (message->nbody == nbody) { - this->state = HTTP_REQUEST_DONE; + message->dbody += len; } } diff --git a/src/http/request/parser/parse.c b/src/http/request/parser/parse.c index c507cd9..d04990f 100644 --- a/src/http/request/parser/parse.c +++ b/src/http/request/parser/parse.c @@ -7,81 +7,35 @@ #include "http/message.h" #include "http/request/parser.h" #include "interface/class.h" - -#ifndef TRUE -#define TRUE 1 -#endif - -#ifndef FALSE -#define FALSE 0 -#endif - -static -inline -char * -getLine(HttpRequestParser this) -{ - char * cr = memchr( - this->buffer->buffer + this->buffer->bstart, - '\r', - this->buffer->bused); - - char * nl = (NULL == cr)? NULL : cr + 1; - - if (NULL != cr && NULL != nl && '\n' == *nl) { - *cr = 0; - return cr; - } - - return NULL; -} - -static -inline -char -httpRequestSkip(HttpRequestParser this) -{ - while (this->buffer->bused > 0 && - ! isalpha(this->buffer->buffer[this->buffer->bstart])) { - this->buffer->bstart = (this->buffer->bstart >= this->buffer->bsize)? - 0 : this->buffer->bstart + 1; - this->buffer->bused--; - } - - return (isalpha(this->buffer->buffer[this->buffer->bstart]))? TRUE : FALSE; -} +#include "cbuf.h" ssize_t httpRequestParserParse(HttpRequestParser this, int fd) { int cont = 1; - ssize_t ret; + ssize_t read; + char * line; - if (0 > (ret = rbRead(this->buffer, fd))) { - cont = 0; + if(0 > (read = cbufRead(this->buffer, fd))) { + return read; } while (cont) { switch(this->state) { - char * line_end; - size_t len; - case HTTP_REQUEST_GARBAGE: - if (httpRequestSkip(this)) { + cbufSkipNonAlpha(this->buffer); + if (! cbufIsEmpty(this->buffer)) { this->cur_request = new(HttpRequest); this->state = HTTP_REQUEST_START; } - else { - cont = 0; - } break; case HTTP_REQUEST_START: - if (NULL == (line_end = getLine(this))) { + if (NULL == (line = cbufGetLine(this->buffer))) { cont = 0; break; } - + if (0 > httpRequestParserGetRequestLine(this, line_end)) { ret = -1; cont = 0; @@ -99,7 +53,7 @@ httpRequestParserParse(HttpRequestParser this, int fd) break; case HTTP_REQUEST_REQUEST_LINE_DONE: - if (NULL == (line_end = getLine(this))) { + if (NULL == (line = cbufGetLine(this->buffer))) { cont = 0; break; } @@ -127,23 +81,33 @@ httpRequestParserParse(HttpRequestParser this, int fd) break; case HTTP_REQUEST_HEADERS_DONE: - /** - * allocate memory according to content-length. - * If content length is to large reject request. - * - * @FUTURE check for multipart mime and handle it - * with temporary file. - */ - httpRequestParserGetBody(this); + { + HttpMessage message = (HttpMessage)this->cur_request; + + httpRequestParserGetBody(this); + + if (message->dbody == message->nbody) { + this->state = HTTP_REQUEST_DONE; + } + } break; case HTTP_REQUEST_DONE: this->request_queue->msgs[(this->request_queue->nmsgs)++] = (HttpMessage)this->cur_request; - ret = this->request_queue->nmsgs; this->cur_request = NULL; + /** + * dont continue loop if input buffer is empty + */ + if (cbufIsEmpty(this->buffer)) { + cont = 0; + } + + /** + * prepare for next request + */ this->state = HTTP_REQUEST_GARBAGE; break; diff --git a/src/http/request/parser/read.c b/src/http/request/parser/read.c index 57987df..972a6b5 100644 --- a/src/http/request/parser/read.c +++ b/src/http/request/parser/read.c @@ -14,29 +14,46 @@ ssize_t httpRequestParserRead(HttpRequestParser this, int fd) { - size_t rsize; - ssize_t temp; - - this->bend = (this->bsize == this->bend)? - 0 : this->bend; - - rsize = (this->bstart <= this->bend)? - this->bsize - this->bend : - this->bstart - 1; - - if (0 >= (temp = read(fd, &(this->buffer[this->bend]), rsize))) { - /** - * this means either we had an rsize of 0 what indicates that - * the buffer ran full without any processing took place or - * the connection was terminated in some way. In both cases - * we want to terminate the connection. - */ - return (0 == temp)? -2 : -1; - } - - this->bend += temp; - - return temp; + /** + * @obsolete + */ + return -1; +// size_t remaining, chunks; +// char buffer[1024]; +// +// ssize_t size = read(fd, buffer, 1024); +// +// if (0 < size) { +// remaining = this->buffer_used % HTTP_REQUEST_PARSER_READ_CHUNK; +// remaining = HTTP_REQUEST_PARSER_READ_CHUNK - remaining; +// chunks = this->buffer_used / HTTP_REQUEST_PARSER_READ_CHUNK; +// +// /** +// * because a division always rounds down +// * chunks holds exactly the currently allocated chunks if +// * remaining equals 0 but there is no space left. +// * Else chunks holds the actually allocated amount of chunks +// * minus 1. +// * For this reason chunks always has to be increased by 1. +// */ +// chunks++; +// +// if (size >= remaining) { +// this->buffer = +// realloc(this->buffer, chunks * HTTP_REQUEST_PARSER_READ_CHUNK); +// } +// +// memcpy(this->buffer + this->buffer_used, buffer, size); +// this->buffer_used += size; +// this->buffer[this->buffer_used] = 0; +// +// size = httpRequestParserParse(this); +// } +// else { +// size = (0 == size)? -2 : size; +// } +// +// return size; } // vim: set ts=4 sw=4: diff --git a/src/http/response/writer.c b/src/http/response/writer.c index c05080d..564d9f9 100644 --- a/src/http/response/writer.c +++ b/src/http/response/writer.c @@ -1,4 +1,5 @@ #include +#include #include "class.h" #include "interface/class.h" @@ -12,7 +13,12 @@ void ctor(void * _this, va_list * params) { HttpResponseWriter this = _this; + char * id = va_arg(*params, char*); + char cbuf_id[100]; + sprintf(cbuf_id, "%s_%s", "writer", id); + + this->buffer = new(Cbuf, cbuf_id, RESPONSE_WRITER_MAX_BUF); this->response_queue = new(HttpMessageQueue); } @@ -23,6 +29,7 @@ dtor(void * _this) HttpResponseWriter this = _this; delete(&(this->response_queue)); + delete(&(this->buffer)); if (NULL != this->cur_response) delete(&(this->cur_response)); diff --git a/src/http/response/writer/write.c b/src/http/response/writer/write.c index b185c3d..27c9972 100644 --- a/src/http/response/writer/write.c +++ b/src/http/response/writer/write.c @@ -11,7 +11,9 @@ #include "http/message.h" #include "http/response.h" #include "http/response/writer.h" +#include "cbuf.h" +#define MIN(x,y) ((x) < (y) ? (x) : (y)) #define MAX(x,y) ((x) > (y) ? (x) : (y)) #define _PSIZE(x) (MAX((x),RESPONSE_WRITER_MAX_BUF)) #define PSIZE _PSIZE(this->nheader+message->nbody) @@ -21,7 +23,6 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) { HttpMessageQueue respq = this->response_queue; HttpMessage message = (HttpMessage)this->cur_response; - ssize_t processed = (message)? 1 : 0; int cont = 1; while (cont) { @@ -30,19 +31,13 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) if (NULL == this->cur_response && 0 < respq->nmsgs) { message = respq->msgs[0]; this->cur_response = (HttpResponse)message; - processed++; - memmove(respq->msgs, - &(respq->msgs[1]), - sizeof(void*) * (--respq->nmsgs + 1)); - - this->nbuffer = 0; this->written = 0; - this->pstart = 0; + this->nbody = 0; this->nheader = httpMessageHeaderSizeGet(message); - this->pend = this->nheader; - this->pipe = malloc(PSIZE); - httpMessageHeaderToString(message, this->pipe); + + httpMessageHeaderToString(message, cbufGetWrite(this->buffer)); + cbufIncWrite(this->buffer, this->nheader); this->state = HTTP_RESPONSE_WRITE; } @@ -55,57 +50,41 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) /** * read */ - if (this->nbuffer < message->nbody) { - size_t temp = 0; - size_t rsize; - - this->pend = (PSIZE == this->pend)? - 0 : this->pend; - - rsize = (this->pstart <= this->pend)? - PSIZE - this->pend : - this->pstart - 1; + if (this->nbody < message->nbody) { + size_t size = MIN( + message->nbody - this->nbody, + cbufGetFree(this->buffer)); switch (message->type) { case HTTP_MESSAGE_BUFFERED: - temp = message->nbody - this->nbuffer; - temp = (rsizepipe[this->pend]), - &(message->body[this->nbuffer]), - temp); + cbufSetData(this->buffer, + message->body + this->nbody, + size); break; case HTTP_MESSAGE_PIPED: - temp = read( - message->handle, - &(this->pipe[this->pend]), - rsize); + size = cbufRead(this->buffer, message->handle); break; + + default: + return -1; } - this->nbuffer += temp; - this->pend += temp; + this->nbody += size; } /** * write */ { - size_t wsize; - size_t temp; - - wsize = (this->pstart <= this->pend)? - this->pend - this->pstart : - PSIZE - this->pstart; + ssize_t written = cbufWrite(this->buffer, fd); - temp = write(fd, &(this->pipe[this->pstart]), wsize); - - this->written += temp; - this->pstart += temp; - - this->pstart = (PSIZE == this->pstart)? - 0 : this->pstart; + if (0 <= written) { + this->written += written; + } + else { + return -1; + } } if (this->written == message->nbody + this->nheader) { @@ -121,33 +100,30 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) close(message->handle); } - free(this->pipe); + this->state = HTTP_RESPONSE_GET; - this->nheader = 0; - this->nbuffer = 0; - this->written = 0; - this->pstart = 0; - this->pend = 0; + memmove(respq->msgs, + &(respq->msgs[1]), + sizeof(void*) * (--respq->nmsgs + 1)); if (! httpMessageHasKeepAlive(message)) { /** * if the message did not have the keep-alive feature * we don't care about further pipelined messages and - * return the to caller with a 0 indicating that the + * return to the caller with a 0 indicating that the * underlying connection should be closed. */ - processed = 0; - cont = 0; + delete(&this->cur_response); + return -1; } delete(&this->cur_response); - this->state = HTTP_RESPONSE_GET; break; } } - return processed; + return respq->nmsgs; } // vim: set ts=4 sw=4: diff --git a/src/http/worker.c b/src/http/worker.c index 4e7a347..5a3f0ce 100644 --- a/src/http/worker.c +++ b/src/http/worker.c @@ -1,5 +1,6 @@ #include #include +#include #include #include "class.h" @@ -17,14 +18,13 @@ void ctor(void * _this, va_list * params) { HttpWorker this = _this; - char id[sizeof(SHMN) + 15 + 5]; + char * id = va_arg(*params, char *); - this->remoteAddr = va_arg(*params, char *); - this->handle = va_arg(*params, int); - sprintf(id, SHMN "%s_%05d", this->remoteAddr, this->handle); + this->id = malloc(strlen(id) + 1); + strcpy(this->id, id); - this->parser = new(HttpRequestParser, id); - this->writer = new(HttpResponseWriter); + this->parser = new(HttpRequestParser, this->id); + this->writer = new(HttpResponseWriter, this->id); } static @@ -33,6 +33,8 @@ dtor(void * _this) { HttpWorker this = _this; + free(this->id); + delete(&this->parser); delete(&this->writer); } diff --git a/src/server/close_conn.c b/src/server/close_conn.c index 9d7d756..1531fb6 100644 --- a/src/server/close_conn.c +++ b/src/server/close_conn.c @@ -12,8 +12,6 @@ serverCloseConn(Server this, unsigned int i) delete(&((this->conns)[fd].sock)); delete(&((this->conns)[fd].worker)); - (this->conns)[fd].keep_alive = 0; - memset(&(this->fds[i]), 0, sizeof(struct pollfd)); } diff --git a/src/server/handle_accept.c b/src/server/handle_accept.c index 12c28b1..ebd3063 100644 --- a/src/server/handle_accept.c +++ b/src/server/handle_accept.c @@ -1,4 +1,8 @@ #include +#include + +#include "http/worker.h" + #include "http/worker.h" @@ -12,11 +16,15 @@ serverHandleAccept(Server this) acc = socketAccept(this->sock, &remoteAddr); if (-1 != acc->handle) { + char id[21]; + + sprintf(id, "my_%s_%05d", remoteAddr, acc->handle); + //* save the socket handle (this->conns)[acc->handle].sock = acc; //* clone reader - (this->conns)[acc->handle].worker = new(HttpWorker, remoteAddr, acc->handle); + (this->conns)[acc->handle].worker = new(HttpWorker, id); (this->fds)[this->nfds].fd = acc->handle; (this->fds)[this->nfds].events = POLLIN; diff --git a/src/server/run.c b/src/server/run.c index d54cfe2..64dc10b 100644 --- a/src/server/run.c +++ b/src/server/run.c @@ -62,12 +62,12 @@ serverRun(Server this) nreads--; switch (serverRead(this, i)) { - case -2: - case -1: - serverCloseConn(this, i); + case 0: break; - case 0: + case -1: + case -2: + serverCloseConn(this, i); break; default: @@ -80,14 +80,24 @@ serverRun(Server this) * handle writes */ if (0 != ((this->fds)[i].revents & POLLOUT) && 0 < nwrites) { + size_t remaining; + events--; nwrites--; - if (0 >= streamWriterWrite((this->conns)[fd].worker, fd)) { - serverCloseConn(this, i); - } + remaining = streamWriterWrite((this->conns)[fd].worker, fd); + switch(remaining) { + case -1: + serverCloseConn(this, i); + break; + + case 0: + (this->fds)[i].events &= ~POLLOUT; + break; - (this->fds)[i].events &= ~POLLOUT; + default: + break; + } } } } diff --git a/src/socket/accept.c b/src/socket/accept.c index ec428d5..a7a8ac8 100644 --- a/src/socket/accept.c +++ b/src/socket/accept.c @@ -6,7 +6,7 @@ #include "interface/logger.h" Sock -socketAccept(Sock this, char (*remoteAddr)[]) +socketAccept(Sock this, char (*remoteAddr)[16]) { Sock sock; /* Socket for client */ unsigned int len; /* Length of client address data structure */ @@ -33,7 +33,8 @@ socketAccept(Sock this, char (*remoteAddr)[]) loggerLog(this->log, LOGGER_WARNING, "error accepting connection: %s", strerror(errno)); } else { - memcpy(*remoteAddr, inet_ntoa((sock->addr).sin_addr), 15); + strcpy(*remoteAddr, inet_ntoa((sock->addr).sin_addr)); + loggerLog(this->log, LOGGER_INFO, "handling client %s\n", *remoteAddr); } diff --git a/src/testserver.c b/src/testserver.c index 2bc6c8b..7289993 100644 --- a/src/testserver.c +++ b/src/testserver.c @@ -19,8 +19,8 @@ int main() { Logger logger = new(LoggerSyslog, LOGGER_ERR); - HttpWorker worker = new(HttpWorker, "", 0); - Server server = new(Server, logger, worker, 11212, SOMAXCONN); + //HttpWorker worker = new(HttpWorker); + Server server = new(Server, logger, NULL /*worker*/, 11212, SOMAXCONN); struct rlimit limit = {RLIM_INFINITY, RLIM_INFINITY}; setrlimit(RLIMIT_CPU, &limit); @@ -30,7 +30,7 @@ main() serverRun(server); delete(&server); - delete(&worker); +// delete(&worker); delete(&logger); return 0;