From 56fdd4bd00d7de8cb9553630c0d69f7f94cb6db0 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Tue, 3 Sep 2013 21:57:06 +0100 Subject: [PATCH] changed socket handling according to my definition...and make sockets nonblocking as the answer of poll is just a guess... --- TODO | 3 +++ include/socket.h | 1 + src/cbuf/read.c | 17 ++++---------- src/http/parser/parse.c | 45 +++++++++++++++++++------------------- src/http/worker/process.c | 14 +++++------- src/server/handle_accept.c | 1 + src/server/run.c | 37 +++++++++++++++++++++++-------- src/server/server.c | 1 + src/socket/Makefile.am | 2 +- src/socket/accept.c | 8 +------ src/socket/socket.c | 1 + src/stream/read.c | 25 ++++++++++++++++++++- src/taskrambler.c | 1 + 13 files changed, 94 insertions(+), 62 deletions(-) diff --git a/TODO b/TODO index 36e3e11..e73dfe4 100644 --- a/TODO +++ b/TODO @@ -17,3 +17,6 @@ VERY BIG TODO: GET /images/waldschrat.jpg HTTP/1.1^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D ^[[D = backspace... + +- the unexpected connection close seems to occur just on concurrency over + 1000 paralell connections...maybe this gives a hint. diff --git a/include/socket.h b/include/socket.h index 7e1442f..797aec1 100644 --- a/include/socket.h +++ b/include/socket.h @@ -40,6 +40,7 @@ CLASS(Sock) { void socketConnect(Sock this, const char * addr, char (*)[16]); void socketListen(Sock this, int backlog); Sock socketAccept(Sock this, char (*remoteAddr)[16]); +void socketNonblock(Sock this); #endif // __SOCKET_H__ diff --git a/src/cbuf/read.c b/src/cbuf/read.c index 1e49fa0..37c6ccf 100644 --- a/src/cbuf/read.c +++ b/src/cbuf/read.c @@ -31,8 +31,8 @@ ssize_t cbufRead(Cbuf this, Stream st) { - ssize_t rrsize = 0; - size_t rsize = cbufGetFree(this); + size_t rsize = cbufGetFree(this); + ssize_t rrsize; if (0 == rsize) { errno = ECBUFOVFL; @@ -41,17 +41,8 @@ cbufRead(Cbuf this, Stream st) rrsize = streamRead(st, cbufGetWrite(this), rsize); - switch (rrsize) { - case 0: - rrsize = -2; - // DROP THROUGH - - case -1: - break; - - default: - cbufIncWrite(this, rrsize); - break; + if (0 < rrsize) { + cbufIncWrite(this, rrsize); } return rrsize; diff --git a/src/http/parser/parse.c b/src/http/parser/parse.c index b9e7082..1a7abe4 100644 --- a/src/http/parser/parse.c +++ b/src/http/parser/parse.c @@ -61,6 +61,8 @@ httpParserParse(void * _this, Stream st) } if (0 > (read = cbufRead(this->buffer, st))) { + cbufRelease(this->buffer); + this->ourLock = FALSE; return read; } @@ -75,10 +77,9 @@ httpParserParse(void * _this, Stream st) cbufRelease(this->buffer); this->ourLock = FALSE; cont = 0; + break; } - break; - case HTTP_MESSAGE_START: if (NULL == (line = cbufGetLine(this->buffer, &line_end))) { if (! cbufIsEmpty(this->buffer)) { @@ -98,17 +99,16 @@ httpParserParse(void * _this, Stream st) if (NULL == this->current) { cbufRelease(this->buffer); this->ourLock = FALSE; - return -1; + return -2; // a server error occured can't process... } httpParserRequestVars(this); this->state = HTTP_MESSAGE_INTRO_DONE; - break; case HTTP_MESSAGE_INTRO_DONE: if (NULL == (line = cbufGetLine(this->buffer, &line_end))) { if (! cbufIsEmpty(this->buffer)) { - this->isize = this->buffer->bused; + this->isize = this->buffer->bused; this->incomplete = memMalloc(this->isize); memcpy(this->incomplete, cbufGetData(this->buffer, this->isize), @@ -120,35 +120,34 @@ httpParserParse(void * _this, Stream st) break; } - if (0 == strlen(line)) { - this->state = HTTP_MESSAGE_HEADERS_DONE; + if (0 != strlen(line)) { + httpParserHeader(this, line, line_end); break; } - httpParserHeader(this, line, line_end); - break; + this->state = HTTP_MESSAGE_HEADERS_DONE; case HTTP_MESSAGE_HEADERS_DONE: if (this->current->dbody == this->current->nbody) { this->state = HTTP_MESSAGE_DONE; - break; - } + } else { + if (cbufIsEmpty(this->buffer)) { + cbufRelease(this->buffer); + this->ourLock = FALSE; + cont = 0; + break; + } + + cbufIncRead( + this->buffer, + httpParserBody( + this, + cbufGetRead(this->buffer), + this->buffer->bused)); - if (cbufIsEmpty(this->buffer)) { - cbufRelease(this->buffer); - this->ourLock = FALSE; - cont = 0; break; } - cbufIncRead( - this->buffer, - httpParserBody( - this, - cbufGetRead(this->buffer), - this->buffer->bused)); - break; - case HTTP_MESSAGE_DONE: { HttpHeader enc = hashGet( diff --git a/src/http/worker/process.c b/src/http/worker/process.c index addcfe4..ee38054 100644 --- a/src/http/worker/process.c +++ b/src/http/worker/process.c @@ -54,10 +54,13 @@ char * httpWorkerGetMimeType(HttpWorker, const char * extension); ssize_t httpWorkerProcess(HttpWorker this, Stream st) { - ssize_t size; + ssize_t requests = httpParserParse(this->parser, st); - if (0 < (size = httpParserParse(this->parser, st))) { + if (0 > requests) { + return requests; + } + if (0 < requests) { while (! queueEmpty(this->parser->queue)) { HttpRequest request = queueGet(this->parser->queue); HttpMessage response = NULL; @@ -212,18 +215,13 @@ httpWorkerProcess(HttpWorker this, Stream st) } httpWorkerAddCommonHeader((HttpMessage)request, response); - delete(request); - queuePut(this->writer->queue, response); - - size = this->writer->queue->nmsg; - response = NULL; } } - return size; + return this->writer->queue->nmsg; } // vim: set ts=4 sw=4: diff --git a/src/server/handle_accept.c b/src/server/handle_accept.c index f7c912b..ab028b5 100644 --- a/src/server/handle_accept.c +++ b/src/server/handle_accept.c @@ -44,6 +44,7 @@ serverHandleAccept(Server this, unsigned int i) } acc = socketAccept((0 == i)? this->sock : this->sockSSL, &remoteAddr); + socketNonblock(acc); if (-1 != acc->handle) { switch(i) { diff --git a/src/server/run.c b/src/server/run.c index e0d243d..7bab40e 100644 --- a/src/server/run.c +++ b/src/server/run.c @@ -33,11 +33,12 @@ ssize_t serverWrite(Server, unsigned int); void serverRun(Server this) { + int events = 0; + loggerLog(this->logger, LOGGER_INFO, "service started"); while (!doShutdown) //! until error or signal { - int events = 0; unsigned int i; if (0 == events) { @@ -71,12 +72,25 @@ serverRun(Server this) if (0 != ((this->fds)[i].revents & POLLIN)) { ssize_t processed = serverRead(this, i); - if (0 < processed) { - (this->fds)[i].revents &= ~POLLIN; + // don't poll this one until I say. + (this->fds)[i].events &= ~POLLIN; + + if (0 > processed) { events--; + + switch (processed) { + case -1: // poll me again + (this->fds)[i].events |= POLLIN; + (this->fds)[i].revents &= ~POLLIN; + break; + + case -2: // close me... + serverCloseConn(this, i); + break; + } } - if (processed > 0) { + if (0 < processed) { (this->fds)[i].events |= POLLOUT; } } @@ -87,11 +101,20 @@ serverRun(Server this) if (0 != ((this->fds)[i].revents & POLLOUT)) { ssize_t remaining = serverWrite(this, i); - if (0 > remaining) { + (this->fds)[i].events &= ~POLLOUT; + + if (0 >= remaining) { + /* + * 0 means queue was empty...try again next + * time...no need to poll again. + * Anyway, most likely we need to read again + * so lets finish this event for now. + */ events--; switch (remaining) { case -1: // poll me again + (this->fds)[i].events |= POLLOUT; (this->fds)[i].revents &= ~POLLOUT; break; @@ -100,10 +123,6 @@ serverRun(Server this) break; } } - - if (0 == remaining) { - (this->fds)[i].events &= ~POLLOUT; - } } if (0 > events) diff --git a/src/server/server.c b/src/server/server.c index 11ce2d8..be73f61 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -70,6 +70,7 @@ serverCtor(void * _this, va_list * params) this->conns = memCalloc(sizeof(struct conns), this->max_fds); this->sock = new(Sock, this->logger, port); + socketNonblock(this->sock); flags = fcntl(this->sock->handle, F_GETFL, 0); fcntl(this->sock->handle, F_SETFL, flags | O_NONBLOCK); diff --git a/src/socket/Makefile.am b/src/socket/Makefile.am index b59bbc7..03b233a 100644 --- a/src/socket/Makefile.am +++ b/src/socket/Makefile.am @@ -2,5 +2,5 @@ ACLOCAL_AMFLAGS = -I m4 noinst_LIBRARIES = libsocket.a -libsocket_a_SOURCES = socket.c accept.c connect.c listen.c +libsocket_a_SOURCES = socket.c accept.c connect.c listen.c nonblock.c libsocket_a_CFLAGS = $(CFLAGS) -Wall -I ../../include/ diff --git a/src/socket/accept.c b/src/socket/accept.c index 3c7de3b..56a8790 100644 --- a/src/socket/accept.c +++ b/src/socket/accept.c @@ -22,6 +22,7 @@ #include // for errno #include +#include #include "socket.h" #include "class.h" @@ -36,13 +37,6 @@ socketAccept(Sock this, char (*remoteAddr)[16]) // Set the size of the in-out parameter len = sizeof(this->addr); - /** - * \todo Uhh, this is bad. we open a new socket additionally to - * the one we get from the accept call. - * i have to change the socket constructor to be able to create - * the data structure without creation of a socket at all. - * For now i simply close the socket here.... :D - */ sock = new(Sock, this->log, -1); // Wait for a client to connect diff --git a/src/socket/socket.c b/src/socket/socket.c index ead4f00..1429c6e 100644 --- a/src/socket/socket.c +++ b/src/socket/socket.c @@ -35,6 +35,7 @@ socketCtor(void * _this, va_list * params) Sock this = _this; int reUse = 1; //! \todo make this configurable int port; + int nonblock; this->log = va_arg(* params, Logger); port = va_arg(* params, int); diff --git a/src/stream/read.c b/src/stream/read.c index f27058d..4e49bc9 100644 --- a/src/stream/read.c +++ b/src/stream/read.c @@ -22,6 +22,7 @@ #include #include +#include #include "stream.h" @@ -31,8 +32,30 @@ streamRead(Stream this, void * buf, size_t count) ssize_t done; switch(this->type) { + ssize_t _read; + case STREAM_FD: - done = read((this->handle).fd, buf, count); + _read = read((this->handle).fd, buf, count); + + if (_read < 0) { + switch (errno) { + case EINTR: + case ENOMEM: + done = 0; + break; + case (EAGAIN|EWOULDBLOCK): + done = -1; + break; + default: + done = -2; + break; + } + } else if (_read == 0) { + done = -2; + } else { + done = _read; + } + break; case STREAM_SSL: diff --git a/src/taskrambler.c b/src/taskrambler.c index d18353a..79330e7 100644 --- a/src/taskrambler.c +++ b/src/taskrambler.c @@ -45,6 +45,7 @@ #include "utils/signalHandling.h" #include "utils/memory.h" +#include "utils/mime_type.h" #define DEFAULT_SECS 10 //#define DEFAULT_USECS (1000000 / HZ * 2)