Browse Source

add communication manager. These are the things that do poll, select, waitformultipleobjects or whatever.

1.0.0
Georg Hopp 11 years ago
parent
commit
31b1b404c6
  1. 48
      include/tr/comm_manager.h
  2. 45
      include/tr/comm_manager_poll.h
  3. 56
      include/tr/interface/comm_manager.h
  4. 1
      src/comm_end_point.c
  5. 141
      src/comm_manager.c
  6. 185
      src/comm_manager_poll.c
  7. 48
      src/comm_manager_shutdown.c
  8. 148
      src/i_comm_manager.c

48
include/tr/comm_manager.h

@ -0,0 +1,48 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TR_COMM_MANAGER_H__
#define __TR_COMM_MANAGER_H__
#include <sys/types.h>
#include "trbase.h"
#include "trevent.h"
TR_CLASS(TR_CommManager) {
TR_EXTENDS(TR_EventHandler);
TR_CommEndPoint * endpoints;
nfds_t n_endpoints;
};
TR_INSTANCE_INIT(TR_CommManager);
TR_CLASSVARS_DECL(TR_CommManager) {
TR_CV_EXTENDS(TR_EventHandler);
};
void TR_commManagerAddEndpoint(void *, TR_CommEndPoint);
int TR_commManagerShutdown(void * _this, TR_Event event);
#endif // __TR_COMM_MANAGER_H__
// vim: set ts=4 sw=4:

45
include/tr/comm_manager_poll.h

@ -0,0 +1,45 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TR_COMM_MANAGER_POLL_H__
#define __TR_COMM_MANAGER_POLL_H__
#include <sys/types.h>
#include <poll.h>
#include "trbase.h"
#include "trevent.h"
TR_CLASS(TR_CommManagerPoll) {
TR_EXTENDS(TR_CommManager);
struct pollfd * fds;
};
TR_INSTANCE_INIT(TR_CommManagerPoll);
TR_CLASSVARS_DECL(TR_CommManagerPoll) {
TR_CV_EXTENDS(TR_EventHandler);
};
#endif // __TR_COMM_MANAGER_POLL_H__
// vim: set ts=4 sw=4:

56
include/tr/interface/comm_manager.h

@ -0,0 +1,56 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TR_INTERFACE_COMM_MANAGER_H__
#define __TR_INTERFACE_COMM_MANAGER_H__
#include <sys/types.h>
#include "trbase.h"
#include "trevent.h"
#include "tr/comm_end_point.h"
typedef void (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint);
typedef void (* fptr_TR_commManagerSelect)(void *, TR_Event, int);
typedef void (* fptr_TR_commManagerEnableWrite)(void *, TR_Event);
typedef void (* fptr_TR_commManagerDisableWrite)(void *, TR_Event);
typedef void (* fptr_TR_commManagerClose)(void *, TR_Event);
typedef void (* fptr_TR_commManagerShutdownRead)(void *, TR_Event);
typedef void (* fptr_TR_commManagerShutdownRead)(void *, TR_Event);
TR_INTERFACE(TR_CommManager) {
TR_IFID;
fptr_TR_commManagerAddEndpoint addEndpoint;
fptr_TR_commManagerSelect select;
fptr_TR_commManagerEnableWrite enableWrite;
fptr_TR_commManagerDisableWrite disableWrite;
fptr_TR_commManagerClose close;
fptr_TR_commManagerShutdownRead shutdownWrite;
fptr_TR_commManagerShutdownRead shutdownRead;
};
void TR_commManagerAddEndpoint(void *, TR_CommEndPoint);
#endif // __TR_INTERFACE_COMM_MANAGER_H__
// vim: set ts=4 sw=4:

1
src/comm_end_point.c

@ -54,6 +54,7 @@ commEndPointDtor(void * _this)
{
TR_CommEndPoint this = _this;
TR_delete(this->transport);
TR_delete(this->read_buffer);
TR_delete(this->write_buffer);
}

141
src/comm_manager.c

@ -0,0 +1,141 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <unistd.h>
#include "trbase.h"
#include "trevent.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/comm_manager.h"
#include "tr/interface/comm_manager.h"
static
int
commManagerCtor(void * _this, va_list * params)
{
TR_CommManager this = _this;
this->n_endpoints = sysconf(_SC_OPEN_MAX);
this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints);
return 0;
}
static
void
commManagerDtor(void * _this)
{
TR_CommManager this = _this;
nfds_t i;
for (i = 0; i < this->n_endpoints; i++) {
TR_delete(this->endpoints[i]);
}
TR_PARENTCALL(_this, TR_Class, dtor);
}
static
int
TR__commManagerAddEndpoint(void * _this, Event event)
{
TR_commManagerAddEndpoint(
(TR_CommManager)_this,
(TR_CommEndPoint)event->subject);
return 1;
}
void TR_commManagerSelect(void *, TR_Event, int);
void TR_commManagerEnableWrite(void *, TR_Event);
void TR_commManagerDisableWrite(void *, TR_Event);
void TR_commManagerClose(void *, TR_Event);
void TR_commManagerShutdownRead(void *, TR_Event);
void TR_commManagerShutdownRead(void *, TR_Event);
static
void
commManagerCvInit(TR_class_ptr cls)
{
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_EventDispatcher,
TR_DISPATCHER_EVENT_DATA_WAIT,
TR_commManagerSelect);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_EventDispatcher,
TR_DISPATCHER_EVENT_SHUTDOWN,
TR_commManagerShutdown);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_Connection,
TR_CON_EVENT_NEW_CON,
TR__commManagerAddEndpoint);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_Connection,
TR_CON_EVENT_PENDING_DATA,
TR_commManagerEnableWrite);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_Connection,
TR_CON_EVENT_END_DATA,
TR_commManagerDisableWrite);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_CommEndPoint,
TR_CEP_EVENT_CLOSE,
TR_commManagerClose);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_CommEndPoint,
TR_CEP_EVENT_SHUT_READ,
TR_commManagerShutdownRead);
TR_EVENT_HANDLER_SET_METHOD(
cls,
TR_CommEndPoint,
TR_CEP_EVENT_SHUT_READ,
TR_commManagerShutdownWrite);
}
TR_INSTANCE(TR_Hash, _event_methods);
TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
TR_CREATE_CLASS(
TR_CommManager,
TR_EventHandler,
commManagerCvInit,
TR_IF(TR_Class),
TR_IF(TR_CommManager)) = {
{ &(__event_methods.data) }
};
// vim: set ts=4 sw=4:

185
src/comm_manager_poll.c

@ -0,0 +1,185 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <unistd.h>
#include "trbase.h"
#include "trevent.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/comm_manager.h"
#include "tr/interface/comm_manager.h"
static
int
commManagerPollCtor(void * _this, va_list * params)
{
TR_CommManagerPoll this = _this;
TR_CommManager cmgr = _this;
nfds_t i;
TR_PARENTCALL(_this, TR_Class, ctor, params);
this->fds = TR_malloc(sizeof(struct pollfd) * cmgr->n_endpoints);
for (i = 0; i < cmgr->n_endpoints; i++) {
this->fds[i] = {-1, 0, 0};
}
return 0;
}
static
void
commManagerPollDtor(void * _this)
{
TR_CommManagerPoll this = _this;
TR_MEM_FREE(this->fds);
TR_PARENTCALL(_this, TR_Class, dtor);
}
static
void
TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
TR_CommManagerPoll this = _this;
this->fds[endpoint->transport->handle].fd = endpoint->transport->handle;
this->fds[endpoint->transport->handle].events = POLLIN;
}
static
void
TR_commManagerSelect(void * _this, TR_Event event, int timeout)
{
TR_CommManagerPoll this = _this;
TR_CommManager cmgr = _this;
nfds_t i;
int nevents;
nevents = poll(this->fds, cmgr->n_endpoints, timeout);
for (i = 0; i < cmgr->n_endpoints, i++) {
TR_CommEndPoint endpoint = this->endpoints[i];
if (this->fds[i].revents & POLLIN == POLLIN) {
nevents--;
if (TR_INSTANCE_OF(TR_TcpSocket, endpoints->transport)
&& ((TR_TcpSocket)endpoint->transport)->listen) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
(TR_Connection)endpoint,
TR_CON_EVENT_ACC_READY,
NULL);
} else {
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
endpoint,
TR_CET_EVENT_READ_READY,
NULL);
}
}
if (this->fds[i].revents & POLLOUT == POLLOUT) {
nevents--;
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
endpoint,
TR_CET_EVENT_WRITE_READY,
NULL);
}
if (nevents <= 0) break;
}
}
static
void
TR_commManagerEnableWrite(void * _this, TR_Event event)
{
TR_CommManagerPoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
if (! TR_socketFinWr(endpoint->transport)) {
this->fds[endpoint->transport->handle].event |= POLLOUT;
}
}
static
int
TR_commManagerDisableWrite(void * _this, TR_Event event)
{
TR_CommManagerPoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
this->fds[endpoint->transport->handle].event &= ~POLLOUT;
}
static
int
TR_commManagerClose(void * _this, TR_Event event)
{
TR_CommManagerPoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
this->fds[endpoint->transport->handle].event = 0;
this->fds[endpoint->transport->handle].fs = -1;
}
static
int
TR_commManagerDisableRead(void * _this, TR_Event event)
{
TR_CommManagerPoll this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
this->fds[endpoint->transport->handle].event &= ~POLLIN;
}
static
int
TR_commManagerShutdownRead(void * _this, TR_Event event)
{
}
static
void
commManagerPollCvInit(TR_class_ptr cls) {
TR_INHERIT_CLASSVARS(TR_CommManagerPoll, TR_CommManager);
}
TR_INIT_IFACE(TR_Class, commManagerPollCtor, commManagerPollDtor, NULL);
TR_INIT_IFACE(
TR_CommManager,
TR_commManagerPollAddEndpoint,
TR_commManagerPollSelect,
TR_commManagerPollEnableWrite,
TR_commManagerPollDisableWrite,
TR_commManagerPollClose,
TR_commManagerPollDisableRead,
TR_commManagerPollDisableWrite);
TR_CREATE_CLASS(
TR_CommManagerPoll,
TR_CommManager,
TR_CommManagerPollCvInit,
TR_IF(TR_Class));
// vim: set ts=4 sw=4:

48
src/comm_manager_shutdown.c

@ -0,0 +1,48 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trbase.h"
#include "trio.h"
#include "trevent.h"
#include "tr/comm_manager.h"
int
TR_commManagerShutdown(void * _this, TR_Event event)
{
TR_CommManager this = _this;
nfds_t i;
for (i=0; i<this->n_endpoints; i++) {
if (this->endpoints[handle]) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
(TR_EventSubject)this->endpoints[handle],
TR_CEP_EVENT_CLOSE,
NULL);
}
}
return 0;
}
// vim: set ts=4 sw=4:

148
src/i_comm_manager.c

@ -0,0 +1,148 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include "trbase.h"
#include "trevent.h"
#include "tr/interface/comm_manager.h"
#include "tr/comm_end_point.h"
TR_CREATE_INTERFACE(TR_CommManager, 7);
void
TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint)
{
TR_ConnManager this = _this;
if (this->endpoints[endpoint->transport->handle]) {
// this should never happen, but if so we assume this is a leftover
// that still has to be deleted.
TR_delete(this->endpoints[endpoint->transport->handle]);
}
this->endpoints[endpoint->transport->handle] = endpoint;
TR_CALL(_this, TR_CommManager, addEndpoint, endpoint);
}
int
TR_commManagerSelect(void * _this, TR_Event event)
{
int timeout; // milliseconds
int * timeoutptr = event->data;
TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject;
if (NULL == timeoutptr) {
timeout = TR_eventDispatcherGetDataWaitTime(dispatcher);
} else {
timeout = *timeoutptr;
TR_MEM_FREE(timeoutptr);
}
TR_CALL(_this, TR_CommManager, select, event, timeout);
return 1;
}
int
TR_commManagerEnableWrite(void * _this, TR_Event event)
{
TR_CALL(_this, TR_CommManager, enableWrite, event);
return 1;
}
int
TR_commManagerDisableWrite(void * _this, TR_Event event)
{
TR_EventHandler this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
TR_CALL(_this, TR_CommManager, disableWrite, event);
if (TR_socketFinRd(endpoint->transport)) {
TR_eventHandlerIssueEvent(
this,
event->subject,
TR_CEP_EVENT_SHUT_READ,
NULL);
}
return 1;
}
int
TR_commManagerClose(void * _this, TR_Event event)
{
TR_ConnManager this = _this;
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
TR_socketShutdown(endpoint->transport);
TR_CALL(_this, TR_CommManager, close, event);
TR_delete(this->endpoints[endpoint->transport->handle]);
return 0;
}
int
TR_commManagerShutdownRead(void * _this, TR_Event event)
{
TR_CALL(_this, TR_CommManager, shutdownRead, event);
if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
// close
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
} else if (TR_cepHasPendingData(endpoint)) {
// handle pending data... close is issued from disableWrite
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
} else {
TR_cepSetClose(endpoint);
}
return 0;
}
int
TR_commManagerShutdownWrite(void * _this, TR_Event event)
{
TR_CALL(_this, TR_CommManager, shutdownWrite, event);
if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
event->subject,
TR_CEP_EVENT_CLOSE,
NULL);
}
return 0;
}
// vim: set ts=4 sw=4:
Loading…
Cancel
Save