diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index 5f40ab9..9d2d76c 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -87,30 +87,33 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) TR_CommEndPoint endpoint = cmgr->endpoints[i]; if ((this->fds[i].revents & POLLIN) == POLLIN) { + TR_Event event; + nevents--; if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) && ((TR_TcpSocket)endpoint->transport)->listen) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, + event = TR_eventSubjectEmit( (TR_EventSubject)endpoint, TR_CET_EVENT_ACC_READY, NULL); } else { - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, + event = TR_eventSubjectEmit( (TR_EventSubject)endpoint, TR_CEP_EVENT_READ_READY, NULL); } + + TR_eventHandlerIssueEvent((TR_EventHandler)this, event); } if ((this->fds[i].revents & POLLOUT) == POLLOUT) { nevents--; TR_eventHandlerIssueEvent( (TR_EventHandler)this, - (TR_EventSubject)endpoint, - TR_CEP_EVENT_WRITE_READY, - NULL); + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_WRITE_READY, + NULL)); } if (nevents <= 0) break; diff --git a/src/comm_manager_shutdown.c b/src/comm_manager_shutdown.c index 474dde5..a47a9a9 100644 --- a/src/comm_manager_shutdown.c +++ b/src/comm_manager_shutdown.c @@ -38,9 +38,10 @@ TR_commManagerShutdown(void * _this, TR_Event event) if (this->endpoints[i]) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - (TR_EventSubject)this->endpoints[i], - TR_CEP_EVENT_CLOSE, - NULL); + TR_eventSubjectEmit( + (TR_EventSubject)this->endpoints[i], + TR_CEP_EVENT_CLOSE, + NULL)); } } diff --git a/src/connector.c b/src/connector.c index 04817f1..02de626 100644 --- a/src/connector.c +++ b/src/connector.c @@ -67,9 +67,10 @@ connectorAccept(void * _this, TR_Event event) 8192); TR_eventHandlerIssueEvent( (TR_EventHandler)this, - (TR_EventSubject)new_con, - TR_CON_EVENT_NEW_CON, - NULL); + TR_eventSubjectEmit( + (TR_EventSubject)new_con, + TR_CON_EVENT_NEW_CON, + NULL)); socket = TR_socketAccept((TR_TcpSocket)connection->transport); } diff --git a/src/i_comm_manager.c b/src/i_comm_manager.c index 76e49be..35574dc 100644 --- a/src/i_comm_manager.c +++ b/src/i_comm_manager.c @@ -83,9 +83,10 @@ TR_commManagerDisableWrite(void * _this, TR_Event event) if (TR_socketFinRd(endpoint->transport)) { TR_eventHandlerIssueEvent( this, - event->subject, - TR_CEP_EVENT_SHUT_READ, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SHUT_READ, + NULL)); } return 1; @@ -113,16 +114,18 @@ TR_commManagerShutdownRead(void * _this, TR_Event event) // close TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL)); } else if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { // handle pending data... close is issued from disableWrite TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL)); } else { TR_cepSetClose((TR_CommEndPoint)event->subject); } @@ -138,9 +141,10 @@ TR_commManagerShutdownWrite(void * _this, TR_Event event) if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL)); } return 0; diff --git a/src/io_handler.c b/src/io_handler.c index 8dbef63..a02a462 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -54,25 +54,28 @@ ioHandlerRead(void * _this, TR_Event event) case -1: TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL)); break; case -2: TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_SHUT_READ, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SHUT_READ, + NULL)); break; case TRUE: TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_NEW_DATA, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_NEW_DATA, + NULL)); break; } @@ -86,19 +89,21 @@ ioHandlerWrite(void * _this, TR_Event event) TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; if (TR_cepWriteBuffered(endpoint)) { + TR_Event event; + if (TR_cepHasPendingData(endpoint)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, + event = TR_eventSubjectEmit( event->subject, TR_CEP_EVENT_PENDING_DATA, NULL); } else { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, + event = TR_eventSubjectEmit( event->subject, TR_CEP_EVENT_END_DATA, NULL); } + + TR_eventHandlerIssueEvent((TR_EventHandler)_this, event); } return TRUE; diff --git a/src/protocol_handler.c b/src/protocol_handler.c index 04db6e4..10f7381 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -56,9 +56,10 @@ protocolHandlerParse(void * _this, TR_Event event) if (message) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_NEW_MSG, - message); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_NEW_MSG, + message)); if (message->close) { // also check that we are a response. Well this is how it is done @@ -86,9 +87,10 @@ protocolHandlerCompose(void * _this, TR_Event event) if (TR_cepCompose(endpoint, message)) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, - event->subject, - TR_CEP_EVENT_WRITE_READY, - NULL); + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_WRITE_READY, + NULL)); TR_delete(message); }