From 521003633534d3e7ce8dd9eaa9ea65f9205c67e0 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Sat, 30 Aug 2014 19:17:58 +0100 Subject: [PATCH] generally epoll is working... sadly I removed the is_writing flag in the CommEndPoint which was neccessary... I have to add it again. --- include/tr/comm_manager.h | 5 +- include/tr/comm_manager_epoll.h | 3 + include/tr/comm_manager_poll.h | 3 +- include/tr/interface/comm_manager.h | 4 +- src/cep_write_buffered.c | 16 ++- src/comm_end_point.c | 16 +++ src/comm_manager_epoll.c | 168 ++++++++++++++++------------ src/comm_manager_poll.c | 14 +-- src/comm_manager_shutdown.c | 2 +- src/conn_entry_point.c | 9 +- src/connection.c | 14 ++- src/datagram_entry_point.c | 5 +- src/datagram_service.c | 9 +- src/io_handler.c | 17 ++- src/protocol_handler.c | 2 - testers/test_handler.c | 13 ++- testers/testclient.sh | 4 +- testers/testserver2.c | 2 +- 18 files changed, 187 insertions(+), 119 deletions(-) diff --git a/include/tr/comm_manager.h b/include/tr/comm_manager.h index 2f0ba91..6cfabcb 100644 --- a/include/tr/comm_manager.h +++ b/include/tr/comm_manager.h @@ -24,7 +24,6 @@ #define __TR_COMM_MANAGER_H__ #include -#include #include "trbase.h" #include "trevent.h" @@ -35,8 +34,8 @@ TR_CLASS(TR_CommManager) { TR_EXTENDS(TR_EventHandler); TR_CommEndPoint * endpoints; - nfds_t n_endpoints; - nfds_t max_handle; + size_t n_endpoints; + size_t max_handle; }; TR_INSTANCE_INIT(TR_CommManager); TR_CLASSVARS_DECL(TR_CommManager) { diff --git a/include/tr/comm_manager_epoll.h b/include/tr/comm_manager_epoll.h index acf100c..63ea5fb 100644 --- a/include/tr/comm_manager_epoll.h +++ b/include/tr/comm_manager_epoll.h @@ -27,6 +27,7 @@ #include #include "trbase.h" +#include "trdata.h" #include "trevent.h" TR_CLASS(TR_CommManagerEpoll) { @@ -34,6 +35,8 @@ TR_CLASS(TR_CommManagerEpoll) { int handle; struct epoll_event * events; + TR_Queue read_ready; + TR_Queue write_ready; }; TR_INSTANCE_INIT(TR_CommManagerEpoll); TR_CLASSVARS_DECL(TR_CommManagerEpoll) { diff --git a/include/tr/comm_manager_poll.h b/include/tr/comm_manager_poll.h index 78d4fde..aaf88f2 100644 --- a/include/tr/comm_manager_poll.h +++ b/include/tr/comm_manager_poll.h @@ -32,7 +32,7 @@ TR_CLASS(TR_CommManagerPoll) { TR_EXTENDS(TR_CommManager); - struct pollfd * fds; + struct pollfd * fds; }; TR_INSTANCE_INIT(TR_CommManagerPoll); TR_CLASSVARS_DECL(TR_CommManagerPoll) { @@ -42,4 +42,3 @@ TR_CLASSVARS_DECL(TR_CommManagerPoll) { #endif // __TR_COMM_MANAGER_POLL_H__ // vim: set ts=4 sw=4: - diff --git a/include/tr/interface/comm_manager.h b/include/tr/interface/comm_manager.h index 8da1034..cec7e31 100644 --- a/include/tr/interface/comm_manager.h +++ b/include/tr/interface/comm_manager.h @@ -47,8 +47,8 @@ TR_INTERFACE(TR_CommManager) { fptr_TR_commManagerDisableWrite disableWrite; fptr_TR_commManagerEnableRead enableRead; fptr_TR_commManagerClose close; - fptr_TR_commManagerShutdownRead shutdownWrite; - fptr_TR_commManagerShutdownWrite shutdownRead; + fptr_TR_commManagerShutdownWrite shutdownWrite; + fptr_TR_commManagerShutdownRead shutdownRead; }; void TR_commManagerAddEndpoint(void *, TR_CommEndPoint); diff --git a/src/cep_write_buffered.c b/src/cep_write_buffered.c index b871d6a..c41af6c 100644 --- a/src/cep_write_buffered.c +++ b/src/cep_write_buffered.c @@ -28,12 +28,24 @@ int TR_cepWriteBuffered(TR_CommEndPoint this) { - TR_RemoteData data = TR_cepNextWriteData(this); - int send = TR_socketSend(this->transport, data); + TR_RemoteData data; + int send; + +// fprintf(stderr, "%s(%p): before get write data: %p / %zd messages\n", +// __func__, this, data, this->write_buffer->nmsg); + data = TR_cepNextWriteData(this); +// fprintf(stderr, "%s(%p): get write data: %p / %zd messages\n", +// __func__, this, data, this->write_buffer->nmsg); +// fflush(stderr); + + send = TR_socketSend(this->transport, data); switch (send) { case FALSE: // EAGAIN TR_queuePutFirst(this->write_buffer, data); +// fprintf(stderr, "%s(%p): put first write data: %p / %zd messages\n", +// __func__, this, data, this->write_buffer->nmsg); +// fflush(stderr); break; case -1: // FAILURE diff --git a/src/comm_end_point.c b/src/comm_end_point.c index fe899ce..5e02cf6 100644 --- a/src/comm_end_point.c +++ b/src/comm_end_point.c @@ -77,6 +77,21 @@ commEndPointCvInit(TR_class_ptr cls) TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE); } +const char * TR_cepEventStrings[] = { + "TR_CEP_EVENT_READ_READY", + "TR_CEP_EVENT_READ_BLOCK", + "TR_CEP_EVENT_WRITE_READY", + "TR_CEP_EVENT_UPGRADE", + "TR_CEP_EVENT_NEW_DATA", + "TR_CEP_EVENT_PENDING_DATA", + "TR_CEP_EVENT_END_DATA", + "TR_CEP_EVENT_NEW_MSG", + "TR_CEP_EVENT_SEND_MSG", + "TR_CEP_EVENT_SHUT_READ", + "TR_CEP_EVENT_SHUT_WRITE", + "TR_CEP_EVENT_CLOSE" +}; + intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1]; TR_INIT_IFACE(TR_Class, commEndPointCtor, commEndPointDtor, NULL); TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL); @@ -87,6 +102,7 @@ TR_CREATE_CLASS( TR_IF(TR_Class), TR_IF(TR_CommEndPoint)) = { { + TR_cepEventStrings, TR_CEP_EVENT_MAX + 1, comm_end_point_events } diff --git a/src/comm_manager_epoll.c b/src/comm_manager_epoll.c index a1f3ca8..2d7a0a7 100644 --- a/src/comm_manager_epoll.c +++ b/src/comm_manager_epoll.c @@ -25,6 +25,7 @@ #include #include "trbase.h" +#include "trdata.h" #include "trevent.h" #include "tr/comm_manager.h" @@ -44,16 +45,14 @@ commManagerEpollCtor(void * _this, va_list * params) { TR_CommManagerEpoll this = _this; TR_CommManager cmgr = _this; - nfds_t i; TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params); - this->handle = epoll_create(cmgr->n_endpoints); - this->events = TR_malloc(sizeof(struct epoll_event) * cmgr->n_endpoints); - for (i = 0; i < cmgr->n_endpoints; i++) { - this->events[i].data.ptr = NULL; - this->events[i].events = EPOLLET | EPOLLONESHOT; - } + this->handle = epoll_create(cmgr->n_endpoints); + this->read_ready = TR_new(TR_Queue); + this->write_ready = TR_new(TR_Queue); + this->read_ready->free_msgs = 0; + this->write_ready->free_msgs = 0; return 0; } @@ -64,8 +63,10 @@ commManagerEpollDtor(void * _this) { TR_CommManagerEpoll this = _this; + TR_delete(this->read_ready); + TR_delete(this->write_ready); + close(this->handle); - TR_MEM_FREE(this->events); TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor); } @@ -75,11 +76,12 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) { TR_CommManagerEpoll this = _this; int handle = endpoint->transport->handle; + struct epoll_event event; - this->events[handle].data.ptr = endpoint; - this->events[handle].events |= EPOLLIN; + event.data.ptr = endpoint; + event.events = EPOLLIN | EPOLLOUT | EPOLLET; - epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &(this->events[handle])); + epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event); } static @@ -88,6 +90,11 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) { TR_CommManagerEpoll this = _this; int i, nevents; + TR_Queue node; + + if (0 != (this->read_ready->nmsg & this->write_ready->nmsg)) { + timeout = 0; + } nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout); @@ -95,113 +102,132 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) TR_CommEndPoint endpoint = (TR_CommEndPoint)events[i].data.ptr; if ((events[i].events & EPOLLIN) == EPOLLIN) { - TR_Event event; - if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) && ((TR_TcpSocket)endpoint->transport)->listen) { - event = TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CET_EVENT_ACC_READY, - NULL); + TR_eventHandlerIssueEvent((TR_EventHandler)this, + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CET_EVENT_ACC_READY, + NULL)); } else { - event = TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_READ_READY, - NULL); + if (! ((TR_EventSubject)endpoint)->fin) { + TR_queuePut(this->read_ready, endpoint); + } } - - TR_eventHandlerIssueEvent((TR_EventHandler)this, event); - this->events[i].events &= ~EPOLLIN; } if ((events[i].events & EPOLLOUT) == EPOLLOUT) { - TR_Event _event = TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_WRITE_READY, - NULL); + if (TR_cepHasPendingData(endpoint) && + ! ((TR_EventSubject)endpoint)->fin) { + TR_queuePut(this->write_ready, endpoint); + } + } + } + + /* now issue reads and write events */ + for (node=this->read_ready->first; node; node=node->next) { + TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg; + + if (! TR_socketFinRd(endpoint->transport)) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)this, + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_READ_READY, + NULL)); + } + } - TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); - this->events[i].events &= ~EPOLLOUT; + for (node=this->write_ready->first; node; node=node->next) { + TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg; + + if (! TR_socketFinWr(endpoint->transport)) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)this, + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_WRITE_READY, + NULL)); } } } static void -TR_commManagerEpollEnableWrite(void * _this, TR_Event event) +TR_commManagerEpollRemoveWrite(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - if (! TR_socketFinWr(endpoint->transport)) { - int handle = endpoint->transport->handle; - - this->events[handle].data.ptr = endpoint; - this->events[handle].events |= EPOLLOUT; - - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); - } + TR_queueDelete(this->write_ready, endpoint); } static void -TR_commManagerEpollDisableWrite(void * _this, TR_Event event) +TR_commManagerEpollRemoveRead(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - int handle = endpoint->transport->handle; - - this->events[handle].data.ptr = endpoint; - this->events[handle].events &= ~EPOLLOUT; - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); + TR_queueDelete(this->read_ready, endpoint); } static void -TR_commManagerEpollEnableRead(void * _this, TR_Event event) +TR_commManagerEpollClose(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - if (! TR_socketFinRd(endpoint->transport)) { - int handle = endpoint->transport->handle; - - this->events[handle].data.ptr = endpoint; - this->events[handle].events |= EPOLLIN; + TR_queueDelete(this->read_ready, endpoint); + TR_queueDelete(this->write_ready, endpoint); - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); - } + epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL); } static void -TR_commManagerEpollDisableRead(void * _this, TR_Event event) +TR_commManagerEpollShutRead(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - int handle = endpoint->transport->handle; + struct epoll_event _event; + + TR_queueDelete(this->read_ready, endpoint); - this->events[handle].data.ptr = endpoint; - this->events[handle].events &= ~EPOLLIN; + _event.data.ptr = endpoint; + _event.events = EPOLLOUT | EPOLLET; - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); + epoll_ctl( + this->handle, + EPOLL_CTL_MOD, + endpoint->transport->handle, + &_event); } static void -TR_commManagerEpollClose(void * _this, TR_Event event) +TR_commManagerEpollShutWrite(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - int handle = endpoint->transport->handle; + struct epoll_event _event; + + TR_queueDelete(this->write_ready, endpoint); - this->events[handle].data.ptr = NULL; - this->events[handle].events = EPOLLET | EPOLLONESHOT; + _event.data.ptr = endpoint; + _event.events = EPOLLIN | EPOLLET; - epoll_ctl(this->handle, EPOLL_CTL_DEL, handle, NULL); + epoll_ctl( + this->handle, + EPOLL_CTL_MOD, + endpoint->transport->handle, + &_event); } + +static void TR_commManagerEpollNoop(void * _this, TR_Event event) {} + static void TR_commManagerEpollCvInit(TR_class_ptr cls) { @@ -211,14 +237,14 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) { TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL); TR_INIT_IFACE( TR_CommManager, - TR_commManagerEpollAddEndpoint, - TR_commManagerEpollSelect, - TR_commManagerEpollEnableWrite, - TR_commManagerEpollDisableWrite, - TR_commManagerEpollEnableRead, - TR_commManagerEpollClose, - TR_commManagerEpollDisableRead, - TR_commManagerEpollDisableWrite); + TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON + TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT + TR_commManagerEpollRemoveWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK + TR_commManagerEpollNoop, // TR_CEP_EVENT_END_DATA + TR_commManagerEpollRemoveRead, // TR_CEP_EVENT_READ_BLOCK + TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE + TR_commManagerEpollShutWrite, // TR_CEP_EVENT_SHUT_READ + TR_commManagerEpollShutRead); // TR_CEP_EVENT_SHUT_WRITE TR_CREATE_CLASS( TR_CommManagerEpoll, TR_CommManager, diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index bd21383..8f092b2 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -42,6 +42,7 @@ commManagerPollCtor(void * _this, va_list * params) nfds_t i; TR_PARENTCALL(TR_CommManagerPoll, _this, TR_Class, ctor, params); + this->fds = TR_malloc(sizeof(struct pollfd) * cmgr->n_endpoints); for (i = 0; i < cmgr->n_endpoints; i++) { this->fds[i].fd = -1; @@ -59,7 +60,6 @@ commManagerPollDtor(void * _this) TR_CommManagerPoll this = _this; TR_MEM_FREE(this->fds); - TR_PARENTCALL(TR_CommManagerPoll, _this, TR_Class, dtor); } static @@ -67,6 +67,7 @@ void TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint) { TR_CommManagerPoll this = _this; + this->fds[endpoint->transport->handle].fd = endpoint->transport->handle; this->fds[endpoint->transport->handle].events = POLLIN; } @@ -104,21 +105,16 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) } TR_eventHandlerIssueEvent((TR_EventHandler)this, event); - // deactivate read poll mimic edge level behaviour - this->fds[endpoint->transport->handle].events &= ~POLLIN; } if ((this->fds[i].revents & POLLOUT) == POLLOUT) { + nevents--; TR_Event _event = TR_eventSubjectEmit( (TR_EventSubject)endpoint, TR_CEP_EVENT_WRITE_READY, NULL); TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); - nevents--; - - // deactivate write poll mimic edge level behaviour - this->fds[endpoint->transport->handle].events &= ~POLLOUT; } if (nevents <= 0) break; @@ -196,8 +192,8 @@ TR_INIT_IFACE( TR_commManagerPollDisableWrite, TR_commManagerPollEnableRead, TR_commManagerPollClose, - TR_commManagerPollDisableRead, - TR_commManagerPollDisableWrite); + TR_commManagerPollDisableWrite, + TR_commManagerPollDisableRead); TR_CREATE_CLASS( TR_CommManagerPoll, TR_CommManager, diff --git a/src/comm_manager_shutdown.c b/src/comm_manager_shutdown.c index 3c84b2e..7bd7b25 100644 --- a/src/comm_manager_shutdown.c +++ b/src/comm_manager_shutdown.c @@ -34,7 +34,7 @@ TR_commManagerShutdown(void * _this, TR_Event event) TR_CommManager this = _this; nfds_t i; - for (i=0; in_endpoints; i++) { + for (i=0; i<=this->max_handle; i++) { if (this->endpoints[i]) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, diff --git a/src/conn_entry_point.c b/src/conn_entry_point.c index 1da4e7f..11b6f19 100644 --- a/src/conn_entry_point.c +++ b/src/conn_entry_point.c @@ -60,6 +60,10 @@ connEntryPointCvInit(TR_class_ptr cls) TR_EVENT_CREATE(cls, TR_CET_EVENT_ACC_READY); } +const char * TR_cetEventStrings[] = { + "TR_CET_EVENT_ACC_READY" +}; + intptr_t connEntryPoint_events[TR_CET_EVENT_MAX + 1]; TR_INIT_IFACE(TR_Class, connEntryPointCtor, connEntryPointDtor, NULL); TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL); @@ -70,8 +74,9 @@ TR_CREATE_CLASS( TR_IF(TR_Class), TR_IF(TR_CommEndPoint)) = { {{ - TR_CET_EVENT_MAX + 1, - connEntryPoint_events + TR_cetEventStrings, + TR_CET_EVENT_MAX + 1, + connEntryPoint_events }} }; diff --git a/src/connection.c b/src/connection.c index ac09d0c..cb8a92a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -123,6 +123,11 @@ connectionCompose(void * _this, TR_ProtoMessage message) } TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); +// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n", +// __func__, (TR_CommEndPoint)_this, data, +// ((TR_CommEndPoint)_this)->write_buffer->nmsg); +// fflush(stderr); + return TRUE; } @@ -133,6 +138,10 @@ connectionCvInit(TR_class_ptr cls) TR_EVENT_CREATE(cls, TR_CON_EVENT_NEW_CON); } +const char * TR_connectionEventStrings[] = { + "TR_CON_EVENT_NEW_CON" +}; + intptr_t connection_events[TR_CON_EVENT_MAX + 1]; TR_INIT_IFACE(TR_Class, connectionCtor, connectionDtor, NULL); TR_INIT_IFACE( @@ -146,8 +155,9 @@ TR_CREATE_CLASS( TR_IF(TR_Class), TR_IF(TR_CommEndPoint)) = { {{ - TR_CON_EVENT_MAX + 1, - connection_events + TR_connectionEventStrings, + TR_CON_EVENT_MAX + 1, + connection_events }} }; diff --git a/src/datagram_entry_point.c b/src/datagram_entry_point.c index 4eda275..d78a1ca 100644 --- a/src/datagram_entry_point.c +++ b/src/datagram_entry_point.c @@ -56,8 +56,9 @@ TR_CREATE_CLASS( NULL, TR_IF(TR_Class)) = { {{ - TR_CEP_EVENT_MAX + 1, - datagramEntryPoint_events + NULL, + TR_CEP_EVENT_MAX + 1, + datagramEntryPoint_events }} }; diff --git a/src/datagram_service.c b/src/datagram_service.c index 069e920..05af42f 100644 --- a/src/datagram_service.c +++ b/src/datagram_service.c @@ -110,6 +110,10 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message) } TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); +// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n", +// __func__, (TR_CommEndPoint)_this, data, +// ((TR_CommEndPoint)_this)->write_buffer->nmsg); +// fflush(stderr); return TRUE; } @@ -126,8 +130,9 @@ TR_CREATE_CLASS( TR_IF(TR_Class), TR_IF(TR_CommEndPoint)) = { {{ - TR_CEP_EVENT_MAX + 1, - datagramService_events + NULL, + TR_CEP_EVENT_MAX + 1, + datagramService_events }} }; diff --git a/src/io_handler.c b/src/io_handler.c index d649d60..0c0277c 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -44,8 +44,7 @@ static TR_EventDone ioHandlerRead(void * _this, TR_Event event) { - TR_Event revent; - TR_EventDone done = TR_EVENT_DONE; + TR_Event revent; switch (TR_cepBufferRead((TR_CommEndPoint)event->subject)) { case FALSE: // EAGAIN @@ -75,26 +74,26 @@ ioHandlerRead(void * _this, TR_Event event) event->subject, TR_CEP_EVENT_NEW_DATA, NULL); - - done = TR_EVENT_PENDING; break; } TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); - return done; + + return TR_EVENT_DONE; } static TR_EventDone ioHandlerWrite(void * _this, TR_Event event) { - TR_Event revent, close_event = NULL; + TR_Event revent = NULL, + close_event = NULL; switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) { case FALSE: // EAGAIN revent = TR_eventSubjectEmit( event->subject, - TR_CEP_EVENT_PENDING_DATA, + TR_CEP_EVENT_PENDING_DATA, // is WRITE_BLOCK NULL); break; @@ -129,9 +128,7 @@ ioHandlerWrite(void * _this, TR_Event event) } TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); - if (close_event) { - TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event); - } + TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event); return TR_EVENT_DONE; } diff --git a/src/protocol_handler.c b/src/protocol_handler.c index 95dfd3f..e621dc8 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -91,8 +91,6 @@ protocolHandlerCompose(void * _this, TR_Event event) NULL); TR_eventHandlerIssueEvent((TR_EventHandler)_this, _event); - } else { - //printf("%s: compose failed\n", __func__); } TR_delete(message); diff --git a/testers/test_handler.c b/testers/test_handler.c index 85fe103..21d8a73 100644 --- a/testers/test_handler.c +++ b/testers/test_handler.c @@ -25,6 +25,7 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event) // TR_SizedData data = (TR_SizedData)msg->data; // char buf[data->size + 1]; // int i; + TR_Event _event; ((TestHandler)this)->handled++; @@ -36,12 +37,12 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event) // } // printf("echo message: %s(%zd)\n", buf, data->size); - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_SEND_MSG, - event->data)); + _event = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SEND_MSG, + event->data); + + TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); return TR_EVENT_DONE; } diff --git a/testers/testclient.sh b/testers/testclient.sh index 4429a54..be3e8d9 100755 --- a/testers/testclient.sh +++ b/testers/testclient.sh @@ -2,9 +2,9 @@ pids="" i=0 -while [ $i -lt 100 ] +while [ $i -lt 120 ] do - dd if=/dev/zero bs=8192 count=25000 | nc 192.168.2.13 5678 & + dd if=/dev/zero bs=8192 count=2500 | nc 192.168.2.13 5678 & pids="${pids} $!" i=$((i + 1)) done diff --git a/testers/testserver2.c b/testers/testserver2.c index 3ef132a..584be80 100644 --- a/testers/testserver2.c +++ b/testers/testserver2.c @@ -9,7 +9,7 @@ #include "test_handler.h" -TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_DEBUG}); +TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_INFO}); int main (int argc, char * argv[])