Browse Source

separated the server completely from the http processing

master
Georg Hopp 14 years ago
parent
commit
20af2baa6f
  1. 8
      ChangeLog
  2. 4
      TODO
  3. 2
      include/http/request/parser.h
  4. 2
      include/http/response/writer.h
  5. 9
      include/http/worker.h
  6. 6
      include/server.h
  7. 4
      src/Makefile.am
  8. 3
      src/http/request/parser.c
  9. 4
      src/http/request/parser/parse.c
  10. 8
      src/http/request/parser/read.c
  11. 3
      src/http/response/writer.c
  12. 30
      src/http/response/writer/write.c
  13. 55
      src/http/worker.c
  14. 59
      src/http/worker/process.c
  15. 12
      src/http/worker/write.c
  16. 4
      src/server.c
  17. 3
      src/server/close_conn.c
  18. 3
      src/server/handle_accept.c
  19. 4
      src/server/read.c
  20. 86
      src/server/run.c
  21. 8
      src/testserver.c

8
ChangeLog

@ -1,10 +1,14 @@
2012-02-15 09:38:32 +0100 Georg Hopp
* separated the server completely from the http processing (HEAD, master)
2012-02-15 06:19:52 +0100 Georg Hopp 2012-02-15 06:19:52 +0100 Georg Hopp
* add subject/observer interface (HEAD, master)
* add subject/observer interface
2012-02-15 04:55:46 +0100 Georg Hopp 2012-02-15 04:55:46 +0100 Georg Hopp
* fix infinite busy loop in run (origin/master, origin/HEAD)
* fix infinite busy loop in run
2012-02-15 04:44:38 +0100 Georg Hopp 2012-02-15 04:44:38 +0100 Georg Hopp

4
TODO

@ -1,2 +1,6 @@
VERY BIG TODO: VERY BIG TODO:
- give a contructor a way to fail, so that no object will be created at all - give a contructor a way to fail, so that no object will be created at all
- right now i will use long polling ajax calls when feedback from to the client
is needed. In the long term this should be changed to websockets (ws). But
right now ws specification is not final anyway. :)

2
include/http/request/parser.h

@ -34,7 +34,7 @@ CLASS(HttpRequestParser) {
}; };
size_t httpRequestParserRead(HttpRequestParser, int); size_t httpRequestParserRead(HttpRequestParser, int);
void httpRequestParserParse(HttpRequestParser);
size_t httpRequestParserParse(HttpRequestParser);
void httpRequestParserGetBody(HttpRequestParser); void httpRequestParserGetBody(HttpRequestParser);
void httpRequestParserGetRequestLine(HttpRequest, char *); void httpRequestParserGetRequestLine(HttpRequest, char *);

2
include/http/response/writer.h

@ -29,7 +29,7 @@ CLASS(HttpResponseWriter) {
HttpResponseState state; HttpResponseState state;
}; };
HttpResponse httpResponseWriterWrite(HttpResponseWriter, int);
size_t httpResponseWriterWrite(HttpResponseWriter, int);
#endif // __HTTP_RESPONSE_WRITER_H__ #endif // __HTTP_RESPONSE_WRITER_H__

9
include/http/worker.h

@ -1,13 +1,20 @@
#ifndef __HTTP_WORKER_H__ #ifndef __HTTP_WORKER_H__
#define __HTTP_WORKER_H__ #define __HTTP_WORKER_H__
#include <sys/types.h>
#include "class.h" #include "class.h"
#include "http/request/parser.h" #include "http/request/parser.h"
#include "http/response/writer.h"
CLASS(HttpWorker) { CLASS(HttpWorker) {
HttpRequestParser parser;
HttpRequestParser parser;
HttpResponseWriter writer;
}; };
size_t httpWorkerProcess(HttpWorker, int);
size_t httpWorkerWrite(HttpWorker, int);
#endif // __HTTP_WORKER_H__ #endif // __HTTP_WORKER_H__
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

6
include/server.h

@ -27,13 +27,15 @@ CLASS(Server) {
Logger logger; Logger logger;
Sock sock; Sock sock;
void * worker;
nfds_t nfds; nfds_t nfds;
struct pollfd fds[POLL_FD_NSIZE]; struct pollfd fds[POLL_FD_NSIZE];
struct { struct {
Sock sock; Sock sock;
void * reader;
void * writer;
void * worker;
char keep_alive; char keep_alive;
} conns[POLL_FD_NSIZE]; } conns[POLL_FD_NSIZE];

4
src/Makefile.am

@ -15,6 +15,7 @@ RESP = http/response.c \
http/response/404.c \ http/response/404.c \
http/response/image.c \ http/response/image.c \
http/response/me.c http/response/me.c
WORKER = http/worker.c http/worker/process.c http/worker/write.c
WRITER = http/response/writer.c http/response/writer/write.c WRITER = http/response/writer.c http/response/writer/write.c
HEADER = http/header.c http/header/get.c http/header/add.c \ HEADER = http/header.c http/header/get.c http/header/add.c \
http/header/size_get.c http/header/to_string.c http/header/size_get.c http/header/to_string.c
@ -29,5 +30,6 @@ bin_PROGRAMS = testserver
testserver_SOURCES = testserver.c \ testserver_SOURCES = testserver.c \
$(IFACE) $(CLASS) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \ $(IFACE) $(CLASS) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \
$(WRITER) $(RESP) $(HEADER) $(PARSER) signalHandling.c daemonize.c
$(WRITER) $(RESP) $(HEADER) $(PARSER) $(WORKER) \
signalHandling.c daemonize.c
testserver_CFLAGS = -Wall -I ../include/ testserver_CFLAGS = -Wall -I ../include/

3
src/http/request/parser.c

@ -59,8 +59,7 @@ _clone(void * _this, void * _base)
} }
INIT_IFACE(Class, ctor, dtor, _clone); INIT_IFACE(Class, ctor, dtor, _clone);
INIT_IFACE(StreamReader,
(fptr_streamReaderRead)httpRequestParserRead);
INIT_IFACE(StreamReader, (fptr_streamReaderRead)httpRequestParserRead);
CREATE_CLASS(HttpRequestParser, NULL, IFACE(Class), IFACE(StreamReader)); CREATE_CLASS(HttpRequestParser, NULL, IFACE(Class), IFACE(StreamReader));
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

4
src/http/request/parser/parse.c

@ -33,7 +33,7 @@ httpRequestSkip(char ** data)
for (; 0 != **data && ! isalpha(**data); (*data)++); for (; 0 != **data && ! isalpha(**data); (*data)++);
} }
void
size_t
httpRequestParserParse(HttpRequestParser this) httpRequestParserParse(HttpRequestParser this)
{ {
char * line; char * line;
@ -111,6 +111,8 @@ httpRequestParserParse(HttpRequestParser this)
break; break;
} }
} }
return this->request_queue->nmsgs;
} }
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

8
src/http/request/parser/read.c

@ -7,10 +7,10 @@
size_t size_t
httpRequestParserRead(HttpRequestParser this, int fd) httpRequestParserRead(HttpRequestParser this, int fd)
{ {
size_t remaining, chunks;
char buffer[1024];
size_t remaining, chunks;
char buffer[1024];
ssize_t size = read(fd, buffer, 1024);
size_t size = read(fd, buffer, 1024);
if (0 < size) { if (0 < size) {
remaining = this->buffer_used % HTTP_REQUEST_PARSER_READ_CHUNK; remaining = this->buffer_used % HTTP_REQUEST_PARSER_READ_CHUNK;
@ -35,7 +35,7 @@ httpRequestParserRead(HttpRequestParser this, int fd)
this->buffer_used += size; this->buffer_used += size;
this->buffer[this->buffer_used] = 0; this->buffer[this->buffer_used] = 0;
httpRequestParserParse(this);
size = httpRequestParserParse(this);
} }
return size; return size;

3
src/http/response/writer.c

@ -38,8 +38,7 @@ _clone(void * _this, void * _base)
} }
INIT_IFACE(Class, ctor, dtor, _clone); INIT_IFACE(Class, ctor, dtor, _clone);
INIT_IFACE(StreamWriter,
(fptr_streamWriterWrite)httpResponseWriterWrite);
INIT_IFACE(StreamWriter, (fptr_streamWriterWrite)httpResponseWriterWrite);
CREATE_CLASS(HttpResponseWriter, NULL, IFACE(Class), IFACE(StreamWriter)); CREATE_CLASS(HttpResponseWriter, NULL, IFACE(Class), IFACE(StreamWriter));
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

30
src/http/response/writer/write.c

@ -16,19 +16,21 @@
#define _PSIZE(x) (MAX((x),RESPONSE_WRITER_MAX_BUF)) #define _PSIZE(x) (MAX((x),RESPONSE_WRITER_MAX_BUF))
#define PSIZE _PSIZE(this->nheader+message->nbody) #define PSIZE _PSIZE(this->nheader+message->nbody)
HttpResponse
size_t
httpResponseWriterWrite(HttpResponseWriter this, int fd) httpResponseWriterWrite(HttpResponseWriter this, int fd)
{ {
HttpMessageQueue respq = this->response_queue;
HttpMessage message = (HttpMessage)this->cur_response;
int cont = 1;
HttpMessageQueue respq = this->response_queue;
HttpMessage message = (HttpMessage)this->cur_response;
size_t processed = (message)? 1 : 0;
int cont = 1;
while (cont) { while (cont) {
switch (this->state) { switch (this->state) {
case HTTP_RESPONSE_GET: case HTTP_RESPONSE_GET:
if (NULL == this->cur_response && 0 < respq->nmsgs) { if (NULL == this->cur_response && 0 < respq->nmsgs) {
message = respq->msgs[0];
message = respq->msgs[0];
this->cur_response = (HttpResponse)message; this->cur_response = (HttpResponse)message;
processed++;
memmove(respq->msgs, memmove(respq->msgs,
&(respq->msgs[1]), &(respq->msgs[1]),
@ -128,19 +130,25 @@ httpResponseWriterWrite(HttpResponseWriter this, int fd)
this->pstart = 0; this->pstart = 0;
this->pend = 0; this->pend = 0;
if (httpMessageHasKeepAlive(message)) {
delete(&this->cur_response);
}
else {
cont = 0;
if (! httpMessageHasKeepAlive(message)) {
/**
* if the message did not have the keep-alive feature
* we don't care about further pipelined messages and
* return the to caller with a 0 indicating that the
* underlying connection should be closed.
*/
processed = 0;
cont = 0;
} }
delete(&this->cur_response);
this->state = HTTP_RESPONSE_GET; this->state = HTTP_RESPONSE_GET;
break; break;
} }
} }
return this->cur_response;
return processed;
} }
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

55
src/http/worker.c

@ -0,0 +1,55 @@
#include <stdarg.h>
#include "class.h"
#include "http/worker.h"
#include "http/request/parser.h"
#include "http/response/writer.h"
#include "interface/class.h"
#include "interface/stream_reader.h"
#include "interface/stream_writer.h"
static
void
ctor(void * _this, va_list * params)
{
HttpWorker this = _this;
this->parser = new(HttpRequestParser);
this->writer = new(HttpResponseWriter);
}
static
void
dtor(void * _this)
{
HttpWorker this = _this;
delete(&this->parser);
delete(&this->writer);
}
static
void
_clone(void * _this, void * _base)
{
/**
* TODO: this actually simply creates a new worker
* and ignores the base. Think about this.
*/
va_list foo;
ctor(_this, &foo);
}
INIT_IFACE(Class, ctor, dtor, _clone);
INIT_IFACE(StreamReader, (fptr_streamReaderRead)httpWorkerProcess);
INIT_IFACE(StreamWriter, (fptr_streamWriterWrite)httpWorkerWrite);
CREATE_CLASS(
HttpWorker,
NULL,
IFACE(Class),
IFACE(StreamReader),
IFACE(StreamWriter));
// vim: set ts=4 sw=4:

59
src/http/worker/process.c

@ -0,0 +1,59 @@
#include "class.h"
#include "interface/class.h"
#include "http/worker.h"
#include "http/request/parser.h"
size_t
httpWorkerProcess(HttpWorker this, int fd)
{
size_t size;
if (0 < (size = httpRequestParserRead(this->parser, fd))) {
int i;
HttpMessageQueue reqq = this->parser->request_queue;
HttpMessageQueue respq = this->writer->response_queue;
for (i=0; i<reqq->nmsgs; i++) {
/**
* @TODO: for now simply remove request and send not found.
* Make this sane.
*/
HttpRequest request = (HttpRequest)(reqq->msgs[i]);
HttpMessage response = NULL;
if (0 == strcmp("GET", request->method) &&
0 == strcmp("/me/", request->uri)) {
response = (HttpMessage)httpResponseMe();
}
else if (0 == strcmp("GET", request->method) &&
0 == strcmp("/image/", request->uri)) {
response = (HttpMessage)httpResponseImage();
}
else {
response = (HttpMessage)httpResponse404();
}
if (httpMessageHasKeepAlive(reqq->msgs[i])) {
httpHeaderAdd(
&(response->header),
new(HttpHeader, "Connection", "Keep-Alive"));
}
else {
httpHeaderAdd(
&(response->header),
new(HttpHeader, "Connection", "Close"));
}
respq->msgs[(respq->nmsgs)++] = response;
response = NULL;
delete(&(reqq->msgs[i]));
}
reqq->nmsgs = 0;
}
return size;
}
// vim: set ts=4 sw=4:

12
src/http/worker/write.c

@ -0,0 +1,12 @@
#include <sys/types.h>
#include "http/worker.h"
#include "http/response/writer.h"
size_t
httpWorkerWrite(HttpWorker this, int fd)
{
return httpResponseWriterWrite(this->writer, fd);
}
// vim: set ts=4 sw=4:

4
src/server.c

@ -20,6 +20,7 @@ ctor(void * _this, va_list * params)
int flags; int flags;
this->logger = va_arg(* params, Logger); this->logger = va_arg(* params, Logger);
this->worker = va_arg(* params, void *);
port = va_arg(* params, int); port = va_arg(* params, int);
backlog = va_arg(* params, unsigned int); backlog = va_arg(* params, unsigned int);
@ -45,8 +46,7 @@ dtor(void * _this)
for (i=0; i<this->nfds; i++) { for (i=0; i<this->nfds; i++) {
if (this->sock->handle != (this->fds)[i].fd) { if (this->sock->handle != (this->fds)[i].fd) {
delete(&(this->conns[(this->fds)[i].fd]).sock); delete(&(this->conns[(this->fds)[i].fd]).sock);
delete(&(this->conns[(this->fds)[i].fd]).reader);
delete(&(this->conns[(this->fds)[i].fd]).writer);
delete(&(this->conns[(this->fds)[i].fd]).worker);
} }
} }

3
src/server/close_conn.c

@ -10,8 +10,7 @@ serverCloseConn(Server this, unsigned int i)
int fd = (this->fds)[i].fd; int fd = (this->fds)[i].fd;
delete(&((this->conns)[fd].sock)); delete(&((this->conns)[fd].sock));
delete(&((this->conns)[fd].reader));
delete(&((this->conns)[fd].writer));
delete(&((this->conns)[fd].worker));
(this->conns)[fd].keep_alive = 0; (this->conns)[fd].keep_alive = 0;

3
src/server/handle_accept.c

@ -12,8 +12,7 @@ serverHandleAccept(Server this)
(this->conns)[acc->handle].sock = acc; (this->conns)[acc->handle].sock = acc;
//* clone reader //* clone reader
(this->conns)[acc->handle].reader = new(HttpRequestParser);
(this->conns)[acc->handle].writer = new(HttpResponseWriter);
(this->conns)[acc->handle].worker = clone(this->worker);
(this->fds)[this->nfds].fd = acc->handle; (this->fds)[this->nfds].fd = acc->handle;
(this->fds)[this->nfds].events = POLLIN; (this->fds)[this->nfds].events = POLLIN;

4
src/server/read.c

@ -5,7 +5,7 @@ serverRead(Server this, unsigned int i)
int fd = (this->fds)[i].fd; int fd = (this->fds)[i].fd;
int size; int size;
if (NULL == (this->conns)[fd].reader) {
if (NULL == (this->conns)[fd].worker) {
loggerLog( loggerLog(
this->logger, this->logger,
LOGGER_INFO, LOGGER_INFO,
@ -13,7 +13,7 @@ serverRead(Server this, unsigned int i)
return -1; return -1;
} }
switch ((size = streamReaderRead((this->conns)[fd].reader, fd))) {
switch ((size = streamReaderRead((this->conns)[fd].worker, fd))) {
case 0: case 0:
/* /*
* normal close: write remaining data * normal close: write remaining data

86
src/server/run.c

@ -19,14 +19,6 @@
#include "interface/stream_writer.h" #include "interface/stream_writer.h"
#include "interface/logger.h" #include "interface/logger.h"
//* @TODO: to be removed
#include "http/request.h"
#include "http/request/parser.h"
#include "http/message/queue.h"
#include "http/response.h"
#include "http/response/writer.h"
//* until here
#undef MAX #undef MAX
#define MAX(x,y) ((x) > (y) ? (x) : (y)) #define MAX(x,y) ((x) > (y) ? (x) : (y))
@ -94,63 +86,17 @@ serverRun(Server this)
*/ */
else { else {
nreads--; nreads--;
/**
* do some other processing
* @TODO: actually this will hard assume that our stream reader
* is a http parser and it has its queue...think about more
* generalizing here.
*/
int size;
if (0 >= (size=serverRead(this, i))) {
serverCloseConn(this, i);
}
else {
int j;
HttpMessageQueue reqq = ((HttpRequestParser) \
(this->conns)[fd].reader)->request_queue;
HttpMessageQueue respq = ((HttpResponseWriter) \
(this->conns)[fd].writer)->response_queue;
for (j=0; j<reqq->nmsgs; j++) {
/**
* @TODO: for now simply remove request and send not found.
* Make this sane.
*/
HttpRequest request = (HttpRequest)(reqq->msgs[j]);
HttpMessage response = NULL;
if (0 == strcmp("GET", request->method) &&
0 == strcmp("/me/", request->uri)) {
response = (HttpMessage)httpResponseMe();
}
else if (0 == strcmp("GET", request->method) &&
0 == strcmp("/image/", request->uri)) {
response = (HttpMessage)httpResponseImage();
}
else {
response = (HttpMessage)httpResponse404();
}
if (httpMessageHasKeepAlive(reqq->msgs[j])) {
httpHeaderAdd(
&(response->header),
new(HttpHeader, "Connection", "Keep-Alive"));
}
else {
httpHeaderAdd(
&(response->header),
new(HttpHeader, "Connection", "Close"));
}
respq->msgs[(respq->nmsgs)++] = response;
response = NULL;
delete(&(reqq->msgs[j]));
(this->fds)[i].events |= POLLOUT;
}
switch (serverRead(this, i)) {
case -1:
serverCloseConn(this, i);
break;
case 0:
break;
reqq->nmsgs = 0;
default:
(this->fds)[i].events |= POLLOUT;
} }
} }
} }
@ -159,22 +105,14 @@ serverRun(Server this)
* handle writes * handle writes
*/ */
if (0 != ((this->fds)[i].revents & POLLOUT) && 0 < nwrites) { if (0 != ((this->fds)[i].revents & POLLOUT) && 0 < nwrites) {
HttpResponseWriter writer =
(HttpResponseWriter)(this->conns)[fd].writer;
HttpMessage message;
events--; events--;
nwrites--; nwrites--;
message = (HttpMessage)streamWriterWrite(writer, fd);
if (NULL == message) {
(this->fds)[i].events &= ~POLLOUT;
}
else {
delete(&message);
if (0 >= streamWriterWrite((this->conns)[fd].worker, fd)) {
serverCloseConn(this, i); serverCloseConn(this, i);
} }
(this->fds)[i].events &= ~POLLOUT;
} }
} }
} }

8
src/testserver.c

@ -7,7 +7,7 @@
#include "server.h" #include "server.h"
#include "logger.h" #include "logger.h"
#include "http/request/parser.h"
#include "http/worker.h"
#include "signalHandling.h" #include "signalHandling.h"
@ -18,8 +18,9 @@ void daemonize(void);
int int
main() main()
{ {
Logger logger = new(LoggerSyslog, LOGGER_ERR);
Server server = new(Server, logger, 11212, SOMAXCONN);
Logger logger = new(LoggerSyslog, LOGGER_ERR);
HttpWorker worker = new(HttpWorker);
Server server = new(Server, logger, worker, 11212, SOMAXCONN);
struct rlimit limit = {RLIM_INFINITY, RLIM_INFINITY}; struct rlimit limit = {RLIM_INFINITY, RLIM_INFINITY};
setrlimit(RLIMIT_CPU, &limit); setrlimit(RLIMIT_CPU, &limit);
@ -29,6 +30,7 @@ main()
serverRun(server); serverRun(server);
delete(&server); delete(&server);
delete(&worker);
delete(&logger); delete(&logger);
return 0; return 0;

Loading…
Cancel
Save