From 0e1ec664630cd489b555f117904f8dc51bdfbfdf Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Tue, 3 Sep 2013 15:48:12 +0100 Subject: [PATCH] this is a huge one...removed cbuf for writing again.... This is not needed any more because I use memory mapped io for these. Additionally sanitised write error handling somewhat...anyway, under huge load I still observer unexpected connection closes. But maybe this is related to interrupt while reading...(reading is not handled very well right now) --- include/http/writer.h | 30 ++++++--- include/utils/memory.h | 10 +-- src/cbuf/Makefile.am | 2 +- src/cbuf/write.c | 56 ---------------- src/http/worker.c | 26 ++++++-- src/http/worker/get_asset.c | 4 ++ src/http/worker/process.c | 3 + src/http/writer.c | 13 ++-- src/http/writer/write.c | 129 +++++++++++++++++++++--------------- src/server/run.c | 33 +++++++-- src/server/write.c | 57 +--------------- src/stream/write.c | 24 ++++++- src/utils/memory.c | 18 +++-- 13 files changed, 204 insertions(+), 201 deletions(-) delete mode 100644 src/cbuf/write.c diff --git a/include/http/writer.h b/include/http/writer.h index 25f641f..4066dbc 100644 --- a/include/http/writer.h +++ b/include/http/writer.h @@ -29,7 +29,6 @@ #include "class.h" #include "http/message.h" #include "queue.h" -#include "cbuf.h" #include "stream.h" #include "commons.h" @@ -58,8 +57,21 @@ * And as I will also implement a cbuf pool, this memory will not be * freed before application end. */ -#define WRITER_MAX_BUF 131072 +#define WRITER_MAX_BUF 131072 +/* + * This is the multiplier for the size of the initial write buffer. + * It is used to store the + * string representation of the message, as well as the first part of + * the body if the headers exceed the size a multiple of this will + * be reserved...very unlikely, but not impossible. + * If no the whole body fits within this buffer only part of it will + * be copied in there. The rest will be send in following send calls. + */ +#define WRITER_BUF_CHUNK 1024 * 10 // our default buffer chunk for + // headers is 10k. This will result + // in at least 20m for 2000 concurrent + // connections. typedef enum e_HttpWriterState { HTTP_WRITER_GET=0, @@ -68,15 +80,15 @@ typedef enum e_HttpWriterState { } HttpWriterState; CLASS(HttpWriter) { - Cbuf buffer; - Bool ourLock; + char * buffer; - Queue queue; - HttpMessage current; + Queue queue; + HttpMessage current; - size_t nheader; - size_t nbody; - size_t written; + size_t nbuffer; // size of buffer + size_t nheader; // size headers in buf + size_t nbody; // sizeof body in buffer + size_t written; // already written bytes HttpWriterState state; }; diff --git a/include/utils/memory.h b/include/utils/memory.h index 6d886e5..ed69685 100644 --- a/include/utils/memory.h +++ b/include/utils/memory.h @@ -28,11 +28,13 @@ #define FREE(val) (ffree((void**)&(val))) #define MEM_FREE(seg) (memFree((void **)&(seg))) +#include -void * memMalloc(size_t); -void * memCalloc(size_t, size_t); -void memFree(void **); -void memCleanup(); +void * memMalloc(size_t); +void * memCalloc(size_t, size_t); +void memFree(void **); +size_t memGetSize(void *); +void memCleanup(); void ffree(void **); diff --git a/src/cbuf/Makefile.am b/src/cbuf/Makefile.am index 8254eeb..9a8ff86 100644 --- a/src/cbuf/Makefile.am +++ b/src/cbuf/Makefile.am @@ -1,6 +1,6 @@ ACLOCAL_AMFLAGS = -I m4 -CB = cbuf.c read.c write.c \ +CB = cbuf.c read.c \ get_line.c set_data.c get_data.c \ addr_index.c get_free.c get_read.c get_write.c \ inc_read.c inc_write.c is_empty.c memchr.c \ diff --git a/src/cbuf/write.c b/src/cbuf/write.c deleted file mode 100644 index 5869f93..0000000 --- a/src/cbuf/write.c +++ /dev/null @@ -1,56 +0,0 @@ -/** - * \file - * - * \author Georg Hopp - * - * \copyright - * Copyright © 2012 Georg Hopp - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -#include -#include - -#include "cbuf.h" -#include "stream.h" - - -ssize_t -cbufWrite(Cbuf this, Stream st) -{ - ssize_t wwsize = 0; - size_t wsize = this->bused; - - if (0 == wsize) return 0; - - wwsize = streamWrite(st, cbufGetRead(this), wsize); - - switch (wwsize) { - case 0: - wwsize = -2; - // DROP THROUGH - - case -1: - break; - - default: - cbufIncRead(this, wwsize); - break; - } - - return wwsize; -} - -// vim: set ts=4 sw=4: diff --git a/src/http/worker.c b/src/http/worker.c index bde7481..d5845be 100644 --- a/src/http/worker.c +++ b/src/http/worker.c @@ -57,11 +57,8 @@ httpWorkerCtor(void * _this, va_list * params) sprintf(cbuf_id, "%s_%s", "parser", id); this->pbuf = new(Cbuf, cbuf_id, PARSER_MAX_BUF); - sprintf(cbuf_id, "%s_%s", "writer", id); - this->wbuf = new(Cbuf, cbuf_id, WRITER_MAX_BUF); - this->parser = new(HttpParser, this->pbuf); - this->writer = new(HttpWriter, this->wbuf); + this->writer = new(HttpWriter); this->sroot = &(this->session); this->auth = va_arg(* params, void *); @@ -108,7 +105,26 @@ httpWorkerClone(void * _this, void * _base) this->asset_pool = base->asset_pool; this->parser = new(HttpParser, base->pbuf); - this->writer = new(HttpWriter, base->wbuf); + /* + * I am pretty sure that it is not neccessary to have a + * separeate writer for each connection... + * Right now I leave it that way. + * TODO check this. + * OK some facts: + * - the stream as well as the worker are associated + * to the filehandle within the server. + * - the response queue is located within the writer. + * (this might be wrong...the response queue should + * be part of the worker. That way I could give it + * into the writer when writing. That way only one + * instance of the writer might be possible...) + * NO, the previous statement is wrong...this would + * involve much more organization overhead within + * the writer...queue change and such... + * At the end I think it might be best to leave it as + * it is. + */ + this->writer = new(HttpWriter); this->sroot = &(base->session); this->auth = base->auth; diff --git a/src/http/worker/get_asset.c b/src/http/worker/get_asset.c index 8c31f08..d833e0d 100644 --- a/src/http/worker/get_asset.c +++ b/src/http/worker/get_asset.c @@ -60,6 +60,10 @@ httpWorkerGetAsset( asset = assetPoolGet(fname, nfname); + if (NULL == asset) { + return (HttpMessage)httpResponse404(); + } + if (asset->netag == nmatch && 0 == memcmp(asset->etag, match, asset->netag)) { assetPoolRelease(asset); diff --git a/src/http/worker/process.c b/src/http/worker/process.c index 2f603ea..addcfe4 100644 --- a/src/http/worker/process.c +++ b/src/http/worker/process.c @@ -216,6 +216,9 @@ httpWorkerProcess(HttpWorker this, Stream st) delete(request); queuePut(this->writer->queue, response); + + size = this->writer->queue->nmsg; + response = NULL; } } diff --git a/src/http/writer.c b/src/http/writer.c index e905ba6..5d390cd 100644 --- a/src/http/writer.c +++ b/src/http/writer.c @@ -28,14 +28,15 @@ #include "queue.h" #include "http/writer.h" +#include "utils/memory.h" + static int httpWriterCtor(void * _this, va_list * params) { HttpWriter this = _this; - this->buffer = va_arg(*params, Cbuf); - this->queue = new(Queue); + this->queue = new(Queue); return 0; } @@ -48,11 +49,13 @@ httpWriterDtor(void * _this) delete(this->queue); - if (TRUE == this->ourLock) - cbufRelease(this->buffer); + if (NULL != this->buffer) { + MEM_FREE(this->buffer); + } - if (NULL != this->current) + if (NULL != this->current) { delete(this->current); + } } INIT_IFACE(Class, httpWriterCtor, httpWriterDtor, NULL); diff --git a/src/http/writer/write.c b/src/http/writer/write.c index de4f95e..0ca12d3 100644 --- a/src/http/writer/write.c +++ b/src/http/writer/write.c @@ -27,10 +27,10 @@ #include "http/message.h" #include "queue.h" #include "http/writer.h" -#include "cbuf.h" #include "stream.h" #include "commons.h" +#include "utils/memory.h" ssize_t @@ -39,92 +39,111 @@ httpWriterWrite(void * _this, Stream st) HttpWriter this = _this; int cont = 1; - if (cbufIsLocked(this->buffer)) { - if (FALSE == this->ourLock) - return 0; - } - else { - cbufLock(this->buffer); - this->ourLock = TRUE; - } - while (cont) { switch (this->state) { + char * start; + ssize_t to_write; + ssize_t written; + case HTTP_WRITER_GET: - if (NULL == this->current && ! queueEmpty(this->queue)) { + if (! queueEmpty(this->queue)) { this->current = queueGet(this->queue); this->written = 0; - this->nbody = 0; this->nheader = httpMessageHeaderSizeGet(this->current); - httpMessageHeaderToString( - this->current, - cbufGetWrite(this->buffer)); - cbufIncWrite(this->buffer, this->nheader); + if (this->nheader > memGetSize(this->buffer)) { + ssize_t size = this->nheader; + + size = (0 != size%WRITER_BUF_CHUNK)? + (size/WRITER_BUF_CHUNK)+1 : + size/WRITER_BUF_CHUNK; + size *= WRITER_BUF_CHUNK; + + if (NULL != this->buffer) { + MEM_FREE(this->buffer); + } + + this->buffer = memMalloc(size); + this->nbuffer = size; + } + + httpMessageHeaderToString(this->current, this->buffer); + + this->nbody = MIN( + this->current->nbody, + this->nbuffer - this->nheader); + + memcpy( + this->buffer + this->nheader, + this->current->body, + this->nbody); this->state = HTTP_WRITER_WRITE; } else { - cbufRelease(this->buffer); - this->ourLock = FALSE; - cont = 0; + cont = 0; + break; } - break; case HTTP_WRITER_WRITE: - /** - * read - */ - if (this->nbody < this->current->nbody) { - size_t size = MIN( - this->current->nbody - this->nbody, - cbufGetFree(this->buffer)); - - cbufSetData(this->buffer, - this->current->body + this->nbody, - size); - - this->nbody += size; + if (this->written >= this->nbuffer) { + size_t body_done = this->written - this->nheader; + + start = this->current->body + body_done; + to_write = this->current->nbody - body_done; + } else { + start = this->buffer + this->written; + to_write = (this->nheader + this->nbody) - this->written; } - /** - * write - */ - { - ssize_t written = cbufWrite(this->buffer, st); + written = streamWrite(st, start, to_write); - if (0 > written) { - return written; - } + if (written < 0) { + return written; + } - this->written += written; + this->written += written; + + if (written != to_write) { + /* + * for some reason not all data could be + * written...most likely its a slow connection + * so, not to slow down the server we stop + * writing to this one now and come back to + * it in the next run....maybe it would be + * feasable to also implement some kind of + * timeout mechanism for writes... + * By the way, the same is true for reading, + * so to say, the parser. + */ + // cont = 0; + // break; + + /* to go a step further...we send it to the + * poll cicle again... */ + return -1; } - if (this->written == this->current->nbody + this->nheader) { + if (this->written >= this->nheader + this->current->nbody) { + // we are done with this message. this->state = HTTP_WRITER_DONE; + } else { + break; } - else { - cont = 0; - } - break; case HTTP_WRITER_DONE: this->state = HTTP_WRITER_GET; - cbufRelease(this->buffer); - this->ourLock = FALSE; - if (! httpMessageHasKeepAlive(this->current)) { /** * if the message did not have the keep-alive feature * we don't care about further pipelined messages and - * return to the caller with a -1 indicating that the + * return to the caller with a -2 indicating that the * underlying connection should be closed at their side. * Then we close to connection. */ - delete(this->current); - return -1; + return -2; } delete(this->current); @@ -132,7 +151,9 @@ httpWriterWrite(void * _this, Stream st) } } - return this->queue->nmsg; + return NULL == this->current ? + this->queue->nmsg : + this->queue->nmsg + 1; } // vim: set ts=4 sw=4: diff --git a/src/server/run.c b/src/server/run.c index 67b8497..e0d243d 100644 --- a/src/server/run.c +++ b/src/server/run.c @@ -49,7 +49,7 @@ serverRun(Server this) */ if (0 != ((this->fds)[0].revents & POLLIN)) { if (-1 == serverHandleAccept(this, 0)) { - (this->fds)[0].revents |= ~POLLIN; + (this->fds)[0].revents &= ~POLLIN; events--; } } @@ -59,7 +59,7 @@ serverRun(Server this) */ if (0 != ((this->fds)[1].revents & POLLIN)) { if (-1 == serverHandleAccept(this, 1)) { - (this->fds)[1].revents |= ~POLLIN; + (this->fds)[1].revents &= ~POLLIN; events--; } } @@ -69,19 +69,40 @@ serverRun(Server this) * handle reads */ if (0 != ((this->fds)[i].revents & POLLIN)) { - if (0 < serverRead(this, i)) { - (this->fds)[i].revents |= ~POLLIN; + ssize_t processed = serverRead(this, i); + + if (0 < processed) { + (this->fds)[i].revents &= ~POLLIN; events--; } + + if (processed > 0) { + (this->fds)[i].events |= POLLOUT; + } } /** * handle writes */ if (0 != ((this->fds)[i].revents & POLLOUT)) { - if (0 < serverWrite(this, i)) { - (this->fds)[i].revents |= ~POLLOUT; + ssize_t remaining = serverWrite(this, i); + + if (0 > remaining) { events--; + + switch (remaining) { + case -1: // poll me again + (this->fds)[i].revents &= ~POLLOUT; + break; + + case -2: // close me... + serverCloseConn(this, i); + break; + } + } + + if (0 == remaining) { + (this->fds)[i].events &= ~POLLOUT; } } diff --git a/src/server/write.c b/src/server/write.c index 30bf1bf..11896c8 100644 --- a/src/server/write.c +++ b/src/server/write.c @@ -32,71 +32,18 @@ ssize_t serverWrite(Server this, unsigned int i) { int fd = (this->fds)[i].fd; - ssize_t remaining; if (NULL == (this->conns)[fd].worker) { loggerLog( this->logger, LOGGER_INFO, "initialization error: NULL worker"); - return -1; + return -2; } - remaining = streamWriterWrite( + return streamWriterWrite( (this->conns)[fd].worker, (this->conns)[fd].stream); - - switch(remaining) { - case -1: - /* - * read failure - */ - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* on EGAIN just try again later. */ - break; - } - // DROP-THROUGH - - case -2: - /** - * normal close: this must be mapped to -2 within the - * underlying read call. - * - * \todo make sure all pending writes will be done before - * close. - */ - - /* - * close connection if not EAGAIN, this would also - * remove the filedescriptor from the poll list. - * Else just return indicate - */ - loggerLog(this->logger, LOGGER_INFO, - "connection[%d] closed...%s", - fd, - inet_ntoa((((this->conns)[fd].sock)->addr).sin_addr)); - serverCloseConn(this, i); - - //case 0: - // break; - - default: - // (this->fds)[i].events |= POLLOUT; - break; - -// case -1: -// serverCloseConn(this, i); -// break; -// -// case 0: -// (this->fds)[i].events &= ~POLLOUT; -// break; -// -// default: -// break; - } - - return remaining; } // vim: set ts=4 sw=4: diff --git a/src/stream/write.c b/src/stream/write.c index 720f5e4..5e9b346 100644 --- a/src/stream/write.c +++ b/src/stream/write.c @@ -22,6 +22,7 @@ #include #include +#include #include "stream.h" @@ -31,8 +32,29 @@ streamWrite(Stream this, void * buf, size_t count) ssize_t done; switch(this->type) { + ssize_t written; + case STREAM_FD: - done = write((this->handle).fd, buf, count); + written = write((this->handle).fd, buf, count); + + if (written < 0) { + switch (errno) { + case EINTR: + case ENOBUFS: + case ENOMEM: + done = 0; + break; + case (EAGAIN|EWOULDBLOCK): + done = -1; + break; + default: + done = -2; + break; + } + } else { + done = written; + } + break; case STREAM_SSL: diff --git a/src/utils/memory.c b/src/utils/memory.c index 59db11c..bd71c05 100644 --- a/src/utils/memory.c +++ b/src/utils/memory.c @@ -869,15 +869,23 @@ memFree(void ** mem) if (NULL != *mem) { insertElement(&segments, (struct memSegment *)(*mem - sizeof(struct memSegment))); - //printf("FREE of Segment: 0x%p of size: %zu\n", - // *mem - sizeof(struct memSegment), - // ((struct memSegment *)(*mem - sizeof(struct memSegment)))->size); - //traverse(segments, printElement); - *mem = NULL; } } +size_t +memGetSize(void * mem) +{ + struct memSegment * segment; + + if (NULL == mem) { + return 0; + } + + segment = (struct memSegment *)(mem - sizeof(struct memSegment)); + return segment->size; +} + void memCleanup() {