From 473ed763c4b84d59cda1ae91271f0262c9029000 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Wed, 27 Aug 2014 18:37:35 +0100 Subject: [PATCH] general as well as edge level handling fixes. --- include/tr/comm_manager_epoll.h | 4 +- src/comm_manager_epoll.c | 99 +++++++++++++++++++++------------ src/comm_manager_poll.c | 32 ++++++----- src/connection.c | 14 +++-- src/datagram_service.c | 14 +++-- src/i_comm_manager.c | 29 +++++++--- src/io_handler.c | 28 +++++++--- src/protocol_handler.c | 20 ++++--- src/server.c | 5 +- testers/test_handler.c | 19 ++++--- 10 files changed, 172 insertions(+), 92 deletions(-) diff --git a/include/tr/comm_manager_epoll.h b/include/tr/comm_manager_epoll.h index d7f4051..acf100c 100644 --- a/include/tr/comm_manager_epoll.h +++ b/include/tr/comm_manager_epoll.h @@ -24,6 +24,7 @@ #define __TR_COMM_MANAGER_EPOLL_H__ #include +#include #include "trbase.h" #include "trevent.h" @@ -31,7 +32,8 @@ TR_CLASS(TR_CommManagerEpoll) { TR_EXTENDS(TR_CommManager); - int handle; + int handle; + struct epoll_event * events; }; TR_INSTANCE_INIT(TR_CommManagerEpoll); TR_CLASSVARS_DECL(TR_CommManagerEpoll) { diff --git a/src/comm_manager_epoll.c b/src/comm_manager_epoll.c index 019566f..ca79db3 100644 --- a/src/comm_manager_epoll.c +++ b/src/comm_manager_epoll.c @@ -34,9 +34,11 @@ #include "tr/connection.h" #include "tr/connect_entry_point.h" -#define MAXEVENTS 64 +#define MAXEVENTS 256 -struct epoll_event events[64]; +struct epoll_event events[MAXEVENTS]; + +extern int count_write_ready; static int @@ -44,9 +46,16 @@ commManagerEpollCtor(void * _this, va_list * params) { TR_CommManagerEpoll this = _this; TR_CommManager cmgr = _this; + nfds_t i; TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params); this->handle = epoll_create(cmgr->n_endpoints); + this->events = TR_malloc(sizeof(struct epoll_event) * cmgr->n_endpoints); + for (i = 0; i < cmgr->n_endpoints; i++) { + this->events[i].data.ptr = NULL; + this->events[i].events = EPOLLET | EPOLLONESHOT; + } + return 0; } @@ -58,19 +67,21 @@ commManagerEpollDtor(void * _this) TR_CommManagerEpoll this = _this; close(this->handle); + TR_MEM_FREE(this->events); + TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor); } static void TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) { - TR_CommManagerEpoll this = _this; - struct epoll_event event; + TR_CommManagerEpoll this = _this; + int handle = endpoint->transport->handle; - event.data.ptr = endpoint; - event.events = EPOLLIN | EPOLLET; + this->events[handle].data.ptr = endpoint; + this->events[handle].events |= EPOLLIN; - epoll_ctl(this->handle, EPOLL_CTL_ADD, endpoint->transport->handle, &event); + epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &(this->events[handle])); } static @@ -85,7 +96,7 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) for (i=0; itransport) @@ -102,15 +113,18 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) } TR_eventHandlerIssueEvent((TR_EventHandler)this, event); + this->events[i].events &= ~EPOLLIN; } - if ((events[i].events & POLLOUT) == POLLOUT) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_WRITE_READY, - NULL)); + if ((events[i].events & EPOLLOUT) == EPOLLOUT) { + TR_Event _event = TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_WRITE_READY, + NULL); + + TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); + this->events[i].events &= ~EPOLLOUT; + count_write_ready++; } } } @@ -123,16 +137,12 @@ TR_commManagerEpollEnableWrite(void * _this, TR_Event event) TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; if (! TR_socketFinWr(endpoint->transport)) { - struct epoll_event epevent; + int handle = endpoint->transport->handle; - epevent.data.ptr = endpoint; - epevent.events = EPOLLOUT | EPOLLIN | EPOLLET; + this->events[handle].data.ptr = endpoint; + this->events[handle].events |= EPOLLOUT; - epoll_ctl( - this->handle, - EPOLL_CTL_MOD, - endpoint->transport->handle, - &epevent); + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); } } @@ -142,38 +152,57 @@ TR_commManagerEpollDisableWrite(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - struct epoll_event epevent; + int handle = endpoint->transport->handle; - epevent.data.ptr = endpoint; - epevent.events = EPOLLIN | EPOLLET; + this->events[handle].data.ptr = endpoint; + this->events[handle].events &= ~EPOLLOUT; - epoll_ctl( - this->handle, - EPOLL_CTL_MOD, - endpoint->transport->handle, - &epevent); + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); } static void -TR_commManagerEpollClose(void * _this, TR_Event event) +TR_commManagerEpollEnableRead(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL); + if (! TR_socketFinRd(endpoint->transport)) { + int handle = endpoint->transport->handle; + + this->events[handle].data.ptr = endpoint; + this->events[handle].events |= EPOLLIN; + + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); + } } static void -TR_commManagerEpollEnableRead(void * _this, TR_Event event) +TR_commManagerEpollDisableRead(void * _this, TR_Event event) { + TR_CommManagerEpoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + int handle = endpoint->transport->handle; + + this->events[handle].data.ptr = endpoint; + this->events[handle].events &= ~EPOLLIN; + + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &(this->events[handle])); } static void -TR_commManagerEpollDisableRead(void * _this, TR_Event event) +TR_commManagerEpollClose(void * _this, TR_Event event) { + TR_CommManagerEpoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + int handle = endpoint->transport->handle; + + this->events[handle].data.ptr = NULL; + this->events[handle].events = EPOLLET | EPOLLONESHOT; + + epoll_ctl(this->handle, EPOLL_CTL_DEL, handle, NULL); } static diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index 95dcef7..bd21383 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -104,18 +104,20 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) } TR_eventHandlerIssueEvent((TR_EventHandler)this, event); - this->fds[i].fd = -1; // this deactivates poll... + // deactivate read poll mimic edge level behaviour + this->fds[endpoint->transport->handle].events &= ~POLLIN; } if ((this->fds[i].revents & POLLOUT) == POLLOUT) { + TR_Event _event = TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_WRITE_READY, + NULL); + + TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); nevents--; - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_WRITE_READY, - NULL)); - // deactivate write poll... + + // deactivate write poll mimic edge level behaviour this->fds[endpoint->transport->handle].events &= ~POLLOUT; } @@ -153,28 +155,30 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event) TR_CommManagerPoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - this->fds[endpoint->transport->handle].fd = endpoint->transport->handle; + if (! TR_socketFinRd(endpoint->transport)) { + this->fds[endpoint->transport->handle].events |= POLLIN; + } } static void -TR_commManagerPollClose(void * _this, TR_Event event) +TR_commManagerPollDisableRead(void * _this, TR_Event event) { TR_CommManagerPoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - this->fds[endpoint->transport->handle].events = 0; - this->fds[endpoint->transport->handle].fd = -1; + this->fds[endpoint->transport->handle].events &= ~POLLIN; } static void -TR_commManagerPollDisableRead(void * _this, TR_Event event) +TR_commManagerPollClose(void * _this, TR_Event event) { TR_CommManagerPoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - this->fds[endpoint->transport->handle].events &= ~POLLIN; + this->fds[endpoint->transport->handle].events = 0; + this->fds[endpoint->transport->handle].fd = -1; } static diff --git a/src/connection.c b/src/connection.c index 922d102..ac09d0c 100644 --- a/src/connection.c +++ b/src/connection.c @@ -112,12 +112,18 @@ connectionNextMessage(void * _this) } static -void +int connectionCompose(void * _this, TR_ProtoMessage message) { - TR_queuePut( - ((TR_CommEndPoint)_this)->write_buffer, - TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message)); + TR_RemoteData data = + TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message); + + if (! data) { + return FALSE; + } + + TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); + return TRUE; } static diff --git a/src/datagram_service.c b/src/datagram_service.c index c7379a8..069e920 100644 --- a/src/datagram_service.c +++ b/src/datagram_service.c @@ -99,12 +99,18 @@ datagramServiceNextMessage(void * _this) } static -void +int datagramServiceCompose(void * _this, TR_ProtoMessage message) { - TR_queuePut( - ((TR_CommEndPoint)_this)->write_buffer, - TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message)); + TR_RemoteData data = + TR_protoCompose(((TR_CommEndPoint)_this)->protocol, message); + + if (! data) { + return FALSE; + } + + TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); + return TRUE; } intptr_t datagramService_events[TR_CEP_EVENT_MAX + 1]; diff --git a/src/i_comm_manager.c b/src/i_comm_manager.c index 8081c07..8eb4efd 100644 --- a/src/i_comm_manager.c +++ b/src/i_comm_manager.c @@ -112,13 +112,19 @@ TR_commManagerClose(void * _this, TR_Event event) TR_CommManager this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - TR_socketShutdown(endpoint->transport); TR_CALL(_this, TR_CommManager, close, event); + if (! TR_socketFinRdWr(endpoint->transport)) { + TR_socketShutdown(endpoint->transport); + } + if (endpoint->transport->handle == this->max_handle) { while (! this->endpoints[--this->max_handle]); } - TR_delete(this->endpoints[endpoint->transport->handle]); + + TR_eventSubjectFinalize( + (TR_EventSubject)this->endpoints[endpoint->transport->handle]); + this->endpoints[endpoint->transport->handle] = NULL; return TR_EVENT_DONE; } @@ -128,6 +134,10 @@ TR_commManagerShutdownRead(void * _this, TR_Event event) { TR_CALL(_this, TR_CommManager, shutdownRead, event); + if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { + TR_socketShutdownRead(((TR_CommEndPoint)event->subject)->transport); + } + if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) { // close TR_eventHandlerIssueEvent( @@ -136,16 +146,15 @@ TR_commManagerShutdownRead(void * _this, TR_Event event) event->subject, TR_CEP_EVENT_CLOSE, NULL)); - } else if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { - // handle pending data... close is issued from disableWrite + } + + if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, TR_eventSubjectEmit( event->subject, - TR_CEP_EVENT_CLOSE, + TR_CEP_EVENT_SHUT_WRITE, NULL)); - } else { - TR_cepSetClose((TR_CommEndPoint)event->subject); } return TR_EVENT_DONE; @@ -156,7 +165,11 @@ TR_commManagerShutdownWrite(void * _this, TR_Event event) { TR_CALL(_this, TR_CommManager, shutdownWrite, event); - if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { + if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) { + TR_socketShutdownWrite(((TR_CommEndPoint)event->subject)->transport); + } + + if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, TR_eventSubjectEmit( diff --git a/src/io_handler.c b/src/io_handler.c index deb9cb5..867f892 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -29,6 +29,9 @@ #include "tr/comm_end_point.h" #include "tr/interface/comm_end_point.h" +extern int count_write_ready; +int count_write_ready_handle = 0; + static int ioHandlerCtor(void * _this, va_list * params) @@ -89,14 +92,22 @@ TR_EventDone ioHandlerWrite(void * _this, TR_Event event) { TR_Event revent, close_event = NULL; - TR_EventDone done = TR_EVENT_DONE; + + count_write_ready_handle++; switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) { case FALSE: // EAGAIN - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_PENDING_DATA, - NULL); + if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_PENDING_DATA, + NULL); + } else { + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_END_DATA, + NULL); + } break; case -1: // FAILURE @@ -114,9 +125,7 @@ ioHandlerWrite(void * _this, TR_Event event) break; default: - if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { - done = TR_EVENT_PENDING; - } else { + if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { revent = TR_eventSubjectEmit( event->subject, TR_CEP_EVENT_END_DATA, @@ -135,7 +144,8 @@ ioHandlerWrite(void * _this, TR_Event event) if (close_event) { TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event); } - return done; + + return TR_EVENT_DONE; } static diff --git a/src/protocol_handler.c b/src/protocol_handler.c index d39512b..d61155b 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -32,6 +32,8 @@ #include "tr/comm_end_point.h" #include "tr/interface/comm_end_point.h" +int count_write_ready = 0; + static int protocolHandlerCtor(void * _this, va_list * params) @@ -85,14 +87,18 @@ protocolHandlerCompose(void * _this, TR_Event event) } if (TR_cepCompose(endpoint, message)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_WRITE_READY, - NULL)); - TR_delete(message); + TR_Event _event = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_WRITE_READY, + NULL); + + TR_eventHandlerIssueEvent((TR_EventHandler)_this, _event); + + count_write_ready++; + } else { + //printf("%s: compose failed\n", __func__); } + TR_delete(message); return TR_EVENT_DONE; } diff --git a/src/server.c b/src/server.c index 89238a3..adb4ef8 100644 --- a/src/server.c +++ b/src/server.c @@ -43,8 +43,11 @@ serverCtor(void * _this, va_list * params) { TR_Server this = _this; - //this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll); +#if 1 + this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerEpoll); +#else this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerPoll); +#endif this->dispatcher = TR_new(TR_EventDispatcher, TR_EVD_SERVER, NULL, 100); this->connector = TR_new(TR_Connector); this->io_handler = TR_new(TR_IoHandler); diff --git a/testers/test_handler.c b/testers/test_handler.c index d8a9643..85fe103 100644 --- a/testers/test_handler.c +++ b/testers/test_handler.c @@ -21,18 +21,19 @@ static TR_EventDone testHandlerNewMessage(TR_EventHandler this, TR_Event event) { - TR_ProtoMessageRaw msg = event->data; - TR_SizedData data = (TR_SizedData)msg->data; - char buf[data->size + 1]; - int i; +// TR_ProtoMessageRaw msg = event->data; +// TR_SizedData data = (TR_SizedData)msg->data; +// char buf[data->size + 1]; +// int i; ((TestHandler)this)->handled++; - memcpy(buf, data->data, data->size); - buf[data->size] = 0; - for (i = 0; buf[i]; i++) { - if (! isprint(buf[i])) buf[i] = '.'; - } +// printf("handled data %p\n", event->data); +// memcpy(buf, data->data, data->size); +// buf[data->size] = 0; +// for (i = 0; buf[i]; i++) { +// if (! isprint(buf[i])) buf[i] = '.'; +// } // printf("echo message: %s(%zd)\n", buf, data->size); TR_eventHandlerIssueEvent(