Communication protocol handler for the taskrambler framework. This does not contain specific protocol implementations but the abstract code to handle them.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

184 lines
4.6 KiB

/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <unistd.h>
#include <poll.h>
#include <pthread.h>
#include "trbase.h"
#include "trdata.h"
#include "trevent.h"
#include "tr/comm_end_point.h"
#include "tr/connection.h"
#include "tr/comm_manager.h"
#include "tr/interface/comm_manager.h"
static
int
commManagerCtor(void * _this, va_list * params)
{
TR_CommManager this = _this;
TR_PARENTCALL(TR_CommManager, _this, TR_Class, ctor, params);
this->accept = TR_new(TR_Hash);
this->write = TR_new(TR_Hash);
this->read = TR_new(TR_Hash);
this->accept->cleanup_no_free = TRUE;
this->write->cleanup_no_free = TRUE;
this->read->cleanup_no_free = TRUE;
this->n_endpoints = sysconf(_SC_OPEN_MAX);
this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints);
pthread_mutex_init(&this->io_triggered_lock, NULL);
return 0;
}
static
void
commManagerDtor(void * _this)
{
TR_CommManager this = _this;
nfds_t i;
pthread_mutex_destroy(&this->io_triggered_lock);
for (i=0; i<this->n_endpoints; i++) {
TR_delete(this->endpoints[i]);
}
TR_MEM_FREE(this->endpoints);
TR_delete(this->read);
TR_delete(this->write);
TR_delete(this->accept);
}
static
TR_EventDone
TR_commManagerWriteIsBlocked(void * _this, TR_Event event)
{
TR_CommManager this = _this;
TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject));
return TR_EVENT_DONE;
}
static
TR_EventDone
TR_commManagerDecrementIoTriggerd(TR_CommManager this, TR_Event event)
{
pthread_mutex_lock(&this->io_triggered_lock);
this->io_triggered--;
pthread_mutex_unlock(&this->io_triggered_lock);
return TR_EVENT_DONE;
}
static
TR_EventDone
TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event)
{
TR_commManagerAddEndpoint(this, (TR_CommEndPoint)event->subject);
return TR_EVENT_DONE;
}
TR_EventDone TR_commManagerSelect(void *, TR_Event);
TR_EventDone TR_commManagerPollWrite(void *, TR_Event);
TR_EventDone TR_commManagerPollRead(void *, TR_Event);
TR_EventDone TR_commManagerDisableRead(void *, TR_Event);
TR_EventDone TR_commManagerDisableWrite(void *, TR_Event);
TR_EventDone TR_commManagerClose(void *, TR_Event);
TR_EventDone TR_commManagerShutdownRead(TR_CommManager, TR_Event);
TR_EventDone TR_commManagerShutdownWrite(TR_CommManager, TR_Event);
static
void
commManagerCvInit(TR_class_ptr cls)
{
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_EventDispatcher,
TR_DISPATCHER_EVENT_DATA_WAIT,
TR_commManagerSelect);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_EventDispatcher,
TR_DISPATCHER_EVENT_SHUTDOWN,
TR_commManagerShutdown);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_Connection,
TR_CON_EVENT_NEW_CON,
TR_commManagerAddEndpointEvt);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_WRITE_BLOCK,
TR_commManagerWriteIsBlocked);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_READ_BLOCK,
TR_commManagerPollRead);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_WBUF_FULL,
TR_commManagerDisableRead);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_CLOSE,
TR_commManagerClose);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_SHUT_READ,
TR_commManagerShutdownRead);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_SHUT_WRITE,
TR_commManagerShutdownWrite);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_DATA_READY,
TR_commManagerPollWrite);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_DATA_END,
TR_commManagerDisableWrite);
TR_EVENT_HANDLER_SET_METHOD(
cls, TR_CommEndPoint,
TR_CEP_EVENT_IO_DONE,
TR_commManagerDecrementIoTriggerd);
}
TR_INIT_HANDLER(TR_CommManager);
TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL);
TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
TR_CREATE_CLASS(
TR_CommManager,
TR_EventHandler,
commManagerCvInit,
TR_IF(TR_Class),
TR_IF(TR_CommManager)) = {
{ TR_HANDLER_CVARS(TR_CommManager) }
};
// vim: set ts=4 sw=4: