Browse Source

this is a huge one...removed cbuf for writing again.... This is not needed any more because I use memory mapped io for these. Additionally sanitised write error handling somewhat...anyway, under huge load I still observer unexpected connection closes. But maybe this is related to interrupt while reading...(reading is not handled very well right now)

release0.1.5
Georg Hopp 12 years ago
parent
commit
0e1ec66463
  1. 30
      include/http/writer.h
  2. 10
      include/utils/memory.h
  3. 2
      src/cbuf/Makefile.am
  4. 56
      src/cbuf/write.c
  5. 26
      src/http/worker.c
  6. 4
      src/http/worker/get_asset.c
  7. 3
      src/http/worker/process.c
  8. 13
      src/http/writer.c
  9. 129
      src/http/writer/write.c
  10. 33
      src/server/run.c
  11. 57
      src/server/write.c
  12. 24
      src/stream/write.c
  13. 18
      src/utils/memory.c

30
include/http/writer.h

@ -29,7 +29,6 @@
#include "class.h"
#include "http/message.h"
#include "queue.h"
#include "cbuf.h"
#include "stream.h"
#include "commons.h"
@ -58,8 +57,21 @@
* And as I will also implement a cbuf pool, this memory will not be
* freed before application end.
*/
#define WRITER_MAX_BUF 131072
#define WRITER_MAX_BUF 131072
/*
* This is the multiplier for the size of the initial write buffer.
* It is used to store the
* string representation of the message, as well as the first part of
* the body if the headers exceed the size a multiple of this will
* be reserved...very unlikely, but not impossible.
* If no the whole body fits within this buffer only part of it will
* be copied in there. The rest will be send in following send calls.
*/
#define WRITER_BUF_CHUNK 1024 * 10 // our default buffer chunk for
// headers is 10k. This will result
// in at least 20m for 2000 concurrent
// connections.
typedef enum e_HttpWriterState {
HTTP_WRITER_GET=0,
@ -68,15 +80,15 @@ typedef enum e_HttpWriterState {
} HttpWriterState;
CLASS(HttpWriter) {
Cbuf buffer;
Bool ourLock;
char * buffer;
Queue queue;
HttpMessage current;
Queue queue;
HttpMessage current;
size_t nheader;
size_t nbody;
size_t written;
size_t nbuffer; // size of buffer
size_t nheader; // size headers in buf
size_t nbody; // sizeof body in buffer
size_t written; // already written bytes
HttpWriterState state;
};

10
include/utils/memory.h

@ -28,11 +28,13 @@
#define FREE(val) (ffree((void**)&(val)))
#define MEM_FREE(seg) (memFree((void **)&(seg)))
#include <sys/types.h>
void * memMalloc(size_t);
void * memCalloc(size_t, size_t);
void memFree(void **);
void memCleanup();
void * memMalloc(size_t);
void * memCalloc(size_t, size_t);
void memFree(void **);
size_t memGetSize(void *);
void memCleanup();
void ffree(void **);

2
src/cbuf/Makefile.am

@ -1,6 +1,6 @@
ACLOCAL_AMFLAGS = -I m4
CB = cbuf.c read.c write.c \
CB = cbuf.c read.c \
get_line.c set_data.c get_data.c \
addr_index.c get_free.c get_read.c get_write.c \
inc_read.c inc_write.c is_empty.c memchr.c \

56
src/cbuf/write.c

@ -1,56 +0,0 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2012 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <unistd.h>
#include "cbuf.h"
#include "stream.h"
ssize_t
cbufWrite(Cbuf this, Stream st)
{
ssize_t wwsize = 0;
size_t wsize = this->bused;
if (0 == wsize) return 0;
wwsize = streamWrite(st, cbufGetRead(this), wsize);
switch (wwsize) {
case 0:
wwsize = -2;
// DROP THROUGH
case -1:
break;
default:
cbufIncRead(this, wwsize);
break;
}
return wwsize;
}
// vim: set ts=4 sw=4:

26
src/http/worker.c

@ -57,11 +57,8 @@ httpWorkerCtor(void * _this, va_list * params)
sprintf(cbuf_id, "%s_%s", "parser", id);
this->pbuf = new(Cbuf, cbuf_id, PARSER_MAX_BUF);
sprintf(cbuf_id, "%s_%s", "writer", id);
this->wbuf = new(Cbuf, cbuf_id, WRITER_MAX_BUF);
this->parser = new(HttpParser, this->pbuf);
this->writer = new(HttpWriter, this->wbuf);
this->writer = new(HttpWriter);
this->sroot = &(this->session);
this->auth = va_arg(* params, void *);
@ -108,7 +105,26 @@ httpWorkerClone(void * _this, void * _base)
this->asset_pool = base->asset_pool;
this->parser = new(HttpParser, base->pbuf);
this->writer = new(HttpWriter, base->wbuf);
/*
* I am pretty sure that it is not neccessary to have a
* separeate writer for each connection...
* Right now I leave it that way.
* TODO check this.
* OK some facts:
* - the stream as well as the worker are associated
* to the filehandle within the server.
* - the response queue is located within the writer.
* (this might be wrong...the response queue should
* be part of the worker. That way I could give it
* into the writer when writing. That way only one
* instance of the writer might be possible...)
* NO, the previous statement is wrong...this would
* involve much more organization overhead within
* the writer...queue change and such...
* At the end I think it might be best to leave it as
* it is.
*/
this->writer = new(HttpWriter);
this->sroot = &(base->session);
this->auth = base->auth;

4
src/http/worker/get_asset.c

@ -60,6 +60,10 @@ httpWorkerGetAsset(
asset = assetPoolGet(fname, nfname);
if (NULL == asset) {
return (HttpMessage)httpResponse404();
}
if (asset->netag == nmatch
&& 0 == memcmp(asset->etag, match, asset->netag)) {
assetPoolRelease(asset);

3
src/http/worker/process.c

@ -216,6 +216,9 @@ httpWorkerProcess(HttpWorker this, Stream st)
delete(request);
queuePut(this->writer->queue, response);
size = this->writer->queue->nmsg;
response = NULL;
}
}

13
src/http/writer.c

@ -28,14 +28,15 @@
#include "queue.h"
#include "http/writer.h"
#include "utils/memory.h"
static
int
httpWriterCtor(void * _this, va_list * params)
{
HttpWriter this = _this;
this->buffer = va_arg(*params, Cbuf);
this->queue = new(Queue);
this->queue = new(Queue);
return 0;
}
@ -48,11 +49,13 @@ httpWriterDtor(void * _this)
delete(this->queue);
if (TRUE == this->ourLock)
cbufRelease(this->buffer);
if (NULL != this->buffer) {
MEM_FREE(this->buffer);
}
if (NULL != this->current)
if (NULL != this->current) {
delete(this->current);
}
}
INIT_IFACE(Class, httpWriterCtor, httpWriterDtor, NULL);

129
src/http/writer/write.c

@ -27,10 +27,10 @@
#include "http/message.h"
#include "queue.h"
#include "http/writer.h"
#include "cbuf.h"
#include "stream.h"
#include "commons.h"
#include "utils/memory.h"
ssize_t
@ -39,92 +39,111 @@ httpWriterWrite(void * _this, Stream st)
HttpWriter this = _this;
int cont = 1;
if (cbufIsLocked(this->buffer)) {
if (FALSE == this->ourLock)
return 0;
}
else {
cbufLock(this->buffer);
this->ourLock = TRUE;
}
while (cont) {
switch (this->state) {
char * start;
ssize_t to_write;
ssize_t written;
case HTTP_WRITER_GET:
if (NULL == this->current && ! queueEmpty(this->queue)) {
if (! queueEmpty(this->queue)) {
this->current = queueGet(this->queue);
this->written = 0;
this->nbody = 0;
this->nheader = httpMessageHeaderSizeGet(this->current);
httpMessageHeaderToString(
this->current,
cbufGetWrite(this->buffer));
cbufIncWrite(this->buffer, this->nheader);
if (this->nheader > memGetSize(this->buffer)) {
ssize_t size = this->nheader;
size = (0 != size%WRITER_BUF_CHUNK)?
(size/WRITER_BUF_CHUNK)+1 :
size/WRITER_BUF_CHUNK;
size *= WRITER_BUF_CHUNK;
if (NULL != this->buffer) {
MEM_FREE(this->buffer);
}
this->buffer = memMalloc(size);
this->nbuffer = size;
}
httpMessageHeaderToString(this->current, this->buffer);
this->nbody = MIN(
this->current->nbody,
this->nbuffer - this->nheader);
memcpy(
this->buffer + this->nheader,
this->current->body,
this->nbody);
this->state = HTTP_WRITER_WRITE;
}
else {
cbufRelease(this->buffer);
this->ourLock = FALSE;
cont = 0;
cont = 0;
break;
}
break;
case HTTP_WRITER_WRITE:
/**
* read
*/
if (this->nbody < this->current->nbody) {
size_t size = MIN(
this->current->nbody - this->nbody,
cbufGetFree(this->buffer));
cbufSetData(this->buffer,
this->current->body + this->nbody,
size);
this->nbody += size;
if (this->written >= this->nbuffer) {
size_t body_done = this->written - this->nheader;
start = this->current->body + body_done;
to_write = this->current->nbody - body_done;
} else {
start = this->buffer + this->written;
to_write = (this->nheader + this->nbody) - this->written;
}
/**
* write
*/
{
ssize_t written = cbufWrite(this->buffer, st);
written = streamWrite(st, start, to_write);
if (0 > written) {
return written;
}
if (written < 0) {
return written;
}
this->written += written;
this->written += written;
if (written != to_write) {
/*
* for some reason not all data could be
* written...most likely its a slow connection
* so, not to slow down the server we stop
* writing to this one now and come back to
* it in the next run....maybe it would be
* feasable to also implement some kind of
* timeout mechanism for writes...
* By the way, the same is true for reading,
* so to say, the parser.
*/
// cont = 0;
// break;
/* to go a step further...we send it to the
* poll cicle again... */
return -1;
}
if (this->written == this->current->nbody + this->nheader) {
if (this->written >= this->nheader + this->current->nbody) {
// we are done with this message.
this->state = HTTP_WRITER_DONE;
} else {
break;
}
else {
cont = 0;
}
break;
case HTTP_WRITER_DONE:
this->state = HTTP_WRITER_GET;
cbufRelease(this->buffer);
this->ourLock = FALSE;
if (! httpMessageHasKeepAlive(this->current)) {
/**
* if the message did not have the keep-alive feature
* we don't care about further pipelined messages and
* return to the caller with a -1 indicating that the
* return to the caller with a -2 indicating that the
* underlying connection should be closed at their side.
* Then we close to connection.
*/
delete(this->current);
return -1;
return -2;
}
delete(this->current);
@ -132,7 +151,9 @@ httpWriterWrite(void * _this, Stream st)
}
}
return this->queue->nmsg;
return NULL == this->current ?
this->queue->nmsg :
this->queue->nmsg + 1;
}
// vim: set ts=4 sw=4:

33
src/server/run.c

@ -49,7 +49,7 @@ serverRun(Server this)
*/
if (0 != ((this->fds)[0].revents & POLLIN)) {
if (-1 == serverHandleAccept(this, 0)) {
(this->fds)[0].revents |= ~POLLIN;
(this->fds)[0].revents &= ~POLLIN;
events--;
}
}
@ -59,7 +59,7 @@ serverRun(Server this)
*/
if (0 != ((this->fds)[1].revents & POLLIN)) {
if (-1 == serverHandleAccept(this, 1)) {
(this->fds)[1].revents |= ~POLLIN;
(this->fds)[1].revents &= ~POLLIN;
events--;
}
}
@ -69,19 +69,40 @@ serverRun(Server this)
* handle reads
*/
if (0 != ((this->fds)[i].revents & POLLIN)) {
if (0 < serverRead(this, i)) {
(this->fds)[i].revents |= ~POLLIN;
ssize_t processed = serverRead(this, i);
if (0 < processed) {
(this->fds)[i].revents &= ~POLLIN;
events--;
}
if (processed > 0) {
(this->fds)[i].events |= POLLOUT;
}
}
/**
* handle writes
*/
if (0 != ((this->fds)[i].revents & POLLOUT)) {
if (0 < serverWrite(this, i)) {
(this->fds)[i].revents |= ~POLLOUT;
ssize_t remaining = serverWrite(this, i);
if (0 > remaining) {
events--;
switch (remaining) {
case -1: // poll me again
(this->fds)[i].revents &= ~POLLOUT;
break;
case -2: // close me...
serverCloseConn(this, i);
break;
}
}
if (0 == remaining) {
(this->fds)[i].events &= ~POLLOUT;
}
}

57
src/server/write.c

@ -32,71 +32,18 @@ ssize_t
serverWrite(Server this, unsigned int i)
{
int fd = (this->fds)[i].fd;
ssize_t remaining;
if (NULL == (this->conns)[fd].worker) {
loggerLog(
this->logger,
LOGGER_INFO,
"initialization error: NULL worker");
return -1;
return -2;
}
remaining = streamWriterWrite(
return streamWriterWrite(
(this->conns)[fd].worker,
(this->conns)[fd].stream);
switch(remaining) {
case -1:
/*
* read failure
*/
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* on EGAIN just try again later. */
break;
}
// DROP-THROUGH
case -2:
/**
* normal close: this must be mapped to -2 within the
* underlying read call.
*
* \todo make sure all pending writes will be done before
* close.
*/
/*
* close connection if not EAGAIN, this would also
* remove the filedescriptor from the poll list.
* Else just return indicate
*/
loggerLog(this->logger, LOGGER_INFO,
"connection[%d] closed...%s",
fd,
inet_ntoa((((this->conns)[fd].sock)->addr).sin_addr));
serverCloseConn(this, i);
//case 0:
// break;
default:
// (this->fds)[i].events |= POLLOUT;
break;
// case -1:
// serverCloseConn(this, i);
// break;
//
// case 0:
// (this->fds)[i].events &= ~POLLOUT;
// break;
//
// default:
// break;
}
return remaining;
}
// vim: set ts=4 sw=4:

24
src/stream/write.c

@ -22,6 +22,7 @@
#include <openssl/ssl.h>
#include <unistd.h>
#include <errno.h>
#include "stream.h"
@ -31,8 +32,29 @@ streamWrite(Stream this, void * buf, size_t count)
ssize_t done;
switch(this->type) {
ssize_t written;
case STREAM_FD:
done = write((this->handle).fd, buf, count);
written = write((this->handle).fd, buf, count);
if (written < 0) {
switch (errno) {
case EINTR:
case ENOBUFS:
case ENOMEM:
done = 0;
break;
case (EAGAIN|EWOULDBLOCK):
done = -1;
break;
default:
done = -2;
break;
}
} else {
done = written;
}
break;
case STREAM_SSL:

18
src/utils/memory.c

@ -869,15 +869,23 @@ memFree(void ** mem)
if (NULL != *mem) {
insertElement(&segments, (struct memSegment *)(*mem - sizeof(struct memSegment)));
//printf("FREE of Segment: 0x%p of size: %zu\n",
// *mem - sizeof(struct memSegment),
// ((struct memSegment *)(*mem - sizeof(struct memSegment)))->size);
//traverse(segments, printElement);
*mem = NULL;
}
}
size_t
memGetSize(void * mem)
{
struct memSegment * segment;
if (NULL == mem) {
return 0;
}
segment = (struct memSegment *)(mem - sizeof(struct memSegment));
return segment->size;
}
void
memCleanup()
{

Loading…
Cancel
Save