diff --git a/include/tr/comm_manager_epoll.h b/include/tr/comm_manager_epoll.h index 63ea5fb..a515726 100644 --- a/include/tr/comm_manager_epoll.h +++ b/include/tr/comm_manager_epoll.h @@ -33,10 +33,8 @@ TR_CLASS(TR_CommManagerEpoll) { TR_EXTENDS(TR_CommManager); - int handle; - struct epoll_event * events; - TR_Queue read_ready; - TR_Queue write_ready; + int handle; + uint32_t * events; }; TR_INSTANCE_INIT(TR_CommManagerEpoll); TR_CLASSVARS_DECL(TR_CommManagerEpoll) { diff --git a/src/cep_write_buffered.c b/src/cep_write_buffered.c index c41af6c..8ebb030 100644 --- a/src/cep_write_buffered.c +++ b/src/cep_write_buffered.c @@ -31,21 +31,12 @@ TR_cepWriteBuffered(TR_CommEndPoint this) TR_RemoteData data; int send; -// fprintf(stderr, "%s(%p): before get write data: %p / %zd messages\n", -// __func__, this, data, this->write_buffer->nmsg); data = TR_cepNextWriteData(this); -// fprintf(stderr, "%s(%p): get write data: %p / %zd messages\n", -// __func__, this, data, this->write_buffer->nmsg); -// fflush(stderr); - send = TR_socketSend(this->transport, data); switch (send) { case FALSE: // EAGAIN TR_queuePutFirst(this->write_buffer, data); -// fprintf(stderr, "%s(%p): put first write data: %p / %zd messages\n", -// __func__, this, data, this->write_buffer->nmsg); -// fflush(stderr); break; case -1: // FAILURE diff --git a/src/comm_manager_epoll.c b/src/comm_manager_epoll.c index 2d7a0a7..4d1495d 100644 --- a/src/comm_manager_epoll.c +++ b/src/comm_manager_epoll.c @@ -47,12 +47,8 @@ commManagerEpollCtor(void * _this, va_list * params) TR_CommManager cmgr = _this; TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params); - this->handle = epoll_create(cmgr->n_endpoints); - this->read_ready = TR_new(TR_Queue); - this->write_ready = TR_new(TR_Queue); - - this->read_ready->free_msgs = 0; - this->write_ready->free_msgs = 0; + this->handle = epoll_create(cmgr->n_endpoints); + this->events = TR_calloc(cmgr->n_endpoints, sizeof(uint32_t)); return 0; } @@ -63,9 +59,7 @@ commManagerEpollDtor(void * _this) { TR_CommManagerEpoll this = _this; - TR_delete(this->read_ready); - TR_delete(this->write_ready); - + TR_MEM_FREE(this->events); close(this->handle); TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor); } @@ -78,8 +72,9 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) int handle = endpoint->transport->handle; struct epoll_event event; - event.data.ptr = endpoint; - event.events = EPOLLIN | EPOLLOUT | EPOLLET; + this->events[handle] = EPOLLIN; + event.data.ptr = endpoint; + event.events = this->events[handle]; epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event); } @@ -90,11 +85,6 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) { TR_CommManagerEpoll this = _this; int i, nevents; - TR_Queue node; - - if (0 != (this->read_ready->nmsg & this->write_ready->nmsg)) { - timeout = 0; - } nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout); @@ -110,40 +100,16 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) TR_CET_EVENT_ACC_READY, NULL)); } else { - if (! ((TR_EventSubject)endpoint)->fin) { - TR_queuePut(this->read_ready, endpoint); - } + TR_eventHandlerIssueEvent((TR_EventHandler)this, + TR_eventSubjectEmit( + (TR_EventSubject)endpoint, + TR_CEP_EVENT_READ_READY, + NULL)); } } if ((events[i].events & EPOLLOUT) == EPOLLOUT) { - if (TR_cepHasPendingData(endpoint) && - ! ((TR_EventSubject)endpoint)->fin) { - TR_queuePut(this->write_ready, endpoint); - } - } - } - - /* now issue reads and write events */ - for (node=this->read_ready->first; node; node=node->next) { - TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg; - - if (! TR_socketFinRd(endpoint->transport)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_READ_READY, - NULL)); - } - } - - for (node=this->write_ready->first; node; node=node->next) { - TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg; - - if (! TR_socketFinWr(endpoint->transport)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)this, + TR_eventHandlerIssueEvent((TR_EventHandler)this, TR_eventSubjectEmit( (TR_EventSubject)endpoint, TR_CEP_EVENT_WRITE_READY, @@ -154,80 +120,84 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) static void -TR_commManagerEpollRemoveWrite(void * _this, TR_Event event) +TR_commManagerEpollEnableWrite(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - TR_queueDelete(this->write_ready, endpoint); + if (! TR_socketFinWr(endpoint->transport)) { + int handle = endpoint->transport->handle; + struct epoll_event _event; + + this->events[handle] |= EPOLLOUT; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + } } static void -TR_commManagerEpollRemoveRead(void * _this, TR_Event event) +TR_commManagerEpollDisableWrite(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + int handle = endpoint->transport->handle; + struct epoll_event _event; + + this->events[handle] &= ~EPOLLOUT; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; - TR_queueDelete(this->read_ready, endpoint); + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); } static void -TR_commManagerEpollClose(void * _this, TR_Event event) +TR_commManagerEpollEnableRead(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - TR_queueDelete(this->read_ready, endpoint); - TR_queueDelete(this->write_ready, endpoint); + if (! TR_socketFinRd(endpoint->transport)) { + int handle = endpoint->transport->handle; + struct epoll_event _event; - epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL); + this->events[handle] |= EPOLLIN; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + } } static void -TR_commManagerEpollShutRead(void * _this, TR_Event event) +TR_commManagerEpollDisableRead(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + int handle = endpoint->transport->handle; struct epoll_event _event; - TR_queueDelete(this->read_ready, endpoint); - + this->events[handle] &= ~EPOLLIN; _event.data.ptr = endpoint; - _event.events = EPOLLOUT | EPOLLET; + _event.events = this->events[handle]; - epoll_ctl( - this->handle, - EPOLL_CTL_MOD, - endpoint->transport->handle, - &_event); + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); } static void -TR_commManagerEpollShutWrite(void * _this, TR_Event event) +TR_commManagerEpollClose(void * _this, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - struct epoll_event _event; - - TR_queueDelete(this->write_ready, endpoint); - _event.data.ptr = endpoint; - _event.events = EPOLLIN | EPOLLET; - - epoll_ctl( - this->handle, - EPOLL_CTL_MOD, - endpoint->transport->handle, - &_event); + epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL); } - -static void TR_commManagerEpollNoop(void * _this, TR_Event event) {} - static void TR_commManagerEpollCvInit(TR_class_ptr cls) { @@ -237,14 +207,14 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) { TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL); TR_INIT_IFACE( TR_CommManager, - TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON - TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT - TR_commManagerEpollRemoveWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK - TR_commManagerEpollNoop, // TR_CEP_EVENT_END_DATA - TR_commManagerEpollRemoveRead, // TR_CEP_EVENT_READ_BLOCK - TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE - TR_commManagerEpollShutWrite, // TR_CEP_EVENT_SHUT_READ - TR_commManagerEpollShutRead); // TR_CEP_EVENT_SHUT_WRITE + TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON + TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT + TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK + TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_END_DATA + TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK + TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE + TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_SHUT_READ + TR_commManagerEpollEnableRead); // TR_CEP_EVENT_SHUT_WRITE TR_CREATE_CLASS( TR_CommManagerEpoll, TR_CommManager, diff --git a/src/connection.c b/src/connection.c index cb8a92a..9f17f93 100644 --- a/src/connection.c +++ b/src/connection.c @@ -123,11 +123,6 @@ connectionCompose(void * _this, TR_ProtoMessage message) } TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); -// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n", -// __func__, (TR_CommEndPoint)_this, data, -// ((TR_CommEndPoint)_this)->write_buffer->nmsg); -// fflush(stderr); - return TRUE; } diff --git a/src/datagram_service.c b/src/datagram_service.c index 05af42f..c1115a1 100644 --- a/src/datagram_service.c +++ b/src/datagram_service.c @@ -110,10 +110,6 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message) } TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); -// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n", -// __func__, (TR_CommEndPoint)_this, data, -// ((TR_CommEndPoint)_this)->write_buffer->nmsg); -// fflush(stderr); return TRUE; } diff --git a/src/io_handler.c b/src/io_handler.c index 0c0277c..e56f9f2 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -86,10 +86,11 @@ static TR_EventDone ioHandlerWrite(void * _this, TR_Event event) { - TR_Event revent = NULL, - close_event = NULL; + TR_Event revent = NULL, + close_event = NULL; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) { + switch (TR_cepWriteBuffered(endpoint)) { case FALSE: // EAGAIN revent = TR_eventSubjectEmit( event->subject,