diff --git a/include/tr/comm_end_point.h b/include/tr/comm_end_point.h index 92a6b84..3df7884 100644 --- a/include/tr/comm_end_point.h +++ b/include/tr/comm_end_point.h @@ -54,14 +54,15 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) { #define TR_CEP_EVENT_DO_WRITE 1 // IoHandler #define TR_CEP_EVENT_READ_BLOCK 2 // CommManager #define TR_CEP_EVENT_WRITE_BLOCK 3 // CommManager -#define TR_CEP_EVENT_NEW_DATA 4 // ProtocolHandler -#define TR_CEP_EVENT_NEW_MSG 5 // Application -#define TR_CEP_EVENT_MSG_READY 6 // ProtocolHandler -#define TR_CEP_EVENT_DATA_READY 7 // CommManager -#define TR_CEP_EVENT_DATA_END 8 // CommManager -#define TR_CEP_EVENT_SHUT_READ 9 // CommManager -#define TR_CEP_EVENT_SHUT_WRITE 10 // CommManager -#define TR_CEP_EVENT_CLOSE 11 // CommManager +#define TR_CEP_EVENT_WBUF_FULL 4 // CommManager +#define TR_CEP_EVENT_NEW_DATA 5 // ProtocolHandler +#define TR_CEP_EVENT_NEW_MSG 6 // Application +#define TR_CEP_EVENT_MSG_READY 7 // ProtocolHandler +#define TR_CEP_EVENT_DATA_READY 8 // CommManager +#define TR_CEP_EVENT_DATA_END 9 // CommManager +#define TR_CEP_EVENT_SHUT_READ 10 // CommManager +#define TR_CEP_EVENT_SHUT_WRITE 11 // CommManager +#define TR_CEP_EVENT_CLOSE 12 // CommManager #define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE) #define TR_cepSetClose(ep) ((ep)->do_close = 1) diff --git a/include/tr/interface/comm_manager.h b/include/tr/interface/comm_manager.h index 112e1e4..74f0a43 100644 --- a/include/tr/interface/comm_manager.h +++ b/include/tr/interface/comm_manager.h @@ -34,15 +34,19 @@ typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint) typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int); typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerDisableRead)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event); TR_INTERFACE(TR_CommManager) { TR_IFID; - fptr_TR_commManagerAddEndpoint addEndpoint; - fptr_TR_commManagerSelect select; - fptr_TR_commManagerPollWrite pollWrite; - fptr_TR_commManagerPollRead pollRead; - fptr_TR_commManagerClose close; + fptr_TR_commManagerAddEndpoint addEndpoint; + fptr_TR_commManagerSelect select; + fptr_TR_commManagerPollWrite pollWrite; + fptr_TR_commManagerPollRead pollRead; + fptr_TR_commManagerDisableWrite disableWrite; + fptr_TR_commManagerDisableRead disableRead; + fptr_TR_commManagerClose close; }; void TR_commManagerAddEndpoint(void *, TR_CommEndPoint); diff --git a/src/comm_end_point.c b/src/comm_end_point.c index 472aa8d..d0fc31c 100644 --- a/src/comm_end_point.c +++ b/src/comm_end_point.c @@ -92,6 +92,7 @@ commEndPointCvInit(TR_class_ptr cls) TR_EVENT_CREATE(cls, TR_CEP_EVENT_DO_WRITE); TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK); TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_BLOCK); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_WBUF_FULL); TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA); TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_MSG); TR_EVENT_CREATE(cls, TR_CEP_EVENT_MSG_READY); @@ -107,6 +108,7 @@ const char * TR_cepEventStrings[] = { "TR_CEP_EVENT_DO_WRITE", "TR_CEP_EVENT_READ_BLOCK", "TR_CEP_EVENT_WRITE_BLOCK", + "TR_CEP_EVENT_WBUF_FULL", "TR_CEP_EVENT_NEW_DATA", "TR_CEP_EVENT_NEW_MSG", "TR_CEP_EVENT_MSG_READY", diff --git a/src/comm_manager.c b/src/comm_manager.c index 87c0a97..01a0a7e 100644 --- a/src/comm_manager.c +++ b/src/comm_manager.c @@ -76,17 +76,6 @@ TR_commManagerEnableWrite(void * _this, TR_Event event) return TR_EVENT_DONE; } -static -TR_EventDone -TR_commManagerDisableWrite(void * _this, TR_Event event) -{ - TR_CommManager this = _this; - - TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); - - return TR_EVENT_DONE; -} - static TR_EventDone TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event) @@ -99,6 +88,8 @@ TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event) TR_EventDone TR_commManagerSelect(void *, TR_Event, int); TR_EventDone TR_commManagerPollWrite(void *, TR_Event); TR_EventDone TR_commManagerPollRead(void *, TR_Event); +TR_EventDone TR_commManagerDisableRead(void *, TR_Event); +TR_EventDone TR_commManagerDisableWrite(void *, TR_Event); TR_EventDone TR_commManagerClose(void *, TR_Event); TR_EventDone TR_commManagerShutdownRead(TR_CommManager, TR_Event); TR_EventDone TR_commManagerShutdownWrite(TR_CommManager, TR_Event); @@ -127,6 +118,10 @@ commManagerCvInit(TR_class_ptr cls) cls, TR_CommEndPoint, TR_CEP_EVENT_READ_BLOCK, TR_commManagerPollRead); + TR_EVENT_HANDLER_SET_METHOD( + cls, TR_CommEndPoint, + TR_CEP_EVENT_WBUF_FULL, + TR_commManagerDisableRead); TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, TR_CEP_EVENT_CLOSE, @@ -151,7 +146,7 @@ commManagerCvInit(TR_class_ptr cls) TR_INSTANCE(TR_Hash, commManagerEventMethods); TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL); -TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL); +TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL); TR_CREATE_CLASS( TR_CommManager, TR_EventHandler, diff --git a/src/comm_manager_epoll.c b/src/comm_manager_epoll.c index e09b2a9..164034b 100644 --- a/src/comm_manager_epoll.c +++ b/src/comm_manager_epoll.c @@ -73,7 +73,8 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) int handle = endpoint->transport->handle; struct epoll_event event; - this->events[handle] = EPOLLET; + //this->events[handle] = EPOLLET; + this->events[handle] = 0; event.data.ptr = endpoint; event.events = this->events[handle]; @@ -100,8 +101,9 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) && ((TR_TcpSocket)endpoint->transport)->listen) { TR_hashAdd(cmgr->accept, endpoint); } else { - TR_hashAdd(cmgr->read, endpoint); - + if (! event->subject->fin) { + TR_hashAdd(cmgr->read, endpoint); + } } this->events[handle] &= ~EPOLLIN; @@ -111,15 +113,43 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) } if ((events[i].events & EPOLLOUT) == EPOLLOUT) { + if (! event->subject->fin) { TR_hashAdd(cmgr->write, endpoint); - this->events[handle] &= ~EPOLLOUT; - _event.data.ptr = endpoint; - _event.events = this->events[handle]; - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + } + this->events[handle] &= ~EPOLLOUT; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + } + + if ((events[i].events & EPOLLHUP) == EPOLLHUP) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_SHUT_WRITE, + NULL)); } } } +static +inline +void +TR_commManagerEpollDisable(void * _this, uint32_t mask, TR_Event event) +{ + TR_CommManagerEpoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + int handle = endpoint->transport->handle; + struct epoll_event _event; + + this->events[handle] &= ~mask; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); +} + static inline void @@ -155,6 +185,20 @@ TR_commManagerEpollEnableRead(void * _this, TR_Event event) } } +static +void +TR_commManagerEpollDisableWrite(void * _this, TR_Event event) +{ + TR_commManagerEpollDisable(_this, EPOLLOUT, event); +} + +static +void +TR_commManagerEpollDisableRead(void * _this, TR_Event event) +{ + TR_commManagerEpollDisable(_this, EPOLLIN, event); +} + static void TR_commManagerEpollClose(void * _this, TR_Event event) @@ -178,6 +222,8 @@ TR_INIT_IFACE( TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK + TR_commManagerEpollDisableWrite, + TR_commManagerEpollDisableRead, TR_commManagerEpollClose); // TR_CEP_EVENT_CLOSE TR_CREATE_CLASS( TR_CommManagerEpoll, diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index b92bc96..4bd3e5f 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -96,16 +96,31 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) && ((TR_TcpSocket)endpoint->transport)->listen) { TR_hashAdd(cmgr->accept, endpoint); } else { - TR_hashAdd(cmgr->read, endpoint); + if (! event->subject->fin) { + TR_hashAdd(cmgr->read, endpoint); + } } this->fds[endpoint->transport->handle].events &= ~POLLIN; } if ((this->fds[i].revents & POLLOUT) == POLLOUT) { - TR_hashAdd(cmgr->write, endpoint); - this->fds[endpoint->transport->handle].events &= ~POLLOUT; + if (! event->subject->fin) { + TR_hashAdd(cmgr->write, endpoint); + } + this->fds[endpoint->transport->handle].events &= + ~(POLLOUT|POLLHUP); + } + + if ((this->fds[i].revents & POLLHUP) == POLLHUP) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_SHUT_WRITE, + NULL)); } + this->fds[i].revents = 0; if (nevents <= 0) break; } } @@ -120,7 +135,7 @@ TR_commManagerPollEnableWrite(void * _this, TR_Event event) TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; if (! TR_socketFinWr(endpoint->transport)) { - this->fds[endpoint->transport->handle].events |= POLLOUT; + this->fds[endpoint->transport->handle].events |= POLLOUT|POLLHUP; } } @@ -136,6 +151,26 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event) } } +static +void +TR_commManagerPollDisableWrite(void * _this, TR_Event event) +{ + TR_CommManagerPoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + this->fds[endpoint->transport->handle].events &= ~(POLLOUT|POLLHUP); +} + +static +void +TR_commManagerPollDisableRead(void * _this, TR_Event event) +{ + TR_CommManagerPoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + this->fds[endpoint->transport->handle].events &= ~POLLIN; +} + static void TR_commManagerPollClose(void * _this, TR_Event event) @@ -160,6 +195,8 @@ TR_INIT_IFACE( TR_commManagerPollSelect, TR_commManagerPollEnableWrite, TR_commManagerPollEnableRead, + TR_commManagerPollDisableWrite, + TR_commManagerPollDisableRead, TR_commManagerPollClose); TR_CREATE_CLASS( TR_CommManagerPoll, diff --git a/src/i_comm_manager.c b/src/i_comm_manager.c index 70b18e4..00fc9f4 100644 --- a/src/i_comm_manager.c +++ b/src/i_comm_manager.c @@ -143,6 +143,30 @@ TR_commManagerPollRead(void * _this, TR_Event event) return TR_EVENT_DONE; } +TR_EventDone +TR_commManagerDisableRead(void * _this, TR_Event event) +{ + TR_CommManager this = _this; + TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject)); + TR_CALL(_this, TR_CommManager, disableRead, event); + + return TR_EVENT_DONE; +} + +TR_EventDone +TR_commManagerDisableWrite(void * _this, TR_Event event) +{ + TR_CommManager this = _this; + + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); + if (! event->subject->fin) { + TR_hashAdd(this->read, event->subject); + } + TR_CALL(_this, TR_CommManager, disableWrite, event); + + return TR_EVENT_DONE; +} + TR_EventDone TR_commManagerClose(void * _this, TR_Event event) { @@ -155,12 +179,14 @@ TR_commManagerClose(void * _this, TR_Event event) } if (handle == this->max_handle) { - while (! this->endpoints[--this->max_handle]); + while (! this->endpoints[--this->max_handle] && + this->max_handle > 0); } if (this->endpoints[handle]) { TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]); - this->endpoints[handle] = NULL; + TR_CALL(_this, TR_CommManager, disableWrite, event); + TR_CALL(_this, TR_CommManager, disableRead, event); TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint)); TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint)); } diff --git a/src/io_handler.c b/src/io_handler.c index a6dd819..55126e2 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -51,44 +51,42 @@ ioHandlerRead(void * _this, TR_Event event) TR_Event revent; TR_RemoteData data; - if (endpoint->write_buffer_size < CEP_WRITE_BUFFER_THRESHOLD) { - switch (TR_commEndPointRead(endpoint, &data)) { - case FALSE: // EAGAIN - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_READ_BLOCK, - NULL); - break; - - case -1: // error - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); - break; + switch (TR_commEndPointRead(endpoint, &data)) { + case FALSE: // EAGAIN + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_READ_BLOCK, + NULL); + break; - default: - case -2: // remote close - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_SHUT_READ, - NULL); - break; + case -1: // error + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL); + break; - case -3: // read limit - return TR_EVENT_DONE; + default: + case -2: // remote close + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SHUT_READ, + NULL); + break; - case TRUE: - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_NEW_DATA, - data); - break; - } + case -3: // read limit + return TR_EVENT_DONE; - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + case TRUE: + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_NEW_DATA, + data); + break; } + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + return TR_EVENT_DONE; } diff --git a/src/protocol_handler.c b/src/protocol_handler.c index f78bb60..8340a7c 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -89,6 +89,15 @@ protocolHandlerCompose(void * _this, TR_Event event) if ((message_size = TR_cepCompose(endpoint, message))) { endpoint->write_buffer_size += message_size; + if (endpoint->write_buffer_size >= CEP_WRITE_BUFFER_THRESHOLD) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_WBUF_FULL, + NULL)); + } + if (endpoint->write_buffer->nmsg == 1) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, diff --git a/testers/test_handler.c b/testers/test_handler.c index 6bf319e..51ef39d 100644 --- a/testers/test_handler.c +++ b/testers/test_handler.c @@ -11,7 +11,9 @@ static TR_EventDone testHandlerHeartbeat(TR_EventHandler this, TR_Event event) { - printf("handled: %llu/s\n", ((TestHandler)this)->handled); + printf("%zd beats since last beat / handled: %llu/s\n", + ((TR_EventDispatcher)event->subject)->n_beats, + ((TestHandler)this)->handled); ((TestHandler)this)->handled = 0; return TR_EVENT_DONE; @@ -51,7 +53,7 @@ static TR_EventDone testHandlerClose(TR_EventHandler this, TR_Event event) { - puts("close"); +// puts("close"); return TR_EVENT_PENDING; } diff --git a/testers/testclient.sh b/testers/testclient.sh index b8433ab..4df73f9 100755 --- a/testers/testclient.sh +++ b/testers/testclient.sh @@ -1,15 +1,19 @@ #!/bin/sh BS=8192 -COUNT=25000 +COUNT=10000 CONCURENT=200 IP="192.168.2.13" pids="" i=0 +MESSAGE="GET / HTTP/1.1\r\nConnection: keep-alive\r\n\r\n" + while [ $i -lt ${CONCURENT} ] do - dd if=/dev/zero bs=${BS} count=${COUNT} | nc ${IP} 5678 >/dev/null & + dd if=/dev/zero bs=${BS} count=${COUNT} | nc -q 1 ${IP} 5678 >/dev/null & + #echo -en "${MESSAGE}" | nc -q 1 ${IP} 5678 & + pids="${pids} $!" i=$((i + 1)) done