Browse Source

more generalizing of response writing (implemented a response writer...now it should be possible to implement a stream writer for images

master
Georg Hopp 14 years ago
parent
commit
d87cd09ba1
  1. 17
      include/http/message/queue.h
  2. 4
      include/http/request/parser.h
  3. 17
      include/http/request/queue.h
  4. 34
      include/http/response/writer.h
  5. 19
      include/interface/stream_writer.h
  6. 19
      include/server.h
  7. 10
      src/Makefile.am
  8. 10
      src/http/message/queue.c
  9. 10
      src/http/request/parser.c
  10. 11
      src/http/request/parser/parse.c
  11. 1
      src/http/response.c
  12. 47
      src/http/response/writer.c
  13. 79
      src/http/response/writer/write.c
  14. 19
      src/interface/stream_writer.c
  15. 5
      src/server.c
  16. 5
      src/server/close_conn.c
  17. 3
      src/server/handle_accept.c
  18. 60
      src/server/run.c
  19. 6
      src/testserver.c

17
include/http/message/queue.h

@ -0,0 +1,17 @@
#ifndef __HTTP_REQUEST_QUEUE_H__
#define __HTTP_REQUEST_QUEUE_H__
#include "class.h"
#include "http/message.h"
#define HTTP_MESSAGE_QUEUE_MAX 1024
CLASS(HttpMessageQueue) {
HttpMessage msgs[HTTP_MESSAGE_QUEUE_MAX];
size_t nmsgs;
};
#endif /* __HTTP_REQUEST_QUEUE_H__ */
// vim: set ts=4 sw=4:

4
include/http/request/parser.h

@ -3,7 +3,7 @@
#include "class.h"
#include "http/request.h"
#include "http/request/queue.h"
#include "http/message/queue.h"
#define HTTP_REQUEST_PARSER_READ_CHUNK 1024
@ -27,7 +27,7 @@ CLASS(HttpRequestParser) {
size_t buffer_used;
size_t buffer_size;
HttpRequestQueue request_queue;
HttpMessageQueue request_queue;
HttpRequest cur_request;
HttpRequestState state;

17
include/http/request/queue.h

@ -1,17 +0,0 @@
#ifndef __HTTP_REQUEST_QUEUE_H__
#define __HTTP_REQUEST_QUEUE_H__
#include "class.h"
#include "http/request.h"
#define HTTP_REQUEST_QUEUE_MAX 1024
CLASS(HttpRequestQueue) {
HttpRequest requests[HTTP_REQUEST_QUEUE_MAX];
size_t nrequests;
};
#endif /* __HTTP_REQUEST_QUEUE_H__ */
// vim: set ts=4 sw=4:

34
include/http/response/writer.h

@ -0,0 +1,34 @@
#ifndef __HTTP_RESPONSE_WRITER_H__
#define __HTTP_RESPONSE_WRITER_H__
#include "class.h"
#include "http/response.h"
#include "http/message/queue.h"
typedef enum e_HttpResponseState {
HTTP_RESPONSE_NO=0,
HTTP_RESPONSE_START,
HTTP_RESPONSE_PIPE,
HTTP_RESPONSE_DONE
} HttpResponseState;
CLASS(HttpResponseWriter) {
char * buffer;
char pipe[1024];
size_t nbuffer;
size_t rpipe;
size_t wpipe;
char pipe_flip;
HttpMessageQueue response_queue;
HttpResponse cur_response;
HttpResponseState state;
};
size_t httpResponseWriterWrite(HttpResponseWriter, int);
#endif // __HTTP_RESPONSE_WRITER_H__
// vim: set ts=4 sw=4:

19
include/interface/stream_writer.h

@ -0,0 +1,19 @@
#ifndef __STREAM_WRITER_H__
#define __STREAM_WRITER_H__
#include <sys/types.h>
typedef size_t (* fptr_streamWriterWrite)(void *, int fd);
extern const struct interface i_StreamWriter;
struct i_StreamWriter {
const struct interface * const _;
fptr_streamWriterWrite write;
};
extern size_t streamWriterWrite(void *, int fd);
#endif // __STREAM_WRITER_H__
// vim: set ts=4 sw=4:

19
include/server.h

@ -7,6 +7,8 @@
#include "class.h"
#include "socket.h"
#include "logger.h"
#include "http/response.h"
#define POLL_FD_NSIZE 1024
#define POLL_FD_SIZE (sizeof(struct pollfd) * POLL_FD_NSIZE)
@ -24,29 +26,16 @@
CLASS(Server) {
Logger logger;
Sock sock;
void * reader;
/**
* loeschen: fds[i].event auf 0
* dann nfds um die anzahl der geloeschten elemente verkleinern.
* die in close pending stehenden socket schliessen.
* vor jedem poll qsort auf fds ueber event.
* nach dem poll qsort auf fds ueber revent und reuckgebewert
* von poll beruecksichtigen.
*/
nfds_t nfds;
struct pollfd fds[POLL_FD_NSIZE];
struct {
Sock sock;
void * reader;
void * writer;
char keep_alive;
char * wbuf;
char * rbuf;
unsigned int rpos;
unsigned int wpos;
} conns[POLL_FD_NSIZE];
};

10
src/Makefile.am

@ -1,15 +1,17 @@
ACLOCAL_AMFLAGS = -I m4
AUTOMAKE_OPTIONS = subdir-objects
IFACE = interface/class.c interface/stream_reader.c interface/logger.c
IFACE = interface/class.c interface/stream_reader.c interface/logger.c \
interface/stream_writer.c
CLASS = class.c interface.c
SOCKET = socket.c socket/accept.c socket/connect.c socket/listen.c
SERVER = server.c server/run.c server/close_conn.c
LOGGER = logger.c logger/stderr.c logger/syslog.c
MSG = http/message.c
REQ = http/request.c http/request/queue.c http/request/has_keep_alive.c
MSG = http/message.c http/message/queue.c
REQ = http/request.c http/request/has_keep_alive.c
RESP = http/response.c http/response/404.c http/response/size_get.c \
http/response/to_string.c
WRITER = http/response/writer.c http/response/writer/write.c
HEADER = http/header.c http/header/get.c http/header/add.c \
http/header/size_get.c http/header/to_string.c
PARSER = http/request/parser.c http/request/parser/get_header.c \
@ -23,5 +25,5 @@ bin_PROGRAMS = testserver
testserver_SOURCES = testserver.c \
$(IFACE) $(CLASS) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \
$(RESP) $(HEADER) $(PARSER) signalHandling.c daemonize.c
$(WRITER) $(RESP) $(HEADER) $(PARSER) signalHandling.c daemonize.c
testserver_CFLAGS = -Wall -I ../include/

10
src/http/request/queue.c → src/http/message/queue.c

@ -3,7 +3,7 @@
#include "class.h"
#include "interface/class.h"
#include "http/request/queue.h"
#include "http/message/queue.h"
static
void
@ -13,15 +13,15 @@ static
void
dtor(void * _this)
{
HttpRequestQueue this = _this;
HttpMessageQueue this = _this;
int i;
for (i=0; i<this->nrequests; i++) {
delete(&(this->requests)[i]);
for (i=0; i<this->nmsgs; i++) {
delete(&(this->msgs)[i]);
}
}
INIT_IFACE(Class, ctor, dtor, NULL);
CREATE_CLASS(HttpRequestQueue, NULL, IFACE(Class));
CREATE_CLASS(HttpMessageQueue, NULL, IFACE(Class));
// vim: set ts=4 sw=4:

10
src/http/request/parser.c

@ -8,10 +8,9 @@
#include "interface/stream_reader.h"
#include "http/request/parser.h"
#include "http/request/queue.h"
#include "http/message/queue.h"
#include "http/request.h"
void httpRequestParserParse(HttpRequestParser);
static
void
@ -19,7 +18,7 @@ ctor(void * _this, va_list * params)
{
HttpRequestParser this = _this;
this->request_queue = new(HttpRequestQueue);
this->request_queue = new(HttpMessageQueue);
this->buffer = malloc(HTTP_REQUEST_PARSER_READ_CHUNK);
this->buffer[0] = 0;
@ -33,6 +32,9 @@ dtor(void * _this)
free(this->buffer);
delete(&(this->request_queue));
if (NULL != this->cur_request)
delete(&(this->cur_request));
}
static
@ -46,7 +48,7 @@ _clone(void * _this, void * _base)
/**
* every parser has its own queue...
*/
this->request_queue = new(HttpRequestQueue);
this->request_queue = new(HttpMessageQueue);
this->buffer_used = base->buffer_used;
chunks = this->buffer_used / HTTP_REQUEST_PARSER_READ_CHUNK;

11
src/http/request/parser/parse.c

@ -36,10 +36,10 @@ httpRequestSkip(char ** data)
void
httpRequestParserParse(HttpRequestParser this)
{
char * line;
int cont = 1;
char * line;
int cont = 1;
while(cont) {
while (cont) {
switch(this->state) {
case HTTP_REQUEST_GARBAGE:
this->cur_data = this->buffer; // initialize static pointer
@ -82,8 +82,9 @@ httpRequestParserParse(HttpRequestParser this)
/**
* enqueue current request
*/
this->request_queue->requests[(this->request_queue->nrequests)++] =
this->cur_request;
this->request_queue->msgs[(this->request_queue->nmsgs)++] =
(HttpMessage)this->cur_request;
this->cur_request = NULL;
/**
* remove processed stuff from input buffer.

1
src/http/response.c

@ -13,7 +13,6 @@ void
ctor(void * _this, va_list * params)
{
HttpResponse this = _this;
char * status;
char * reason;
PARENTCALL(_this, Class, ctor, params);

47
src/http/response/writer.c

@ -0,0 +1,47 @@
#include <stdlib.h>
#include "class.h"
#include "interface/class.h"
#include "interface/stream_writer.h"
#include "http/message/queue.h"
#include "http/response/writer.h"
static
void
ctor(void * _this, va_list * params)
{
HttpResponseWriter this = _this;
this->response_queue = new(HttpMessageQueue);
}
static
void
dtor(void * _this)
{
HttpResponseWriter this = _this;
if (NULL != this->buffer) free(this->buffer);
delete(&(this->response_queue));
if (NULL != this->cur_response)
delete(&(this->cur_response));
}
static
void
_clone(void * _this, void * _base)
{
HttpResponseWriter this = _this;
//HttpResponseWriter base = _base;
this->response_queue = new(HttpMessageQueue);
}
INIT_IFACE(Class, ctor, dtor, _clone);
INIT_IFACE(StreamWriter,
(fptr_streamWriterWrite)httpResponseWriterWrite);
CREATE_CLASS(HttpResponseWriter, NULL, IFACE(Class), IFACE(StreamWriter));
// vim: set ts=4 sw=4:

79
src/http/response/writer/write.c

@ -0,0 +1,79 @@
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include "class.h"
#include "interface/class.h"
#include "http/response.h"
#include "http/response/writer.h"
size_t
httpResponseWriterWrite(HttpResponseWriter this, int fd)
{
HttpMessageQueue respq = this->response_queue;
int cont = 1;
size_t written = 0;
while (cont) {
switch (this->state) {
case HTTP_RESPONSE_NO:
if (NULL == this->cur_response && 0 < respq->nmsgs) {
this->cur_response = (HttpResponse)respq->msgs[0];
memmove(respq->msgs, &(respq->msgs[1]), --respq->nmsgs);
this->state = HTTP_RESPONSE_START;
}
else {
cont = 0;
}
break;
case HTTP_RESPONSE_START:
if (NULL == this->buffer) {
this->nbuffer = httpResponseSizeGet(this->cur_response);
this->buffer = calloc(1, this->nbuffer);
httpResponseToString(this->cur_response, this->buffer);
}
{
written = write(fd, this->buffer, this->nbuffer);
if (-1 == written) {
free (this->buffer);
this->buffer = NULL;
return written;
}
if (written == this->nbuffer) {
if (HTTP_MESSAGE_BUFFERED ==
((HttpMessage)(this->cur_response))->type) {
this->state = HTTP_RESPONSE_DONE;
}
else {
this->state = HTTP_RESPONSE_PIPE;
}
}
else {
this->nbuffer -= written;
memmove(this->buffer, this->buffer + written, this->nbuffer);
cont = 0;
}
}
break;
case HTTP_RESPONSE_PIPE:
break;
case HTTP_RESPONSE_DONE:
free (this->buffer);
this->buffer = NULL;
delete(&(this->cur_response));
this->state = HTTP_RESPONSE_NO;
break;
}
}
return written;
}
// vim: set ts=4 sw=4:

19
src/interface/stream_writer.c

@ -0,0 +1,19 @@
#include "class.h"
#include "interface/stream_writer.h"
const struct interface i_StreamWriter = {
"streamWriter",
1
};
size_t
streamWriterWrite(void * object, int fd)
{
size_t ret;
RETCALL(object, StreamWriter, write, ret, fd);
return ret;
}
// vim: set ts=4 sw=4:

5
src/server.c

@ -20,7 +20,6 @@ ctor(void * _this, va_list * params)
int flags;
this->logger = va_arg(* params, Logger);
this->reader = va_arg(* params, void*);
port = va_arg(* params, int);
backlog = va_arg(* params, unsigned int);
@ -47,9 +46,7 @@ dtor(void * _this)
if (this->sock->handle != (this->fds)[i].fd) {
delete(&(this->conns[(this->fds)[i].fd]).sock);
delete(&(this->conns[(this->fds)[i].fd]).reader);
if (this->conns[(this->fds)[i].fd].wbuf)
free(this->conns[(this->fds)[i].fd].wbuf);
delete(&(this->conns[(this->fds)[i].fd]).writer);
}
}

5
src/server/close_conn.c

@ -11,11 +11,8 @@ serverCloseConn(Server this, unsigned int i)
delete(&((this->conns)[fd].sock));
delete(&((this->conns)[fd].reader));
delete(&((this->conns)[fd].writer));
if ((this->conns)[fd].wbuf != NULL) {
free((this->conns)[fd].wbuf);
(this->conns)[fd].wbuf = NULL;
}
(this->conns)[fd].keep_alive = 0;
memset(&(this->fds[i]), 0, sizeof(struct pollfd));

3
src/server/handle_accept.c

@ -12,7 +12,8 @@ serverHandleAccept(Server this)
(this->conns)[acc->handle].sock = acc;
//* clone reader
(this->conns)[acc->handle].reader = clone(this->reader);
(this->conns)[acc->handle].reader = new(HttpRequestParser);
(this->conns)[acc->handle].writer = new(HttpResponseWriter);
(this->fds)[this->nfds].fd = acc->handle;
(this->fds)[this->nfds].events = POLLIN;

60
src/server/run.c

@ -13,13 +13,15 @@
#include "signalHandling.h"
#include "interface/class.h"
#include "interface/stream_reader.h"
#include "interface/stream_writer.h"
#include "interface/logger.h"
//* @TODO: to be removed
#include "http/request.h"
#include "http/request/parser.h"
#include "http/request/queue.h"
#include "http/message/queue.h"
#include "http/response.h"
#include "http/response/writer.h"
//* until here
#undef MAX
@ -102,43 +104,39 @@ serverRun(Server this)
}
else {
int j;
HttpRequestQueue queue =
((HttpRequestParser)(this->conns)[fd].reader)->request_queue;
for (j=0; j<queue->nrequests; j++) {
HttpResponse response;
HttpMessageQueue reqq = ((HttpRequestParser) \
(this->conns)[fd].reader)->request_queue;
HttpMessageQueue respq = ((HttpResponseWriter) \
(this->conns)[fd].writer)->response_queue;
for (j=0; j<reqq->nmsgs; j++) {
/**
* @TODO: for now simply remove request and send not found.
* Make this sane.
*/
response = httpResponse404();
HttpMessage response = (HttpMessage)httpResponse404();
if (httpRequestHasKeepAlive(queue->requests[j])) {
if (httpRequestHasKeepAlive((HttpRequest)reqq->msgs[j])) {
(this->conns)[fd].keep_alive = 1;
httpHeaderAdd(
&(((HttpMessage)response)->header),
&(response->header),
new(HttpHeader, "Connection", "Keep-Alive"));
}
else {
(this->conns)[fd].keep_alive = 0;
httpHeaderAdd(
&(((HttpMessage)response)->header),
&(response->header),
new(HttpHeader, "Connection", "Close"));
}
delete(&(queue->requests[j]));
(this->conns)[fd].wbuf = calloc(
1, httpResponseSizeGet(response) + 1);
httpResponseToString(response, (this->conns)[fd].wbuf);
delete(&response);
respq->msgs[(respq->nmsgs)++] = response;
response = NULL;
delete(&(reqq->msgs[j]));
(this->fds)[i].events |= POLLOUT;
}
queue->nrequests = 0;
reqq->nmsgs = 0;
}
}
}
@ -152,31 +150,13 @@ serverRun(Server this)
events--;
nwrites--;
size = write(
(this->fds)[i].fd,
(this->conns)[fd].wbuf,
strlen((this->conns)[fd].wbuf));
if (size == strlen((this->conns)[fd].wbuf) ||
-1 == size) {
if (-1 == size) {
loggerLog(this->logger, LOGGER_ERR,
"write error, closing connection");
}
size = streamWriterWrite((this->conns)[fd].writer, fd);
if ((this->conns)[fd].keep_alive) {
(this->fds)[i].events &= ~POLLOUT;
}
else {
serverCloseConn(this, i);
}
free((this->conns)[fd].wbuf);
(this->conns)[fd].wbuf = NULL;
if ((this->conns)[fd].keep_alive) {
(this->fds)[i].events &= ~POLLOUT;
}
else {
memmove((this->conns)[fd].wbuf,
(this->conns)[fd].wbuf + size,
strlen((this->conns)[fd].wbuf) - size + 1);
serverCloseConn(this, i);
}
}
}

6
src/testserver.c

@ -18,9 +18,8 @@ void daemonize(void);
int
main()
{
Logger logger = new(LoggerSyslog, LOGGER_ERR);
HttpRequestParser parser = new(HttpRequestParser);
Server server = new(Server, logger, parser, 11212, SOMAXCONN);
Logger logger = new(LoggerSyslog, LOGGER_ERR);
Server server = new(Server, logger, 11212, SOMAXCONN);
struct rlimit limit = {RLIM_INFINITY, RLIM_INFINITY};
setrlimit(RLIMIT_CPU, &limit);
@ -31,7 +30,6 @@ main()
delete(&server);
delete(&logger);
delete(&parser);
return 0;
}

Loading…
Cancel
Save