Browse Source

another try with a shared memory based ringbuffer...this performs well for keep-alive sessions but is much slower without. actually i am not sure why but most likely the shared memory setup is quite expensive. @TODO: make a profiling.

master
Georg Hopp 14 years ago
parent
commit
f2dbad19c6
  1. 6
      ChangeLog
  2. 1
      include/http/message.h
  3. 8
      include/http/request/parser.h
  4. 15
      include/http/response/writer.h
  5. 3
      include/http/worker.h
  6. 3
      include/server.h
  7. 2
      include/socket.h
  8. 7
      src/Makefile.am
  9. 113
      src/cbuf.c
  10. 11
      src/cbuf/addr_index.c
  11. 19
      src/cbuf/get_data.c
  12. 11
      src/cbuf/get_free.c
  13. 26
      src/cbuf/get_line.c
  14. 9
      src/cbuf/get_read.c
  15. 9
      src/cbuf/get_write.c
  16. 14
      src/cbuf/inc_read.c
  17. 14
      src/cbuf/inc_write.c
  18. 9
      src/cbuf/is_empty.c
  19. 11
      src/cbuf/memchr.c
  20. 37
      src/cbuf/read.c
  21. 23
      src/cbuf/set_data.c
  22. 12
      src/cbuf/skip_non_alpha.c
  23. 29
      src/cbuf/write.c
  24. 16
      src/http/request/parser.c
  25. 55
      src/http/request/parser/get_body.c
  26. 94
      src/http/request/parser/parse.c
  27. 63
      src/http/request/parser/read.c
  28. 7
      src/http/response/writer.c
  29. 90
      src/http/response/writer/write.c
  30. 14
      src/http/worker.c
  31. 2
      src/server/close_conn.c
  32. 10
      src/server/handle_accept.c
  33. 26
      src/server/run.c
  34. 5
      src/socket/accept.c
  35. 6
      src/testserver.c

6
ChangeLog

@ -1,10 +1,10 @@
2012-02-18 20:12:27 +0100 Georg Hopp
2012-02-19 11:35:15 +0100 Georg Hopp
* lots of changes but primarily change the request parser to use a ringbuffer. The ringbuffer is implemented using the shared memory trick. (HEAD, master)
* another try with a shared memory based ringbuffer...this performs well for keep-alive sessions but is much slower without. actually i am not sure why but most likely the shared memory setup is quite expensive. @TODO: make a profiling. (HEAD, ringbuffer)
2012-02-15 12:30:33 +0100 Georg Hopp
* some more cleanups in the server code. Removing not needed header includes (origin/master, origin/HEAD)
* some more cleanups in the server code. Removing not needed header includes
2012-02-15 12:17:39 +0100 Georg Hopp

1
include/http/message.h

@ -19,6 +19,7 @@ CLASS(HttpMessage) {
int handle;
char * body;
int nbody;
int dbody;
};
char httpMessageHasKeepAlive(HttpMessage);

8
include/http/request/parser.h

@ -4,9 +4,10 @@
#include "class.h"
#include "http/request.h"
#include "http/message/queue.h"
#include "ringbuffer.h"
#include "cbuf.h"
#define HTTP_REQUEST_PARSER_BUFFER_MAX 8192
#define HTTP_REQUEST_PARSER_MAX_BUF 131072
/**
* limits to stop invalid requests from killing
@ -31,7 +32,7 @@ typedef enum e_HttpRequestState {
CLASS(HttpRequestParser) {
Ringbuffer buffer;
Cbuf buffer;
HttpMessageQueue request_queue;
HttpRequest cur_request;
@ -41,6 +42,7 @@ CLASS(HttpRequestParser) {
ssize_t httpRequestParserRead(HttpRequestParser, int);
ssize_t httpRequestParserParse(HttpRequestParser, int);
void httpRequestParserGetBody(HttpRequestParser);
ssize_t httpRequestParserGetRequestLine(HttpRequestParser, char *);
ssize_t httpRequestParserGetHeader(HttpRequestParser, char *);

15
include/http/response/writer.h

@ -1,9 +1,12 @@
#ifndef __HTTP_RESPONSE_WRITER_H__
#define __HTTP_RESPONSE_WRITER_H__
#include <sys/types.h>
#include "class.h"
#include "http/response.h"
#include "http/message/queue.h"
#include "cbuf.h"
#define RESPONSE_WRITER_MAX_BUF 131072
@ -15,17 +18,15 @@ typedef enum e_HttpResponseState {
} HttpResponseState;
CLASS(HttpResponseWriter) {
char * pipe;
size_t nheader;
size_t nbuffer;
size_t written;
size_t pstart;
size_t pend;
Cbuf buffer;
HttpMessageQueue response_queue;
HttpResponse cur_response;
size_t nheader;
size_t nbody;
size_t written;
HttpResponseState state;
};

3
include/http/worker.h

@ -8,8 +8,7 @@
#include "http/response/writer.h"
CLASS(HttpWorker) {
char * remoteAddr;
int handle;
char * id;
HttpRequestParser parser;
HttpResponseWriter writer;

3
include/server.h

@ -34,10 +34,7 @@ CLASS(Server) {
struct {
Sock sock;
void * worker;
char keep_alive;
} conns[POLL_FD_NSIZE];
};

2
include/socket.h

@ -15,7 +15,7 @@ CLASS(Sock) {
void socketConnect(Sock this, const char * addr);
void socketListen(Sock this, int backlog);
Sock socketAccept(Sock this, char (*remoteAddr)[]);
Sock socketAccept(Sock this, char (*remoteAddr)[16]);
#endif // __SOCKET_H__

7
src/Makefile.am

@ -9,6 +9,11 @@ RB = ringbuffer.c ringbuffer/rb_read.c
SOCKET = socket.c socket/accept.c socket/connect.c socket/listen.c
SERVER = server.c server/run.c server/close_conn.c
LOGGER = logger.c logger/stderr.c logger/syslog.c
CB = cbuf.c cbuf/read.c cbuf/write.c \
cbuf/get_line.c cbuf/set_data.c cbuf/get_data.c \
cbuf/addr_index.c cbuf/get_free.c cbuf/get_read.c cbuf/get_write.c \
cbuf/inc_read.c cbuf/inc_write.c cbuf/is_empty.c cbuf/memchr.c \
cbuf/skip_non_alpha.c
MSG = http/message.c http/message/queue.c http/message/has_keep_alive.c \
http/message/header_size_get.c http/message/header_to_string.c
REQ = http/request.c
@ -31,7 +36,7 @@ bin_PROGRAMS = testserver
testserver_SOURCES = testserver.c \
$(IFACE) $(CLASS) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \
$(WRITER) $(RESP) $(HEADER) $(PARSER) $(WORKER) $(RB) \
$(WRITER) $(RESP) $(HEADER) $(PARSER) $(WORKER) $(CB) \
signalHandling.c daemonize.c
testserver_CFLAGS = -Wall -I ../include/
testserver_LDFLAGS = -lrt

113
src/cbuf.c

@ -0,0 +1,113 @@
#define _POSIX_SOURCE
#define _POSIX_C_SOURCE 200112L
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include "class.h"
#include "interface/class.h"
#include "cbuf.h"
static void dtor(void*);
static
void
ctor(void * _this, va_list * params)
{
Cbuf this = _this;
char state = 0;
char * shm_name = va_arg(*params, char*);
long psize = sysconf(_SC_PAGESIZE);
size_t size;
int shm;
this->shm_name = malloc(strlen(shm_name) + 7 + 2);
sprintf(this->shm_name, "/%06d_%s", getpid(), shm_name);
/**
* align size at page boundary.
* increase as neccessary
*/
size = va_arg(*params, size_t);
size = (0 >= size)? 1 : (0 != size%psize)? (size/psize)+1 : size/psize;
this->bsize = psize * size;
while (0 == state) {
shm = shm_open(this->shm_name, O_RDWR|O_CREAT|O_EXCL, S_IRWXU);
if (-1 == shm) {
break;
}
if (-1 == ftruncate(shm, this->bsize)) {
break;
}
this->data = mmap (0, this->bsize << 1,
PROT_READ|PROT_WRITE, MAP_SHARED, shm, 0);
if (this->data == MAP_FAILED) {
this->data = NULL;
break;
}
munmap(this->data + this->bsize, this->bsize);
this->mirror = mmap (this->data + this->bsize, this->bsize,
PROT_READ|PROT_WRITE, MAP_SHARED, shm, 0);
if (this->mirror != this->data + this->bsize) {
if (this->mirror == this->data - this->bsize) {
this->data = this->mirror;
this->mirror += this->bsize;
}
else {
this->mirror = NULL;
break;
}
}
state = 1;
}
if (-1 != shm) {
shm_unlink(this->shm_name);
close(shm);
}
if (1 != state) {
dtor(this);
}
}
static
void
dtor(void * _this)
{
Cbuf this = _this;
if (NULL != this->shm_name) {
free(this->shm_name);
this->shm_name = NULL;
}
if (NULL != this->data) {
munmap(this->data, this->bsize);
this->data = NULL;
}
if (NULL != this->mirror) {
munmap(this->mirror, this->bsize);
this->mirror = NULL;
}
}
INIT_IFACE(Class, ctor, dtor, NULL);
CREATE_CLASS(Cbuf, NULL, IFACE(Class));
// vim: set ts=4 sw=4:

11
src/cbuf/addr_index.c

@ -0,0 +1,11 @@
#include <sys/types.h>
#include "cbuf.h"
size_t
cbufAddrIndex(Cbuf this, const void * c)
{
return c - (const void *)cbufGetRead(this);
}
// vim: set ts=4 sw=4:

19
src/cbuf/get_data.c

@ -0,0 +1,19 @@
#include <sys/types.h>
#include <string.h>
#include "cbuf.h"
char *
cbufGetData(Cbuf this, size_t n)
{
char * ret = cbufGetRead(this);
if (n > this->bused) {
return -1;
}
cbufIncRead(this, n);
return ret;
}
// vim: set ts=4 sw=4:

11
src/cbuf/get_free.c

@ -0,0 +1,11 @@
#include <sys/types.h>
#include "cbuf.h"
size_t
cbufGetFree(Cbuf this)
{
return this->bsize - this->bused;
}
// vim: set ts=4 sw=4:

26
src/cbuf/get_line.c

@ -0,0 +1,26 @@
#include <sys/types.h>
#include <string.h>
#include "cbuf.h"
char *
cbufGetLine(Cbuf this)
{
char * nl = cbufMemchr(this, '\n');
char * ret = NULL;
if (NULL != nl) {
size_t len = cbufAddrIndex(this, nl) + 1;
*nl = 0;
*(nl-1) = ('\r' == *(nl-1))? 0 : *(nl-1);
ret = cbufGetRead(this);
cbufIncRead(this, len);
}
return ret;
}
// vim: set ts=4 sw=4:

9
src/cbuf/get_read.c

@ -0,0 +1,9 @@
#include "cbuf.h"
char *
cbufGetRead(Cbuf this)
{
return this->data + this->read;
}
// vim: set ts=4 sw=4:

9
src/cbuf/get_write.c

@ -0,0 +1,9 @@
#include "cbuf.h"
char *
cbufGetWrite(Cbuf this)
{
return this->data + this->write;
}
// vim: set ts=4 sw=4:

14
src/cbuf/inc_read.c

@ -0,0 +1,14 @@
#include <sys/types.h>
#include "cbuf.h"
void
cbufIncRead(Cbuf this, size_t n)
{
this->read += n;
this->read = (this->read >= this->bsize)?
this->read - this->bsize : this->read;
this->bused -= n;
}
// vim: set ts=4 sw=4:

14
src/cbuf/inc_write.c

@ -0,0 +1,14 @@
#include <sys/types.h>
#include "cbuf.h"
void
cbufIncWrite(Cbuf this, size_t n)
{
this->write += n;
this->write = (this->write >= this->bsize)?
this->write - this->bsize : this->write;
this->bused += n;
}
// vim: set ts=4 sw=4:

9
src/cbuf/is_empty.c

@ -0,0 +1,9 @@
#include "cbuf.h"
char
cbufIsEmpty(Cbuf this)
{
return (0 == this->bused);
}
// vim: set ts=4 sw=4:

11
src/cbuf/memchr.c

@ -0,0 +1,11 @@
#include <string.h>
#include "cbuf.h"
char *
cbufMemchr(Cbuf this, int c)
{
return memchr(cbufGetRead(this), c, this->bused);
}
// vim: set ts=4 sw=4:

37
src/cbuf/read.c

@ -0,0 +1,37 @@
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include "cbuf.h"
ssize_t
cbufRead(Cbuf this, int fd)
{
ssize_t rrsize = 0;
size_t rsize = cbufGetFree(this);
if (0 == rsize) {
errno = ECBUFOVFL;
return -1;
}
rrsize = read(fd, cbufGetWrite(this), rsize);
switch (rrsize) {
case 0:
rrsize = -2;
// DROP THROUGH
case -1:
break;
default:
cbufIncWrite(this, rrsize);
break;
}
return rrsize;
}
// vim: set ts=4 sw=4:

23
src/cbuf/set_data.c

@ -0,0 +1,23 @@
#include <sys/types.h>
#include <string.h>
#include <errno.h>
#include "cbuf.h"
char *
cbufSetData(Cbuf this, const void * src, size_t n)
{
char * addr;
if (n > cbufGetFree(this)) {
errno = ECBUFOVFL;
return -1;
}
addr = memcpy(cbufGetWrite(this), src, n);
cbufIncWrite(this, n);
return addr;
}
// vim: set ts=4 sw=4:

12
src/cbuf/skip_non_alpha.c

@ -0,0 +1,12 @@
#include <ctype.h>
#include "cbuf.h"
void
cbufSkipNonAlpha(Cbuf this)
{
while(0 > this->bused && isalpha(*cbufGetRead(this)))
cbufIncRead(this, 1);
}
// vim: set ts=4 sw=4:

29
src/cbuf/write.c

@ -0,0 +1,29 @@
#include <sys/types.h>
#include <unistd.h>
#include "cbuf.h"
ssize_t
cbufWrite(Cbuf this, int fd)
{
ssize_t wwsize = 0;
size_t wsize = this->bused;
if (0 == wsize) return 0;
wwsize = write(fd, cbufGetRead(this), wsize);
switch (wwsize) {
case -1:
break;
default:
cbufIncRead(this, wwsize);
break;
}
return wwsize;
}
// vim: set ts=4 sw=4:

16
src/http/request/parser.c

@ -1,5 +1,5 @@
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
@ -10,17 +10,20 @@
#include "http/request/parser.h"
#include "http/message/queue.h"
#include "http/request.h"
#include "ringbuffer.h"
#include "cbuf.h"
static
void
ctor(void * _this, va_list * params)
{
HttpRequestParser this = _this;
char * shm_name = va_arg(*params, char*);
HttpRequestParser this = _this;
char * id = va_arg(*params, char*);
char cbuf_id[100];
sprintf(cbuf_id, "%s_%s", "parser", id);
this->buffer = new(Ringbuffer, shm_name, HTTP_REQUEST_LINE_MAX);
this->buffer = new(Cbuf, cbuf_id, HTTP_REQUEST_PARSER_BUFFER_MAX);
this->request_queue = new(HttpMessageQueue);
}
@ -37,9 +40,8 @@ dtor(void * _this)
delete(&(this->cur_request));
}
INIT_IFACE(Class, ctor, dtor, NULL);
INIT_IFACE(StreamReader, (fptr_streamReaderRead)httpRequestParserParse);
INIT_IFACE(StreamReader, (fptr_streamReaderRead)httpRequestParserRead);
CREATE_CLASS(HttpRequestParser, NULL, IFACE(Class), IFACE(StreamReader));
// vim: set ts=4 sw=4:

55
src/http/request/parser/get_body.c

@ -3,6 +3,10 @@
#include "http/header.h"
#include "http/message.h"
#include "http/request/parser.h"
#include "cbuf.h"
#define MAX(a,b) (((a) > (b))? (a) : (b))
#define MAX(x,y) ((x) > (y) ? (x) : (y))
@ -14,41 +18,34 @@ void
httpRequestParserGetBody(HttpRequestParser this)
{
HttpMessage message = (HttpMessage)(this->cur_request);
char * str_nbody;
int nbody;
int len;
str_nbody = httpHeaderGet(
&(message->header),
"Content-Length");
if (NULL == str_nbody) {
this->state = HTTP_REQUEST_DONE;
return -1;
}
nbody = atoi(str_nbody);
size_t len;
if (0 == message->nbody) {
message->type = HTTP_MESSAGE_BUFFERED;
if (0 < nbody)
message->body = malloc(nbody);
char * nbody = httpHeaderGet(
&(message->header),
"Content-Length");
if (NULL == nbody) {
this->state = HTTP_REQUEST_DONE;
return;
}
else {
message->type = HTTP_MESSAGE_BUFFERED;
message->nbody = atoi(nbody);
message->body = calloc(1, message->nbody);
message->dbody = 0;
}
}
this->buffer->bused -= len;
len = MAX(nbody - message->nbody, this->buffer->bused);
memcpy(message->body + message->nbody,
this->buffer->buffer + this->buffer->bstart,
len);
if (message->dbody < message->nbody) {
len = MAX(
message->nbody - message->dbody,
this->buffer->bused);
message->nbody += len;
this->buffer->bstart += len;
if (this->buffer->bstart >= this->buffer->bsize) {
this->buffer->bstart -= this->buffer->bsize;
}
this->buffer->bused -= len;
memcpy(message->body, cbufGetData(this->buffer, len), len);
if (message->nbody == nbody) {
this->state = HTTP_REQUEST_DONE;
message->dbody += len;
}
}

94
src/http/request/parser/parse.c

@ -7,81 +7,35 @@
#include "http/message.h"
#include "http/request/parser.h"
#include "interface/class.h"
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
static
inline
char *
getLine(HttpRequestParser this)
{
char * cr = memchr(
this->buffer->buffer + this->buffer->bstart,
'\r',
this->buffer->bused);
char * nl = (NULL == cr)? NULL : cr + 1;
if (NULL != cr && NULL != nl && '\n' == *nl) {
*cr = 0;
return cr;
}
return NULL;
}
static
inline
char
httpRequestSkip(HttpRequestParser this)
{
while (this->buffer->bused > 0 &&
! isalpha(this->buffer->buffer[this->buffer->bstart])) {
this->buffer->bstart = (this->buffer->bstart >= this->buffer->bsize)?
0 : this->buffer->bstart + 1;
this->buffer->bused--;
}
return (isalpha(this->buffer->buffer[this->buffer->bstart]))? TRUE : FALSE;
}
#include "cbuf.h"
ssize_t
httpRequestParserParse(HttpRequestParser this, int fd)
{
int cont = 1;
ssize_t ret;
ssize_t read;
char * line;
if (0 > (ret = rbRead(this->buffer, fd))) {
cont = 0;
if(0 > (read = cbufRead(this->buffer, fd))) {
return read;
}
while (cont) {
switch(this->state) {
char * line_end;
size_t len;
case HTTP_REQUEST_GARBAGE:
if (httpRequestSkip(this)) {
cbufSkipNonAlpha(this->buffer);
if (! cbufIsEmpty(this->buffer)) {
this->cur_request = new(HttpRequest);
this->state = HTTP_REQUEST_START;
}
else {
cont = 0;
}
break;
case HTTP_REQUEST_START:
if (NULL == (line_end = getLine(this))) {
if (NULL == (line = cbufGetLine(this->buffer))) {
cont = 0;
break;
}
if (0 > httpRequestParserGetRequestLine(this, line_end)) {
ret = -1;
cont = 0;
@ -99,7 +53,7 @@ httpRequestParserParse(HttpRequestParser this, int fd)
break;
case HTTP_REQUEST_REQUEST_LINE_DONE:
if (NULL == (line_end = getLine(this))) {
if (NULL == (line = cbufGetLine(this->buffer))) {
cont = 0;
break;
}
@ -127,23 +81,33 @@ httpRequestParserParse(HttpRequestParser this, int fd)
break;
case HTTP_REQUEST_HEADERS_DONE:
/**
* allocate memory according to content-length.
* If content length is to large reject request.
*
* @FUTURE check for multipart mime and handle it
* with temporary file.
*/
httpRequestParserGetBody(this);
{
HttpMessage message = (HttpMessage)this->cur_request;
httpRequestParserGetBody(this);
if (message->dbody == message->nbody) {
this->state = HTTP_REQUEST_DONE;
}
}
break;
case HTTP_REQUEST_DONE:
this->request_queue->msgs[(this->request_queue->nmsgs)++] =
(HttpMessage)this->cur_request;
ret = this->request_queue->nmsgs;
this->cur_request = NULL;
/**
* dont continue loop if input buffer is empty
*/
if (cbufIsEmpty(this->buffer)) {
cont = 0;
}
/**
* prepare for next request
*/
this->state = HTTP_REQUEST_GARBAGE;
break;

63
src/http/request/parser/read.c

@ -14,29 +14,46 @@
ssize_t
httpRequestParserRead(HttpRequestParser this, int fd)
{
size_t rsize;
ssize_t temp;
this->bend = (this->bsize == this->bend)?
0 : this->bend;
rsize = (this->bstart <= this->bend)?
this->bsize - this->bend :
this->bstart - 1;
if (0 >= (temp = read(fd, &(this->buffer[this->bend]), rsize))) {
/**
* this means either we had an rsize of 0 what indicates that
* the buffer ran full without any processing took place or
* the connection was terminated in some way. In both cases
* we want to terminate the connection.
*/
return (0 == temp)? -2 : -1;
}
this->bend += temp;
return temp;
/**
* @obsolete
*/
return -1;
// size_t remaining, chunks;
// char buffer[1024];
//
// ssize_t size = read(fd, buffer, 1024);
//
// if (0 < size) {
// remaining = this->buffer_used % HTTP_REQUEST_PARSER_READ_CHUNK;
// remaining = HTTP_REQUEST_PARSER_READ_CHUNK - remaining;
// chunks = this->buffer_used / HTTP_REQUEST_PARSER_READ_CHUNK;
//
// /**
// * because a division always rounds down
// * chunks holds exactly the currently allocated chunks if
// * remaining equals 0 but there is no space left.
// * Else chunks holds the actually allocated amount of chunks
// * minus 1.
// * For this reason chunks always has to be increased by 1.
// */
// chunks++;
//
// if (size >= remaining) {
// this->buffer =
// realloc(this->buffer, chunks * HTTP_REQUEST_PARSER_READ_CHUNK);
// }
//
// memcpy(this->buffer + this->buffer_used, buffer, size);
// this->buffer_used += size;
// this->buffer[this->buffer_used] = 0;
//
// size = httpRequestParserParse(this);
// }
// else {
// size = (0 == size)? -2 : size;
// }
//
// return size;
}
// vim: set ts=4 sw=4:

7
src/http/response/writer.c

@ -1,4 +1,5 @@
#include <stdlib.h>
#include <stdio.h>
#include "class.h"
#include "interface/class.h"
@ -12,7 +13,12 @@ void
ctor(void * _this, va_list * params)
{
HttpResponseWriter this = _this;
char * id = va_arg(*params, char*);
char cbuf_id[100];
sprintf(cbuf_id, "%s_%s", "writer", id);
this->buffer = new(Cbuf, cbuf_id, RESPONSE_WRITER_MAX_BUF);
this->response_queue = new(HttpMessageQueue);
}
@ -23,6 +29,7 @@ dtor(void * _this)
HttpResponseWriter this = _this;
delete(&(this->response_queue));
delete(&(this->buffer));
if (NULL != this->cur_response)
delete(&(this->cur_response));

90
src/http/response/writer/write.c

@ -11,7 +11,9 @@
#include "http/message.h"
#include "http/response.h"
#include "http/response/writer.h"
#include "cbuf.h"
#define MIN(x,y) ((x) < (y) ? (x) : (y))
#define MAX(x,y) ((x) > (y) ? (x) : (y))
#define _PSIZE(x) (MAX((x),RESPONSE_WRITER_MAX_BUF))
#define PSIZE _PSIZE(this->nheader+message->nbody)
@ -21,7 +23,6 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd)
{
HttpMessageQueue respq = this->response_queue;
HttpMessage message = (HttpMessage)this->cur_response;
ssize_t processed = (message)? 1 : 0;
int cont = 1;
while (cont) {
@ -30,19 +31,13 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd)
if (NULL == this->cur_response && 0 < respq->nmsgs) {
message = respq->msgs[0];
this->cur_response = (HttpResponse)message;
processed++;
memmove(respq->msgs,
&(respq->msgs[1]),
sizeof(void*) * (--respq->nmsgs + 1));
this->nbuffer = 0;
this->written = 0;
this->pstart = 0;
this->nbody = 0;
this->nheader = httpMessageHeaderSizeGet(message);
this->pend = this->nheader;
this->pipe = malloc(PSIZE);
httpMessageHeaderToString(message, this->pipe);
httpMessageHeaderToString(message, cbufGetWrite(this->buffer));
cbufIncWrite(this->buffer, this->nheader);
this->state = HTTP_RESPONSE_WRITE;
}
@ -55,57 +50,41 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd)
/**
* read
*/
if (this->nbuffer < message->nbody) {
size_t temp = 0;
size_t rsize;
this->pend = (PSIZE == this->pend)?
0 : this->pend;
rsize = (this->pstart <= this->pend)?
PSIZE - this->pend :
this->pstart - 1;
if (this->nbody < message->nbody) {
size_t size = MIN(
message->nbody - this->nbody,
cbufGetFree(this->buffer));
switch (message->type) {
case HTTP_MESSAGE_BUFFERED:
temp = message->nbody - this->nbuffer;
temp = (rsize<temp)? rsize : temp;
memcpy(
&(this->pipe[this->pend]),
&(message->body[this->nbuffer]),
temp);
cbufSetData(this->buffer,
message->body + this->nbody,
size);
break;
case HTTP_MESSAGE_PIPED:
temp = read(
message->handle,
&(this->pipe[this->pend]),
rsize);
size = cbufRead(this->buffer, message->handle);
break;
default:
return -1;
}
this->nbuffer += temp;
this->pend += temp;
this->nbody += size;
}
/**
* write
*/
{
size_t wsize;
size_t temp;
wsize = (this->pstart <= this->pend)?
this->pend - this->pstart :
PSIZE - this->pstart;
ssize_t written = cbufWrite(this->buffer, fd);
temp = write(fd, &(this->pipe[this->pstart]), wsize);
this->written += temp;
this->pstart += temp;
this->pstart = (PSIZE == this->pstart)?
0 : this->pstart;
if (0 <= written) {
this->written += written;
}
else {
return -1;
}
}
if (this->written == message->nbody + this->nheader) {
@ -121,33 +100,30 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd)
close(message->handle);
}
free(this->pipe);
this->state = HTTP_RESPONSE_GET;
this->nheader = 0;
this->nbuffer = 0;
this->written = 0;
this->pstart = 0;
this->pend = 0;
memmove(respq->msgs,
&(respq->msgs[1]),
sizeof(void*) * (--respq->nmsgs + 1));
if (! httpMessageHasKeepAlive(message)) {
/**
* if the message did not have the keep-alive feature
* we don't care about further pipelined messages and
* return the to caller with a 0 indicating that the
* return to the caller with a 0 indicating that the
* underlying connection should be closed.
*/
processed = 0;
cont = 0;
delete(&this->cur_response);
return -1;
}
delete(&this->cur_response);
this->state = HTTP_RESPONSE_GET;
break;
}
}
return processed;
return respq->nmsgs;
}
// vim: set ts=4 sw=4:

14
src/http/worker.c

@ -1,5 +1,6 @@
#include <stdlib.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include "class.h"
@ -17,14 +18,13 @@ void
ctor(void * _this, va_list * params)
{
HttpWorker this = _this;
char id[sizeof(SHMN) + 15 + 5];
char * id = va_arg(*params, char *);
this->remoteAddr = va_arg(*params, char *);
this->handle = va_arg(*params, int);
sprintf(id, SHMN "%s_%05d", this->remoteAddr, this->handle);
this->id = malloc(strlen(id) + 1);
strcpy(this->id, id);
this->parser = new(HttpRequestParser, id);
this->writer = new(HttpResponseWriter);
this->parser = new(HttpRequestParser, this->id);
this->writer = new(HttpResponseWriter, this->id);
}
static
@ -33,6 +33,8 @@ dtor(void * _this)
{
HttpWorker this = _this;
free(this->id);
delete(&this->parser);
delete(&this->writer);
}

2
src/server/close_conn.c

@ -12,8 +12,6 @@ serverCloseConn(Server this, unsigned int i)
delete(&((this->conns)[fd].sock));
delete(&((this->conns)[fd].worker));
(this->conns)[fd].keep_alive = 0;
memset(&(this->fds[i]), 0, sizeof(struct pollfd));
}

10
src/server/handle_accept.c

@ -1,4 +1,8 @@
#include <errno.h>
#include <stdio.h>
#include "http/worker.h"
#include "http/worker.h"
@ -12,11 +16,15 @@ serverHandleAccept(Server this)
acc = socketAccept(this->sock, &remoteAddr);
if (-1 != acc->handle) {
char id[21];
sprintf(id, "my_%s_%05d", remoteAddr, acc->handle);
//* save the socket handle
(this->conns)[acc->handle].sock = acc;
//* clone reader
(this->conns)[acc->handle].worker = new(HttpWorker, remoteAddr, acc->handle);
(this->conns)[acc->handle].worker = new(HttpWorker, id);
(this->fds)[this->nfds].fd = acc->handle;
(this->fds)[this->nfds].events = POLLIN;

26
src/server/run.c

@ -62,12 +62,12 @@ serverRun(Server this)
nreads--;
switch (serverRead(this, i)) {
case -2:
case -1:
serverCloseConn(this, i);
case 0:
break;
case 0:
case -1:
case -2:
serverCloseConn(this, i);
break;
default:
@ -80,14 +80,24 @@ serverRun(Server this)
* handle writes
*/
if (0 != ((this->fds)[i].revents & POLLOUT) && 0 < nwrites) {
size_t remaining;
events--;
nwrites--;
if (0 >= streamWriterWrite((this->conns)[fd].worker, fd)) {
serverCloseConn(this, i);
}
remaining = streamWriterWrite((this->conns)[fd].worker, fd);
switch(remaining) {
case -1:
serverCloseConn(this, i);
break;
case 0:
(this->fds)[i].events &= ~POLLOUT;
break;
(this->fds)[i].events &= ~POLLOUT;
default:
break;
}
}
}
}

5
src/socket/accept.c

@ -6,7 +6,7 @@
#include "interface/logger.h"
Sock
socketAccept(Sock this, char (*remoteAddr)[])
socketAccept(Sock this, char (*remoteAddr)[16])
{
Sock sock; /* Socket for client */
unsigned int len; /* Length of client address data structure */
@ -33,7 +33,8 @@ socketAccept(Sock this, char (*remoteAddr)[])
loggerLog(this->log, LOGGER_WARNING,
"error accepting connection: %s", strerror(errno));
} else {
memcpy(*remoteAddr, inet_ntoa((sock->addr).sin_addr), 15);
strcpy(*remoteAddr, inet_ntoa((sock->addr).sin_addr));
loggerLog(this->log, LOGGER_INFO,
"handling client %s\n", *remoteAddr);
}

6
src/testserver.c

@ -19,8 +19,8 @@ int
main()
{
Logger logger = new(LoggerSyslog, LOGGER_ERR);
HttpWorker worker = new(HttpWorker, "", 0);
Server server = new(Server, logger, worker, 11212, SOMAXCONN);
//HttpWorker worker = new(HttpWorker);
Server server = new(Server, logger, NULL /*worker*/, 11212, SOMAXCONN);
struct rlimit limit = {RLIM_INFINITY, RLIM_INFINITY};
setrlimit(RLIMIT_CPU, &limit);
@ -30,7 +30,7 @@ main()
serverRun(server);
delete(&server);
delete(&worker);
// delete(&worker);
delete(&logger);
return 0;

Loading…
Cancel
Save