From e3a6d0e91989180ae0b849b6eafd18c98a0566c0 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Sat, 13 Sep 2014 07:41:31 +0100 Subject: [PATCH] change the whole thing to be more edge triggered no matter wich poll method is used. In fact this works now with edge triggered epoll --- .gitignore | 2 +- include/tr/_comm_manager.h | 49 ++++++ include/tr/comm_end_point.h | 40 +++-- include/tr/comm_manager.h | 7 +- include/tr/interface/comm_end_point.h | 8 +- include/tr/interface/comm_manager.h | 20 +-- include/tr/io_handler.h | 1 + include/tr/server.h | 1 + src/Makefile.am | 16 +- src/cep_write_buffered.c | 59 +++++-- src/comm_end_point.c | 59 +++++-- ...ep_buffer_read.c => comm_end_point_read.c} | 11 +- src/comm_manager.c | 91 ++++++----- src/comm_manager_epoll.c | 104 ++++-------- src/comm_manager_poll.c | 73 +++------ src/comm_manager_shutdown.c | 15 +- ...te_data.c => comm_manager_shutdown_read.c} | 28 +++- ...d_data.c => comm_manager_shutdown_write.c} | 20 ++- src/connection.c | 90 ++++++----- src/connector.c | 9 ++ src/datagram_service.c | 61 +++----- src/i_comm_end_point.c | 8 +- src/i_comm_manager.c | 148 ++++++++---------- src/io_handler.c | 110 +++++++------ src/protocol_handler.c | 40 ++--- testers/test_handler.c | 28 ++-- testers/testclient.sh | 9 +- 27 files changed, 609 insertions(+), 498 deletions(-) create mode 100644 include/tr/_comm_manager.h rename src/{cep_buffer_read.c => comm_end_point_read.c} (70%) rename src/{cep_append_write_data.c => comm_manager_shutdown_read.c} (56%) rename src/{cep_append_read_data.c => comm_manager_shutdown_write.c} (64%) diff --git a/.gitignore b/.gitignore index 8d3543e..9c409e8 100644 --- a/.gitignore +++ b/.gitignore @@ -40,4 +40,4 @@ test-driver /assets/html/_documentation.html tags /trcomm.h* -/testers/testserver +/testers/testserver* diff --git a/include/tr/_comm_manager.h b/include/tr/_comm_manager.h new file mode 100644 index 0000000..5c5f72a --- /dev/null +++ b/include/tr/_comm_manager.h @@ -0,0 +1,49 @@ +/** + * \file + * + * \author Georg Hopp + * + * \copyright + * Copyright © 2014 Georg Hopp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __INT_TR_COMM_MANAGER_H__ +#define __INT_TR_COMM_MANAGER_H__ + +#include "tr/comm_end_point.h" +#include "tr/connect_entry_point.h" + +#define TR_ISSUE_IO_EVENT(this, type, subject) \ + TR_eventHandlerIssueEvent((TR_EventHandler)(this), TR_eventSubjectEmit( \ + (TR_EventSubject)(subject), type, NULL)) + +#define TR_ISSUE_IO_ACC_EVENT(this, subject) \ + (TR_ISSUE_IO_EVENT(this, TR_CET_EVENT_ACC_READY, subject)) +#define TR_ISSUE_IO_READ_EVENT(this, subject) \ + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_DO_READ, subject)) +#define TR_ISSUE_IO_WRITE_EVENT(this, subject) \ + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_DO_WRITE, subject)) +#define TR_ISSUE_IO_CLOSE_EVENT(this, subject) \ + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_CLOSE, subject)) +#define TR_ISSUE_IO_SHUT_READ_EVENT(this, subject) \ + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_SHUT_READ, subject)) +#define TR_ISSUE_IO_SHUT_WRITE_EVENT(this, subject) \ + (TR_ISSUE_IO_EVENT(this, TR_CEP_EVENT_SHUT_WRITE, subject)) + +#endif // __INT_TR_COMM_MANAGER_H__ + +// vim: set ts=4 sw=4: + diff --git a/include/tr/comm_end_point.h b/include/tr/comm_end_point.h index 3983810..92a6b84 100644 --- a/include/tr/comm_end_point.h +++ b/include/tr/comm_end_point.h @@ -29,34 +29,40 @@ #include "trevent.h" #include "trdata.h" +/* + * Read ahead limits. + * These values should be conficurable in the future. + */ +#define CEP_WRITE_BUFFER_THRESHOLD 128 * 1024 + TR_CLASS(TR_CommEndPoint) { TR_EXTENDS(TR_EventSubject); void * protocol; // will be type TR_Protocol as soon as it is there. TR_Socket transport; - size_t read_chunk_size; int do_close; - TR_Queue read_buffer; TR_Queue write_buffer; + size_t write_buffer_size; + size_t read_chunk_size; // bytes }; TR_INSTANCE_INIT(TR_CommEndPoint); TR_CLASSVARS_DECL(TR_CommEndPoint) { TR_CV_EXTENDS(TR_EventSubject); }; -#define TR_CEP_EVENT_READ_READY 0 -#define TR_CEP_EVENT_READ_BLOCK 1 -#define TR_CEP_EVENT_WRITE_READY 2 -#define TR_CEP_EVENT_UPGRADE 3 -#define TR_CEP_EVENT_NEW_DATA 4 -#define TR_CEP_EVENT_PENDING_DATA 5 -#define TR_CEP_EVENT_END_DATA 6 -#define TR_CEP_EVENT_NEW_MSG 7 -#define TR_CEP_EVENT_SEND_MSG 8 -#define TR_CEP_EVENT_SHUT_READ 9 -#define TR_CEP_EVENT_SHUT_WRITE 10 -#define TR_CEP_EVENT_CLOSE 11 -#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE) +#define TR_CEP_EVENT_DO_READ 0 // IoHandler +#define TR_CEP_EVENT_DO_WRITE 1 // IoHandler +#define TR_CEP_EVENT_READ_BLOCK 2 // CommManager +#define TR_CEP_EVENT_WRITE_BLOCK 3 // CommManager +#define TR_CEP_EVENT_NEW_DATA 4 // ProtocolHandler +#define TR_CEP_EVENT_NEW_MSG 5 // Application +#define TR_CEP_EVENT_MSG_READY 6 // ProtocolHandler +#define TR_CEP_EVENT_DATA_READY 7 // CommManager +#define TR_CEP_EVENT_DATA_END 8 // CommManager +#define TR_CEP_EVENT_SHUT_READ 9 // CommManager +#define TR_CEP_EVENT_SHUT_WRITE 10 // CommManager +#define TR_CEP_EVENT_CLOSE 11 // CommManager +#define TR_CEP_EVENT_MAX ((size_t)TR_CEP_EVENT_CLOSE) #define TR_cepSetClose(ep) ((ep)->do_close = 1) #define TR_cepHasProto(ep, proto) (TR_INSTANCE_OF(proto, TR_cepGetProto(ep))) @@ -67,8 +73,8 @@ TR_CLASSVARS_DECL(TR_CommEndPoint) { void TR_cepAppendReadData(TR_CommEndPoint, TR_RemoteData); void TR_cepAppendWriteData(TR_CommEndPoint, TR_RemoteData); -int TR_cepBufferRead(TR_CommEndPoint); -int TR_cepWriteBuffered(TR_CommEndPoint); +int TR_commEndPointRead(TR_CommEndPoint, TR_RemoteData *); +int TR_cepWriteBuffered(TR_CommEndPoint, size_t *); #endif // __TR_COMM_END_POINT_H__ diff --git a/include/tr/comm_manager.h b/include/tr/comm_manager.h index 6cfabcb..9acf049 100644 --- a/include/tr/comm_manager.h +++ b/include/tr/comm_manager.h @@ -26,6 +26,7 @@ #include #include "trbase.h" +#include "trdata.h" #include "trevent.h" #include "tr/comm_end_point.h" @@ -34,6 +35,9 @@ TR_CLASS(TR_CommManager) { TR_EXTENDS(TR_EventHandler); TR_CommEndPoint * endpoints; + TR_Hash accept; + TR_Hash write; + TR_Hash read; size_t n_endpoints; size_t max_handle; }; @@ -42,8 +46,7 @@ TR_CLASSVARS_DECL(TR_CommManager) { TR_CV_EXTENDS(TR_EventHandler); }; -void TR_commManagerAddEndpoint(void *, TR_CommEndPoint); -TR_EventDone TR_commManagerShutdown(void * _this, TR_Event event); +TR_EventDone TR_commManagerShutdown(TR_CommManager, TR_Event event); #endif // __TR_COMM_MANAGER_H__ diff --git a/include/tr/interface/comm_end_point.h b/include/tr/interface/comm_end_point.h index 4ce0006..6f40e0a 100644 --- a/include/tr/interface/comm_end_point.h +++ b/include/tr/interface/comm_end_point.h @@ -31,8 +31,8 @@ #include "tr/comm_end_point.h" #include "tr/proto_message.h" -typedef TR_ProtoMessage (* fptr_TR_cepNextMessage)(void *); -typedef int (* fptr_TR_cepCompose)(void *, TR_ProtoMessage); +typedef TR_ProtoMessage (* fptr_TR_cepNextMessage)(void *, TR_RemoteData *); +typedef size_t (* fptr_TR_cepCompose)(void *, TR_ProtoMessage); TR_INTERFACE(TR_CommEndPoint) { TR_IFID; @@ -40,8 +40,8 @@ TR_INTERFACE(TR_CommEndPoint) { fptr_TR_cepCompose compose; }; -TR_ProtoMessage TR_cepNextMessage(void *); -int TR_cepCompose(void *, TR_ProtoMessage); +TR_ProtoMessage TR_cepNextMessage(void *, TR_RemoteData *); +size_t TR_cepCompose(void *, TR_ProtoMessage); #endif // __TR_INTERFACE_COMM_END_POINT_H__ diff --git a/include/tr/interface/comm_manager.h b/include/tr/interface/comm_manager.h index cec7e31..112e1e4 100644 --- a/include/tr/interface/comm_manager.h +++ b/include/tr/interface/comm_manager.h @@ -32,23 +32,17 @@ typedef TR_EventDone (* fptr_TR_commManagerAddEndpoint)(void *, TR_CommEndPoint); typedef TR_EventDone (* fptr_TR_commManagerSelect)(void *, TR_Event, int); -typedef TR_EventDone (* fptr_TR_commManagerEnableWrite)(void *, TR_Event); -typedef TR_EventDone (* fptr_TR_commManagerDisableWrite)(void *, TR_Event); -typedef TR_EventDone (* fptr_TR_commManagerEnableRead)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerPollWrite)(void *, TR_Event); +typedef TR_EventDone (* fptr_TR_commManagerPollRead)(void *, TR_Event); typedef TR_EventDone (* fptr_TR_commManagerClose)(void *, TR_Event); -typedef TR_EventDone (* fptr_TR_commManagerShutdownRead)(void *, TR_Event); -typedef TR_EventDone (* fptr_TR_commManagerShutdownWrite)(void *, TR_Event); TR_INTERFACE(TR_CommManager) { TR_IFID; - fptr_TR_commManagerAddEndpoint addEndpoint; - fptr_TR_commManagerSelect select; - fptr_TR_commManagerEnableWrite enableWrite; - fptr_TR_commManagerDisableWrite disableWrite; - fptr_TR_commManagerEnableRead enableRead; - fptr_TR_commManagerClose close; - fptr_TR_commManagerShutdownWrite shutdownWrite; - fptr_TR_commManagerShutdownRead shutdownRead; + fptr_TR_commManagerAddEndpoint addEndpoint; + fptr_TR_commManagerSelect select; + fptr_TR_commManagerPollWrite pollWrite; + fptr_TR_commManagerPollRead pollRead; + fptr_TR_commManagerClose close; }; void TR_commManagerAddEndpoint(void *, TR_CommEndPoint); diff --git a/include/tr/io_handler.h b/include/tr/io_handler.h index 97aa8cc..78dba7c 100644 --- a/include/tr/io_handler.h +++ b/include/tr/io_handler.h @@ -24,6 +24,7 @@ #define __TR_IO_HANDLER_H__ #include +#include #include "trbase.h" #include "trevent.h" diff --git a/include/tr/server.h b/include/tr/server.h index 6646ea7..9a56213 100644 --- a/include/tr/server.h +++ b/include/tr/server.h @@ -33,6 +33,7 @@ #include "tr/io_handler.h" #include "tr/protocol_handler.h" #include "tr/protocol.h" +#include "tr/interface/comm_manager.h" TR_CLASS(TR_Server) { TR_CommManager comm_manager; diff --git a/src/Makefile.am b/src/Makefile.am index 84c6105..bb54599 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,21 +3,21 @@ AUTOMAKE_OPTIONS = subdir-objects AM_CFLAGS += -I../include/ -TRCOMM = cep_append_read_data.c \ - cep_append_write_data.c \ - cet_accept.c \ - cep_buffer_read.c \ +TRCOMM = cet_accept.c \ cep_write_buffered.c \ + comm_end_point_read.c \ comm_end_point.c \ - comm_manager.c \ - comm_manager_poll.c \ - comm_manager_epoll.c \ - comm_manager_shutdown.c \ conn_entry_point.c \ connection.c \ connector.c \ datagram_service.c \ datagram_entry_point.c \ + comm_manager.c \ + comm_manager_poll.c \ + comm_manager_epoll.c \ + comm_manager_shutdown.c \ + comm_manager_shutdown_read.c \ + comm_manager_shutdown_write.c \ io_handler.c \ proto_message.c \ protocol.c \ diff --git a/src/cep_write_buffered.c b/src/cep_write_buffered.c index 8ebb030..f35bb93 100644 --- a/src/cep_write_buffered.c +++ b/src/cep_write_buffered.c @@ -26,24 +26,57 @@ #include "tr/comm_end_point.h" int -TR_cepWriteBuffered(TR_CommEndPoint this) +TR_cepWriteBuffered(TR_CommEndPoint this, size_t * size) { TR_RemoteData data; int send; + *size = 0; + data = TR_cepNextWriteData(this); - send = TR_socketSend(this->transport, data); - - switch (send) { - case FALSE: // EAGAIN - TR_queuePutFirst(this->write_buffer, data); - break; - - case -1: // FAILURE - case -2: // remote close - default: - TR_delete(data); - break; + + while(data) { + send = TR_socketSend(this->transport, data); + + switch (send) { + case FALSE: // EAGAIN + case -3: // remote not ready + TR_queuePutFirst(this->write_buffer, data); + break; + + case -1: // FAILURE + case -2: // remote close + TR_delete(data); + TR_queueDestroy(this->write_buffer); + *size = this->write_buffer_size; + break; + + default: + { + TR_RemoteData new_data = NULL; + + if (send != ((TR_SizedData)data)->size) { + new_data = TR_new( + TR_RemoteData, + ((TR_SizedData)data)->data + send, + ((TR_SizedData)data)->size - send, + data->remote); + } else { + new_data = TR_cepNextWriteData(this); + } + + *size += send; + TR_delete(data); + data = new_data; + } + break; + } + + if (send <= 0) break; + } + + if (! data) { + return -4; // no more data to send } return send; diff --git a/src/comm_end_point.c b/src/comm_end_point.c index 5e02cf6..472aa8d 100644 --- a/src/comm_end_point.c +++ b/src/comm_end_point.c @@ -42,7 +42,6 @@ commEndPointCtor(void * _this, va_list * params) this->protocol = va_arg(*params, TR_Protocol); this->read_chunk_size = va_arg(*params, int); this->do_close = 0; - this->read_buffer = TR_new(TR_Queue); this->write_buffer = TR_new(TR_Queue); return 0; @@ -55,52 +54,80 @@ commEndPointDtor(void * _this) TR_CommEndPoint this = _this; TR_delete(this->transport); - TR_delete(this->read_buffer); TR_delete(this->write_buffer); } +static +unsigned long +commEndPointGetHash(void * _this) +{ + return (unsigned long)((TR_CommEndPoint)_this)->transport->handle; +} + +static +void +commEndPointHandleDouble(void * _current, void * _new) +{ + TR_CommEndPoint current = _current; + TR_CommEndPoint new = _new; + + // add will delete _new after this function is processed, so it's + // neccessary to cleanup _current and reinit it with whatever was + // in _new. + // This will only be called if _current and _new are different. + commEndPointDtor(current); + + current->transport = new->transport; + current->protocol = new->protocol; + current->read_chunk_size = new->read_chunk_size; + current->do_close = new->do_close; + current->write_buffer = new->write_buffer; +} + static void commEndPointCvInit(TR_class_ptr cls) { - TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_READY); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DO_READ); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DO_WRITE); TR_EVENT_CREATE(cls, TR_CEP_EVENT_READ_BLOCK); - TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_READY); - TR_EVENT_CREATE(cls, TR_CEP_EVENT_UPGRADE); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_WRITE_BLOCK); TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_DATA); - TR_EVENT_CREATE(cls, TR_CEP_EVENT_PENDING_DATA); - TR_EVENT_CREATE(cls, TR_CEP_EVENT_END_DATA); TR_EVENT_CREATE(cls, TR_CEP_EVENT_NEW_MSG); - TR_EVENT_CREATE(cls, TR_CEP_EVENT_SEND_MSG); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_MSG_READY); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DATA_READY); + TR_EVENT_CREATE(cls, TR_CEP_EVENT_DATA_END); TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_READ); TR_EVENT_CREATE(cls, TR_CEP_EVENT_SHUT_WRITE); TR_EVENT_CREATE(cls, TR_CEP_EVENT_CLOSE); } const char * TR_cepEventStrings[] = { - "TR_CEP_EVENT_READ_READY", + "TR_CEP_EVENT_DO_READ", + "TR_CEP_EVENT_DO_WRITE", "TR_CEP_EVENT_READ_BLOCK", - "TR_CEP_EVENT_WRITE_READY", - "TR_CEP_EVENT_UPGRADE", + "TR_CEP_EVENT_WRITE_BLOCK", "TR_CEP_EVENT_NEW_DATA", - "TR_CEP_EVENT_PENDING_DATA", - "TR_CEP_EVENT_END_DATA", "TR_CEP_EVENT_NEW_MSG", - "TR_CEP_EVENT_SEND_MSG", + "TR_CEP_EVENT_MSG_READY", + "TR_CEP_EVENT_DATA_READY", + "TR_CEP_EVENT_DATA_END", "TR_CEP_EVENT_SHUT_READ", "TR_CEP_EVENT_SHUT_WRITE", - "TR_CEP_EVENT_CLOSE" + "TR_CEP_EVENT_CLOSE", }; intptr_t comm_end_point_events[TR_CEP_EVENT_MAX + 1]; TR_INIT_IFACE(TR_Class, commEndPointCtor, commEndPointDtor, NULL); TR_INIT_IFACE(TR_CommEndPoint, NULL, NULL); +TR_INIT_IFACE(TR_Hashable, commEndPointGetHash, commEndPointHandleDouble); TR_CREATE_CLASS( TR_CommEndPoint, TR_EventSubject, commEndPointCvInit, TR_IF(TR_Class), - TR_IF(TR_CommEndPoint)) = { + TR_IF(TR_CommEndPoint), + TR_IF(TR_Hashable)) = { { TR_cepEventStrings, TR_CEP_EVENT_MAX + 1, diff --git a/src/cep_buffer_read.c b/src/comm_end_point_read.c similarity index 70% rename from src/cep_buffer_read.c rename to src/comm_end_point_read.c index c6ad1e2..a9815e3 100644 --- a/src/cep_buffer_read.c +++ b/src/comm_end_point_read.c @@ -26,15 +26,14 @@ #include "tr/comm_end_point.h" int -TR_cepBufferRead(TR_CommEndPoint this) +TR_commEndPointRead(TR_CommEndPoint this, TR_RemoteData * data_ptr) { - TR_RemoteData data = TR_socketRecv(this->transport, this->read_chunk_size); + *data_ptr = TR_socketRecv(this->transport, this->read_chunk_size); - if (! data) return -1; // ment to trigger a close - if (data == (void*)-1) return -2; // remote close... shutdown - if (data == TR_emptyRemoteData) return FALSE; + if (! *data_ptr) return -1; // ment to trigger a close + if (*data_ptr == (void*)-1) return -2; // remote close... shutdown + if (*data_ptr == TR_emptyRemoteData) return FALSE; // read blocked - TR_cepAppendReadData(this, data); return TRUE; } diff --git a/src/comm_manager.c b/src/comm_manager.c index 7291ff1..87c0a97 100644 --- a/src/comm_manager.c +++ b/src/comm_manager.c @@ -24,6 +24,7 @@ #include #include "trbase.h" +#include "trdata.h" #include "trevent.h" #include "tr/comm_end_point.h" @@ -39,6 +40,9 @@ commManagerCtor(void * _this, va_list * params) TR_PARENTCALL(TR_CommManager, _this, TR_Class, ctor, params); + this->accept = TR_new(TR_Hash); + this->write = TR_new(TR_Hash); + this->read = TR_new(TR_Hash); this->n_endpoints = sysconf(_SC_OPEN_MAX); this->endpoints = TR_calloc(sizeof(TR_CommEndPoint), this->n_endpoints); @@ -56,81 +60,98 @@ commManagerDtor(void * _this) TR_delete(this->endpoints[i]); } TR_MEM_FREE(this->endpoints); + TR_delete(this->read); + TR_delete(this->write); + TR_delete(this->accept); } static TR_EventDone -TR__commManagerAddEndpoint(void * _this, TR_Event event) +TR_commManagerEnableWrite(void * _this, TR_Event event) { - TR_commManagerAddEndpoint( - (TR_CommManager)_this, - (TR_CommEndPoint)event->subject); + TR_CommManager this = _this; + + TR_hashAdd(this->write, event->subject); + + return TR_EVENT_DONE; +} + +static +TR_EventDone +TR_commManagerDisableWrite(void * _this, TR_Event event) +{ + TR_CommManager this = _this; + + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); + + return TR_EVENT_DONE; +} + +static +TR_EventDone +TR_commManagerAddEndpointEvt(TR_CommManager this, TR_Event event) +{ + TR_commManagerAddEndpoint(this, (TR_CommEndPoint)event->subject); return TR_EVENT_DONE; } TR_EventDone TR_commManagerSelect(void *, TR_Event, int); -TR_EventDone TR_commManagerEnableWrite(void *, TR_Event); -TR_EventDone TR_commManagerDisableWrite(void *, TR_Event); -TR_EventDone TR_commManagerEnableRead(void *, TR_Event); +TR_EventDone TR_commManagerPollWrite(void *, TR_Event); +TR_EventDone TR_commManagerPollRead(void *, TR_Event); TR_EventDone TR_commManagerClose(void *, TR_Event); -TR_EventDone TR_commManagerShutdownRead(void *, TR_Event); -TR_EventDone TR_commManagerShutdownWrite(void *, TR_Event); +TR_EventDone TR_commManagerShutdownRead(TR_CommManager, TR_Event); +TR_EventDone TR_commManagerShutdownWrite(TR_CommManager, TR_Event); static void commManagerCvInit(TR_class_ptr cls) { TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_EventDispatcher, + cls, TR_EventDispatcher, TR_DISPATCHER_EVENT_DATA_WAIT, TR_commManagerSelect); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_EventDispatcher, + cls, TR_EventDispatcher, TR_DISPATCHER_EVENT_SHUTDOWN, TR_commManagerShutdown); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_Connection, + cls, TR_Connection, TR_CON_EVENT_NEW_CON, - TR__commManagerAddEndpoint); - TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, - TR_CEP_EVENT_PENDING_DATA, - TR_commManagerEnableWrite); + TR_commManagerAddEndpointEvt); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, - TR_CEP_EVENT_END_DATA, - TR_commManagerDisableWrite); + cls, TR_CommEndPoint, + TR_CEP_EVENT_WRITE_BLOCK, + TR_commManagerPollWrite); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, + cls, TR_CommEndPoint, TR_CEP_EVENT_READ_BLOCK, - TR_commManagerEnableRead); + TR_commManagerPollRead); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, + cls, TR_CommEndPoint, TR_CEP_EVENT_CLOSE, TR_commManagerClose); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, + cls, TR_CommEndPoint, TR_CEP_EVENT_SHUT_READ, TR_commManagerShutdownRead); TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, + cls, TR_CommEndPoint, TR_CEP_EVENT_SHUT_WRITE, TR_commManagerShutdownWrite); + TR_EVENT_HANDLER_SET_METHOD( + cls, TR_CommEndPoint, + TR_CEP_EVENT_DATA_READY, + TR_commManagerEnableWrite); + TR_EVENT_HANDLER_SET_METHOD( + cls, TR_CommEndPoint, + TR_CEP_EVENT_DATA_END, + TR_commManagerDisableWrite); } TR_INSTANCE(TR_Hash, commManagerEventMethods); TR_INIT_IFACE(TR_Class, commManagerCtor, commManagerDtor, NULL); -TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); +TR_INIT_IFACE(TR_CommManager, NULL, NULL, NULL, NULL, NULL); TR_CREATE_CLASS( TR_CommManager, TR_EventHandler, diff --git a/src/comm_manager_epoll.c b/src/comm_manager_epoll.c index 4d1495d..e09b2a9 100644 --- a/src/comm_manager_epoll.c +++ b/src/comm_manager_epoll.c @@ -34,8 +34,9 @@ #include "tr/comm_end_point.h" #include "tr/connection.h" #include "tr/connect_entry_point.h" +#include "tr/_comm_manager.h" -#define MAXEVENTS 256 +#define MAXEVENTS 1024 struct epoll_event events[MAXEVENTS]; @@ -72,7 +73,7 @@ TR_commManagerEpollAddEndpoint(void * _this, TR_CommEndPoint endpoint) int handle = endpoint->transport->handle; struct epoll_event event; - this->events[handle] = EPOLLIN; + this->events[handle] = EPOLLET; event.data.ptr = endpoint; event.events = this->events[handle]; @@ -84,69 +85,52 @@ void TR_commManagerEpollSelect(void * _this, TR_Event event, int timeout) { TR_CommManagerEpoll this = _this; + TR_CommManager cmgr = _this; int i, nevents; + struct epoll_event _event; nevents = epoll_wait(this->handle, events, MAXEVENTS, timeout); for (i=0; itransport->handle; if ((events[i].events & EPOLLIN) == EPOLLIN) { if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) && ((TR_TcpSocket)endpoint->transport)->listen) { - TR_eventHandlerIssueEvent((TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CET_EVENT_ACC_READY, - NULL)); + TR_hashAdd(cmgr->accept, endpoint); } else { - TR_eventHandlerIssueEvent((TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_READ_READY, - NULL)); + TR_hashAdd(cmgr->read, endpoint); + } + + this->events[handle] &= ~EPOLLIN; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); } if ((events[i].events & EPOLLOUT) == EPOLLOUT) { - TR_eventHandlerIssueEvent((TR_EventHandler)this, - TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_WRITE_READY, - NULL)); + TR_hashAdd(cmgr->write, endpoint); + this->events[handle] &= ~EPOLLOUT; + _event.data.ptr = endpoint; + _event.events = this->events[handle]; + epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); } } } static +inline void -TR_commManagerEpollEnableWrite(void * _this, TR_Event event) -{ - TR_CommManagerEpoll this = _this; - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - - if (! TR_socketFinWr(endpoint->transport)) { - int handle = endpoint->transport->handle; - struct epoll_event _event; - - this->events[handle] |= EPOLLOUT; - _event.data.ptr = endpoint; - _event.events = this->events[handle]; - - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); - } -} - -static -void -TR_commManagerEpollDisableWrite(void * _this, TR_Event event) +TR_commManagerEpollEnable(void * _this, uint32_t mask, TR_Event event) { TR_CommManagerEpoll this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; int handle = endpoint->transport->handle; struct epoll_event _event; - this->events[handle] &= ~EPOLLOUT; + this->events[handle] |= mask; _event.data.ptr = endpoint; _event.events = this->events[handle]; @@ -155,37 +139,20 @@ TR_commManagerEpollDisableWrite(void * _this, TR_Event event) static void -TR_commManagerEpollEnableRead(void * _this, TR_Event event) +TR_commManagerEpollEnableWrite(void * _this, TR_Event event) { - TR_CommManagerEpoll this = _this; - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - - if (! TR_socketFinRd(endpoint->transport)) { - int handle = endpoint->transport->handle; - struct epoll_event _event; - - this->events[handle] |= EPOLLIN; - _event.data.ptr = endpoint; - _event.events = this->events[handle]; - - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) { + TR_commManagerEpollEnable(_this, EPOLLOUT, event); } } static void -TR_commManagerEpollDisableRead(void * _this, TR_Event event) +TR_commManagerEpollEnableRead(void * _this, TR_Event event) { - TR_CommManagerEpoll this = _this; - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - int handle = endpoint->transport->handle; - struct epoll_event _event; - - this->events[handle] &= ~EPOLLIN; - _event.data.ptr = endpoint; - _event.events = this->events[handle]; - - epoll_ctl(this->handle, EPOLL_CTL_MOD, handle, &_event); + if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { + TR_commManagerEpollEnable(_this, EPOLLIN, event); + } } static @@ -207,14 +174,11 @@ TR_commManagerEpollCvInit(TR_class_ptr cls) { TR_INIT_IFACE(TR_Class, commManagerEpollCtor, commManagerEpollDtor, NULL); TR_INIT_IFACE( TR_CommManager, - TR_commManagerEpollAddEndpoint, // TR_CON_EVENT_NEW_CON - TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT - TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK - TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_END_DATA - TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK - TR_commManagerEpollClose, // TR_CEP_EVENT_CLOSE - TR_commManagerEpollDisableWrite, // TR_CEP_EVENT_SHUT_READ - TR_commManagerEpollEnableRead); // TR_CEP_EVENT_SHUT_WRITE + TR_commManagerEpollAddEndpoint, + TR_commManagerEpollSelect, // TR_DISPATCHER_EVENT_DATA_WAIT + TR_commManagerEpollEnableWrite, // TR_CEP_EVENT_PENDING_DATA => WRITE_BLOCK + TR_commManagerEpollEnableRead, // TR_CEP_EVENT_READ_BLOCK + TR_commManagerEpollClose); // TR_CEP_EVENT_CLOSE TR_CREATE_CLASS( TR_CommManagerEpoll, TR_CommManager, diff --git a/src/comm_manager_poll.c b/src/comm_manager_poll.c index 8f092b2..b92bc96 100644 --- a/src/comm_manager_poll.c +++ b/src/comm_manager_poll.c @@ -32,6 +32,7 @@ #include "tr/comm_end_point.h" #include "tr/connection.h" #include "tr/connect_entry_point.h" +#include "tr/_comm_manager.h" static int @@ -69,9 +70,9 @@ TR_commManagerPollAddEndpoint(void * _this, TR_CommEndPoint endpoint) TR_CommManagerPoll this = _this; this->fds[endpoint->transport->handle].fd = endpoint->transport->handle; - this->fds[endpoint->transport->handle].events = POLLIN; + this->fds[endpoint->transport->handle].events = 0; } - + static void TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) @@ -85,39 +86,28 @@ TR_commManagerPollSelect(void * _this, TR_Event event, int timeout) if (nevents) { for (i = 0; i < cmgr->max_handle+1; i++) { - TR_CommEndPoint endpoint = cmgr->endpoints[i]; - - if ((this->fds[i].revents & POLLIN) == POLLIN) { - TR_Event event; + if (this->fds[i].revents != 0) { + TR_CommEndPoint endpoint = cmgr->endpoints[i]; nevents--; - if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) - && ((TR_TcpSocket)endpoint->transport)->listen) { - event = TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CET_EVENT_ACC_READY, - NULL); - } else { - event = TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_READ_READY, - NULL); - } - TR_eventHandlerIssueEvent((TR_EventHandler)this, event); - } + if ((this->fds[i].revents & POLLIN) == POLLIN) { + if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) + && ((TR_TcpSocket)endpoint->transport)->listen) { + TR_hashAdd(cmgr->accept, endpoint); + } else { + TR_hashAdd(cmgr->read, endpoint); + } + this->fds[endpoint->transport->handle].events &= ~POLLIN; + } - if ((this->fds[i].revents & POLLOUT) == POLLOUT) { - nevents--; - TR_Event _event = TR_eventSubjectEmit( - (TR_EventSubject)endpoint, - TR_CEP_EVENT_WRITE_READY, - NULL); + if ((this->fds[i].revents & POLLOUT) == POLLOUT) { + TR_hashAdd(cmgr->write, endpoint); + this->fds[endpoint->transport->handle].events &= ~POLLOUT; + } - TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); + if (nevents <= 0) break; } - - if (nevents <= 0) break; } } } @@ -134,16 +124,6 @@ TR_commManagerPollEnableWrite(void * _this, TR_Event event) } } -static -void -TR_commManagerPollDisableWrite(void * _this, TR_Event event) -{ - TR_CommManagerPoll this = _this; - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - - this->fds[endpoint->transport->handle].events &= ~POLLOUT; -} - static void TR_commManagerPollEnableRead(void * _this, TR_Event event) @@ -156,16 +136,6 @@ TR_commManagerPollEnableRead(void * _this, TR_Event event) } } -static -void -TR_commManagerPollDisableRead(void * _this, TR_Event event) -{ - TR_CommManagerPoll this = _this; - TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - - this->fds[endpoint->transport->handle].events &= ~POLLIN; -} - static void TR_commManagerPollClose(void * _this, TR_Event event) @@ -189,11 +159,8 @@ TR_INIT_IFACE( TR_commManagerPollAddEndpoint, TR_commManagerPollSelect, TR_commManagerPollEnableWrite, - TR_commManagerPollDisableWrite, TR_commManagerPollEnableRead, - TR_commManagerPollClose, - TR_commManagerPollDisableWrite, - TR_commManagerPollDisableRead); + TR_commManagerPollClose); TR_CREATE_CLASS( TR_CommManagerPoll, TR_CommManager, diff --git a/src/comm_manager_shutdown.c b/src/comm_manager_shutdown.c index 7bd7b25..7126312 100644 --- a/src/comm_manager_shutdown.c +++ b/src/comm_manager_shutdown.c @@ -27,22 +27,17 @@ #include "trevent.h" #include "tr/comm_manager.h" +#include "tr/_comm_manager.h" TR_EventDone -TR_commManagerShutdown(void * _this, TR_Event event) +TR_commManagerShutdown(TR_CommManager this, TR_Event event) { - TR_CommManager this = _this; - nfds_t i; + nfds_t i; for (i=0; i<=this->max_handle; i++) { if (this->endpoints[i]) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - (TR_EventSubject)this->endpoints[i], - TR_CEP_EVENT_CLOSE, - NULL)); - } + TR_ISSUE_IO_CLOSE_EVENT(this, this->endpoints[i]); + } } return TR_EVENT_DONE; diff --git a/src/cep_append_write_data.c b/src/comm_manager_shutdown_read.c similarity index 56% rename from src/cep_append_write_data.c rename to src/comm_manager_shutdown_read.c index c4d5964..29eab94 100644 --- a/src/cep_append_write_data.c +++ b/src/comm_manager_shutdown_read.c @@ -21,14 +21,34 @@ */ #include "trbase.h" -#include "trio.h" +#include "trdata.h" +#include "trevent.h" #include "tr/comm_end_point.h" +#include "tr/comm_manager.h" +#include "tr/_comm_manager.h" -void -TR_cepAppendWriteData(TR_CommEndPoint this, TR_RemoteData data) +TR_EventDone +TR_commManagerShutdownRead(TR_CommManager this, TR_Event event) { - TR_queuePut(this->write_buffer, data); + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + if (! TR_socketFinRd(endpoint->transport)) { + TR_socketShutdownRead(endpoint->transport); + } + + if (TR_socketFinRdWr(endpoint->transport)) { + // close + TR_ISSUE_IO_CLOSE_EVENT(this, event->subject); + } + + if (! TR_cepHasPendingData(endpoint)) { + TR_ISSUE_IO_SHUT_WRITE_EVENT(this, event->subject); + } + + TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject)); + + return TR_EVENT_DONE; } // vim: set ts=4 sw=4: diff --git a/src/cep_append_read_data.c b/src/comm_manager_shutdown_write.c similarity index 64% rename from src/cep_append_read_data.c rename to src/comm_manager_shutdown_write.c index 9885d42..6fcfc58 100644 --- a/src/cep_append_read_data.c +++ b/src/comm_manager_shutdown_write.c @@ -21,14 +21,26 @@ */ #include "trbase.h" -#include "trio.h" +#include "trevent.h" #include "tr/comm_end_point.h" +#include "tr/comm_manager.h" +#include "tr/_comm_manager.h" -void -TR_cepAppendReadData(TR_CommEndPoint this, TR_RemoteData data) +TR_EventDone +TR_commManagerShutdownWrite(TR_CommManager this, TR_Event event) { - TR_queuePut(this->read_buffer, data); + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + + if (! TR_socketFinWr(endpoint->transport)) { + TR_socketShutdownWrite(endpoint->transport); + } + + TR_ISSUE_IO_CLOSE_EVENT(this, event->subject); + + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); + + return TR_EVENT_DONE; } // vim: set ts=4 sw=4: diff --git a/src/connection.c b/src/connection.c index 9f17f93..f3b3b16 100644 --- a/src/connection.c +++ b/src/connection.c @@ -57,62 +57,74 @@ connectionDtor(void * _this) static TR_ProtoMessage -connectionNextMessage(void * _this) +connectionNextMessage(void * _this, TR_RemoteData * data) { TR_Connection this = _this; TR_CommEndPoint comm = _this; - TR_RemoteData data = TR_queueGet(comm->read_buffer); TR_ProtoMessage ret_message = NULL; + TR_RemoteData new_data = NULL; size_t end; - if (NULL == data) return ret_message; - - if (! this->current_message || this->current_message->ready) - { - this->current_message = - TR_protoCreateMessage(comm->protocol, data->remote); - } + if (*data) { + if (! this->current_message || this->current_message->ready) + { + this->current_message = + TR_protoCreateMessage(comm->protocol, (*data)->remote); + } - end = TR_protoParse(comm->protocol, this->current_message, data); + end = TR_protoParse(comm->protocol, this->current_message, *data); - if (end != ((TR_SizedData)data)->size) { /** - * TODO - * This means that the parser has not consumed all of the data. - * We do not know the reason, but with HTTP this should only occur - * when the message is complete... anyway, to prevent us from - * looping forever because a protocol implementation is buggy - * we should close the connection after end was 0 the second time. - * This can be done by firing a close event. + * We define that the only valid reason for a protocol parser to not + * consume all data is, that the current message is complete. + * When a parser returns a not completely consumed data then we first + * check if the current message is ready. If it is we create a + * new data object from the remaining data and return it to the caller + * along with the message. The caller (which is the protocol handler) + * can then call this again with the remaining data. + * If the message is not ready we drop the data silently because there + * is either wrong data or a bug in the parser and the data will never + * be consumed correctly. + * INFO: Usually we do not free data here at all. We leave this to the + * protocol implementation. The protocol might take the data without + * copying it at all or if it copies it is responsible for the free too. + * Only if we got a wrong behaviour of the protocol we free the data + * ourself. + * IMPORTANT: The protocol should never free the data when it does not + * consume it completely. + * IMPORTANT: To keep this maintainable we must write a log message here + * when we drop data. This message should be a WARNING as the protocol + * might want to drop data intentionally... (probably this is not + * true and we should make it an ERROR). */ - switch(end) { - default: - { - TR_RemoteData new_data = TR_new( - TR_RemoteData, - ((TR_SizedData)data)->data + end, - ((TR_SizedData)data)->size - end, - data->remote); - TR_delete(data); - data = new_data; - } - // intended drop through - - case 0: - TR_queuePutFirst(comm->read_buffer, data); + if (this->current_message->ready) { + if (end != ((TR_SizedData)*data)->size) { + new_data = TR_new( + TR_RemoteData, + ((TR_SizedData)*data)->data + end, + ((TR_SizedData)*data)->size - end, + (*data)->remote); + } + ret_message = this->current_message; + this->current_message = NULL; + } else { + if (end != ((TR_SizedData)*data)->size) { + TR_delete(*data); + TR_loggerLog( + TR_logger, + TR_LOGGER_WARNING, + "Drop data not consumed by protocol."); + } } - } - if (this->current_message->ready) { - ret_message = this->current_message; - this->current_message = NULL; + *data = new_data; } return ret_message; } static -int +size_t connectionCompose(void * _this, TR_ProtoMessage message) { TR_RemoteData data = @@ -123,7 +135,7 @@ connectionCompose(void * _this, TR_ProtoMessage message) } TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); - return TRUE; + return ((TR_SizedData)data)->size; } static diff --git a/src/connector.c b/src/connector.c index 7652811..3ddfdf9 100644 --- a/src/connector.c +++ b/src/connector.c @@ -76,6 +76,15 @@ connectorAccept(void * _this, TR_Event event) socket = TR_socketAccept((TR_TcpSocket)connection->transport); } + if (! socket) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)this, + TR_eventSubjectEmit( + (TR_EventSubject)connection, + TR_CEP_EVENT_READ_BLOCK, + NULL)); + } + return TR_EVENT_DONE; } diff --git a/src/datagram_service.c b/src/datagram_service.c index c1115a1..54de58e 100644 --- a/src/datagram_service.c +++ b/src/datagram_service.c @@ -51,48 +51,31 @@ datagramServiceDtor(void * _this) static TR_ProtoMessage -datagramServiceNextMessage(void * _this) +datagramServiceNextMessage(void * _this, TR_RemoteData * data) { - TR_CommEndPoint comm = _this; - TR_RemoteData data = TR_queueGet(comm->read_buffer); - TR_ProtoMessage ret_message = NULL; - size_t end; - - if (NULL == data) return ret_message; - - ret_message = TR_protoCreateMessage(comm->protocol, data->remote); - end = TR_protoParse(comm->protocol, ret_message, data); - - if (end != ((TR_SizedData)data)->size) { - /** - * TODO - * This means that the parser has not consumed all of the data. - * We do not know the reason, but with HTTP this should only occur - * when the message is complete... anyway, to prevent us from - * looping forever because a protocol implementation is buggy - * we should close the connection after end was 0 the second time. - * This can be done by firing a close event. + TR_CommEndPoint comm = _this; + TR_ProtoMessage ret_message = NULL; + + if (*data) { + ret_message = TR_protoCreateMessage(comm->protocol, (*data)->remote); + TR_protoParse(comm->protocol, ret_message, *data); + + /* + * In UDP I don't care about remaining data. UDP is an all or nothing + * approach. If the parser is unable to create a message from the data I + * drop the data. + * Here the protocol must not free the data if it does not create a + * complete message from it. If a message was created the original data + * might or might not traval with the created message. In any case the + * protocol is then responsible for freeing the data. */ - switch(end) { - default: - { - TR_RemoteData new_data = TR_new( - TR_RemoteData, - ((TR_SizedData)data)->data + end, - ((TR_SizedData)data)->size - end, - data->remote); - TR_delete(data); - data = new_data; - } - // intended drop through - - case 0: - TR_queuePutFirst(comm->read_buffer, data); + + if (! ret_message->ready) { + TR_delete(*data); + TR_delete(ret_message); } - } - if (! ret_message->ready) { - TR_delete(ret_message); + *data = NULL; } return ret_message; @@ -110,7 +93,7 @@ datagramServiceCompose(void * _this, TR_ProtoMessage message) } TR_queuePut(((TR_CommEndPoint)_this)->write_buffer, data); - return TRUE; + return ((TR_SizedData)data)->size; } intptr_t datagramService_events[TR_CEP_EVENT_MAX + 1]; diff --git a/src/i_comm_end_point.c b/src/i_comm_end_point.c index 8a18571..eb09882 100644 --- a/src/i_comm_end_point.c +++ b/src/i_comm_end_point.c @@ -31,17 +31,17 @@ TR_CREATE_INTERFACE(TR_CommEndPoint, 2); TR_ProtoMessage -TR_cepNextMessage(void * _this) +TR_cepNextMessage(void * _this, TR_RemoteData * data) { TR_ProtoMessage callret; - TR_RETCALL(_this, TR_CommEndPoint, nextMessage, callret); + TR_RETCALL(_this, TR_CommEndPoint, nextMessage, callret, data); return callret; } -int +size_t TR_cepCompose(void * _this, TR_ProtoMessage message) { - int callret; + size_t callret; TR_RETCALL(_this, TR_CommEndPoint, compose, callret, message); return callret; } diff --git a/src/i_comm_manager.c b/src/i_comm_manager.c index c9ae933..70b18e4 100644 --- a/src/i_comm_manager.c +++ b/src/i_comm_manager.c @@ -29,8 +29,9 @@ #include "tr/interface/comm_manager.h" #include "tr/comm_end_point.h" #include "tr/comm_manager.h" +#include "tr/_comm_manager.h" -TR_CREATE_INTERFACE(TR_CommManager, 8); +TR_CREATE_INTERFACE(TR_CommManager, 5); void TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint) @@ -48,17 +49,53 @@ TR_commManagerAddEndpoint(void * _this, TR_CommEndPoint endpoint) : this->max_handle; this->endpoints[endpoint->transport->handle] = endpoint; + + if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) + && ((TR_TcpSocket)endpoint->transport)->listen) { + TR_hashAdd(this->accept, endpoint); + TR_ISSUE_IO_ACC_EVENT(this, endpoint); + } else { + TR_hashAdd(this->read, endpoint); + TR_ISSUE_IO_READ_EVENT(this, endpoint); + } + TR_CALL(_this, TR_CommManager, addEndpoint, endpoint); } +static +void +commManagerIssueAcceptEvents(const void * endpoint, const void * comm_manager) +{ + TR_ISSUE_IO_ACC_EVENT(comm_manager, endpoint); +} + +static +void +commManagerIssueWriteEvents(const void * endpoint, const void * comm_manager) +{ + TR_ISSUE_IO_WRITE_EVENT(comm_manager, endpoint); +} + +static +void +commManagerIssueReadEvents(const void * endpoint, const void * comm_manager) +{ + TR_ISSUE_IO_READ_EVENT(comm_manager, endpoint); +} + TR_EventDone TR_commManagerSelect(void * _this, TR_Event event) { + TR_CommManager this = _this; int timeout; // milliseconds int * timeoutptr = event->data; TR_EventDispatcher dispatcher = (TR_EventDispatcher)event->subject; - if (NULL == timeoutptr) { + if (! (TR_hashEmpty(this->read) + && TR_hashEmpty(this->write) + && TR_hashEmpty(this->accept))) { + timeout = 0; + } else if (NULL == timeoutptr) { timeout = TR_eventDispatcherGetDataWaitTime(dispatcher); } else { timeout = *timeoutptr; @@ -66,41 +103,42 @@ TR_commManagerSelect(void * _this, TR_Event event) TR_CALL(_this, TR_CommManager, select, event, timeout); + TR_hashEach(this->write, this, commManagerIssueWriteEvents); + TR_hashEach(this->accept, this, commManagerIssueAcceptEvents); + TR_hashEach(this->read, this, commManagerIssueReadEvents); + return TR_EVENT_DONE; } TR_EventDone -TR_commManagerEnableWrite(void * _this, TR_Event event) +TR_commManagerPollWrite(void * _this, TR_Event event) { - TR_CALL(_this, TR_CommManager, enableWrite, event); + TR_CommManager this = _this; + + TR_hashDeleteByVal(this->write, TR_hashableGetHash(event->subject)); + if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) { + TR_CALL(_this, TR_CommManager, pollWrite, event); + } return TR_EVENT_DONE; } TR_EventDone -TR_commManagerDisableWrite(void * _this, TR_Event event) +TR_commManagerPollRead(void * _this, TR_Event event) { - TR_EventHandler this = _this; + TR_CommManager this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - TR_CALL(_this, TR_CommManager, disableWrite, event); - - if (TR_socketFinRd(endpoint->transport)) { - TR_eventHandlerIssueEvent( - this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_SHUT_READ, - NULL)); + if (TR_INSTANCE_OF(TR_TcpSocket, endpoint->transport) + && ((TR_TcpSocket)endpoint->transport)->listen) { + TR_hashDeleteByVal(this->accept, TR_hashableGetHash(event->subject)); + } else { + TR_hashDeleteByVal(this->read, TR_hashableGetHash(event->subject)); } - return TR_EVENT_DONE; -} - -TR_EventDone -TR_commManagerEnableRead(void * _this, TR_Event event) -{ - TR_CALL(_this, TR_CommManager, enableRead, event); + if (! TR_socketFinRd(endpoint->transport)) { + TR_CALL(_this, TR_CommManager, pollRead, event); + } return TR_EVENT_DONE; } @@ -110,74 +148,26 @@ TR_commManagerClose(void * _this, TR_Event event) { TR_CommManager this = _this; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - - TR_CALL(_this, TR_CommManager, close, event); + int handle = endpoint->transport->handle; if (! TR_socketFinRdWr(endpoint->transport)) { TR_socketShutdown(endpoint->transport); } - if (endpoint->transport->handle == this->max_handle) { + if (handle == this->max_handle) { while (! this->endpoints[--this->max_handle]); } - if (this->endpoints[endpoint->transport->handle]) { - TR_eventSubjectFinalize( - (TR_EventSubject)this->endpoints[endpoint->transport->handle]); - this->endpoints[endpoint->transport->handle] = NULL; - } - - return TR_EVENT_DONE; -} - -TR_EventDone -TR_commManagerShutdownRead(void * _this, TR_Event event) -{ - TR_CALL(_this, TR_CommManager, shutdownRead, event); - - if (! TR_socketFinRd(((TR_CommEndPoint)event->subject)->transport)) { - TR_socketShutdownRead(((TR_CommEndPoint)event->subject)->transport); + if (this->endpoints[handle]) { + TR_eventSubjectFinalize((TR_EventSubject)this->endpoints[handle]); + this->endpoints[handle] = NULL; + TR_hashDeleteByVal(this->write, TR_hashableGetHash(endpoint)); + TR_hashDeleteByVal(this->read, TR_hashableGetHash(endpoint)); } - if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) { - // close - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_CLOSE, - NULL)); - } - - if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_SHUT_WRITE, - NULL)); - } - - return TR_EVENT_DONE; -} - -TR_EventDone -TR_commManagerShutdownWrite(void * _this, TR_Event event) -{ - TR_CALL(_this, TR_CommManager, shutdownWrite, event); - - if (! TR_socketFinWr(((TR_CommEndPoint)event->subject)->transport)) { - TR_socketShutdownWrite(((TR_CommEndPoint)event->subject)->transport); - } + TR_CALL(_this, TR_CommManager, close, event); - if (TR_socketFinRdWr(((TR_CommEndPoint)event->subject)->transport)) { - TR_eventHandlerIssueEvent( - (TR_EventHandler)_this, - TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_CLOSE, - NULL)); - } + this->endpoints[handle] = NULL; return TR_EVENT_DONE; } diff --git a/src/io_handler.c b/src/io_handler.c index e56f9f2..a6dd819 100644 --- a/src/io_handler.c +++ b/src/io_handler.c @@ -21,9 +21,12 @@ */ #include +#include +#include #include "trbase.h" #include "trevent.h" +#include "trio.h" #include "tr/io_handler.h" #include "tr/comm_end_point.h" @@ -44,40 +47,47 @@ static TR_EventDone ioHandlerRead(void * _this, TR_Event event) { - TR_Event revent; + TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + TR_Event revent; + TR_RemoteData data; - switch (TR_cepBufferRead((TR_CommEndPoint)event->subject)) { - case FALSE: // EAGAIN - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_READ_BLOCK, - NULL); - break; + if (endpoint->write_buffer_size < CEP_WRITE_BUFFER_THRESHOLD) { + switch (TR_commEndPointRead(endpoint, &data)) { + case FALSE: // EAGAIN + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_READ_BLOCK, + NULL); + break; - case -1: // error - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); - break; + case -1: // error + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_CLOSE, + NULL); + break; - default: - case -2: // remote close - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_SHUT_READ, - NULL); - break; + default: + case -2: // remote close + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_SHUT_READ, + NULL); + break; - case TRUE: - revent = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_NEW_DATA, - NULL); - break; - } + case -3: // read limit + return TR_EVENT_DONE; - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + case TRUE: + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_NEW_DATA, + data); + break; + } + + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + } return TR_EVENT_DONE; } @@ -86,15 +96,15 @@ static TR_EventDone ioHandlerWrite(void * _this, TR_Event event) { - TR_Event revent = NULL, - close_event = NULL; + TR_Event revent = NULL; TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; + size_t written; - switch (TR_cepWriteBuffered(endpoint)) { + switch (TR_cepWriteBuffered(endpoint, &written)) { case FALSE: // EAGAIN revent = TR_eventSubjectEmit( event->subject, - TR_CEP_EVENT_PENDING_DATA, // is WRITE_BLOCK + TR_CEP_EVENT_WRITE_BLOCK, NULL); break; @@ -112,24 +122,31 @@ ioHandlerWrite(void * _this, TR_Event event) NULL); break; + case -3: // remote end not ready + break; + + case -4: // no more data to send + revent = TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_DATA_END, + NULL); + break; + default: - if (! TR_cepHasPendingData((TR_CommEndPoint)event->subject)) { + // TODO This still looks wrong... + if (TRUE == endpoint->do_close) { revent = TR_eventSubjectEmit( event->subject, - TR_CEP_EVENT_END_DATA, + TR_CEP_EVENT_CLOSE, NULL); - - if (TRUE == ((TR_CommEndPoint)event->subject)->do_close) { - close_event = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_CLOSE, - NULL); - } } } - TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); - TR_eventHandlerIssueEvent((TR_EventHandler)_this, close_event); + endpoint->write_buffer_size -= written; + + if (revent) { + TR_eventHandlerIssueEvent((TR_EventHandler)_this, revent); + } return TR_EVENT_DONE; } @@ -141,13 +158,12 @@ ioHandlerCvInit(TR_class_ptr cls) TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, - TR_CEP_EVENT_READ_READY, + TR_CEP_EVENT_DO_READ, ioHandlerRead); - TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, - TR_CEP_EVENT_WRITE_READY, + TR_CEP_EVENT_DO_WRITE, ioHandlerWrite); } diff --git a/src/protocol_handler.c b/src/protocol_handler.c index e621dc8..f78bb60 100644 --- a/src/protocol_handler.c +++ b/src/protocol_handler.c @@ -51,9 +51,10 @@ protocolHandlerParse(void * _this, TR_Event event) * TODO No upgrade for now. Add it later on. */ TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; - TR_ProtoMessage message = TR_cepNextMessage(endpoint); + TR_RemoteData data = event->data; + TR_ProtoMessage message; - if (message) { + while ((message = TR_cepNextMessage(endpoint, &data))) { TR_eventHandlerIssueEvent( (TR_EventHandler)_this, TR_eventSubjectEmit( @@ -62,8 +63,8 @@ protocolHandlerParse(void * _this, TR_Event event) message)); if (message->close) { - // also check that we are a response. Well this is how it is done - // in the python code... + // also check that we are a response. Well this is + // how it is done in the python code... TR_cepSetClose(endpoint); } } @@ -77,6 +78,7 @@ protocolHandlerCompose(void * _this, TR_Event event) { TR_CommEndPoint endpoint = (TR_CommEndPoint)event->subject; TR_ProtoMessage message = (TR_ProtoMessage)event->data; + size_t message_size; if (message->close) { // also check that we are a response. Well this is how it is done @@ -84,13 +86,17 @@ protocolHandlerCompose(void * _this, TR_Event event) TR_cepSetClose(endpoint); } - if (TR_cepCompose(endpoint, message)) { - TR_Event _event = TR_eventSubjectEmit( - event->subject, - TR_CEP_EVENT_WRITE_READY, - NULL); + if ((message_size = TR_cepCompose(endpoint, message))) { + endpoint->write_buffer_size += message_size; - TR_eventHandlerIssueEvent((TR_EventHandler)_this, _event); + if (endpoint->write_buffer->nmsg == 1) { + TR_eventHandlerIssueEvent( + (TR_EventHandler)_this, + TR_eventSubjectEmit( + event->subject, + TR_CEP_EVENT_DATA_READY, + NULL)); + } } TR_delete(message); @@ -112,18 +118,16 @@ protocolHandlerCvInit(TR_class_ptr cls) TR_CommEndPoint, TR_CEP_EVENT_NEW_DATA, protocolHandlerParse); - TR_EVENT_HANDLER_SET_METHOD( cls, TR_CommEndPoint, - TR_CEP_EVENT_SEND_MSG, + TR_CEP_EVENT_MSG_READY, protocolHandlerCompose); - - TR_EVENT_HANDLER_SET_METHOD( - cls, - TR_CommEndPoint, - TR_CEP_EVENT_UPGRADE, - protocolHandlerUpgrade); +// TR_EVENT_HANDLER_SET_METHOD( +// cls, +// TR_CommEndPoint, +// TR_CEP_EVENT_UPGRADE, +// protocolHandlerUpgrade); } TR_INSTANCE(TR_Hash, protocolHandlerEventMethods); diff --git a/testers/test_handler.c b/testers/test_handler.c index 21d8a73..6bf319e 100644 --- a/testers/test_handler.c +++ b/testers/test_handler.c @@ -39,7 +39,7 @@ testHandlerNewMessage(TR_EventHandler this, TR_Event event) _event = TR_eventSubjectEmit( event->subject, - TR_CEP_EVENT_SEND_MSG, + TR_CEP_EVENT_MSG_READY, event->data); TR_eventHandlerIssueEvent((TR_EventHandler)this, _event); @@ -56,14 +56,14 @@ testHandlerClose(TR_EventHandler this, TR_Event event) return TR_EVENT_PENDING; } -static -TR_EventDone -testHandlerUpgrade(TR_EventHandler this, TR_Event event) -{ - printf("upgrade: %"PRIdPTR"\n", event->id); - - return TR_EVENT_PENDING; -} +//static +//TR_EventDone +//testHandlerUpgrade(TR_EventHandler this, TR_Event event) +//{ +// printf("upgrade: %"PRIdPTR"\n", event->id); +// +// return TR_EVENT_PENDING; +//} static int @@ -101,11 +101,11 @@ testHandlerCvInit(TR_class_ptr class) TR_CommEndPoint, TR_CEP_EVENT_CLOSE, testHandlerClose); - TR_EVENT_HANDLER_SET_METHOD( - class, - TR_CommEndPoint, - TR_CEP_EVENT_UPGRADE, - testHandlerUpgrade); +// TR_EVENT_HANDLER_SET_METHOD( +// class, +// TR_CommEndPoint, +// TR_CEP_EVENT_UPGRADE, +// testHandlerUpgrade); } TR_INSTANCE(TR_Hash, testHandlerEventMethods); diff --git a/testers/testclient.sh b/testers/testclient.sh index 50aac64..b8433ab 100755 --- a/testers/testclient.sh +++ b/testers/testclient.sh @@ -1,10 +1,15 @@ #!/bin/sh +BS=8192 +COUNT=25000 +CONCURENT=200 +IP="192.168.2.13" pids="" i=0 -while [ $i -lt 800 ] + +while [ $i -lt ${CONCURENT} ] do - dd if=/dev/zero bs=8192 count=2500 | nc 192.168.2.13 5678 & + dd if=/dev/zero bs=${BS} count=${COUNT} | nc ${IP} 5678 >/dev/null & pids="${pids} $!" i=$((i + 1)) done