Browse Source

changed socket handling according to my definition...and make sockets nonblocking as the answer of poll is just a guess...

release0.1.5
Georg Hopp 12 years ago
parent
commit
56fdd4bd00
  1. 3
      TODO
  2. 1
      include/socket.h
  3. 17
      src/cbuf/read.c
  4. 45
      src/http/parser/parse.c
  5. 14
      src/http/worker/process.c
  6. 1
      src/server/handle_accept.c
  7. 37
      src/server/run.c
  8. 1
      src/server/server.c
  9. 2
      src/socket/Makefile.am
  10. 8
      src/socket/accept.c
  11. 1
      src/socket/socket.c
  12. 25
      src/stream/read.c
  13. 1
      src/taskrambler.c

3
TODO

@ -17,3 +17,6 @@ VERY BIG TODO:
GET /images/waldschrat.jpg HTTP/1.1^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D GET /images/waldschrat.jpg HTTP/1.1^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D^[[D
^[[D = backspace... ^[[D = backspace...
- the unexpected connection close seems to occur just on concurrency over
1000 paralell connections...maybe this gives a hint.

1
include/socket.h

@ -40,6 +40,7 @@ CLASS(Sock) {
void socketConnect(Sock this, const char * addr, char (*)[16]); void socketConnect(Sock this, const char * addr, char (*)[16]);
void socketListen(Sock this, int backlog); void socketListen(Sock this, int backlog);
Sock socketAccept(Sock this, char (*remoteAddr)[16]); Sock socketAccept(Sock this, char (*remoteAddr)[16]);
void socketNonblock(Sock this);
#endif // __SOCKET_H__ #endif // __SOCKET_H__

17
src/cbuf/read.c

@ -31,8 +31,8 @@
ssize_t ssize_t
cbufRead(Cbuf this, Stream st) cbufRead(Cbuf this, Stream st)
{ {
ssize_t rrsize = 0;
size_t rsize = cbufGetFree(this);
size_t rsize = cbufGetFree(this);
ssize_t rrsize;
if (0 == rsize) { if (0 == rsize) {
errno = ECBUFOVFL; errno = ECBUFOVFL;
@ -41,17 +41,8 @@ cbufRead(Cbuf this, Stream st)
rrsize = streamRead(st, cbufGetWrite(this), rsize); rrsize = streamRead(st, cbufGetWrite(this), rsize);
switch (rrsize) {
case 0:
rrsize = -2;
// DROP THROUGH
case -1:
break;
default:
cbufIncWrite(this, rrsize);
break;
if (0 < rrsize) {
cbufIncWrite(this, rrsize);
} }
return rrsize; return rrsize;

45
src/http/parser/parse.c

@ -61,6 +61,8 @@ httpParserParse(void * _this, Stream st)
} }
if (0 > (read = cbufRead(this->buffer, st))) { if (0 > (read = cbufRead(this->buffer, st))) {
cbufRelease(this->buffer);
this->ourLock = FALSE;
return read; return read;
} }
@ -75,10 +77,9 @@ httpParserParse(void * _this, Stream st)
cbufRelease(this->buffer); cbufRelease(this->buffer);
this->ourLock = FALSE; this->ourLock = FALSE;
cont = 0; cont = 0;
break;
} }
break;
case HTTP_MESSAGE_START: case HTTP_MESSAGE_START:
if (NULL == (line = cbufGetLine(this->buffer, &line_end))) { if (NULL == (line = cbufGetLine(this->buffer, &line_end))) {
if (! cbufIsEmpty(this->buffer)) { if (! cbufIsEmpty(this->buffer)) {
@ -98,17 +99,16 @@ httpParserParse(void * _this, Stream st)
if (NULL == this->current) { if (NULL == this->current) {
cbufRelease(this->buffer); cbufRelease(this->buffer);
this->ourLock = FALSE; this->ourLock = FALSE;
return -1;
return -2; // a server error occured can't process...
} }
httpParserRequestVars(this); httpParserRequestVars(this);
this->state = HTTP_MESSAGE_INTRO_DONE; this->state = HTTP_MESSAGE_INTRO_DONE;
break;
case HTTP_MESSAGE_INTRO_DONE: case HTTP_MESSAGE_INTRO_DONE:
if (NULL == (line = cbufGetLine(this->buffer, &line_end))) { if (NULL == (line = cbufGetLine(this->buffer, &line_end))) {
if (! cbufIsEmpty(this->buffer)) { if (! cbufIsEmpty(this->buffer)) {
this->isize = this->buffer->bused;
this->isize = this->buffer->bused;
this->incomplete = memMalloc(this->isize); this->incomplete = memMalloc(this->isize);
memcpy(this->incomplete, memcpy(this->incomplete,
cbufGetData(this->buffer, this->isize), cbufGetData(this->buffer, this->isize),
@ -120,35 +120,34 @@ httpParserParse(void * _this, Stream st)
break; break;
} }
if (0 == strlen(line)) {
this->state = HTTP_MESSAGE_HEADERS_DONE;
if (0 != strlen(line)) {
httpParserHeader(this, line, line_end);
break; break;
} }
httpParserHeader(this, line, line_end);
break;
this->state = HTTP_MESSAGE_HEADERS_DONE;
case HTTP_MESSAGE_HEADERS_DONE: case HTTP_MESSAGE_HEADERS_DONE:
if (this->current->dbody == this->current->nbody) { if (this->current->dbody == this->current->nbody) {
this->state = HTTP_MESSAGE_DONE; this->state = HTTP_MESSAGE_DONE;
break;
}
} else {
if (cbufIsEmpty(this->buffer)) {
cbufRelease(this->buffer);
this->ourLock = FALSE;
cont = 0;
break;
}
cbufIncRead(
this->buffer,
httpParserBody(
this,
cbufGetRead(this->buffer),
this->buffer->bused));
if (cbufIsEmpty(this->buffer)) {
cbufRelease(this->buffer);
this->ourLock = FALSE;
cont = 0;
break; break;
} }
cbufIncRead(
this->buffer,
httpParserBody(
this,
cbufGetRead(this->buffer),
this->buffer->bused));
break;
case HTTP_MESSAGE_DONE: case HTTP_MESSAGE_DONE:
{ {
HttpHeader enc = hashGet( HttpHeader enc = hashGet(

14
src/http/worker/process.c

@ -54,10 +54,13 @@ char * httpWorkerGetMimeType(HttpWorker, const char * extension);
ssize_t ssize_t
httpWorkerProcess(HttpWorker this, Stream st) httpWorkerProcess(HttpWorker this, Stream st)
{ {
ssize_t size;
ssize_t requests = httpParserParse(this->parser, st);
if (0 < (size = httpParserParse(this->parser, st))) {
if (0 > requests) {
return requests;
}
if (0 < requests) {
while (! queueEmpty(this->parser->queue)) { while (! queueEmpty(this->parser->queue)) {
HttpRequest request = queueGet(this->parser->queue); HttpRequest request = queueGet(this->parser->queue);
HttpMessage response = NULL; HttpMessage response = NULL;
@ -212,18 +215,13 @@ httpWorkerProcess(HttpWorker this, Stream st)
} }
httpWorkerAddCommonHeader((HttpMessage)request, response); httpWorkerAddCommonHeader((HttpMessage)request, response);
delete(request); delete(request);
queuePut(this->writer->queue, response); queuePut(this->writer->queue, response);
size = this->writer->queue->nmsg;
response = NULL; response = NULL;
} }
} }
return size;
return this->writer->queue->nmsg;
} }
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

1
src/server/handle_accept.c

@ -44,6 +44,7 @@ serverHandleAccept(Server this, unsigned int i)
} }
acc = socketAccept((0 == i)? this->sock : this->sockSSL, &remoteAddr); acc = socketAccept((0 == i)? this->sock : this->sockSSL, &remoteAddr);
socketNonblock(acc);
if (-1 != acc->handle) { if (-1 != acc->handle) {
switch(i) { switch(i) {

37
src/server/run.c

@ -33,11 +33,12 @@ ssize_t serverWrite(Server, unsigned int);
void void
serverRun(Server this) serverRun(Server this)
{ {
int events = 0;
loggerLog(this->logger, LOGGER_INFO, "service started"); loggerLog(this->logger, LOGGER_INFO, "service started");
while (!doShutdown) //! until error or signal while (!doShutdown) //! until error or signal
{ {
int events = 0;
unsigned int i; unsigned int i;
if (0 == events) { if (0 == events) {
@ -71,12 +72,25 @@ serverRun(Server this)
if (0 != ((this->fds)[i].revents & POLLIN)) { if (0 != ((this->fds)[i].revents & POLLIN)) {
ssize_t processed = serverRead(this, i); ssize_t processed = serverRead(this, i);
if (0 < processed) {
(this->fds)[i].revents &= ~POLLIN;
// don't poll this one until I say.
(this->fds)[i].events &= ~POLLIN;
if (0 > processed) {
events--; events--;
switch (processed) {
case -1: // poll me again
(this->fds)[i].events |= POLLIN;
(this->fds)[i].revents &= ~POLLIN;
break;
case -2: // close me...
serverCloseConn(this, i);
break;
}
} }
if (processed > 0) {
if (0 < processed) {
(this->fds)[i].events |= POLLOUT; (this->fds)[i].events |= POLLOUT;
} }
} }
@ -87,11 +101,20 @@ serverRun(Server this)
if (0 != ((this->fds)[i].revents & POLLOUT)) { if (0 != ((this->fds)[i].revents & POLLOUT)) {
ssize_t remaining = serverWrite(this, i); ssize_t remaining = serverWrite(this, i);
if (0 > remaining) {
(this->fds)[i].events &= ~POLLOUT;
if (0 >= remaining) {
/*
* 0 means queue was empty...try again next
* time...no need to poll again.
* Anyway, most likely we need to read again
* so lets finish this event for now.
*/
events--; events--;
switch (remaining) { switch (remaining) {
case -1: // poll me again case -1: // poll me again
(this->fds)[i].events |= POLLOUT;
(this->fds)[i].revents &= ~POLLOUT; (this->fds)[i].revents &= ~POLLOUT;
break; break;
@ -100,10 +123,6 @@ serverRun(Server this)
break; break;
} }
} }
if (0 == remaining) {
(this->fds)[i].events &= ~POLLOUT;
}
} }
if (0 > events) if (0 > events)

1
src/server/server.c

@ -70,6 +70,7 @@ serverCtor(void * _this, va_list * params)
this->conns = memCalloc(sizeof(struct conns), this->max_fds); this->conns = memCalloc(sizeof(struct conns), this->max_fds);
this->sock = new(Sock, this->logger, port); this->sock = new(Sock, this->logger, port);
socketNonblock(this->sock);
flags = fcntl(this->sock->handle, F_GETFL, 0); flags = fcntl(this->sock->handle, F_GETFL, 0);
fcntl(this->sock->handle, F_SETFL, flags | O_NONBLOCK); fcntl(this->sock->handle, F_SETFL, flags | O_NONBLOCK);

2
src/socket/Makefile.am

@ -2,5 +2,5 @@ ACLOCAL_AMFLAGS = -I m4
noinst_LIBRARIES = libsocket.a noinst_LIBRARIES = libsocket.a
libsocket_a_SOURCES = socket.c accept.c connect.c listen.c
libsocket_a_SOURCES = socket.c accept.c connect.c listen.c nonblock.c
libsocket_a_CFLAGS = $(CFLAGS) -Wall -I ../../include/ libsocket_a_CFLAGS = $(CFLAGS) -Wall -I ../../include/

8
src/socket/accept.c

@ -22,6 +22,7 @@
#include <errno.h> // for errno #include <errno.h> // for errno
#include <unistd.h> #include <unistd.h>
#include <fcntl.h>
#include "socket.h" #include "socket.h"
#include "class.h" #include "class.h"
@ -36,13 +37,6 @@ socketAccept(Sock this, char (*remoteAddr)[16])
// Set the size of the in-out parameter // Set the size of the in-out parameter
len = sizeof(this->addr); len = sizeof(this->addr);
/**
* \todo Uhh, this is bad. we open a new socket additionally to
* the one we get from the accept call.
* i have to change the socket constructor to be able to create
* the data structure without creation of a socket at all.
* For now i simply close the socket here.... :D
*/
sock = new(Sock, this->log, -1); sock = new(Sock, this->log, -1);
// Wait for a client to connect // Wait for a client to connect

1
src/socket/socket.c

@ -35,6 +35,7 @@ socketCtor(void * _this, va_list * params)
Sock this = _this; Sock this = _this;
int reUse = 1; //! \todo make this configurable int reUse = 1; //! \todo make this configurable
int port; int port;
int nonblock;
this->log = va_arg(* params, Logger); this->log = va_arg(* params, Logger);
port = va_arg(* params, int); port = va_arg(* params, int);

25
src/stream/read.c

@ -22,6 +22,7 @@
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include "stream.h" #include "stream.h"
@ -31,8 +32,30 @@ streamRead(Stream this, void * buf, size_t count)
ssize_t done; ssize_t done;
switch(this->type) { switch(this->type) {
ssize_t _read;
case STREAM_FD: case STREAM_FD:
done = read((this->handle).fd, buf, count);
_read = read((this->handle).fd, buf, count);
if (_read < 0) {
switch (errno) {
case EINTR:
case ENOMEM:
done = 0;
break;
case (EAGAIN|EWOULDBLOCK):
done = -1;
break;
default:
done = -2;
break;
}
} else if (_read == 0) {
done = -2;
} else {
done = _read;
}
break; break;
case STREAM_SSL: case STREAM_SSL:

1
src/taskrambler.c

@ -45,6 +45,7 @@
#include "utils/signalHandling.h" #include "utils/signalHandling.h"
#include "utils/memory.h" #include "utils/memory.h"
#include "utils/mime_type.h"
#define DEFAULT_SECS 10 #define DEFAULT_SECS 10
//#define DEFAULT_USECS (1000000 / HZ * 2) //#define DEFAULT_USECS (1000000 / HZ * 2)

Loading…
Cancel
Save