Browse Source

fix close handling and introduce server class

1.0.0
Georg Hopp 11 years ago
parent
commit
079be38153
  1. 1
      include/Makefile.am
  2. 63
      include/tr/server.h
  3. 1
      include/trcomm.h
  4. 2
      src/Makefile.am
  5. 4
      src/i_comm_manager.c
  6. 32
      src/io_handler.c
  7. 83
      src/server.c
  8. 35
      src/server_start.c
  9. 4
      testers/build.sh
  10. 119
      testers/test_handler.c
  11. 18
      testers/test_handler.h
  12. 149
      testers/testserver.c
  13. 54
      testers/testserver2.c

1
include/Makefile.am

@ -12,6 +12,7 @@ nobase_include_HEADERS = trcomm.h \
tr/protocol/raw.h \ tr/protocol/raw.h \
tr/protocol/message_raw.h \ tr/protocol/message_raw.h \
tr/protocol_handler.h \ tr/protocol_handler.h \
tr/server.h \
tr/interface/comm_end_point.h \ tr/interface/comm_end_point.h \
tr/interface/comm_manager.h \ tr/interface/comm_manager.h \
tr/interface/protocol.h tr/interface/protocol.h

63
include/tr/server.h

@ -0,0 +1,63 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TR_SERVER_H__
#define __TR_SERVER_H__
#include <sys/types.h>
#include "trbase.h"
#include "trevent.h"
#include "tr/comm_manager.h"
#include "tr/connector.h"
#include "tr/io_handler.h"
#include "tr/protocol_handler.h"
TR_CLASS(TR_Server) {
TR_CommManager comm_manager;
TR_EventDispatcher dispatcher;
TR_Connector connector;
TR_IoHandler io_handler;
TR_ProtocolHandler protocol_handler;
};
TR_INSTANCE_INIT(TR_Server);
TR_CLASSVARS_DECL(TR_Server) {};
#define TR_serverAddEndpoint(srv, ep) \
TR_commManagerAddEndpoint((srv)->comm_manager, (ep))
#define TR_serverAddHandler(srv, handler) \
TR_eventDispatcherRegisterHandler((srv)->dispatcher, (handler))
void TR_serverStart(TR_Server, int);
#define TR_serverClassCleanup() \
TR_eventHandlerClassCleanup(TR_ProtocolHandler); \
TR_eventHandlerClassCleanup(TR_IoHandler); \
TR_eventHandlerClassCleanup(TR_Connector); \
TR_eventHandlerClassCleanup(TR_CommManagerPoll)
#endif // __TR_SERVER_H__
// vim: set ts=4 sw=4:

1
include/trcomm.h

@ -14,6 +14,7 @@
#include "tr/protocol/raw.h" #include "tr/protocol/raw.h"
#include "tr/protocol/message_raw.h" #include "tr/protocol/message_raw.h"
#include "tr/protocol_handler.h" #include "tr/protocol_handler.h"
#include "tr/server.h"
#include "tr/interface/comm_end_point.h" #include "tr/interface/comm_end_point.h"
#include "tr/interface/comm_manager.h" #include "tr/interface/comm_manager.h"
#include "tr/interface/protocol.h" #include "tr/interface/protocol.h"

2
src/Makefile.am

@ -24,6 +24,8 @@ TRCOMM = cep_append_read_data.c \
protocol_handler.c \ protocol_handler.c \
protocol_message_raw.c \ protocol_message_raw.c \
protocol_raw.c \ protocol_raw.c \
server.c \
server_start.c \
i_comm_end_point.c \ i_comm_end_point.c \
i_comm_manager.c \ i_comm_manager.c \
i_protocol.c i_protocol.c

4
src/i_comm_manager.c

@ -112,7 +112,7 @@ TR_commManagerShutdownRead(void * _this, TR_Event event)
{ {
TR_CALL(_this, TR_CommManager, shutdownRead, event); TR_CALL(_this, TR_CommManager, shutdownRead, event);
if (TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) {
if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) {
// close // close
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,
@ -120,7 +120,7 @@ TR_commManagerShutdownRead(void * _this, TR_Event event)
event->subject, event->subject,
TR_CEP_EVENT_CLOSE, TR_CEP_EVENT_CLOSE,
NULL)); NULL));
} else if (TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
} else if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) {
// handle pending data... close is issued from disableWrite // handle pending data... close is issued from disableWrite
TR_eventHandlerIssueEvent( TR_eventHandlerIssueEvent(
(TR_EventHandler)_this, (TR_EventHandler)_this,

32
src/io_handler.c

@ -87,21 +87,29 @@ ioHandlerWrite(void * _this, TR_Event event)
TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject;
if (TR_cepWriteBuffered(endpoint)) { if (TR_cepWriteBuffered(endpoint)) {
TR_Event new_event;
if (TR_cepHasPendingData(endpoint)) { if (TR_cepHasPendingData(endpoint)) {
new_event = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_PENDING_DATA,
NULL);
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_PENDING_DATA,
NULL));
} else { } else {
new_event = TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_END_DATA,
NULL);
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_END_DATA,
NULL));
if (TRUE == endpoint->do_close) {
TR_eventHandlerIssueEvent(
(TR_EventHandler)_this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_CLOSE,
NULL));
}
} }
TR_eventHandlerIssueEvent((TR_EventHandler)_this, new_event);
} }
return TR_EVENT_DONE; return TR_EVENT_DONE;

83
src/server.c

@ -0,0 +1,83 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdarg.h>
#include <sys/types.h>
#include "trbase.h"
#include "trio.h"
#include "trevent.h"
#include "tr/server.h"
#include "tr/comm_manager.h"
#include "tr/comm_manager_poll.h"
#include "tr/connector.h"
#include "tr/io_handler.h"
#include "tr/protocol_handler.h"
static
int
serverCtor(void * _this, va_list * params)
{
TR_Server this = _this;
this->comm_manager = (TR_CommManager)TR_new(TR_CommManagerPoll);
this->dispatcher = TR_new(TR_EventDispatcher);
this->connector = TR_new(TR_Connector);
this->io_handler = TR_new(TR_IoHandler);
this->protocol_handler = TR_new(TR_ProtocolHandler);
TR_eventDispatcherRegisterHandler(
this->dispatcher,
(TR_EventHandler)this->comm_manager);
TR_eventDispatcherRegisterHandler(
this->dispatcher,
(TR_EventHandler)this->connector);
TR_eventDispatcherRegisterHandler(
this->dispatcher,
(TR_EventHandler)this->io_handler);
TR_eventDispatcherRegisterHandler(
this->dispatcher,
(TR_EventHandler)this->protocol_handler);
return 0;
}
static
void
serverDtor(void * _this)
{
TR_Server this = _this;
TR_delete(this->protocol_handler);
TR_delete(this->io_handler);
TR_delete(this->connector);
TR_delete(this->dispatcher);
TR_delete(this->comm_manager);
}
TR_INIT_IFACE(TR_Class, serverCtor, serverDtor, NULL);
TR_CREATE_CLASS(TR_Server, NULL, NULL, TR_IF(TR_Class));
// vim: set ts=4 sw=4:

35
src/server_start.c

@ -0,0 +1,35 @@
/**
* \file
*
* \author Georg Hopp
*
* \copyright
* Copyright © 2014 Georg Hopp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trbase.h"
#include "trevent.h"
#include "tr/server.h"
void
TR_serverStart(TR_Server this, int heartbeat)
{
TR_eventDispatcherSetHeartbeat(this->dispatcher, 1000);
TR_eventDispatcherStart(this->dispatcher);
}
// vim: set ts=4 sw=4:

4
testers/build.sh

@ -1,4 +1,6 @@
#!/bin/bash #!/bin/bash
TRLIBS="-ltrbase -ltrhashing -ltrio -ltrdata -ltrevent -ltrcomm" TRLIBS="-ltrbase -ltrhashing -ltrio -ltrdata -ltrevent -ltrcomm"
LIBS="-lcrypto -lssl -lrt -luuid" LIBS="-lcrypto -lssl -lrt -luuid"
gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${TRLIBS} ${LIBS} -o testserver testserver.c
gcc ${CFLAGS} -c -o test_handler.o test_handler.c
gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${TRLIBS} ${LIBS} -o testserver testserver.c test_handler.o
gcc ${CFLAGS} -I/usr/local/include -L/usr/local/lib ${TRLIBS} ${LIBS} -o testserver2 testserver2.c test_handler.o

119
testers/test_handler.c

@ -0,0 +1,119 @@
#include <stdio.h>
#include <string.h>
#include <inttypes.h>
#include "trbase.h"
#include "trcomm.h"
#include "trevent.h"
#include "test_handler.h"
static
TR_EventDone
testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
{
printf("handled: %llu/s\n", ((TestHandler)this)->handled);
((TestHandler)this)->handled = 0;
return TR_EVENT_DONE;
}
static
TR_EventDone
testHandlerNewMessage(TR_EventHandler this, TR_Event event)
{
TR_ProtoMessageRaw msg = event->data;
TR_SizedData data = (TR_SizedData)msg->data;
char buf[data->size + 1];
int i;
((TestHandler)this)->handled++;
memcpy(buf, data->data, data->size);
buf[data->size] = 0;
for (i = 0; buf[i]; i++) {
if (! isprint(buf[i])) buf[i] = '.';
}
// printf("echo message: %s(%zd)\n", buf, data->size);
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SEND_MSG,
event->data));
return TR_EVENT_DONE;
}
static
TR_EventDone
testHandlerClose(TR_EventHandler this, TR_Event event)
{
puts("close");
return TR_EVENT_PENDING;
}
static
TR_EventDone
testHandlerUpgrade(TR_EventHandler this, TR_Event event)
{
printf("upgrade: %"PRIdPTR"\n", event->id);
return TR_EVENT_PENDING;
}
static
int
testHandlerCtor(void * _this, va_list * params)
{
TR_PARENTCALL(TestHandler, _this, TR_Class, ctor, params);
((TestHandler)_this)->handled = 0;
return 0;
}
static
void
testHandlerDtor(void * _this, va_list * params)
{
TR_PARENTCALL(TestHandler, _this, TR_Class, dtor);
}
static
void
testHandlerCvInit(TR_class_ptr class)
{
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_EventDispatcher,
TR_DISPATCHER_EVENT_HEARTBEAT,
testHandlerHeartbeat);
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_CommEndPoint,
TR_CEP_EVENT_NEW_MSG,
testHandlerNewMessage);
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_CommEndPoint,
TR_CEP_EVENT_CLOSE,
testHandlerClose);
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_CommEndPoint,
TR_CEP_EVENT_UPGRADE,
testHandlerUpgrade);
}
TR_INSTANCE(TR_Hash, testHandlerEventMethods);
TR_INIT_IFACE(TR_Class, testHandlerCtor, testHandlerDtor, NULL);
TR_CREATE_CLASS(
TestHandler,
TR_EventHandler,
testHandlerCvInit,
TR_IF(TR_Class)) = {
{ &(_testHandlerEventMethods.data) }
};
// vim: set ts=4 sw=4:

18
testers/test_handler.h

@ -0,0 +1,18 @@
#include <stdio.h>
#include <string.h>
#include <inttypes.h>
#include "trbase.h"
#include "trcomm.h"
#include "trevent.h"
TR_CLASS(TestHandler) {
TR_EXTENDS(TR_EventHandler);
unsigned long long handled;
};
TR_INSTANCE_INIT(TestHandler);
TR_CLASSVARS_DECL(TestHandler) {
TR_CV_EXTENDS(TR_EventHandler);
};
// vim: set ts=4 sw=4:

149
testers/testserver.c

@ -6,123 +6,7 @@
#include "trcomm.h" #include "trcomm.h"
#include "trevent.h" #include "trevent.h"
TR_CLASS(TestHandler) {
TR_EXTENDS(TR_EventHandler);
unsigned long long handled;
};
TR_INSTANCE_INIT(TestHandler);
TR_CLASSVARS_DECL(TestHandler) {
TR_CV_EXTENDS(TR_EventHandler);
};
static
TR_EventDone
testHandlerHeartbeat(TR_EventHandler this, TR_Event event)
{
printf("handled: %llu/s\n", ((TestHandler)this)->handled);
((TestHandler)this)->handled = 0;
return TR_EVENT_DONE;
}
static
TR_EventDone
testHandlerNewMessage(TR_EventHandler this, TR_Event event)
{
TR_ProtoMessageRaw msg = event->data;
TR_SizedData data = (TR_SizedData)msg->data;
char buf[data->size + 1];
int i;
((TestHandler)this)->handled++;
memcpy(buf, data->data, data->size);
buf[data->size] = 0;
for (i = 0; buf[i]; i++) {
if (! isprint(buf[i])) buf[i] = '.';
}
printf("echo message: %s(%zd)\n", buf, data->size);
TR_eventHandlerIssueEvent(
(TR_EventHandler)this,
TR_eventSubjectEmit(
event->subject,
TR_CEP_EVENT_SEND_MSG,
event->data));
return TR_EVENT_DONE;
}
static
TR_EventDone
testHandlerClose(TR_EventHandler this, TR_Event event)
{
puts("close");
return TR_EVENT_PENDING;
}
static
TR_EventDone
testHandlerUpgrade(TR_EventHandler this, TR_Event event)
{
printf("upgrade: %"PRIdPTR"\n", event->id);
return TR_EVENT_PENDING;
}
static
int
testHandlerCtor(void * _this, va_list * params)
{
TR_PARENTCALL(TestHandler, _this, TR_Class, ctor, params);
((TestHandler)_this)->handled = 0;
return 0;
}
static
void
testHandlerDtor(void * _this, va_list * params)
{
TR_PARENTCALL(TestHandler, _this, TR_Class, dtor);
}
static
void
testHandlerCvInit(TR_class_ptr class)
{
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_EventDispatcher,
TR_DISPATCHER_EVENT_HEARTBEAT,
testHandlerHeartbeat);
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_CommEndPoint,
TR_CEP_EVENT_NEW_MSG,
testHandlerNewMessage);
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_CommEndPoint,
TR_CEP_EVENT_CLOSE,
testHandlerClose);
TR_EVENT_HANDLER_SET_METHOD(
class,
TR_CommEndPoint,
TR_CEP_EVENT_UPGRADE,
testHandlerUpgrade);
}
TR_INSTANCE(TR_Hash, testHandlerEventMethods);
TR_INIT_IFACE(TR_Class, testHandlerCtor, testHandlerDtor, NULL);
TR_CREATE_CLASS(
TestHandler,
TR_EventHandler,
testHandlerCvInit,
TR_IF(TR_Class)) = {
{ &(_testHandlerEventMethods.data) }
};
#include "test_handler.h"
TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_DEBUG}); TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_DEBUG});
@ -135,13 +19,10 @@ main (int argc, char * argv[])
TR_IoHandler io_handler = TR_new(TR_IoHandler); TR_IoHandler io_handler = TR_new(TR_IoHandler);
TR_ProtocolHandler protocol_handler = TR_new(TR_ProtocolHandler); TR_ProtocolHandler protocol_handler = TR_new(TR_ProtocolHandler);
TestHandler test_handler = TR_new(TestHandler); TestHandler test_handler = TR_new(TestHandler);
#if 0
TR_ConnEntryPoint ep;
TR_TcpSocket ep_sock;
#else
TR_DatagramService ep;
TR_UdpSocket ep_sock;
#endif
TR_ConnEntryPoint tcp_ep;
TR_TcpSocket tcp_ep_sock;
TR_DatagramService udp_ep;
TR_UdpSocket udp_ep_sock;
TR_Protocol protocol; TR_Protocol protocol;
TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger); TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger);
@ -157,17 +38,15 @@ main (int argc, char * argv[])
(TR_EventHandler)test_handler); (TR_EventHandler)test_handler);
protocol = TR_new(TR_ProtocolRaw); protocol = TR_new(TR_ProtocolRaw);
#if 0
ep_sock = TR_new(TR_TcpSocket, TR_logger, "0.0.0.0", 5678, 0);
ep = TR_new(TR_ConnEntryPoint, ep_sock, protocol);
#else
ep_sock = TR_new(TR_UdpSocket, TR_logger, "0.0.0.0", 5678, 0);
TR_socketBind((TR_Socket)ep_sock);
TR_socketNonblock((TR_Socket)ep_sock);
ep = TR_new(TR_DatagramService, ep_sock, protocol);
#endif
TR_commManagerAddEndpoint(cmgr, (TR_CommEndPoint)ep);
tcp_ep_sock = TR_new(TR_TcpSocket, TR_logger, "0.0.0.0", 5678, 0);
tcp_ep = TR_new(TR_ConnEntryPoint, tcp_ep_sock, protocol);
udp_ep_sock = TR_new(TR_UdpSocket, TR_logger, "0.0.0.0", 5678, 0);
TR_socketBind((TR_Socket)udp_ep_sock);
TR_socketNonblock((TR_Socket)udp_ep_sock);
udp_ep = TR_new(TR_DatagramService, udp_ep_sock, protocol);
TR_commManagerAddEndpoint(cmgr, (TR_CommEndPoint)tcp_ep);
TR_commManagerAddEndpoint(cmgr, (TR_CommEndPoint)udp_ep);
TR_eventDispatcherSetHeartbeat(dispatcher, 1000); TR_eventDispatcherSetHeartbeat(dispatcher, 1000);
TR_eventDispatcherStart(dispatcher); TR_eventDispatcherStart(dispatcher);

54
testers/testserver2.c

@ -0,0 +1,54 @@
#include <stdio.h>
#include <string.h>
#include <inttypes.h>
#include "trbase.h"
#include "trcomm.h"
#include "trio.h"
#include "trevent.h"
#include "test_handler.h"
TR_INSTANCE(TR_LoggerSyslog, mylogger, {TR_LOGGER_DEBUG});
int
main (int argc, char * argv[])
{
TR_Server server = TR_new(TR_Server);
TR_Protocol protocol = TR_new(TR_ProtocolRaw);
TestHandler test_handler = TR_new(TestHandler);
TR_Socket socket;
TR_logger = TR_INSTANCE_CAST(TR_Logger, mylogger);
TR_serverAddHandler(server, (TR_EventHandler)test_handler);
socket = (TR_Socket)TR_new(TR_TcpSocket, TR_logger, "0.0.0.0", 5678, 0);
TR_serverAddEndpoint(
server,
TR_new(TR_ConnEntryPoint, socket, protocol));
socket = TR_new(TR_UdpSocket, TR_logger, "0.0.0.0", 5678, 0);
TR_socketBind((TR_Socket)socket);
TR_socketNonblock((TR_Socket)socket);
TR_serverAddEndpoint(
server,
TR_new(TR_DatagramService, socket, protocol));
TR_serverStart(server, 1000);
puts("cleanup...");
TR_delete(server);
TR_delete(test_handler);
TR_delete(protocol);
//TR_delete(ep);
TR_eventHandlerClassCleanup(TestHandler);
TR_serverClassCleanup();
TR_cleanup();
return 0;
}
// vim: set ts=4 sw=4:
Loading…
Cancel
Save