Browse Source

refs #19: Added a stream class. This is a delegate that uses the correct read and write methods for the given stream type. This is implemented as prerequesite for the SSL implemented

master
Georg Hopp 14 years ago
parent
commit
5b3fed549f
  1. 2
      configure.ac
  2. 18
      include/cbuf.h
  3. 10
      include/commons.h
  4. 5
      include/http/message.h
  5. 13
      include/http/parser.h
  6. 9
      include/http/worker.h
  7. 12
      include/http/writer.h
  8. 6
      include/interface/stream_reader.h
  9. 6
      include/interface/stream_writer.h
  10. 2
      include/server.h
  11. 27
      include/stream.h
  12. 5
      src/Makefile.am
  13. 4
      src/cbuf/is_locked.c
  14. 5
      src/cbuf/read.c
  15. 5
      src/cbuf/write.c
  16. 5
      src/http/message.c
  17. 8
      src/http/message/has_keep_alive.c
  18. 5
      src/http/parser/parse.c
  19. 3
      src/http/response/asset.c
  20. 5
      src/http/worker.c
  21. 6
      src/http/worker/process.c
  22. 5
      src/http/worker/write.c
  23. 5
      src/http/writer/write.c
  24. 5
      src/interface.c
  25. 5
      src/interface/stream_reader.c
  26. 5
      src/interface/stream_writer.c
  27. 1
      src/server/close_conn.c
  28. 2
      src/server/handle_accept.c
  29. 5
      src/server/read.c
  30. 5
      src/server/write.c
  31. 41
      src/stream.c
  32. 27
      src/stream/read.c
  33. 27
      src/stream/write.c
  34. 8
      src/utils/http.c
  35. 40
      src/webgameserver.c

2
configure.ac

@ -20,6 +20,8 @@ AM_PROG_CC_C_O
# # Checks for libraries.
# AC_CHECK_LIB([json], [json_object_new_object], [],
# [AC_MSG_ERROR([json-c library not found], [1])])
AC_CHECK_LIB([ssl], [SSL_library_init], [],
[AC_MSG_ERROR([openssl not found], [1])])
# Checks for header files.
AC_CHECK_HEADERS([stdarg.h string.h stdlib.h stdio.h unistd.h syslog.h sys/types.h json/json.h])

18
include/cbuf.h

@ -35,22 +35,18 @@
#include <sys/types.h>
#include "class.h"
#include "stream.h"
#define ECBUFOVFL 100
#include "commons.h"
#ifndef TRUE
#define TRUE ((void *)1)
#endif
#define ECBUFOVFL 100
#ifndef FALSE
#define FALSE ((void *)0)
#endif
CLASS(Cbuf) {
char * shm_name; // shared memory identifier
char * data;
void * lock;
Bool lock;
size_t bsize;
size_t bused;
@ -59,8 +55,8 @@ CLASS(Cbuf) {
size_t read;
};
ssize_t cbufRead(Cbuf, int fd);
ssize_t cbufWrite(Cbuf, int fd);
ssize_t cbufRead(Cbuf, Stream);
ssize_t cbufWrite(Cbuf, Stream);
char * cbufGetLine(Cbuf, char **);
char * cbufGetData(Cbuf, size_t);
@ -76,7 +72,7 @@ void cbufIncWrite(Cbuf this, size_t n);
size_t cbufGetFree(Cbuf this);
char cbufIsEmpty(Cbuf this);
void cbufSkipNonAlpha(Cbuf this);
void * cbufIsLocked(Cbuf this);
Bool cbufIsLocked(Cbuf this);
void cbufLock(Cbuf this);
void cbufRelease(Cbuf this);

10
include/commons.h

@ -0,0 +1,10 @@
#ifndef __COMMONS_H__
#define __COMMONS_H__
#define Bool char
#define TRUE 1
#define FALSE 0
#endif // __COMMONS_H__
// vim: set ts=4 sw=4:

5
include/http/message.h

@ -26,6 +26,7 @@
#include "class.h"
#include "http/header.h"
#include "stream.h"
typedef enum e_HttpMessageType {
HTTP_MESSAGE_BUFFERED=0,
@ -39,8 +40,8 @@ CLASS(HttpMessage) {
HttpHeader header;
HttpMessageType type;
int handle;
char * body;
Stream handle;
char * body;
int nbody;
int dbody;
};

13
include/http/parser.h

@ -28,14 +28,9 @@
#include "http/message.h"
#include "http/message/queue.h"
#include "cbuf.h"
#include "stream.h"
#ifndef TRUE
#define TRUE ((void *)1)
#endif
#ifndef FALSE
#define FALSE ((void *)0)
#endif
#include "commons.h"
#define PARSER_MAX_BUF 131072
@ -51,7 +46,7 @@ typedef enum e_HttpMessageState {
CLASS(HttpParser) {
Cbuf buffer;
void * ourLock;
Bool ourLock;
char * incomplete;
size_t isize;
@ -62,7 +57,7 @@ CLASS(HttpParser) {
HttpMessageState state;
};
ssize_t httpParserParse(void *, int);
ssize_t httpParserParse(void *, Stream);
void httpParserHeader(HttpParser, const char *, const char *);
void httpParserNewMessage(HttpParser, const char *, const char * lend);
size_t httpParserBody(HttpParser, const char *, size_t);

9
include/http/worker.h

@ -33,14 +33,7 @@
#include "cbuf.h"
#include "session.h"
#ifndef TRUE
#define TRUE ((void *)1)
#endif
#ifndef FALSE
#define FALSE ((void *)0)
#endif
#include "commons.h"
struct randval {
time_t timestamp;

12
include/http/writer.h

@ -30,14 +30,10 @@
#include "http/message.h"
#include "http/message/queue.h"
#include "cbuf.h"
#include "stream.h"
#ifndef TRUE
#define TRUE ((void *)1)
#endif
#include "commons.h"
#ifndef FALSE
#define FALSE ((void *)0)
#endif
#define WRITER_MAX_BUF 131072
@ -50,7 +46,7 @@ typedef enum e_HttpWriterState {
CLASS(HttpWriter) {
Cbuf buffer;
void * ourLock;
Bool ourLock;
HttpMessageQueue queue;
HttpMessage current;
@ -62,7 +58,7 @@ CLASS(HttpWriter) {
HttpWriterState state;
};
ssize_t httpWriterWrite(void *, int);
ssize_t httpWriterWrite(void *, Stream);
#endif // __HTTP_WRITER_H__

6
include/interface/stream_reader.h

@ -27,7 +27,9 @@
#include <sys/types.h>
typedef ssize_t (* fptr_streamReaderRead)(void *, int fd);
#include "stream.h"
typedef ssize_t (* fptr_streamReaderRead)(void *, Stream);
extern const struct interface i_StreamReader;
@ -36,7 +38,7 @@ struct i_StreamReader {
fptr_streamReaderRead read;
};
extern ssize_t streamReaderRead(void *, int fd);
extern ssize_t streamReaderRead(void *, Stream);
#endif // __STREAM_READER_H__

6
include/interface/stream_writer.h

@ -27,7 +27,9 @@
#include <sys/types.h>
typedef ssize_t (* fptr_streamWriterWrite)(void *, int fd);
#include "stream.h"
typedef ssize_t (* fptr_streamWriterWrite)(void *, Stream);
extern const struct interface i_StreamWriter;
@ -36,7 +38,7 @@ struct i_StreamWriter {
fptr_streamWriterWrite write;
};
extern ssize_t streamWriterWrite(void *, int fd);
extern ssize_t streamWriterWrite(void *, Stream);
#endif // __STREAM_WRITER_H__

2
include/server.h

@ -32,9 +32,11 @@
#include "class.h"
#include "socket.h"
#include "logger.h"
#include "stream.h"
struct conns {
Sock sock;
Stream stream;
void * worker;
};

27
include/stream.h

@ -0,0 +1,27 @@
#ifndef __STREAM_H__
#define __STREAM_H__
#include <sys/types.h>
#include <openssl/ssl.h>
#include "class.h"
typedef enum e_StreamHandleType {
STREAM_FD = 0,
STREAM_SSL
} StreamHandleType;
CLASS(Stream) {
StreamHandleType type;
union {
int fd;
SSL * ssl;
} handle;
};
ssize_t streamRead(Stream, void *, size_t);
ssize_t streamWrite(Stream, void *, size_t);
#endif // __STREAM_H__
// vim: set ts=4 sw=4:

5
src/Makefile.am

@ -5,6 +5,7 @@ IFACE = interface/class.c interface/stream_reader.c interface/logger.c \
interface/stream_writer.c interface/http_intro.c \
interface/subject.c interface/observer.c interface.c
SOCKET = socket.c socket/accept.c socket/connect.c socket/listen.c
STREAM = stream.c stream/read.c stream/write.c
SERVER = server.c server/run.c server/close_conn.c server/poll.c \
server/handle_accept.c server/read.c server/write.c
LOGGER = logger.c logger/stderr.c logger/syslog.c
@ -60,6 +61,6 @@ bin_PROGRAMS = webgameserver
webgameserver_SOURCES = webgameserver.c \
$(IFACE) $(SOCKET) $(SERVER) $(LOGGER) $(MSG) $(REQ) \
$(WRITER) $(RESP) $(HEADER) $(PARSER) $(WORKER) $(CB) \
$(UTILS) $(MSGQ) $(SESSION)
$(UTILS) $(MSGQ) $(SESSION) $(STREAM)
webgameserver_CFLAGS = -Wall -I ../include/
webgameserver_LDFLAGS = -lrt
webgameserver_LDFLAGS = -lrt -lssl

4
src/cbuf/is_locked.c

@ -22,7 +22,9 @@
#include "cbuf.h"
void *
#include "commons.h"
Bool
cbufIsLocked(Cbuf this)
{
return this->lock;

5
src/cbuf/read.c

@ -25,10 +25,11 @@
#include <errno.h>
#include "cbuf.h"
#include "stream.h"
ssize_t
cbufRead(Cbuf this, int fd)
cbufRead(Cbuf this, Stream st)
{
ssize_t rrsize = 0;
size_t rsize = cbufGetFree(this);
@ -38,7 +39,7 @@ cbufRead(Cbuf this, int fd)
return -1;
}
rrsize = read(fd, cbufGetWrite(this), rsize);
rrsize = streamRead(st, cbufGetWrite(this), rsize);
switch (rrsize) {
case 0:

5
src/cbuf/write.c

@ -24,17 +24,18 @@
#include <unistd.h>
#include "cbuf.h"
#include "stream.h"
ssize_t
cbufWrite(Cbuf this, int fd)
cbufWrite(Cbuf this, Stream st)
{
ssize_t wwsize = 0;
size_t wsize = this->bused;
if (0 == wsize) return 0;
wwsize = write(fd, cbufGetRead(this), wsize);
wwsize = streamWrite(st, cbufGetRead(this), wsize);
switch (wwsize) {
case -1:

5
src/http/message.c

@ -79,7 +79,10 @@ httpMessageDtor(void * _this)
break;
case HTTP_MESSAGE_PIPED:
if (2 < this->handle) close(this->handle);
if (2 < (this->handle->handle).fd) {
close((this->handle->handle).fd);
}
delete(this->handle);
break;
default:

8
src/http/message/has_keep_alive.c

@ -30,13 +30,7 @@
#include "utils/memory.h"
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
#include "commons.h"
char
httpMessageHasKeepAlive(HttpMessage message)

5
src/http/parser/parse.c

@ -26,9 +26,10 @@
#include "interface/class.h"
#include "interface/http_intro.h"
#include "cbuf.h"
#include "stream.h"
ssize_t
httpParserParse(void * _this, int fd)
httpParserParse(void * _this, Stream st)
{
HttpParser this = _this;
int cont = 1;
@ -51,7 +52,7 @@ httpParserParse(void * _this, int fd)
this->incomplete = NULL;
}
if (0 > (read = cbufRead(this->buffer, fd))) {
if (0 > (read = cbufRead(this->buffer, st))) {
return read;
}

3
src/http/response/asset.c

@ -28,6 +28,7 @@
#include "class.h"
#include "interface/class.h"
#include "stream.h"
#include "http/response.h"
#include "http/message.h"
@ -69,7 +70,7 @@ httpResponseAsset(
message = (HttpMessage)response;
message->type = HTTP_MESSAGE_PIPED;
message->handle = handle;
message->handle = new(Stream, STREAM_FD, handle);
message->nbody = st.st_size;
httpHeaderAdd(&(message->header),

5
src/http/worker.c

@ -29,6 +29,7 @@
#include <search.h>
#include "class.h"
#include "stream.h"
#include "http/worker.h"
#include "http/parser.h"
#include "http/writer.h"
@ -107,8 +108,8 @@ httpWorkerClone(void * _this, void * _base)
this->sroot = &(base->session);
}
ssize_t httpWorkerProcess(void *, int);
ssize_t httpWorkerWrite(void *, int);
ssize_t httpWorkerProcess(void *, Stream);
ssize_t httpWorkerWrite(void *, Stream);
INIT_IFACE(Class, httpWorkerCtor, httpWorkerDtor, httpWorkerClone);
INIT_IFACE(StreamReader, httpWorkerProcess);

6
src/http/worker/process.c

@ -35,6 +35,7 @@
#include "http/message/queue.h"
#include "http/parser.h"
#include "session.h"
#include "stream.h"
#include "utils/memory.h"
@ -42,14 +43,14 @@ HttpMessage httpWorkerGetAsset(HttpRequest, const char *, const char *, size_t);
void httpWorkerAddCommonHeader(HttpMessage, HttpMessage);
ssize_t
httpWorkerProcess(HttpWorker this, int fd)
httpWorkerProcess(HttpWorker this, Stream st)
{
time_t t;
struct tm * tmp;
char buffer[200];
ssize_t size;
if (0 < (size = httpParserParse(this->parser, fd))) {
if (0 < (size = httpParserParse(this->parser, st))) {
int i;
HttpMessageQueue reqq = this->parser->queue;
HttpMessageQueue respq = this->writer->queue;
@ -92,7 +93,6 @@ httpWorkerProcess(HttpWorker this, int fd)
if (0 == strcmp("POST", request->method)) {
if (0 == strcmp("/me/", request->uri)) {
char * delim = memchr(rmessage->body, '=', rmessage->nbody);
char * key = rmessage->body;
char * val;
size_t nkey, nval;
char buffer[200];

5
src/http/worker/write.c

@ -24,11 +24,12 @@
#include "http/worker.h"
#include "http/writer.h"
#include "stream.h"
ssize_t
httpWorkerWrite(HttpWorker this, int fd)
httpWorkerWrite(HttpWorker this, Stream st)
{
return httpWriterWrite(this->writer, fd);
return httpWriterWrite(this->writer, st);
}
// vim: set ts=4 sw=4:

5
src/http/writer/write.c

@ -28,12 +28,13 @@
#include "http/message.h"
#include "http/writer.h"
#include "cbuf.h"
#include "stream.h"
#define MIN(x,y) ((x) < (y) ? (x) : (y))
#define MAX(x,y) ((x) > (y) ? (x) : (y))
ssize_t
httpWriterWrite(void * _this, int fd)
httpWriterWrite(void * _this, Stream st)
{
HttpWriter this = _this;
HttpMessageQueue respq = this->queue;
@ -103,7 +104,7 @@ httpWriterWrite(void * _this, int fd)
* write
*/
{
ssize_t written = cbufWrite(this->buffer, fd);
ssize_t written = cbufWrite(this->buffer, st);
if (0 <= written) {
this->written += written;

5
src/interface.c

@ -24,10 +24,7 @@
#include <stdlib.h>
#include "interface.h"
#ifndef TRUE
#define TRUE 1
#endif // TRUE
#include "commons.h"
static
inline

5
src/interface/stream_reader.c

@ -22,6 +22,7 @@
#include "class.h"
#include "interface/stream_reader.h"
#include "stream.h"
const struct interface i_StreamReader = {
"streamReader",
@ -29,11 +30,11 @@ const struct interface i_StreamReader = {
};
ssize_t
streamReaderRead(void * object, int fd)
streamReaderRead(void * object, Stream st)
{
ssize_t ret;
RETCALL(object, StreamReader, read, ret, fd);
RETCALL(object, StreamReader, read, ret, st);
return ret;
}

5
src/interface/stream_writer.c

@ -22,6 +22,7 @@
#include "class.h"
#include "interface/stream_writer.h"
#include "stream.h"
const struct interface i_StreamWriter = {
"streamWriter",
@ -29,11 +30,11 @@ const struct interface i_StreamWriter = {
};
ssize_t
streamWriterWrite(void * object, int fd)
streamWriterWrite(void * object, Stream st)
{
ssize_t ret;
RETCALL(object, StreamWriter, write, ret, fd);
RETCALL(object, StreamWriter, write, ret, st);
return ret;
}

1
src/server/close_conn.c

@ -33,6 +33,7 @@ serverCloseConn(Server this, unsigned int i)
delete((this->conns)[fd].sock);
delete((this->conns)[fd].worker);
delete((this->conns)[fd].stream);
memset(&(this->fds[i]), 0, sizeof(struct pollfd));
}

2
src/server/handle_accept.c

@ -28,6 +28,7 @@
#include "server.h"
#include "interface/class.h"
#include "interface/logger.h"
#include "stream.h"
int
serverHandleAccept(Server this)
@ -47,6 +48,7 @@ serverHandleAccept(Server this)
// clone worker
(this->conns)[acc->handle].worker = clone(this->worker);
(this->conns)[acc->handle].stream = new(Stream, STREAM_FD, acc->handle);
(this->fds)[this->nfds].fd = acc->handle;
(this->fds)[this->nfds].events = POLLIN;

5
src/server/read.c

@ -40,7 +40,10 @@ serverRead(Server this, unsigned int i)
return -1;
}
switch ((size = streamReaderRead((this->conns)[fd].worker, fd))) {
switch ((size = streamReaderRead(
(this->conns)[fd].worker,
(this->conns)[fd].stream)))
{
case -2:
/**
* normal close: this must be mapped to -2 within the

5
src/server/write.c

@ -40,7 +40,10 @@ serverWrite(Server this, unsigned int i)
return -1;
}
remaining = streamWriterWrite((this->conns)[fd].worker, fd);
remaining = streamWriterWrite(
(this->conns)[fd].worker,
(this->conns)[fd].stream);
switch(remaining) {
case -1:
serverCloseConn(this, i);

41
src/stream.c

@ -0,0 +1,41 @@
#include <stdarg.h>
#include <openssl/ssl.h>
#include "class.h"
#include "interface/class.h"
#include "stream.h"
static
int
streamCtor(void * _this, va_list * params)
{
Stream this = _this;
this->type = va_arg(* params, StreamHandleType);
switch(this->type) {
case STREAM_FD:
(this->handle).fd = va_arg(* params, int);
break;
case STREAM_SSL:
(this->handle).ssl = va_arg(* params, SSL*);
break;
default:
return -1;
}
return 0;
}
static
void
streamDtor(void * _this)
{
}
INIT_IFACE(Class, streamCtor, streamDtor, NULL);
CREATE_CLASS(Stream, NULL, IFACE(Class));
// vim: set ts=4 sw=4:

27
src/stream/read.c

@ -0,0 +1,27 @@
#include <openssl/ssl.h>
#include <unistd.h>
#include "stream.h"
ssize_t
streamRead(Stream this, void * buf, size_t count)
{
ssize_t done;
switch(this->type) {
case STREAM_FD:
done = read((this->handle).fd, buf, count);
break;
case STREAM_SSL:
done = SSL_read((this->handle).ssl, buf, count);
break;
default:
break;
}
return done;
}
// vim: set ts=4 sw=4:

27
src/stream/write.c

@ -0,0 +1,27 @@
#include <openssl/ssl.h>
#include <unistd.h>
#include "stream.h"
ssize_t
streamWrite(Stream this, void * buf, size_t count)
{
ssize_t done;
switch(this->type) {
case STREAM_FD:
done = write((this->handle).fd, buf, count);
break;
case STREAM_SSL:
done = SSL_write((this->handle).ssl, buf, count);
break;
default:
break;
}
return done;
}
// vim: set ts=4 sw=4:

8
src/utils/http.c

@ -8,13 +8,7 @@
#include "interface/class.h"
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
#include "commons.h"
char
isHttpVersion(const char * str, size_t len)

40
src/webgameserver.c

@ -33,6 +33,7 @@
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <errno.h>
#include "server.h"
#include "logger.h"
@ -133,7 +134,7 @@ main()
shm_unlink("/fooshm");
close(shm);
logger = new(LoggerSyslog, LOGGER_ERR);
logger = new(LoggerStderr, LOGGER_DEBUG);
worker = new(HttpWorker, "testserver", value);
server = new(Server, logger, worker, 11212, SOMAXCONN);
@ -149,21 +150,34 @@ main()
do {
pid_t w;
w = waitpid(pid, &status, WUNTRACED | WCONTINUED);
w = waitpid(pid, &status, 0);
if (w == -1) {
perror("waitpid");
exit(EXIT_FAILURE);
while (w == -1) {
switch(errno) {
case EINTR: w = waitpid(pid, &status, 0);
break;
case ECHILD: perror("no child");
// DROP THROUGH
default: w = 0;
}
}
if (WIFEXITED(status)) {
printf("exited, status=%d\n", WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
printf("killed by signal %d\n", WTERMSIG(status));
} else if (WIFSTOPPED(status)) {
printf("stopped by signal %d\n", WSTOPSIG(status));
} else if (WIFCONTINUED(status)) {
printf("continued\n");
if (0 < w) {
if (WIFEXITED(status)) {
loggerLog(logger, LOGGER_INFO,
"child exited, status=%d\n",
WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
loggerLog(logger, LOGGER_INFO,
"killed by signal %d\n",
WTERMSIG(status));
} else if (WIFSTOPPED(status)) {
loggerLog(logger, LOGGER_INFO,
"stopped by signal %d\n",
WSTOPSIG(status));
} else if (WIFCONTINUED(status)) {
loggerLog(logger, LOGGER_INFO, "continued\n");
}
}
} while (!WIFEXITED(status) && !WIFSIGNALED(status));

Loading…
Cancel
Save