diff --git a/src/cep_buffer_read.c b/src/cep_buffer_read.c index fa81a15..c6ad1e2 100644 --- a/src/cep_buffer_read.c +++ b/src/cep_buffer_read.c @@ -30,13 +30,11 @@ TR_cepBufferRead(TR_CommEndPoint this) { TR_RemoteData data = TR_socketRecv(this->transport, this->read_chunk_size); - if (! data) return FALSE; - - while (data) { - TR_cepAppendReadData(this, data); - data = TR_socketRecv(this->transport, this->read_chunk_size); - } + if (! data) return -1; // ment to trigger a close + if (data == (void*)-1) return -2; // remote close... shutdown + if (data == TR_emptyRemoteData) return FALSE; + TR_cepAppendReadData(this, data); return TRUE; } diff --git a/src/comm_manager.c b/src/comm_manager.c index 72bf4e1..313bf19 100644 --- a/src/comm_manager.c +++ b/src/comm_manager.c @@ -55,6 +55,7 @@ commManagerDtor(void * _this) for (i = 0; i < this->n_endpoints; i++) { TR_delete(this->endpoints[i]); } + TR_MEM_FREE(this->endpoints); } static diff --git a/src/connection.c b/src/connection.c index f052618..922d102 100644 --- a/src/connection.c +++ b/src/connection.c @@ -59,56 +59,56 @@ static TR_ProtoMessage connectionNextMessage(void * _this) { - TR_Connection this = _this; - TR_CommEndPoint comm = _this; - TR_RemoteData data = TR_queueGet(comm->read_buffer); + TR_Connection this = _this; + TR_CommEndPoint comm = _this; + TR_RemoteData data = TR_queueGet(comm->read_buffer); + TR_ProtoMessage ret_message = NULL; size_t end; - if (data && (! this->current_message || this->current_message->ready)) + if (NULL == data) return ret_message; + + if (! this->current_message || this->current_message->ready) { this->current_message = TR_protoCreateMessage(comm->protocol, data->remote); } - while (NULL != data) { - end = TR_protoParse(comm->protocol, this->current_message, data); - - if (end != ((TR_SizedData)data)->size) { - /** - * TODO - * This means that the parser has not consumed all of the data. - * We do not know the reason, but with HTTP this should only occur - * when the message is complete... anyway, to prevent us from - * looping forever because a protocol implementation is buggy - * we should close the connection after end was 0 the second time. - * This can be done by firing a close event. - */ - switch(end) { - default: - { - TR_RemoteData new_data = TR_new( - TR_RemoteData, - ((TR_SizedData)data)->data + end, - ((TR_SizedData)data)->size - end, - data->remote); - TR_delete(data); - data = new_data; - } - // intended drop through - - case 0: - TR_queuePutFirst(comm->read_buffer, data); - } - } - - if (this->current_message->ready) { - return this->current_message; + end = TR_protoParse(comm->protocol, this->current_message, data); + + if (end != ((TR_SizedData)data)->size) { + /** + * TODO + * This means that the parser has not consumed all of the data. + * We do not know the reason, but with HTTP this should only occur + * when the message is complete... anyway, to prevent us from + * looping forever because a protocol implementation is buggy + * we should close the connection after end was 0 the second time. + * This can be done by firing a close event. + */ + switch(end) { + default: + { + TR_RemoteData new_data = TR_new( + TR_RemoteData, + ((TR_SizedData)data)->data + end, + ((TR_SizedData)data)->size - end, + data->remote); + TR_delete(data); + data = new_data; + } + // intended drop through + + case 0: + TR_queuePutFirst(comm->read_buffer, data); } + } - data = TR_queueGet(comm->read_buffer); + if (this->current_message->ready) { + ret_message = this->current_message; + this->current_message = NULL; } - return NULL; + return ret_message; } static diff --git a/src/io_handler.c b/src/io_handler.c index 8459247..8dbef63 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -46,12 +46,34 @@ ioHandlerRead(void * _this, TR_Event event) { TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - if (TR_cepBufferRead(endpoint)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_NEW_DATA, - NULL); + switch (TR_cepBufferRead(endpoint)) { + default: + case FALSE: + break; + + case -1: + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + event->subject, + TR_CEP_EVENT_CLOSE, + NULL); + break; + + case -2: + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + event->subject, + TR_CEP_EVENT_SHUT_READ, + NULL); + break; + + case TRUE: + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + event->subject, + TR_CEP_EVENT_NEW_DATA, + NULL); + break; } return TRUE; diff --git a/src/protocol_handler.c b/src/protocol_handler.c index 88651c9..04db6e4 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -53,7 +53,7 @@ protocolHandlerParse(void * _this, TR_Event event) TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; TR_ProtoMessage message = TR_cepNextMessage(endpoint); - while (message) { + if (message) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, event->subject, @@ -65,8 +65,6 @@ protocolHandlerParse(void * _this, TR_Event event) // in the python code... TR_cepSetClose(endpoint); } - - message = TR_cepNextMessage(endpoint); } return TRUE; @@ -79,18 +77,19 @@ protocolHandlerCompose(void * _this, TR_Event event) TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; TR_ProtoMessage message = (TR_ProtoMessage)event->data; + if (message->close) { + // also check that we are a response. Well this is how it is done + // in the python code... + TR_cepSetClose(endpoint); + } + if (TR_cepCompose(endpoint, message)) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, event->subject, TR_CEP_EVENT_WRITE_READY, NULL); - } - - if (message->close) { - // also check that we are a response. Well this is how it is done - // in the python code... - TR_cepSetClose(endpoint); + TR_delete(message); } return TRUE; diff --git a/testers/leak.log b/testers/leak.log new file mode 100644 index 0000000..35c7a21 --- /dev/null +++ b/testers/leak.log @@ -0,0 +1,68 @@ +==25008== Memcheck, a memory error detector +==25008== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al. +==25008== Using Valgrind-3.9.0 and LibVEX; rerun with -h for copyright info +==25008== Command: ./testserver +==25008== +==25008== Conditional jump or move depends on uninitialised value(s) +==25008== at 0x4017A74: index (strchr.S:77) +==25008== by 0x400743D: expand_dynamic_string_token (dl-load.c:425) +==25008== by 0x400802D: _dl_map_object (dl-load.c:2302) +==25008== by 0x400138D: map_doit (rtld.c:626) +==25008== by 0x400E993: _dl_catch_error (dl-error.c:187) +==25008== by 0x4000B30: do_preload (rtld.c:815) +==25008== by 0x4004122: dl_main (rtld.c:1632) +==25008== by 0x401533B: _dl_sysdep_start (dl-sysdep.c:249) +==25008== by 0x4004A4C: _dl_start (rtld.c:331) +==25008== by 0x40011A7: ??? (in /lib64/ld-2.19.so) +==25008== +==25008== +==25008== HEAP SUMMARY: +==25008== in use at exit: 1,024 bytes in 4 blocks +==25008== total heap usage: 12,272 allocs, 12,268 frees, 1,601,216 bytes allocated +==25008== +==25008== 256 bytes in 1 blocks are definitely lost in loss record 1 of 4 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291) +==25008== by 0x4E3497C: newElement (memory.c:82) +==25008== by 0x4E34B19: TR_malloc (memory.c:442) +==25008== by 0x4E34B50: TR_calloc (memory.c:460) +==25008== by 0x4E35144: TR_classNewv (i_class.c:55) +==25008== by 0x4E3532E: TR_classNew (i_class.c:81) +==25008== by 0x4018DA: main (testserver.c:123) +==25008== +==25008== 256 bytes in 1 blocks are definitely lost in loss record 2 of 4 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291) +==25008== by 0x4E3497C: newElement (memory.c:82) +==25008== by 0x4E34B19: TR_malloc (memory.c:442) +==25008== by 0x4E34B50: TR_calloc (memory.c:460) +==25008== by 0x4E35144: TR_classNewv (i_class.c:55) +==25008== by 0x4E3532E: TR_classNew (i_class.c:81) +==25008== by 0x4018F2: main (testserver.c:124) +==25008== +==25008== 256 bytes in 1 blocks are definitely lost in loss record 3 of 4 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291) +==25008== by 0x4E3497C: newElement (memory.c:82) +==25008== by 0x4E34B19: TR_malloc (memory.c:442) +==25008== by 0x4E34B50: TR_calloc (memory.c:460) +==25008== by 0x4E35144: TR_classNewv (i_class.c:55) +==25008== by 0x4E3532E: TR_classNew (i_class.c:81) +==25008== by 0x40190A: main (testserver.c:125) +==25008== +==25008== 256 bytes in 1 blocks are definitely lost in loss record 4 of 4 +==25008== at 0x4C28730: malloc (vg_replace_malloc.c:291) +==25008== by 0x4E3497C: newElement (memory.c:82) +==25008== by 0x4E34B19: TR_malloc (memory.c:442) +==25008== by 0x4E34B50: TR_calloc (memory.c:460) +==25008== by 0x4E35144: TR_classNewv (i_class.c:55) +==25008== by 0x4E3532E: TR_classNew (i_class.c:81) +==25008== by 0x401920: main (testserver.c:126) +==25008== +==25008== LEAK SUMMARY: +==25008== definitely lost: 1,024 bytes in 4 blocks +==25008== indirectly lost: 0 bytes in 0 blocks +==25008== possibly lost: 0 bytes in 0 blocks +==25008== still reachable: 0 bytes in 0 blocks +==25008== suppressed: 0 bytes in 0 blocks +==25008== +==25008== For counts of detected and suppressed errors, rerun with: -v +==25008== Use --track-origins=yes to see where uninitialised values come from +==25008== ERROR SUMMARY: 5 errors from 5 contexts (suppressed: 0 from 0) diff --git a/testers/testserver.c b/testers/testserver.c index 78ea34c..b05b434 100644 --- a/testers/testserver.c +++ b/testers/testserver.c @@ -1,4 +1,5 @@ #include +#include #include "trbase.h" #include "trcomm.h" @@ -6,6 +7,7 @@ TR_CLASS(TestHandler) { TR_EXTENDS(TR_EventHandler); + unsigned long long handled; }; TR_INSTANCE_INIT(TestHandler); TR_CLASSVARS_DECL(TestHandler) { @@ -16,7 +18,8 @@ static int testHandlerHeartbeat(TR_EventHandler this, TR_Event event) { - puts("heartbeat"); + printf("handled: %llu/s\n", ((TestHandler)this)->handled); + ((TestHandler)this)->handled = 0; return FALSE; } @@ -24,7 +27,26 @@ static int testHandlerNewMessage(TR_EventHandler this, TR_Event event) { - puts("new message"); + TR_ProtoMessageRaw msg = event->data; + TR_SizedData data = (TR_SizedData)msg->data; + char buf[data->size + 1]; + int i; + + ((TestHandler)this)->handled++; + + memcpy(buf, data->data, data->size); + buf[data->size] = 0; + for (i = 0; buf[i]; i++) { + if (! isprint(buf[i])) buf[i] = '.'; + } +// printf("echo message: %s(%zd)\n", buf, data->size); + + TR_eventHandlerIssueEvent( + (TR_EventHandler)this, + event->subject, + TR_CEP_EVENT_SEND_MSG, + event->data); + return FALSE; } @@ -49,6 +71,8 @@ int testHandlerCtor(void * _this, va_list * params) { TR_PARENTCALL(TestHandler, _this, TR_Class, ctor, params); + ((TestHandler)_this)->handled = 0; + return 0; } @@ -102,6 +126,10 @@ main (int argc, char * argv[]) { TR_CommManager cmgr = (TR_CommManager)TR_new(TR_CommManagerPoll); TR_EventDispatcher dispatcher = TR_new(TR_EventDispatcher); + TR_Connector connector = TR_new(TR_Connector); + TR_IoHandler io_handler = TR_new(TR_IoHandler); + TR_ProtocolHandler protocol_handler = TR_new(TR_ProtocolHandler); + TestHandler test_handler = TR_new(TestHandler); TR_ConnEntryPoint ep; TR_TcpSocket ep_sock; TR_Protocol protocol; @@ -109,14 +137,14 @@ main (int argc, char * argv[]) TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger); TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)cmgr); - TR_eventDispatcherRegisterHandler(dispatcher, - (TR_EventHandler)TR_new(TR_Connector)); - TR_eventDispatcherRegisterHandler(dispatcher, - (TR_EventHandler)TR_new(TR_IoHandler)); - TR_eventDispatcherRegisterHandler(dispatcher, - (TR_EventHandler)TR_new(TR_ProtocolHandler)); - TR_eventDispatcherRegisterHandler(dispatcher, - (TR_EventHandler)TR_new(TestHandler)); + TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)connector); + TR_eventDispatcherRegisterHandler(dispatcher, (TR_EventHandler)io_handler); + TR_eventDispatcherRegisterHandler( + dispatcher, + (TR_EventHandler)protocol_handler); + TR_eventDispatcherRegisterHandler( + dispatcher, + (TR_EventHandler)test_handler); protocol = TR_new(TR_ProtocolRaw); ep_sock = TR_new(TR_TcpSocket, TR_logger, "0.0.0.0", 5678, 0); @@ -126,10 +154,25 @@ main (int argc, char * argv[]) TR_eventDispatcherSetHeartbeat(dispatcher, 1000); TR_eventDispatcherStart(dispatcher); - TR_eventHandlerClassCleanup(TestHandler); + + puts("cleanup..."); TR_delete(cmgr); TR_delete(dispatcher); + TR_delete(connector); + TR_delete(io_handler); + TR_delete(protocol_handler); + TR_delete(test_handler); + TR_delete(protocol); + //TR_delete(ep); + + TR_eventHandlerClassCleanup(TestHandler); + TR_eventHandlerClassCleanup(TR_ProtocolHandler); + TR_eventHandlerClassCleanup(TR_IoHandler); + TR_eventHandlerClassCleanup(TR_Connector); + TR_eventHandlerClassCleanup(TR_CommManagerPoll); + + TR_cleanup(); return 0; }