Browse Source

finish edge level trigger for socket notification

1.0.0
Georg Hopp 12 years ago
parent
commit
2885147eef
  1. 21
      include/tr/comm_end_point.h
  2. 2
      include/tr/interface/comm_manager.h
  3. 18
      src/cep_write_buffered.c
  4. 1
      src/comm_end_point.c
  5. 8
      src/comm_manager.c
  6. 14
      src/comm_manager_poll.c
  7. 9
      src/connector.c
  8. 10
      src/i_comm_manager.c
  9. 107
      src/io_handler.c
  10. 4
      testers/testclient.sh

21
include/tr/comm_end_point.h

@ -45,16 +45,17 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) {
};
#define TR_CEP_EVENT_READ_READY 0
#define TR_CEP_EVENT_WRITE_READY 1
#define TR_CEP_EVENT_UPGRADE 2
#define TR_CEP_EVENT_NEW_DATA 3
#define TR_CEP_EVENT_PENDING_DATA 4
#define TR_CEP_EVENT_END_DATA 5
#define TR_CEP_EVENT_NEW_MSG 6
#define TR_CEP_EVENT_SEND_MSG 7
#define TR_CEP_EVENT_SHUT_READ 8
#define TR_CEP_EVENT_SHUT_WRITE 9
#define TR_CEP_EVENT_CLOSE 10
#define TR_CEP_EVENT_READ_BLOCK 1
#define TR_CEP_EVENT_WRITE_READY 2
#define TR_CEP_EVENT_UPGRADE 3
#define TR_CEP_EVENT_NEW_DATA 4
#define TR_CEP_EVENT_PENDING_DATA 5
#define TR_CEP_EVENT_END_DATA 6
#define TR_CEP_EVENT_NEW_MSG 7
#define TR_CEP_EVENT_SEND_MSG 8
#define TR_CEP_EVENT_SHUT_READ 9
#define TR_CEP_EVENT_SHUT_WRITE 10
#define TR_CEP_EVENT_CLOSE 11
#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE)
#define TR_cepSetClose(ep) ((ep)->do_close = 1)

2
include/tr/interface/comm_manager.h

@ -34,6 +34,7 @@ typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint)
typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int);
typedef TR_EventDone (* fptr_TR_commManagerEnableWrite)(void *, TR_Event);
typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
typedef TR_EventDone (* fptr_TR_commManagerEnableRead)(void *, TR_Event);
typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event);
typedef TR_EventDone (* fptr_TR_commManagerShutdownRead)(void *, TR_Event);
typedef TR_EventDone (* fptr_TR_commManagerShutdownWrite)(void *, TR_Event);
@ -44,6 +45,7 @@ TR_INTERFACE(TR_CommManager) {
fptr_TR_commManagerSelect select;
fptr_TR_commManagerEnableWrite enableWrite;
fptr_TR_commManagerDisableWrite disableWrite;
fptr_TR_commManagerEnableRead enableRead;
fptr_TR_commManagerClose close;
fptr_TR_commManagerShutdownRead shutdownWrite;
fptr_TR_commManagerShutdownWrite shutdownRead;

18
src/cep_write_buffered.c

@ -29,19 +29,19 @@ int
TR_cepWriteBuffered(TR_CommEndPoint this)
{
TR_RemoteData data = TR_cepNextWriteData(this);
int send = 0;
int send = TR_socketSend(this->transport, data);
while (data) {
int current_send = TR_socketSend(this->transport, data);
switch (send) {
case FALSE: // EAGAIN
case -1: // FAILURE
case -2: // remote close
return send;
send += current_send;
// TODO if nothing was send put it back into the queue..
// and stop loop. (This was a close.)
data = TR_cepNextWriteData(this);
default:
break;
}
return TRUE;
return send;
}
// vim: set ts=4 sw=4:

1
src/comm_end_point.c

@ -64,6 +64,7 @@ void
commEndPointCvInit(TR_class_ptr cls)
{
TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_READY);
TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK);
TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_READY);
TR_EVENT_CREATE(cls, TR_CEP_EVENT_UPGRADE);
TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA);

8
src/comm_manager.c

@ -72,6 +72,7 @@ TR__commManagerAddEndpoint(void * _this, TR_Event event)
TR_EventDone TR_commManagerSelect(void *, TR_Event, int);
TR_EventDone TR_commManagerEnableWrite(void *, TR_Event);
TR_EventDone TR_commManagerDisableWrite(void *, TR_Event);
TR_EventDone TR_commManagerEnableRead(void *, TR_Event);
TR_EventDone TR_commManagerClose(void *, TR_Event);
TR_EventDone TR_commManagerShutdownRead(void *, TR_Event);
TR_EventDone TR_commManagerShutdownWrite(void *, TR_Event);
@ -105,6 +106,11 @@ commManagerCvInit(TR_class_ptr cls)
TR_CommEndPoint,
TR_CEP_EVENT_END_DATA,
TR_commManagerDisableWrite);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_CommEndPoint,
TR_CEP_EVENT_READ_BLOCK,
TR_commManagerEnableRead);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_CommEndPoint,
@ -124,7 +130,7 @@ commManagerCvInit(TR_class_ptr cls)
TR_INSTANCE(TR_Hash, commManagerEventMethods);
TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
TR_CREATE_CLASS(
TR_CommManager,
TR_EventHandler,

14
src/comm_manager_poll.c

@ -104,6 +104,7 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
}
TR_eventHandlerIssueEvent((TR_EventHandler)this, event);
this->fds[i].fd = -1; // this deactivates poll...
}
if ((this->fds[i].revents & POLLOUT) == POLLOUT) {
@ -114,6 +115,8 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout)
(TR_EventSubject)endpoint,
TR_CEP_EVENT_WRITE_READY,
NULL));
// deactivate write poll...
this->fds[endpoint->transport->handle].events &= ~POLLOUT;
}
if (nevents <= 0) break;
@ -143,6 +146,16 @@ TR_commManagerPollDisableWrite(void * _this, TR_Event event)
this->fds[endpoint->transport->handle].events &= ~POLLOUT;
}
static
void
TR_commManagerPollEnableRead(void * _this, TR_Event event)
{
TR_CommManagerPoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
}
static
void
TR_commManagerPollClose(void * _this, TR_Event event)
@ -177,6 +190,7 @@ TR_INIT_IFACE(
TR_commManagerPollSelect,
TR_commManagerPollEnableWrite,
TR_commManagerPollDisableWrite,
TR_commManagerPollEnableRead,
TR_commManagerPollClose,
TR_commManagerPollDisableRead,
TR_commManagerPollDisableWrite);

9
src/connector.c

@ -74,6 +74,15 @@ connectorAccept(void * _this, TR_Event event)
socket = TR_socketAccept((TR_TcpSocket)connection->transport);
}
/*
* reenable socket for poll
*/
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
(TR_EventSubject)connection,
TR_CEP_EVENT_READ_BLOCK,
NULL));
/**
* TODO we need to identify socket failures and close socket then.
*/

10
src/i_comm_manager.c

@ -30,7 +30,7 @@
#include "tr/comm_end_point.h"
#include "tr/comm_manager.h"
TR_CREATE_INTERFACE(TR_CommManager, 7);
TR_CREATE_INTERFACE(TR_CommManager, 8);
void
TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
@ -97,6 +97,14 @@ TR_commManagerDisableWrite(void * _this, TR_Event event)
return TR_EVENT_DONE;
}
TR_EventDone
TR_commManagerEnableRead(void * _this, TR_Event event)
{
TR_CALL(_this, TR_CommManager, enableRead, event);
return TR_EVENT_DONE;
}
TR_EventDone
TR_commManagerClose(void * _this, TR_Event event)
{

107
src/io_handler.c

@ -44,75 +44,98 @@ static
TR_EventDone
ioHandlerRead(void * _this, TR_Event event)
{
TR_Event revent;
TR_EventDone done = TR_EVENT_DONE;
switch (TR_cepBufferRead((TR_CommEndPoint)event->subject)) {
default:
case FALSE:
case FALSE: // EAGAIN
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_READ_BLOCK,
NULL);
break;
case -1: // error
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
break;
default:
case -2: // remote close
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL));
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL);
break;
case TRUE:
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_NEW_DATA,
NULL));
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_NEW_DATA,
NULL);
done = TR_EVENT_PENDING;
break;
}
return TR_EVENT_DONE;
TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
return done;
}
static
TR_EventDone
ioHandlerWrite(void * _this, TR_Event event)
{
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
TR_Event revent, close_event = NULL;
TR_EventDone done = TR_EVENT_DONE;
switch (TR_cepWriteBuffered((TR_CommEndPoint)event->subject)) {
case FALSE: // EAGAIN
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_PENDING_DATA,
NULL);
break;
if (TR_cepWriteBuffered(endpoint)) {
if (TR_cepHasPendingData(endpoint)) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_PENDING_DATA,
NULL));
} else {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
case -1: // FAILURE
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
break;
case -2: // remote close
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SHUT_WRITE,
NULL);
break;
default:
if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
done = TR_EVENT_PENDING;
} else {
revent = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_END_DATA,
NULL));
if (TRUE == endpoint->do_close) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
NULL);
if (TRUE == ((TR_CommEndPoint)event->subject)->do_close) {
close_event = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
NULL);
}
}
}
}
return TR_EVENT_DONE;
TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent);
if (close_event) {
TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event);
}
return done;
}
static

4
testers/testclient.sh

@ -2,9 +2,9 @@
pids=""
i=0
while [ $i -lt 400 ]
while [ $i -lt 20 ]
do
dd if=/dev/zero bs=8192 count=2500 | nc 192.168.2.13 5678 &
dd if=/dev/zero bs=8192 count=25000 | nc -u localhost 5678 &
pids="${pids} $!"
i=$((i + 1))
done

Loading…
Cancel
Save