diff --git a/include/tr/comm_end_point.h b/include/tr/comm_end_point.h index 27bce0e..3983810 100644 --- a/include/tr/comm_end_point.h +++ b/include/tr/comm_end_point.h @@ -45,16 +45,17 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) { }; #define TR_CEP_EVENT_READ_READY 0 -#define TR_CEP_EVENT_WRITE_READY 1 -#define TR_CEP_EVENT_UPGRADE 2 -#define TR_CEP_EVENT_NEW_DATA 3 -#define TR_CEP_EVENT_PENDING_DATA 4 -#define TR_CEP_EVENT_END_DATA 5 -#define TR_CEP_EVENT_NEW_MSG 6 -#define TR_CEP_EVENT_SEND_MSG 7 -#define TR_CEP_EVENT_SHUT_READ 8 -#define TR_CEP_EVENT_SHUT_WRITE 9 -#define TR_CEP_EVENT_CLOSE 10 +#define TR_CEP_EVENT_READ_BLOCK 1 +#define TR_CEP_EVENT_WRITE_READY 2 +#define TR_CEP_EVENT_UPGRADE 3 +#define TR_CEP_EVENT_NEW_DATA 4 +#define TR_CEP_EVENT_PENDING_DATA 5 +#define TR_CEP_EVENT_END_DATA 6 +#define TR_CEP_EVENT_NEW_MSG 7 +#define TR_CEP_EVENT_SEND_MSG 8 +#define TR_CEP_EVENT_SHUT_READ 9 +#define TR_CEP_EVENT_SHUT_WRITE 10 +#define TR_CEP_EVENT_CLOSE 11 #define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE) #define TR_cepSetClose(ep) ((ep)->do_close = 1) diff --git a/include/tr/interface/comm_manager.h b/include/tr/interface/comm_manager.h index c557278..8da1034 100644 --- a/include/tr/interface/comm_manager.h +++ b/include/tr/interface/comm_manager.h @@ -34,6 +34,7 @@ typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint) typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int); typedef TR_EventDone (* fptr_TR_commManagerEnableWrite)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerEnableRead)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerShutdownRead)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerShutdownWrite)(void *, TR_Event); @@ -44,6 +45,7 @@ TR_INTERFACE(TR_CommManager) { fptr_TR_commManagerSelect select; fptr_TR_commManagerEnableWrite enableWrite; fptr_TR_commManagerDisableWrite disableWrite; + fptr_TR_commManagerEnableRead enableRead; fptr_TR_commManagerClose close; fptr_TR_commManagerShutdownRead shutdownWrite; fptr_TR_commManagerShutdownWrite shutdownRead; diff --git a/src/cep_write_buffered.c b/src/cep_write_buffered.c index bed3920..ab8f5cf 100644 --- a/src/cep_write_buffered.c +++ b/src/cep_write_buffered.c @@ -29,19 +29,19 @@ int TR_cepWriteBuffered(TR_CommEndPoint this) { TR_RemoteData data = TR_cepNextWriteData(this); - int send = 0; + int send = TR_socketSend(this->transport, data); - while (data) { - int current_send = TR_socketSend(this->transport, data); + switch (send) { + case FALSE: // EAGAIN + case -1: // FAILURE + case -2: // remote close + return send; - send += current_send; - // TODO if nothing was send put it back into the queue.. - // and stop loop. (This was a close.) - - data = TR_cepNextWriteData(this); + default: + break; } - return TRUE; + return send; } // vim: set ts=4 sw=4: diff --git a/src/comm_end_point.c b/src/comm_end_point.c index 2a1dac8..fe899ce 100644 --- a/src/comm_end_point.c +++ b/src/comm_end_point.c @@ -64,6 +64,7 @@ void commEndPointCvInit(TR_class_ptr cls) { TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_READY); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK); TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_READY); TR_EVENT_CREATE(cls, TR_CEP_EVENT_UPGRADE); TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA); diff --git a/src/comm_manager.c b/src/comm_manager.c index 7869dd1..7291ff1 100644 --- a/src/comm_manager.c +++ b/src/comm_manager.c @@ -72,6 +72,7 @@ TR__commManagerAddEndpoint(void * _this, TR_Event event) TR_EventDone TR_commManagerSelect(void *, TR_Event, int); TR_EventDone TR_commManagerEnableWrite(void *, TR_Event); TR_EventDone TR_commManagerDisableWrite(void *, TR_Event); +TR_EventDone TR_commManagerEnableRead(void *, TR_Event); TR_EventDone TR_commManagerClose(void *, TR_Event); TR_EventDone TR_commManagerShutdownRead(void *, TR_Event); TR_EventDone TR_commManagerShutdownWrite(void *, TR_Event); @@ -105,6 +106,11 @@ commManagerCvInit(TR_class_ptr cls) TR_CommEndPoint, TR_CEP_EVENT_END_DATA, TR_commManagerDisableWrite); + TR_EVENT_HANDLER_SET_METHOD( + cls, + TR_CommEndPoint, + TR_CEP_EVENT_READ_BLOCK, + TR_commManagerEnableRead); TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, @@ -124,7 +130,7 @@ commManagerCvInit(TR_class_ptr cls) TR_INSTANCE(TR_Hash, commManagerEventMethods); TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL); -TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL); +TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); TR_CREATE_CLASS( TR_CommManager, TR_EventHandler, diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index 2ea01af..95dcef7 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -104,6 +104,7 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) } TR_eventHandlerIssueEvent((TR_EventHandler)this, event); + this->fds[i].fd = -1; // this deactivates poll... } if ((this->fds[i].revents & POLLOUT) == POLLOUT) { @@ -114,6 +115,8 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) (TR_EventSubject)endpoint, TR_CEP_EVENT_WRITE_READY, NULL)); + // deactivate write poll... + this->fds[endpoint->transport->handle].events &= ~POLLOUT; } if (nevents <= 0) break; @@ -143,6 +146,16 @@ TR_commManagerPollDisableWrite(void * _this, TR_Event event) this->fds[endpoint->transport->handle].events &= ~POLLOUT; } +static +void +TR_commManagerPollEnableRead(void * _this, TR_Event event) +{ + TR_CommManagerPoll this = _this; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + this->fds[endpoint->transport->handle].fd = endpoint->transport->handle; +} + static void TR_commManagerPollClose(void * _this, TR_Event event) @@ -177,6 +190,7 @@ TR_INIT_IFACE( TR_commManagerPollSelect, TR_commManagerPollEnableWrite, TR_commManagerPollDisableWrite, + TR_commManagerPollEnableRead, TR_commManagerPollClose, TR_commManagerPollDisableRead, TR_commManagerPollDisableWrite); diff --git a/src/connector.c b/src/connector.c index 8384c9e..ec6b95e 100644 --- a/src/connector.c +++ b/src/connector.c @@ -74,6 +74,15 @@ connectorAccept(void * _this, TR_Event event) socket = TR_socketAccept((TR_TcpSocket)connection->transport); } + /* + * reenable socket for poll + */ + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + TR_eventSubjectEmit( + (TR_EventSubject)connection, + TR_CEP_EVENT_READ_BLOCK, + NULL)); /** * TODO we need to identify socket failures and close socket then. */ diff --git a/src/i_comm_manager.c b/src/i_comm_manager.c index 781d78d..dba01b0 100644 --- a/src/i_comm_manager.c +++ b/src/i_comm_manager.c @@ -30,7 +30,7 @@ #include "tr/comm_end_point.h" #include "tr/comm_manager.h" -TR_CREATE_INTERFACE(TR_CommManager, 7); +TR_CREATE_INTERFACE(TR_CommManager, 8); void TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint) @@ -97,6 +97,14 @@ TR_commManagerDisableWrite(void * _this, TR_Event event) return TR_EVENT_DONE; } +TR_EventDone +TR_commManagerEnableRead(void * _this, TR_Event event) +{ + TR_CALL(_this, TR_CommManager, enableRead, event); + + return TR_EVENT_DONE; +} + TR_EventDone TR_commManagerClose(void * _this, TR_Event event) { diff --git a/src/io_handler.c b/src/io_handler.c index dc3d60d..deb9cb5 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -44,75 +44,98 @@ static TR_EventDone ioHandlerRead(void * _this, TR_Event event) { + TR_Event revent; + TR_EventDone done = TR_EVENT_DONE; + switch (TR_cepBufferRead((TR_CommEndPoint)event->subject)) { - default: - case FALSE: + case FALSE: // EAGAIN + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_READ_BLOCK, + NULL); break; case -1: // error - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_CLOSE, - NULL)); + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL); break; + default: case -2: // remote close - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_SHUT_READ, - NULL)); + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SHUT_READ, + NULL); break; case TRUE: - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_NEW_DATA, - NULL)); + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_NEW_DATA, + NULL); + + done = TR_EVENT_PENDING; break; } - return TR_EVENT_DONE; + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + return done; } static TR_EventDone ioHandlerWrite(void * _this, TR_Event event) { - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + TR_Event revent, close_event = NULL; + TR_EventDone done = TR_EVENT_DONE; + + switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) { + case FALSE: // EAGAIN + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_PENDING_DATA, + NULL); + break; - if (TR_cepWriteBuffered(endpoint)) { - if (TR_cepHasPendingData(endpoint)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_PENDING_DATA, - NULL)); - } else { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( + case -1: // FAILURE + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL); + break; + + case -2: // remote close + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SHUT_WRITE, + NULL); + break; + + default: + if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { + done = TR_EVENT_PENDING; + } else { + revent = TR_eventSubjectEmit( event->subject, TR_CEP_EVENT_END_DATA, - NULL)); - if (TRUE == endpoint->do_close) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( + NULL); + + if (TRUE == ((TR_CommEndPoint)event->subject)->do_close) { + close_event = TR_eventSubjectEmit( event->subject, TR_CEP_EVENT_CLOSE, - NULL)); + NULL); + } } - } } - return TR_EVENT_DONE; + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + if (close_event) { + TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event); + } + return done; } static diff --git a/testers/testclient.sh b/testers/testclient.sh index e7cb4ff..cb54218 100755 --- a/testers/testclient.sh +++ b/testers/testclient.sh @@ -2,9 +2,9 @@ pids="" i=0 -while [ $i -lt 400 ] +while [ $i -lt 20 ] do - dd if=/dev/zero bs=8192 count=2500 | nc 192.168.2.13 5678 & + dd if=/dev/zero bs=8192 count=25000 | nc -u localhost 5678 & pids="${pids} $!" i=$((i + 1)) done