Browse Source

adapt on separation of issue and emit of events

1.0.0
Georg Hopp 11 years ago
parent
commit
18fcc58723
  1. 17
      src/comm_manager_poll.c
  2. 7
      src/comm_manager_shutdown.c
  3. 7
      src/connector.c
  4. 28
      src/i_comm_manager.c
  5. 31
      src/io_handler.c
  6. 14
      src/protocol_handler.c

17
src/comm_manager_poll.c

@ -87,30 +87,33 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
TR_CommEndPoint endpoint = cmgr->endpoints[i]; TR_CommEndPoint endpoint = cmgr->endpoints[i];
if ((this->fds[i].revents & POLLIN) == POLLIN) { if ((this->fds[i].revents & POLLIN) == POLLIN) {
TR_Event event;
nevents--; nevents--;
if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport)
&& ((TR_TcpSocket)endpoint->transport)->listen) { && ((TR_TcpSocket)endpoint->transport)->listen) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
event = TR_eventSubjectEmit(
(TR_EventSubject)endpoint, (TR_EventSubject)endpoint,
TR_CET_EVENT_ACC_READY, TR_CET_EVENT_ACC_READY,
NULL); NULL);
} else { } else {
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
event = TR_eventSubjectEmit(
(TR_EventSubject)endpoint, (TR_EventSubject)endpoint,
TR_CEP_EVENT_READ_READY, TR_CEP_EVENT_READ_READY,
NULL); NULL);
} }
TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
} }
if ((this->fds[i].revents & POLLOUT) == POLLOUT) { if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
nevents--; nevents--;
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)this, (TR_EventHandler)this,
(TR_EventSubject)endpoint,
TR_CEP_EVENT_WRITE_READY,
NULL);
TR_eventSubjectEmit(
(TR_EventSubject)endpoint,
TR_CEP_EVENT_WRITE_READY,
NULL));
} }
if (nevents <= 0) break; if (nevents <= 0) break;

7
src/comm_manager_shutdown.c

@ -38,9 +38,10 @@ TR_commManagerShutdown(void * _this, TR_Event event)
if (this->endpoints[i]) { if (this->endpoints[i]) {
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
(TR_EventSubject)this->endpoints[i],
TR_CEP_EVENT_CLOSE,
NULL);
TR_eventSubjectEmit(
(TR_EventSubject)this->endpoints[i],
TR_CEP_EVENT_CLOSE,
NULL));
} }
} }

7
src/connector.c

@ -67,9 +67,10 @@ connectorAccept(void * _this, TR_Event event)
8192); 8192);
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)this, (TR_EventHandler)this,
(TR_EventSubject)new_con,
TR_CON_EVENT_NEW_CON,
NULL);
TR_eventSubjectEmit(
(TR_EventSubject)new_con,
TR_CON_EVENT_NEW_CON,
NULL));
socket = TR_socketAccept((TR_TcpSocket)connection->transport); socket = TR_socketAccept((TR_TcpSocket)connection->transport);
} }

28
src/i_comm_manager.c

@ -83,9 +83,10 @@ TR_commManagerDisableWrite(void * _this, TR_Event event)
if (TR_socketFinRd(endpoint->transport)) { if (TR_socketFinRd(endpoint->transport)) {
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
this, this,
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL));
} }
return 1; return 1;
@ -113,16 +114,18 @@ TR_commManagerShutdownRead(void * _this, TR_Event event)
// close // close
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
} else if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { } else if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
// handle pending data... close is issued from disableWrite // handle pending data... close is issued from disableWrite
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
} else { } else {
TR_cepSetClose((TR_CommEndPoint)event->subject); TR_cepSetClose((TR_CommEndPoint)event->subject);
} }
@ -138,9 +141,10 @@ TR_commManagerShutdownWrite(void * _this, TR_Event event)
if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
} }
return 0; return 0;

31
src/io_handler.c

@ -54,25 +54,28 @@ ioHandlerRead(void * _this, TR_Event event)
case -1: case -1:
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
break; break;
case -2: case -2:
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL));
break; break;
case TRUE: case TRUE:
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_NEW_DATA,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_NEW_DATA,
NULL));
break; break;
} }
@ -86,19 +89,21 @@ ioHandlerWrite(void * _this, TR_Event event)
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
if (TR_cepWriteBuffered(endpoint)) { if (TR_cepWriteBuffered(endpoint)) {
TR_Event event;
if (TR_cepHasPendingData(endpoint)) { if (TR_cepHasPendingData(endpoint)) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
event = TR_eventSubjectEmit(
event->subject, event->subject,
TR_CEP_EVENT_PENDING_DATA, TR_CEP_EVENT_PENDING_DATA,
NULL); NULL);
} else { } else {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
event = TR_eventSubjectEmit(
event->subject, event->subject,
TR_CEP_EVENT_END_DATA, TR_CEP_EVENT_END_DATA,
NULL); NULL);
} }
TR_eventHandlerIssueEvent((TR_EventHandler)_this, event);
} }
return TRUE; return TRUE;

14
src/protocol_handler.c

@ -56,9 +56,10 @@ protocolHandlerParse(void * _this, TR_Event event)
if (message) { if (message) {
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_NEW_MSG,
message);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_NEW_MSG,
message));
if (message->close) { if (message->close) {
// also check that we are a response. Well this is how it is done // also check that we are a response. Well this is how it is done
@ -86,9 +87,10 @@ protocolHandlerCompose(void * _this, TR_Event event)
if (TR_cepCompose(endpoint, message)) { if (TR_cepCompose(endpoint, message)) {
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_WRITE_READY,
NULL);
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_WRITE_READY,
NULL));
TR_delete(message); TR_delete(message);
} }

Loading…
Cancel
Save