From 17cf44b9b92c27190cfba98b5a10a6a798940984 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Sat, 4 Oct 2014 10:44:24 +0100 Subject: [PATCH] Revert "fixes and additions for threaded code" This reverts commit f71cac22a3dc7ce41c180776124ec52bac46caa4. --- include/tr/comm_end_point.h | 3 +- include/tr/comm_manager.h | 3 -- include/tr/interface/comm_manager.h | 14 +++--- src/cep_write_buffered.c | 5 +- src/comm_end_point.c | 4 +- src/comm_end_point_read.c | 3 -- src/comm_manager.c | 34 ++++---------- src/comm_manager_epoll.c | 43 +++++++---------- src/comm_manager_poll.c | 54 ++++++++-------------- src/connection.c | 4 +- src/connector.c | 12 ++--- src/i_comm_manager.c | 72 +++++++++-------------------- src/io_handler.c | 28 ++--------- src/protocol_handler.c | 6 ++- src/server.c | 2 +- src/simple_client.c | 6 ++- src/threaded_server.c | 5 +- testers/build.sh | 4 +- testers/test_handler.c | 3 +- testers/testclient.c | 2 +- testers/testserver2.c | 2 +- 21 files changed, 102 insertions(+), 207 deletions(-) diff --git a/include/tr/comm_end_point.h b/include/tr/comm_end_point.h index ca16f84..2c57499 100644 --- a/include/tr/comm_end_point.h +++ b/include/tr/comm_end_point.h @@ -64,8 +64,7 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) { #define TR_CEP_EVENT_SHUT_READ 10 // CommManager #define TR_CEP_EVENT_SHUT_WRITE 11 // CommManager #define TR_CEP_EVENT_CLOSE 12 // CommManager -#define TR_CEP_EVENT_IO_DONE 13 // CommManager -#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_IO_DONE) +#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE) #define TR_cepSetClose(ep) ((ep)->do_close = 1) #define TR_cepHasProto(ep, proto) (TR_INSTANCE_OF(proto, TR_cepGetProto(ep))) diff --git a/include/tr/comm_manager.h b/include/tr/comm_manager.h index a384ca6..9acf049 100644 --- a/include/tr/comm_manager.h +++ b/include/tr/comm_manager.h @@ -24,7 +24,6 @@ #define __TR_COMM_MANAGER_H__ #include -#include #include "trbase.h" #include "trdata.h" @@ -41,8 +40,6 @@ TR_CLASS(TR_CommManager) { TR_Hash read; size_t n_endpoints; size_t max_handle; - unsigned long io_triggered; - pthread_mutex_t io_triggered_lock; }; TR_INSTANCE_INIT(TR_CommManager); TR_CLASSVARS_DECL(TR_CommManager) { diff --git a/include/tr/interface/comm_manager.h b/include/tr/interface/comm_manager.h index 700caaa..d4c073e 100644 --- a/include/tr/interface/comm_manager.h +++ b/include/tr/interface/comm_manager.h @@ -30,13 +30,13 @@ #include "tr/comm_end_point.h" -typedef void (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint); -typedef size_t (* fptr_TR_commManagerSelect)(void *, TR_Event, unsigned long); -typedef void (* fptr_TR_commManagerPollWrite)(void *, TR_Event); -typedef void (* fptr_TR_commManagerPollRead)(void *, TR_Event); -typedef void (* fptr_TR_commManagerDisableWrite)(void *, TR_Event); -typedef void (* fptr_TR_commManagerDisableRead)(void *, TR_Event); -typedef void (* fptr_TR_commManagerClose)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint); +typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, unsigned long); +typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerDisableRead)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event); TR_INTERFACE(TR_CommManager) { TR_IFID; diff --git a/src/cep_write_buffered.c b/src/cep_write_buffered.c index a1652e5..f35bb93 100644 --- a/src/cep_write_buffered.c +++ b/src/cep_write_buffered.c @@ -29,7 +29,7 @@ int TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size) { TR_RemoteData data; - size_t send; + int send; *size = 0; @@ -55,9 +55,6 @@ TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size) { TR_RemoteData new_data = NULL; - printf("[~DEBUG~] wrote %zd bytes\n", send); - fflush(stdout); - if (send != ((TR_SizedData)data)->size) { new_data = TR_new( TR_RemoteData, diff --git a/src/comm_end_point.c b/src/comm_end_point.c index 6bcd4f5..d0fc31c 100644 --- a/src/comm_end_point.c +++ b/src/comm_end_point.c @@ -41,7 +41,7 @@ commEndPointCtor(void * _this, va_list * params) this->transport = va_arg(*params, TR_Socket); this->protocol = va_arg(*params, TR_Protocol); this->read_chunk_size = va_arg(*params, int); - this->do_close = FALSE; + this->do_close = 0; this->write_buffer = TR_new(TR_Queue); return 0; @@ -101,7 +101,6 @@ commEndPointCvInit(TR_class_ptr cls) TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_READ); TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_WRITE); TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE); - TR_EVENT_CREATE(cls, TR_CEP_EVENT_IO_DONE); } const char * TR_cepEventStrings[] = { @@ -118,7 +117,6 @@ const char * TR_cepEventStrings[] = { "TR_CEP_EVENT_SHUT_READ", "TR_CEP_EVENT_SHUT_WRITE", "TR_CEP_EVENT_CLOSE", - "TR_CEP_EVENT_IO_DONE", }; intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1]; diff --git a/src/comm_end_point_read.c b/src/comm_end_point_read.c index b40417a..a9815e3 100644 --- a/src/comm_end_point_read.c +++ b/src/comm_end_point_read.c @@ -30,9 +30,6 @@ TR_commEndPointRead(TR_CommEndPoint this, TR_RemoteData * data_ptr) { *data_ptr = TR_socketRecv(this->transport, this->read_chunk_size); - printf("[~DEBUG~] read %zd bytes\n", ((TR_SizedData)*data_ptr)->size); - fflush(stdout); - if (! *data_ptr) return -1; // ment to trigger a close if (*data_ptr == (void*)-1) return -2; // remote close... shutdown if (*data_ptr == TR_emptyRemoteData) return FALSE; // read blocked diff --git a/src/comm_manager.c b/src/comm_manager.c index ab3e344..7f2545c 100644 --- a/src/comm_manager.c +++ b/src/comm_manager.c @@ -22,7 +22,6 @@ #include #include -#include #include "trbase.h" #include "trdata.h" @@ -52,8 +51,6 @@ commManagerCtor(void * _this, va_list * params) this->n_endpoints = sysconf(_SC_OPEN_MAX); this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints); - pthread_mutex_init(&this->io_triggered_lock, NULL); - return 0; } @@ -64,8 +61,6 @@ commManagerDtor(void * _this) TR_CommManager this = _this; nfds_t i; - pthread_mutex_destroy(&this->io_triggered_lock); - for (i=0; in_endpoints; i++) { TR_delete(this->endpoints[i]); } @@ -77,22 +72,11 @@ commManagerDtor(void * _this) static TR_EventDone -TR_commManagerWriteIsBlocked(void * _this, TR_Event event) +TR_commManagerEnableWrite(void * _this, TR_Event event) { TR_CommManager this = _this; - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); - - return TR_EVENT_DONE; -} - -static -TR_EventDone -TR_commManagerDecrementIoTriggerd(TR_CommManager this, TR_Event event) -{ - pthread_mutex_lock(&this->io_triggered_lock); - this->io_triggered--; - pthread_mutex_unlock(&this->io_triggered_lock); + TR_hashAdd(this->write, event->subject); return TR_EVENT_DONE; } @@ -119,6 +103,8 @@ static void commManagerCvInit(TR_class_ptr cls) { + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree); + TR_EVENT_HANDLER_SET_METHOD( cls, TR_EventDispatcher, TR_DISPATCHER_EVENT_DATA_WAIT, @@ -134,7 +120,7 @@ commManagerCvInit(TR_class_ptr cls) TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, TR_CEP_EVENT_WRITE_BLOCK, - TR_commManagerWriteIsBlocked); + TR_commManagerPollWrite); TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, TR_CEP_EVENT_READ_BLOCK, @@ -158,18 +144,14 @@ commManagerCvInit(TR_class_ptr cls) TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, TR_CEP_EVENT_DATA_READY, - TR_commManagerPollWrite); + TR_commManagerEnableWrite); TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, TR_CEP_EVENT_DATA_END, TR_commManagerDisableWrite); - TR_EVENT_HANDLER_SET_METHOD( - cls, TR_CommEndPoint, - TR_CEP_EVENT_IO_DONE, - TR_commManagerDecrementIoTriggerd); } -TR_INIT_HANDLER(TR_CommManager); +TR_INSTANCE(TR_Hash, commManagerEventMethods); TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL); TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL); TR_CREATE_CLASS( @@ -178,7 +160,7 @@ TR_CREATE_CLASS( commManagerCvInit, TR_IF(TR_Class), TR_IF(TR_CommManager)) = { - { TR_HANDLER_CVARS(TR_CommManager) } + { &(_commManagerEventMethods.data) } }; // vim: set ts=4 sw=4: diff --git a/src/comm_manager_epoll.c b/src/comm_manager_epoll.c index 587643c..72cdfe8 100644 --- a/src/comm_manager_epoll.c +++ b/src/comm_manager_epoll.c @@ -73,8 +73,8 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) int handle = endpoint->transport->handle; struct epoll_event event; - //this->events[handle] = EPOLLIN | EPOLLET; - this->events[handle] = EPOLLIN; + //this->events[handle] = EPOLLET; + this->events[handle] = 0; event.data.ptr = endpoint; event.events = this->events[handle]; @@ -82,19 +82,19 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) } static -size_t +void TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout) { TR_CommManagerEpoll this = _this; TR_CommManager cmgr = _this; int i, nevents; - //struct epoll_event _event; + struct epoll_event _event; nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout); for (i=0; itransport->handle; + int handle = endpoint->transport->handle; if ((events[i].events & EPOLLIN) == EPOLLIN) { if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) @@ -106,20 +106,20 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout) } } - //this->events[handle] &= ~EPOLLIN; - //_event.data.ptr = endpoint; - //_event.events = this->events[handle]; - //epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + this->events[handle] &= ~EPOLLIN; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); } if ((events[i].events & EPOLLOUT) == EPOLLOUT) { if (! event->subject->fin) { TR_hashAdd(cmgr->write, endpoint); } - //this->events[handle] &= ~EPOLLOUT; - //_event.data.ptr = endpoint; - //_event.events = this->events[handle]; - //epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + this->events[handle] &= ~EPOLLOUT; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); } if ((events[i].events & EPOLLHUP) == EPOLLHUP) { @@ -131,15 +131,6 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, unsigned long timeout) NULL)); } } - - if (nevents >= 0) { - return nevents; - } else { - perror("epoll"); - fflush(stderr); - fflush(stdout); - return 0; - } } static @@ -189,9 +180,9 @@ static void TR_commManagerEpollEnableRead(void * _this, TR_Event event) { -// if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { -// TR_commManagerEpollEnable(_this, EPOLLIN, event); -// } + if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { + TR_commManagerEpollEnable(_this, EPOLLIN, event); + } } static @@ -205,7 +196,7 @@ static void TR_commManagerEpollDisableRead(void * _this, TR_Event event) { -// TR_commManagerEpollDisable(_this, EPOLLIN, event); + TR_commManagerEpollDisable(_this, EPOLLIN, event); } static diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index 977e987..e5bcb16 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -71,57 +71,45 @@ TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint) TR_CommManagerPoll this = _this; this->fds[endpoint->transport->handle].fd = endpoint->transport->handle; - this->fds[endpoint->transport->handle].events = POLLIN; + this->fds[endpoint->transport->handle].events = 0; } static -size_t +void TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout) { TR_CommManagerPoll this = _this; TR_CommManager cmgr = _this; nfds_t i; - int nevents, doevents; - - for (i = 0; i < cmgr->max_handle+1; i++) { - printf("[=DEBUG=] handle %ld POLLIN? %s\n", i, - (this->fds[i].events & POLLIN) == POLLIN ? "YES" : "NO"); - fflush(stdout); - } + int nevents; - nevents = doevents = poll(this->fds, cmgr->max_handle+1, timeout); + nevents = poll(this->fds, cmgr->max_handle+1, timeout); - if (doevents) { + if (nevents) { for (i = 0; i < cmgr->max_handle+1; i++) { if (this->fds[i].revents != 0) { TR_CommEndPoint endpoint = cmgr->endpoints[i]; - doevents--; + nevents--; if ((this->fds[i].revents & POLLIN) == POLLIN) { if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) && ((TR_TcpSocket)endpoint->transport)->listen) { - pthread_mutex_lock(&cmgr->io_triggered_lock); TR_hashAdd(cmgr->accept, endpoint); - pthread_mutex_unlock(&cmgr->io_triggered_lock); } else { if (! event->subject->fin) { - pthread_mutex_lock(&cmgr->io_triggered_lock); TR_hashAdd(cmgr->read, endpoint); - pthread_mutex_unlock(&cmgr->io_triggered_lock); } } - //this->fds[endpoint->transport->handle].events &= ~POLLIN; + this->fds[endpoint->transport->handle].events &= ~POLLIN; } if ((this->fds[i].revents & POLLOUT) == POLLOUT) { if (! event->subject->fin) { - pthread_mutex_lock(&cmgr->io_triggered_lock); TR_hashAdd(cmgr->write, endpoint); - pthread_mutex_unlock(&cmgr->io_triggered_lock); } - //this->fds[endpoint->transport->handle].events &= - // ~(POLLOUT|POLLHUP); + this->fds[endpoint->transport->handle].events &= + ~(POLLOUT|POLLHUP); } if ((this->fds[i].revents & POLLHUP) == POLLHUP) { @@ -134,12 +122,10 @@ TR_commManagerPollSelect(void * _this, TR_Event event, unsigned long timeout) } this->fds[i].revents = 0; - if (doevents <= 0) break; + if (nevents <= 0) break; } } } - - return nevents; } static @@ -158,12 +144,12 @@ static void TR_commManagerPollEnableRead(void * _this, TR_Event event) { -// TR_CommManagerPoll this = _this; -// TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; -// -// if (! TR_socketFinRd(endpoint->transport)) { -// this->fds[endpoint->transport->handle].events |= POLLIN; -// } + TR_CommManagerPoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + if (! TR_socketFinRd(endpoint->transport)) { + this->fds[endpoint->transport->handle].events |= POLLIN; + } } static @@ -180,10 +166,10 @@ static void TR_commManagerPollDisableRead(void * _this, TR_Event event) { -// TR_CommManagerPoll this = _this; -// TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; -// -// this->fds[endpoint->transport->handle].events &= ~POLLIN; + TR_CommManagerPoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + this->fds[endpoint->transport->handle].events &= ~POLLIN; } static diff --git a/src/connection.c b/src/connection.c index a1038e2..e970b19 100644 --- a/src/connection.c +++ b/src/connection.c @@ -37,10 +37,10 @@ static int connectionCtor(void * _this, va_list * params) { - //TR_Connection this = _this; + TR_Connection this = _this; TR_PARENTCALL(TR_Connection, _this, TR_Class, ctor, params); - //this->current_message = NULL; + this->current_message = NULL; return 0; } diff --git a/src/connector.c b/src/connector.c index 33dcf1c..2875d2b 100644 --- a/src/connector.c +++ b/src/connector.c @@ -76,13 +76,6 @@ connectorAccept(void * _this, TR_Event event) socket = TR_socketAccept((TR_TcpSocket)connection->transport); } - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)connection, - TR_CEP_EVENT_IO_DONE, - NULL)); - if (! socket) { TR_eventHandlerIssueEvent( (TR_EventHandler)this, @@ -99,6 +92,8 @@ static void connectorCvInit(TR_class_ptr cls) { + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree); + TR_EVENT_HANDLER_SET_METHOD( cls, TR_ConnEntryPoint, @@ -106,7 +101,6 @@ connectorCvInit(TR_class_ptr cls) connectorAccept); } -TR_INIT_HANDLER(TR_Connector); TR_INSTANCE(TR_Hash, connectorEventMethods); TR_INIT_IFACE(TR_Class, connectorCtor, connectorDtor, NULL); TR_CREATE_CLASS( @@ -114,7 +108,7 @@ TR_CREATE_CLASS( TR_EventHandler, connectorCvInit, TR_IF(TR_Class)) = { - { TR_HANDLER_CVARS(TR_Connector) } + { &(_connectorEventMethods.data) } }; // vim: set ts=4 sw=4: diff --git a/src/i_comm_manager.c b/src/i_comm_manager.c index 4c120d5..d6f5344 100644 --- a/src/i_comm_manager.c +++ b/src/i_comm_manager.c @@ -20,11 +20,8 @@ * along with this program. If not, see . */ -#define _GNU_SOURCE - #include #include -#include #include "trbase.h" #include "trevent.h" @@ -62,8 +59,6 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint) TR_ISSUE_IO_READ_EVENT(this, endpoint); } - this->io_triggered++; - TR_CALL(_this, TR_CommManager, addEndpoint, endpoint); } @@ -95,35 +90,22 @@ TR_commManagerSelect(void * _this, TR_Event event) TR_Timer timer = (TR_Timer)event->data; TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject; unsigned long timeout; // milliseconds - char buffer[17]; - - pthread_getname_np(pthread_self(), buffer, 17); - - if (! this->io_triggered) { - printf("[DEBUG] [%s] io triggerd was empty\n", buffer); - fflush(stdout); - pthread_mutex_lock(&this->io_triggered_lock); - this->io_triggered = TR_hashEach(this->write, this, commManagerIssueWriteEvents); - this->io_triggered += TR_hashEach(this->accept, this, commManagerIssueAcceptEvents); - this->io_triggered += TR_hashEach(this->read, this, commManagerIssueReadEvents); - pthread_mutex_unlock(&this->io_triggered_lock); - } - printf("[DEBUG] [%s] io triggerd: %lu\n", buffer, this->io_triggered); - fflush(stdout); - - if (! this->io_triggered) { - if (NULL == timer) { - timeout = TR_eventDispatcherGetDataWaitTime(dispatcher); - } else { - timeout = TR_timerGet(timer, NULL); - } + if (! (TR_hashEmpty(this->read) + && TR_hashEmpty(this->write) + && TR_hashEmpty(this->accept))) { + timeout = 0; + } else if (NULL == timer) { + timeout = TR_eventDispatcherGetDataWaitTime(dispatcher); + } else { + timeout = TR_timerGet(timer, NULL); + } - printf("[DEBUG] [%s] select timeout: %lu\n", buffer, timeout); - fflush(stdout); + TR_CALL(_this, TR_CommManager, select, event, timeout); - TR_CALL(_this, TR_CommManager, select, event, timeout); - } + TR_hashEach(this->write, this, commManagerIssueWriteEvents); + TR_hashEach(this->accept, this, commManagerIssueAcceptEvents); + TR_hashEach(this->read, this, commManagerIssueReadEvents); return TR_EVENT_DONE; } @@ -133,13 +115,9 @@ TR_commManagerPollWrite(void * _this, TR_Event event) { TR_CommManager this = _this; + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) { - pthread_mutex_lock(&this->io_triggered_lock); - TR_hashAdd(this->write, event->subject); TR_CALL(_this, TR_CommManager, pollWrite, event); - pthread_mutex_unlock(&this->io_triggered_lock); - printf("[!DEBUG!] socket added to write hash\n"); - fflush(stdout); } return TR_EVENT_DONE; @@ -151,10 +129,6 @@ TR_commManagerPollRead(void * _this, TR_Event event) TR_CommManager this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - if (! TR_socketFinRd(endpoint->transport)) { - TR_CALL(_this, TR_CommManager, pollRead, event); - } - if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) && ((TR_TcpSocket)endpoint->transport)->listen) { TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject)); @@ -162,6 +136,10 @@ TR_commManagerPollRead(void * _this, TR_Event event) TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject)); } + if (! TR_socketFinRd(endpoint->transport)) { + TR_CALL(_this, TR_CommManager, pollRead, event); + } + return TR_EVENT_DONE; } @@ -178,17 +156,13 @@ TR_commManagerDisableRead(void * _this, TR_Event event) TR_EventDone TR_commManagerDisableWrite(void * _this, TR_Event event) { - TR_CommManager this = _this; - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + TR_CommManager this = _this; - if (! endpoint->write_buffer->nmsg) { - // TODO think about a better way... - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); - //if (! event->subject->fin) { - // TR_hashAdd(this->read, event->subject); - //} - TR_CALL(_this, TR_CommManager, disableWrite, event); + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); + if (! event->subject->fin) { + TR_hashAdd(this->read, event->subject); } + TR_CALL(_this, TR_CommManager, disableWrite, event); return TR_EVENT_DONE; } diff --git a/src/io_handler.c b/src/io_handler.c index ed060f0..b7b46d9 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -50,7 +50,6 @@ ioHandlerRead(void * _this, TR_Event event) TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; TR_Event revent; TR_RemoteData data; - char ip[16]; switch (TR_commEndPointRead(endpoint, &data)) { case FALSE: // EAGAIN @@ -84,13 +83,6 @@ ioHandlerRead(void * _this, TR_Event event) return TR_EVENT_DONE; } - TR_socketAddrIpStr(data->remote, ip, 16); - printf( - "DEBUG: remote ip: %s / port: %d\n", - ip, - TR_socketAddrPort(data->remote)); - fflush(stdout); - revent = TR_eventSubjectEmit( event->subject, TR_CEP_EVENT_NEW_DATA, @@ -98,13 +90,6 @@ ioHandlerRead(void * _this, TR_Event event) break; } - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_IO_DONE, - NULL)); - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); return TR_EVENT_DONE; @@ -162,13 +147,6 @@ ioHandlerWrite(void * _this, TR_Event event) endpoint->write_buffer_size -= written; - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_IO_DONE, - NULL)); - if (revent) { TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); } @@ -180,6 +158,8 @@ static void ioHandlerCvInit(TR_class_ptr cls) { + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree); + TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, @@ -192,14 +172,14 @@ ioHandlerCvInit(TR_class_ptr cls) ioHandlerWrite); } -TR_INIT_HANDLER(TR_IoHandler); +TR_INSTANCE(TR_Hash, ioHandlerEventMethods); TR_INIT_IFACE(TR_Class, ioHandlerCtor, ioHandlerDtor, NULL); TR_CREATE_CLASS( TR_IoHandler, TR_EventHandler, ioHandlerCvInit, TR_IF(TR_Class)) = { - { TR_HANDLER_CVARS(TR_IoHandler) } + { &(_ioHandlerEventMethods.data) } }; // vim: set ts=4 sw=4: diff --git a/src/protocol_handler.c b/src/protocol_handler.c index ad4fff2..e4c841c 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -125,6 +125,8 @@ static void protocolHandlerCvInit(TR_class_ptr cls) { + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree); + TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, @@ -142,14 +144,14 @@ protocolHandlerCvInit(TR_class_ptr cls) // protocolHandlerUpgrade); } -TR_INIT_HANDLER(TR_ProtocolHandler); +TR_INSTANCE(TR_Hash, protocolHandlerEventMethods); TR_INIT_IFACE(TR_Class, protocolHandlerCtor, protocolHandlerDtor, NULL); TR_CREATE_CLASS( TR_ProtocolHandler, TR_EventHandler, protocolHandlerCvInit, TR_IF(TR_Class)) = { - { TR_HANDLER_CVARS(TR_ProtocolHandler) } + { &(_protocolHandlerEventMethods.data) } }; // vim: set ts=4 sw=4: diff --git a/src/server.c b/src/server.c index 2b67715..7fda748 100644 --- a/src/server.c +++ b/src/server.c @@ -43,7 +43,7 @@ serverCtor(void * _this, va_list * params) { TR_Server this = _this; -#if 0 +#if 1 this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll); #else this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerPoll); diff --git a/src/simple_client.c b/src/simple_client.c index b0c9105..d11693b 100644 --- a/src/simple_client.c +++ b/src/simple_client.c @@ -135,6 +135,8 @@ static void simpleClientCvInit(TR_class_ptr cls) { + TR_CLASSVARS(TR_EventHandler, cls)->event_methods->tree = TR_new(TR_Tree); + TR_EVENT_HANDLER_SET_METHOD( cls, TR_EventDispatcher, @@ -147,14 +149,14 @@ simpleClientCvInit(TR_class_ptr cls) simpleClientHandleData); } -TR_INIT_HANDLER(TR_SimpleClient); +TR_INSTANCE(TR_Hash, simpleClientEventMethods); TR_INIT_IFACE(TR_Class, simpleClientCtor, simpleClientDtor, NULL); TR_CREATE_CLASS( TR_SimpleClient, TR_EventHandler, simpleClientCvInit, TR_IF(TR_Class)) = { - { TR_HANDLER_CVARS(TR_SimpleClient) } + { &(_simpleClientEventMethods.data) } }; // vim: set ts=4 sw=4: diff --git a/src/threaded_server.c b/src/threaded_server.c index 03a57f1..d652238 100644 --- a/src/threaded_server.c +++ b/src/threaded_server.c @@ -36,7 +36,6 @@ threadedServerCtor(void * _this, va_list * params) { TR_ThreadedServer this = _this; int i; - char buffer[16]; TR_PARENTCALL(TR_ThreadedServer, _this, TR_Class, ctor, params); @@ -44,11 +43,9 @@ threadedServerCtor(void * _this, va_list * params) this->threads = TR_malloc(sizeof(TR_EventThread) * this->n_threads); for (i=0; in_threads; i++) { - sprintf(buffer, "test%03d", i); this->threads[i] = TR_new( TR_EventThread, - ((TR_Server)this)->dispatcher, - buffer); + ((TR_Server)this)->dispatcher); } return 0; diff --git a/testers/build.sh b/testers/build.sh index a9ed44e..0b33782 100755 --- a/testers/build.sh +++ b/testers/build.sh @@ -1,10 +1,10 @@ #!/bin/bash #TRLIBS="-ltrbase -ltrhashing -ltrio -ltrdata -ltrevent -ltrcomm" TRLIBS="/usr/local/lib/libtrcomm.a /usr/local/lib/libtrevent.a /usr/local/lib/libtrdata.a /usr/local/lib/libtrio.a /usr/local/lib/libtrhashing.a /usr/local/lib/libtrbase.a" -LIBS="-lcrypto -lssl -lrt -luuid -lpthread" +LIBS="-lcrypto -lssl -lrt -luuid" gcc ${CFLAGS} -c -o test_handler.o test_handler.c gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver testserver.c test_handler.o ${TRLIBS} gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver2 testserver2.c test_handler.o ${TRLIBS} gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testtcp testclient.c ${TRLIBS} gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -DUDP=1 -o testudp testclient.c ${TRLIBS} -gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -o testserver_thread testserver_thread.c test_handler.o ${TRLIBS} +gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${LIBS} -lpthread -o testserver_thread testserver_thread.c test_handler.o ${TRLIBS} diff --git a/testers/test_handler.c b/testers/test_handler.c index 517ed1d..47524cc 100644 --- a/testers/test_handler.c +++ b/testers/test_handler.c @@ -15,7 +15,7 @@ testHandlerHeartbeat(TR_EventHandler this, TR_Event event) double size_msg = ((TestHandler)this)->size ? size / ((TestHandler)this)->handled : 0.0; - int div_count = ' '; + int div_count = 0; while (size > 1024. && div_count != 'G') { size /= 1024.; @@ -31,7 +31,6 @@ testHandlerHeartbeat(TR_EventHandler this, TR_Event event) ((TR_EventDispatcher)event->subject)->n_beats, ((TestHandler)this)->handled, size, div_count, size_msg); - fflush(stdout); ((TestHandler)this)->handled = 0; ((TestHandler)this)->size = 0; diff --git a/testers/testclient.c b/testers/testclient.c index bbf0219..91fb0d6 100644 --- a/testers/testclient.c +++ b/testers/testclient.c @@ -30,7 +30,7 @@ main (int argc, char * argv[]) TR_ProtoMessageRaw message; int i, j=0; - //TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2); + TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2); protocol = TR_new(TR_ProtocolRaw); #if UDP diff --git a/testers/testserver2.c b/testers/testserver2.c index dcf8ca2..ded7fc1 100644 --- a/testers/testserver2.c +++ b/testers/testserver2.c @@ -19,7 +19,7 @@ main (int argc, char * argv[]) TR_Protocol protocol = TR_new(TR_ProtocolRaw); TestHandler test_handler = TR_new(TestHandler); - //TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2); + TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger2); TR_serverAddHandler(server, (TR_EventHandler)test_handler); TR_serverBindTcp(server, "0.0.0.0", 5678, protocol);