Browse Source

completely drop edge triggerd behaviour as I don't see any real benefit in it and it makes things unnessecary complicated.

1.0.0
Georg Hopp 11 years ago
parent
commit
4a64f7dd36
  1. 6
      include/tr/comm_manager_epoll.h
  2. 9
      src/cep_write_buffered.c
  3. 142
      src/comm_manager_epoll.c
  4. 5
      src/connection.c
  5. 4
      src/datagram_service.c
  6. 7
      src/io_handler.c

6
include/tr/comm_manager_epoll.h

@ -33,10 +33,8 @@
TR_CLASS(TR_CommManagerEpoll) {
TR_EXTENDS(TR_CommManager);
int handle;
struct epoll_event * events;
TR_Queue read_ready;
TR_Queue write_ready;
int handle;
uint32_t * events;
};
TR_INSTANCE_INIT(TR_CommManagerEpoll);
TR_CLASSVARS_DECL(TR_CommManagerEpoll) {

9
src/cep_write_buffered.c

@ -31,21 +31,12 @@ TR_cepWriteBuffered(TR_CommEndPoint this)
TR_RemoteData data;
int send;
// fprintf(stderr, "%s(%p): before get write data: %p / %zd messages\n",
// __func__, this, data, this->write_buffer->nmsg);
data = TR_cepNextWriteData(this);
// fprintf(stderr, "%s(%p): get write data: %p / %zd messages\n",
// __func__, this, data, this->write_buffer->nmsg);
// fflush(stderr);
send = TR_socketSend(this->transport, data);
switch (send) {
case FALSE: // EAGAIN
TR_queuePutFirst(this->write_buffer, data);
// fprintf(stderr, "%s(%p): put first write data: %p / %zd messages\n",
// __func__, this, data, this->write_buffer->nmsg);
// fflush(stderr);
break;
case -1: // FAILURE

142
src/comm_manager_epoll.c

@ -47,12 +47,8 @@ commManagerEpollCtor(void * _this, va_list * params)
TR_CommManager cmgr = _this;
TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, ctor, params);
this->handle = epoll_create(cmgr->n_endpoints);
this->read_ready = TR_new(TR_Queue);
this->write_ready = TR_new(TR_Queue);
this->read_ready->free_msgs = 0;
this->write_ready->free_msgs = 0;
this->handle = epoll_create(cmgr->n_endpoints);
this->events = TR_calloc(cmgr->n_endpoints, sizeof(uint32_t));
return 0;
}
@ -63,9 +59,7 @@ commManagerEpollDtor(void * _this)
{
TR_CommManagerEpoll this = _this;
TR_delete(this->read_ready);
TR_delete(this->write_ready);
TR_MEM_FREE(this->events);
close(this->handle);
TR_PARENTCALL(TR_CommManagerEpoll, _this, TR_Class, dtor);
}
@ -78,8 +72,9 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint)
int handle = endpoint->transport->handle;
struct epoll_event event;
event.data.ptr = endpoint;
event.events = EPOLLIN | EPOLLOUT | EPOLLET;
this->events[handle] = EPOLLIN;
event.data.ptr = endpoint;
event.events = this->events[handle];
epoll_ctl(this->handle, EPOLL_CTL_ADD, handle, &event);
}
@ -90,11 +85,6 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
{
TR_CommManagerEpoll this = _this;
int i, nevents;
TR_Queue node;
if (0 != (this->read_ready->nmsg & this->write_ready->nmsg)) {
timeout = 0;
}
nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout);
@ -110,40 +100,16 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
TR_CET_EVENT_ACC_READY,
NULL));
} else {
if (! ((TR_EventSubject)endpoint)->fin) {
TR_queuePut(this->read_ready, endpoint);
}
TR_eventHandlerIssueEvent((TR_EventHandler)this,
TR_eventSubjectEmit(
(TR_EventSubject)endpoint,
TR_CEP_EVENT_READ_READY,
NULL));
}
}
if ((events[i].events & EPOLLOUT) == EPOLLOUT) {
if (TR_cepHasPendingData(endpoint) &&
! ((TR_EventSubject)endpoint)->fin) {
TR_queuePut(this->write_ready, endpoint);
}
}
}
/* now issue reads and write events */
for (node=this->read_ready->first; node; node=node->next) {
TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg;
if (! TR_socketFinRd(endpoint->transport)) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
TR_eventSubjectEmit(
(TR_EventSubject)endpoint,
TR_CEP_EVENT_READ_READY,
NULL));
}
}
for (node=this->write_ready->first; node; node=node->next) {
TR_CommEndPoint endpoint = (TR_CommEndPoint)node->msg;
if (! TR_socketFinWr(endpoint->transport)) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
TR_eventHandlerIssueEvent((TR_EventHandler)this,
TR_eventSubjectEmit(
(TR_EventSubject)endpoint,
TR_CEP_EVENT_WRITE_READY,
@ -154,80 +120,84 @@ TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout)
static
void
TR_commManagerEpollRemoveWrite(void * _this, TR_Event event)
TR_commManagerEpollEnableWrite(void * _this, TR_Event event)
{
TR_CommManagerEpoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
TR_queueDelete(this->write_ready, endpoint);
if (! TR_socketFinWr(endpoint->transport)) {
int handle = endpoint->transport->handle;
struct epoll_event _event;
this->events[handle] |= EPOLLOUT;
_event.data.ptr = endpoint;
_event.events = this->events[handle];
epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}
}
static
void
TR_commManagerEpollRemoveRead(void * _this, TR_Event event)
TR_commManagerEpollDisableWrite(void * _this, TR_Event event)
{
TR_CommManagerEpoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
int handle = endpoint->transport->handle;
struct epoll_event _event;
this->events[handle] &= ~EPOLLOUT;
_event.data.ptr = endpoint;
_event.events = this->events[handle];
TR_queueDelete(this->read_ready, endpoint);
epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}
static
void
TR_commManagerEpollClose(void * _this, TR_Event event)
TR_commManagerEpollEnableRead(void * _this, TR_Event event)
{
TR_CommManagerEpoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
TR_queueDelete(this->read_ready, endpoint);
TR_queueDelete(this->write_ready, endpoint);
if (! TR_socketFinRd(endpoint->transport)) {
int handle = endpoint->transport->handle;
struct epoll_event _event;
epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
this->events[handle] |= EPOLLIN;
_event.data.ptr = endpoint;
_event.events = this->events[handle];
epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}
}
static
void
TR_commManagerEpollShutRead(void * _this, TR_Event event)
TR_commManagerEpollDisableRead(void * _this, TR_Event event)
{
TR_CommManagerEpoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
int handle = endpoint->transport->handle;
struct epoll_event _event;
TR_queueDelete(this->read_ready, endpoint);
this->events[handle] &= ~EPOLLIN;
_event.data.ptr = endpoint;
_event.events = EPOLLOUT | EPOLLET;
_event.events = this->events[handle];
epoll_ctl(
this->handle,
EPOLL_CTL_MOD,
endpoint->transport->handle,
&_event);
epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event);
}
static
void
TR_commManagerEpollShutWrite(void * _this, TR_Event event)
TR_commManagerEpollClose(void * _this, TR_Event event)
{
TR_CommManagerEpoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
struct epoll_event _event;
TR_queueDelete(this->write_ready, endpoint);
_event.data.ptr = endpoint;
_event.events = EPOLLIN | EPOLLET;
epoll_ctl(
this->handle,
EPOLL_CTL_MOD,
endpoint->transport->handle,
&_event);
epoll_ctl(this->handle, EPOLL_CTL_DEL, endpoint->transport->handle, NULL);
}
static void TR_commManagerEpollNoop(void * _this, TR_Event event) {}
static
void
TR_commManagerEpollCvInit(TR_class_ptr cls) {
@ -237,14 +207,14 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) {
TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL);
TR_INIT_IFACE(
TR_CommManager,
TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON
TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT
TR_commManagerEpollRemoveWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
TR_commManagerEpollNoop, // TR_CEP_EVENT_END_DATA
TR_commManagerEpollRemoveRead, // TR_CEP_EVENT_READ_BLOCK
TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE
TR_commManagerEpollShutWrite, // TR_CEP_EVENT_SHUT_READ
TR_commManagerEpollShutRead); // TR_CEP_EVENT_SHUT_WRITE
TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON
TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT
TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK
TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_END_DATA
TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK
TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE
TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_SHUT_READ
TR_commManagerEpollEnableRead); // TR_CEP_EVENT_SHUT_WRITE
TR_CREATE_CLASS(
TR_CommManagerEpoll,
TR_CommManager,

5
src/connection.c

@ -123,11 +123,6 @@ connectionCompose(void * _this, TR_ProtoMessage message)
}
TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n",
// __func__, (TR_CommEndPoint)_this, data,
// ((TR_CommEndPoint)_this)->write_buffer->nmsg);
// fflush(stderr);
return TRUE;
}

4
src/datagram_service.c

@ -110,10 +110,6 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message)
}
TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data);
// fprintf(stderr, "%s(%p): put write data: %p / %zd messages\n",
// __func__, (TR_CommEndPoint)_this, data,
// ((TR_CommEndPoint)_this)->write_buffer->nmsg);
// fflush(stderr);
return TRUE;
}

7
src/io_handler.c

@ -86,10 +86,11 @@ static
TR_EventDone
ioHandlerWrite(void * _this, TR_Event event)
{
TR_Event revent = NULL,
close_event = NULL;
TR_Event revent = NULL,
close_event = NULL;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) {
switch (TR_cepWriteBuffered(endpoint)) {
case FALSE: // EAGAIN
revent = TR_eventSubjectEmit(
event->subject,

Loading…
Cancel
Save