diff --git a/include/http/parser.h b/include/http/parser.h index 43a4bb1..be9d810 100644 --- a/include/http/parser.h +++ b/include/http/parser.h @@ -37,6 +37,8 @@ #define FALSE ((void *)0) #endif +#define PARSER_MAX_BUF 131072 + typedef enum e_HttpMessageState { HTTP_MESSAGE_GARBAGE=0, diff --git a/include/http/worker.h b/include/http/worker.h index 73dc9a4..19bf1dd 100644 --- a/include/http/worker.h +++ b/include/http/worker.h @@ -28,11 +28,9 @@ #include "class.h" #include "http/parser.h" -#include "http/response/writer.h" +#include "http/writer.h" #include "cbuf.h" -#define RESPONSE_WRITER_MAX_BUF 131072 -#define REQUEST_PARSER_BUFFER_MAX 8192 #ifndef TRUE #define TRUE ((void *)1) @@ -42,15 +40,16 @@ #define FALSE ((void *)0) #endif + CLASS(HttpWorker) { - char * id; - int * val; + char * id; + int * val; - Cbuf pbuf; - Cbuf wbuf; + Cbuf pbuf; + Cbuf wbuf; - HttpParser parser; - HttpResponseWriter writer; + HttpParser parser; + HttpWriter writer; }; #endif // __HTTP_WORKER_H__ diff --git a/include/http/writer.h b/include/http/writer.h index 74efd3c..b46e198 100644 --- a/include/http/writer.h +++ b/include/http/writer.h @@ -21,13 +21,13 @@ * - along with this program. If not, see . */ -#ifndef __HTTP_RESPONSE_WRITER_H__ -#define __HTTP_RESPONSE_WRITER_H__ +#ifndef __HTTP_WRITER_H__ +#define __HTTP_WRITER_H__ #include #include "class.h" -#include "http/response.h" +#include "http/message.h" #include "http/message/queue.h" #include "cbuf.h" @@ -39,29 +39,31 @@ #define FALSE ((void *)0) #endif +#define WRITER_MAX_BUF 131072 -typedef enum e_HttpResponseState { - HTTP_RESPONSE_GET=0, - HTTP_RESPONSE_WRITE, - HTTP_RESPONSE_DONE -} HttpResponseState; -CLASS(HttpResponseWriter) { - Cbuf buffer; - void * ourLock; +typedef enum e_HttpWriterState { + HTTP_WRITER_GET=0, + HTTP_WRITER_WRITE, + HTTP_WRITER_DONE +} HttpWriterState; - HttpMessageQueue response_queue; - HttpResponse cur_response; +CLASS(HttpWriter) { + Cbuf buffer; + void * ourLock; - size_t nheader; - size_t nbody; - size_t written; + HttpMessageQueue queue; + HttpMessage current; - HttpResponseState state; + size_t nheader; + size_t nbody; + size_t written; + + HttpWriterState state; }; -ssize_t httpResponseWriterWrite(HttpResponseWriter, int); +ssize_t httpWriterWrite(void *, int); -#endif // __HTTP_RESPONSE_WRITER_H__ +#endif // __HTTP_WRITER_H__ // vim: set ts=4 sw=4: diff --git a/src/Makefile.am b/src/Makefile.am index 49cbb74..0f3fd26 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,7 +6,7 @@ IFACE = interface/class.c interface/stream_reader.c interface/logger.c \ interface/subject.c interface/observer.c interface.c SOCKET = socket.c socket/accept.c socket/connect.c socket/listen.c SERVER = server.c server/run.c server/close_conn.c server/poll.c \ - server/handle_accept.c server/read.c + server/handle_accept.c server/read.c server/write.c LOGGER = logger.c logger/stderr.c logger/syslog.c CB = cbuf.c cbuf/read.c cbuf/write.c \ cbuf/get_line.c cbuf/set_data.c cbuf/get_data.c \ @@ -33,12 +33,13 @@ PARSER = http/parser.c \ http/parser/new_message.c \ http/parser/header.c \ http/parser/body.c +WRITER = http/writer.c \ + http/writer/write.c WORKER = http/worker.c \ http/worker/process.c \ http/worker/write.c \ http/worker/get_asset.c \ http/worker/add_common_header.c -WRITER = http/response/writer.c http/response/writer/write.c HEADER = http/header.c http/header/get.c http/header/add.c \ http/header/to_string.c UTILS = utils/hash.c \ diff --git a/src/http/worker.c b/src/http/worker.c index 5e84d55..9f3fb2d 100644 --- a/src/http/worker.c +++ b/src/http/worker.c @@ -7,7 +7,7 @@ #include "class.h" #include "http/worker.h" #include "http/parser.h" -#include "http/response/writer.h" +#include "http/writer.h" #include "interface/class.h" #include "interface/stream_reader.h" @@ -29,13 +29,13 @@ httpWorkerCtor(void * _this, va_list * params) this->val = val; sprintf(cbuf_id, "%s_%s", "parser", id); - this->pbuf = new(Cbuf, cbuf_id, REQUEST_PARSER_BUFFER_MAX); + this->pbuf = new(Cbuf, cbuf_id, PARSER_MAX_BUF); sprintf(cbuf_id, "%s_%s", "writer", id); - this->wbuf = new(Cbuf, cbuf_id, RESPONSE_WRITER_MAX_BUF); + this->wbuf = new(Cbuf, cbuf_id, WRITER_MAX_BUF); this->parser = new(HttpParser, this->pbuf); - this->writer = new(HttpResponseWriter, this->wbuf); + this->writer = new(HttpWriter, this->wbuf); return 0; } @@ -51,30 +51,27 @@ httpWorkerDtor(void * _this) delete(this->parser); delete(this->writer); - if (NULL != this->pbuf) delete(this->pbuf); - if (NULL != this->wbuf) delete(this->wbuf); + delete(this->pbuf); //!< cloned workers have NULL, so delete won't do anything + delete(this->wbuf); //!< cloned workers have NULL, so delete won't do anything } static void -_clone(void * _this, void * _base) +httpWorkerClone(void * _this, void * _base) { HttpWorker this = _this; HttpWorker base = _base; - this->id = NULL; this->val = base->val; - this->pbuf = NULL; - this->wbuf = NULL; this->parser = new(HttpParser, base->pbuf); - this->writer = new(HttpResponseWriter, base->wbuf); + this->writer = new(HttpWriter, base->wbuf); } ssize_t httpWorkerProcess(void *, int); ssize_t httpWorkerWrite(void *, int); -INIT_IFACE(Class, httpWorkerCtor, httpWorkerDtor, _clone); +INIT_IFACE(Class, httpWorkerCtor, httpWorkerDtor, httpWorkerClone); INIT_IFACE(StreamReader, httpWorkerProcess); INIT_IFACE(StreamWriter, httpWorkerWrite); CREATE_CLASS( diff --git a/src/http/worker/process.c b/src/http/worker/process.c index b244e2a..1362b79 100644 --- a/src/http/worker/process.c +++ b/src/http/worker/process.c @@ -28,6 +28,7 @@ #include "http/worker.h" #include "http/message.h" #include "http/request.h" +#include "http/response.h" #include "http/message/queue.h" #include "http/parser.h" @@ -45,7 +46,7 @@ httpWorkerProcess(HttpWorker this, int fd) if (0 < (size = httpParserParse(this->parser, fd))) { int i; HttpMessageQueue reqq = this->parser->queue; - HttpMessageQueue respq = this->writer->response_queue; + HttpMessageQueue respq = this->writer->queue; for (i=0; inmsgs; i++) { HttpRequest request = (HttpRequest)(reqq->msgs[i]); diff --git a/src/http/worker/write.c b/src/http/worker/write.c index 16f8d9e..ed3da6e 100644 --- a/src/http/worker/write.c +++ b/src/http/worker/write.c @@ -23,12 +23,12 @@ #include #include "http/worker.h" -#include "http/response/writer.h" +#include "http/writer.h" ssize_t httpWorkerWrite(HttpWorker this, int fd) { - return httpResponseWriterWrite(this->writer, fd); + return httpWriterWrite(this->writer, fd); } // vim: set ts=4 sw=4: diff --git a/src/http/writer.c b/src/http/writer.c index a0afc7a..00104e9 100644 --- a/src/http/writer.c +++ b/src/http/writer.c @@ -27,37 +27,37 @@ #include "interface/stream_writer.h" #include "http/message/queue.h" -#include "http/response/writer.h" +#include "http/writer.h" static int -responseWriterCtor(void * _this, va_list * params) +httpWriterCtor(void * _this, va_list * params) { - HttpResponseWriter this = _this; + HttpWriter this = _this; - this->buffer = va_arg(*params, Cbuf); - this->response_queue = new(HttpMessageQueue); + this->buffer = va_arg(*params, Cbuf); + this->queue = new(HttpMessageQueue); return 0; } static void -responseWriterDtor(void * _this) +httpWriterDtor(void * _this) { - HttpResponseWriter this = _this; + HttpWriter this = _this; - delete(this->response_queue); + delete(this->queue); if (TRUE == this->ourLock) cbufRelease(this->buffer); - if (NULL != this->cur_response) - delete(this->cur_response); + if (NULL != this->current) + delete(this->current); } -INIT_IFACE(Class, responseWriterCtor, responseWriterDtor, NULL); -INIT_IFACE(StreamWriter, (fptr_streamWriterWrite)httpResponseWriterWrite); -CREATE_CLASS(HttpResponseWriter, NULL, IFACE(Class), IFACE(StreamWriter)); +INIT_IFACE(Class, httpWriterCtor, httpWriterDtor, NULL); +INIT_IFACE(StreamWriter, httpWriterWrite); +CREATE_CLASS(HttpWriter, NULL, IFACE(Class), IFACE(StreamWriter)); // vim: set ts=4 sw=4: diff --git a/src/http/writer/write.c b/src/http/writer/write.c index 3b51eaa..5f3de61 100644 --- a/src/http/writer/write.c +++ b/src/http/writer/write.c @@ -26,21 +26,18 @@ #include "class.h" #include "interface/class.h" #include "http/message.h" -#include "http/response.h" -#include "http/response/writer.h" +#include "http/writer.h" #include "cbuf.h" #define MIN(x,y) ((x) < (y) ? (x) : (y)) #define MAX(x,y) ((x) > (y) ? (x) : (y)) -#define _PSIZE(x) (MAX((x),RESPONSE_WRITER_MAX_BUF)) -#define PSIZE _PSIZE(this->nheader+message->nbody) ssize_t -httpResponseWriterWrite(HttpResponseWriter this, int fd) +httpWriterWrite(void * _this, int fd) { - HttpMessageQueue respq = this->response_queue; - HttpMessage message = (HttpMessage)this->cur_response; - int cont = 1; + HttpWriter this = _this; + HttpMessageQueue respq = this->queue; + int cont = 1; if (cbufIsLocked(this->buffer)) { if (FALSE == this->ourLock) @@ -53,19 +50,20 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) while (cont) { switch (this->state) { - case HTTP_RESPONSE_GET: - if (NULL == this->cur_response && 0 < respq->nmsgs) { - message = respq->msgs[0]; - this->cur_response = (HttpResponse)message; + case HTTP_WRITER_GET: + if (NULL == this->current && 0 < respq->nmsgs) { + this->current = respq->msgs[0]; this->written = 0; this->nbody = 0; - this->nheader = httpMessageHeaderSizeGet(message); + this->nheader = httpMessageHeaderSizeGet(this->current); - httpMessageHeaderToString(message, cbufGetWrite(this->buffer)); + httpMessageHeaderToString( + this->current, + cbufGetWrite(this->buffer)); cbufIncWrite(this->buffer, this->nheader); - this->state = HTTP_RESPONSE_WRITE; + this->state = HTTP_WRITER_WRITE; } else { cbufRelease(this->buffer); @@ -74,24 +72,24 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) } break; - case HTTP_RESPONSE_WRITE: + case HTTP_WRITER_WRITE: /** * read */ - if (this->nbody < message->nbody) { + if (this->nbody < this->current->nbody) { size_t size = MIN( - message->nbody - this->nbody, + this->current->nbody - this->nbody, cbufGetFree(this->buffer)); - switch (message->type) { + switch (this->current->type) { case HTTP_MESSAGE_BUFFERED: cbufSetData(this->buffer, - message->body + this->nbody, + this->current->body + this->nbody, size); break; case HTTP_MESSAGE_PIPED: - size = cbufRead(this->buffer, message->handle); + size = cbufRead(this->buffer, this->current->handle); break; default: @@ -115,42 +113,41 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd) } } - if (this->written == message->nbody + this->nheader) { - this->state = HTTP_RESPONSE_DONE; + if (this->written == this->current->nbody + this->nheader) { + this->state = HTTP_WRITER_DONE; } else { cont = 0; } break; - case HTTP_RESPONSE_DONE: - if (HTTP_MESSAGE_PIPED == message->type) { - close(message->handle); + case HTTP_WRITER_DONE: + if (HTTP_MESSAGE_PIPED == this->current->type) { + close(this->current->handle); } - this->state = HTTP_RESPONSE_GET; + this->state = HTTP_WRITER_GET; memmove(respq->msgs, &(respq->msgs[1]), sizeof(void*) * (--respq->nmsgs + 1)); - if (! httpMessageHasKeepAlive(message)) { + cbufRelease(this->buffer); + this->ourLock = FALSE; + + if (! httpMessageHasKeepAlive(this->current)) { /** * if the message did not have the keep-alive feature * we don't care about further pipelined messages and * return to the caller with a -1 indicating that the - * underlying connection should be closed. + * underlying connection should be closed at their side. + * Then we close to connection. */ - cbufRelease(this->buffer); - this->ourLock = FALSE; - delete(this->cur_response); + delete(this->current); return -1; } - cbufRelease(this->buffer); - this->ourLock = FALSE; - delete(this->cur_response); - + delete(this->current); break; } } diff --git a/src/server/poll.c b/src/server/poll.c index 2944710..4ed8a98 100644 --- a/src/server/poll.c +++ b/src/server/poll.c @@ -51,7 +51,7 @@ serverPoll(Server this) { if (fda < fdb) { memcpy(fda, fdb, sizeof(struct pollfd)); - memset(fdb, 0, sizeof(struct pollfd)); + //memset(fdb, 0, sizeof(struct pollfd)); // this might be unneccessary fdb--; this->nfds--; } diff --git a/src/server/read.c b/src/server/read.c index f08db59..f8b1b22 100644 --- a/src/server/read.c +++ b/src/server/read.c @@ -24,6 +24,8 @@ #include "interface/logger.h" #include "interface/stream_reader.h" +void serverCloseConn(Server, unsigned int); + ssize_t serverRead(Server this, unsigned int i) { @@ -57,9 +59,14 @@ serverRead(Server this, unsigned int i) "connection[%d] closed...%s", fd, inet_ntoa((((this->conns)[fd].sock)->addr).sin_addr)); + serverCloseConn(this, i); + break; + + case 0: break; default: + (this->fds)[i].events |= POLLOUT; break; } diff --git a/src/server/run.c b/src/server/run.c index 97aff94..2f84fba 100644 --- a/src/server/run.c +++ b/src/server/run.c @@ -21,112 +21,66 @@ */ #include "server.h" -#include "interface/stream_writer.h" #include "interface/logger.h" #include "utils/signalHandling.h" -#undef MAX -#define MAX(x,y) ((x) > (y) ? (x) : (y)) - int serverPoll(Server); int serverHandleAccept(Server); void serverCloseConn(Server, unsigned int); ssize_t serverRead(Server, unsigned int); +ssize_t serverWrite(Server, unsigned int); void serverRun(Server this) { loggerLog(this->logger, LOGGER_INFO, "service started"); - /** - * \todo actually this is the main loop of my server. When - * stuff becomes more complicated it might be feasabible to - * split stuff into separate processes. This will definetly - * involve some IPC and syncing. Right now as this is actually - * only a simple HTTP server implementation we go on with - * this single process. - * What we can first do to get some processing between read/write - * cicles is to use the poll timeout. - * A first candidate for a separate process would be the - * generation of the responses piped responses then still need - * to open the filehandle in this process and reading and - * writing would be done here. So the benefit might not be - * very big. Otherwise we could share the read and write - * ringbuffer as well as the message queues. Then the child - * process can do the file readings, but this would involve - * some more IPC. - */ while (!doShutdown) //! until error or signal { int events; unsigned int i; + int naccs = 10; events = serverPoll(this); - if (doShutdown) break; + if (doShutdown || 0 >= events) break; + + /** + * handle accept + */ + if (0 != ((this->fds)[0].revents & POLLIN)) { + events--; + while(-1 != serverHandleAccept(this) && 0 < naccs) { + naccs--; + } + } - for (i=0; i < this->nfds; i++) { + for (i=1; i < this->nfds; i++) { int fd = (this->fds)[i].fd; - int naccs = 10, nreads = 10, nwrites = 10; - - if (0 >= events) break; + int nreads = 10, nwrites = 10; + /** + * handle reads + */ if (0 != ((this->fds)[i].revents & POLLIN) && 0 < nreads) { events--; + nreads--; - /** - * handle accept - */ - if (this->sock->handle == (this->fds)[i].fd) { - while(-1 != serverHandleAccept(this) && 0 < naccs) { - naccs--; - } - } - - /** - * handle reads - */ - else { - nreads--; - - switch (serverRead(this, i)) { - case 0: - break; - - case -1: - case -2: - serverCloseConn(this, i); - break; - - default: - (this->fds)[i].events |= POLLOUT; - } - } + serverRead(this, i); } /** * handle writes */ if (0 != ((this->fds)[i].revents & POLLOUT) && 0 < nwrites) { - size_t remaining; - events--; nwrites--; - remaining = streamWriterWrite((this->conns)[fd].worker, fd); - switch(remaining) { - case -1: - serverCloseConn(this, i); - break; - - case 0: - (this->fds)[i].events &= ~POLLOUT; - break; - - default: - break; - } + serverWrite(this, i); } + + if (0 > events) + break; // no more events to handle } } } diff --git a/src/server/write.c b/src/server/write.c new file mode 100644 index 0000000..7c9124f --- /dev/null +++ b/src/server/write.c @@ -0,0 +1,60 @@ +/** + * \file + * + * \author Georg Hopp + * + * \copyright + * Copyright (C) 2012 Georg Hopp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "server.h" +#include "interface/logger.h" +#include "interface/stream_writer.h" + +void serverCloseConn(Server, unsigned int); + +ssize_t +serverWrite(Server this, unsigned int i) +{ + int fd = (this->fds)[i].fd; + ssize_t remaining; + + if (NULL == (this->conns)[fd].worker) { + loggerLog( + this->logger, + LOGGER_INFO, + "initialization error: NULL reader"); + return -1; + } + + remaining = streamWriterWrite((this->conns)[fd].worker, fd); + switch(remaining) { + case -1: + serverCloseConn(this, i); + break; + + case 0: + (this->fds)[i].events &= ~POLLOUT; + break; + + default: + break; + } + + return remaining; +} + +// vim: set ts=4 sw=4: diff --git a/src/socket.c b/src/socket.c index 3d85108..2bd9e7c 100644 --- a/src/socket.c +++ b/src/socket.c @@ -40,7 +40,7 @@ socketCtor(void * _this, va_list * params) this->port = va_arg(* params, int); //! Create socket for incoming connections - if (-1 == (this->handle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP))) { + if (-1 == (this->handle = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP))) { loggerLog(this->log, LOGGER_CRIT, "error opening socket: %s - service terminated", strerror(errno)); diff --git a/src/socket/connect.c b/src/socket/connect.c index fe5870d..c0d8398 100644 --- a/src/socket/connect.c +++ b/src/socket/connect.c @@ -35,7 +35,11 @@ socketConnect(Sock this, const char * addr) (this->addr).sin_family = AF_INET; // Internet address family (this->addr).sin_port = htons(this->port); // Local port - if (-1 == connect(this->handle, (struct sockaddr*) &(this->addr), sizeof(this->addr))) { + if (-1 == connect( + this->handle, + (struct sockaddr*) &(this->addr), + sizeof(this->addr))) + { loggerLog(this->log, LOGGER_CRIT, "error connection socket: %s - service terminated", strerror(errno)); diff --git a/src/testserver.c b/src/testserver.c index 3f38f9a..f1dd1f9 100644 --- a/src/testserver.c +++ b/src/testserver.c @@ -132,7 +132,7 @@ main() close(shm); logger = new(LoggerSyslog, LOGGER_ERR); - worker = new(HttpWorker, "my", value); + worker = new(HttpWorker, "testserver", value); server = new(Server, logger, worker, 11212, SOMAXCONN); //daemonize();