From fcbef2f039c18c51006d259445e9023cb04bbd5d Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Wed, 24 Sep 2014 20:53:54 +0100 Subject: [PATCH] first try for a threaded event dispatcher, but this is not correctly working right now. --- include/Makefile.am | 3 +- include/tr/event_dispatcher.h | 13 ++++++- include/tr/event_thread.h | 44 +++++++++++++++++++++ include/trevent.h | 1 + src/Makefile.am | 7 +++- src/event_dispatcher.c | 9 ++++- src/event_dispatcher_start.c | 26 +++++++++++-- src/event_handler_handle_event.c | 21 ++++++----- src/event_handler_issue_event.c | 9 +++-- src/event_thread.c | 56 +++++++++++++++++++++++++++ src/event_thread_join.c | 65 ++++++++++++++++++++++++++++++++ src/event_thread_start.c | 57 ++++++++++++++++++++++++++++ 12 files changed, 288 insertions(+), 23 deletions(-) create mode 100644 include/tr/event_thread.h create mode 100644 src/event_thread.c create mode 100644 src/event_thread_join.c create mode 100644 src/event_thread_start.c diff --git a/include/Makefile.am b/include/Makefile.am index af2b7ed..ac977c1 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -2,4 +2,5 @@ nobase_include_HEADERS = trevent.h \ tr/event.h \ tr/event_handler.h \ tr/event_subject.h \ - tr/event_dispatcher.h + tr/event_dispatcher.h \ + tr/event_thread.h diff --git a/include/tr/event_dispatcher.h b/include/tr/event_dispatcher.h index c198303..20f38c4 100644 --- a/include/tr/event_dispatcher.h +++ b/include/tr/event_dispatcher.h @@ -25,6 +25,7 @@ #include #include +#include #include "trbase.h" #include "trdata.h" @@ -49,6 +50,10 @@ TR_CLASS(TR_EventDispatcher) { TR_EXTENDS(TR_EventSubject); TR_Queue events; + pthread_mutex_t events_lock; + pthread_cond_t events_cond; + pthread_t events_wait; + TR_Hash handler; TR_EventHandler default_handler; int running; @@ -78,8 +83,12 @@ TR_eventDispatcherGetDataWaitTime(TR_EventDispatcher); void TR_eventDispatcherStart(TR_EventDispatcher); void TR_eventDispatcherShutdown(TR_EventDispatcher); -#define TR_eventDispatcherEnqueueEvent(disp,ev) \ - (TR_queuePut((disp)->events, (ev))) +#define TR_eventDispatcherEnqueueEvent(disp,ev) \ + pthread_mutex_lock(&((disp)->events_lock)); \ + TR_queuePut((disp)->events, (ev)); \ + pthread_cond_broadcast(&((disp)->events_cond)); \ + pthread_mutex_unlock(&((disp)->events_lock)) + #define TR_eventDispatcherStop(disp) \ (((TR_EventDispatcher)disp)->running = 0) #define TR_eventDispatcherSetHeartbeat(disp, beat) \ diff --git a/include/tr/event_thread.h b/include/tr/event_thread.h new file mode 100644 index 0000000..896fb1c --- /dev/null +++ b/include/tr/event_thread.h @@ -0,0 +1,44 @@ +/** + * \file + * + * \author Georg Hopp + * + * \copyright + * Copyright © 2014 Georg Hopp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __TR_EVENT_THREAD_H__ +#define __TR_EVENT_THREAD_H__ + +#include + +#include "trbase.h" +#include "event_dispatcher.h" + +TR_CLASS(TR_EventThread) { + TR_EventDispatcher dispatcher; + pthread_t handle; +}; +TR_INSTANCE_INIT(TR_EventThread); +TR_CLASSVARS_DECL(TR_EventThread) {}; + +void TR_eventThreadStart(TR_EventThread); +void TR_eventThreadJoin(TR_EventThread); + +#endif // __TR_EVENT_THREAD_H__ + +// vim: set ts=4 sw=4: + diff --git a/include/trevent.h b/include/trevent.h index 944f15e..ff38210 100644 --- a/include/trevent.h +++ b/include/trevent.h @@ -5,6 +5,7 @@ #include "tr/event_handler.h" #include "tr/event_subject.h" #include "tr/event_dispatcher.h" +#include "tr/event_thread.h" #endif // __TR_EVENT_H__ diff --git a/src/Makefile.am b/src/Makefile.am index 2210eb8..a5109c7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,10 +17,13 @@ TREVENT = event.c \ event_handler_class_cleanup.c \ event_subject.c \ event_subject_emit.c \ - event_subject_id.c + event_subject_id.c \ + event_thread.c \ + event_thread_start.c \ + event_thread_join.c lib_LTLIBRARIES = libtrevent.la libtrevent_la_SOURCES = $(TREVENT) -libtrevent_la_CFLAGS = $(AM_CFLAGS) +libtrevent_la_CFLAGS = $(AM_CFLAGS) -pthread libtrevent_la_LIBADD = diff --git a/src/event_dispatcher.c b/src/event_dispatcher.c index e1efd29..9c2ae00 100644 --- a/src/event_dispatcher.c +++ b/src/event_dispatcher.c @@ -24,6 +24,7 @@ #include #include #include +#include #include "trbase.h" #include "trdata.h" @@ -65,7 +66,10 @@ int eventDispatcherCtor(void * _this, va_list * params) { TR_EventDispatcher this = _this; - this->events = TR_new(TR_Queue); + this->events = TR_new(TR_Queue); + pthread_mutex_init(&(this->events_lock), NULL); + pthread_cond_init(&(this->events_cond), NULL); + this->handler = TR_new(TR_Hash); this->heartbeat = TR_new(TR_Timer, TR_TBASE_MIL, 1000); this->mode = va_arg(*params, TR_EventDispatcherMode); @@ -98,6 +102,9 @@ eventDispatcherDtor(void * _this) { TR_delete(this->heartbeat); TR_delete(this->handler); TR_delete(this->events); + + pthread_mutex_destroy(&(this->events_lock)); + pthread_cond_destroy(&(this->events_cond)); } static diff --git a/src/event_dispatcher_start.c b/src/event_dispatcher_start.c index 1f2b3b5..7e38517 100644 --- a/src/event_dispatcher_start.c +++ b/src/event_dispatcher_start.c @@ -40,6 +40,8 @@ TR_eventDispatcherStart(TR_EventDispatcher this) TR_Queue handler_queue; TR_HashValue handler_queue_hv; + pthread_mutex_lock(&(this->events_lock)); + TR_eventDispatcherGetBeatTime(this); if (this->n_beats) { @@ -48,15 +50,27 @@ TR_eventDispatcherStart(TR_EventDispatcher this) TR_DISPATCHER_EVENT_HEARTBEAT, NULL); } else if (TR_queueEmpty(this->events)) { - int evtid = TR_EVD_CLIENT == this->mode - ? TR_DISPATCHER_EVENT_USER_WAIT - : TR_DISPATCHER_EVENT_DATA_WAIT; + if (! this->events_wait) { + int evtid = TR_EVD_CLIENT == this->mode + ? TR_DISPATCHER_EVENT_USER_WAIT + : TR_DISPATCHER_EVENT_DATA_WAIT; - event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, NULL); + this->events_wait = pthread_self(); + event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, NULL); + } else { + pthread_cond_wait(&(this->events_cond), &(this->events_lock)); + event = NULL; + } } else { event = TR_queueGet(this->events); } + pthread_mutex_unlock(&(this->events_lock)); + + if (! event) { + continue; + } + handler_queue_hv = TR_hashGetByVal( this->handler, TR_sdbm( @@ -89,6 +103,10 @@ TR_eventDispatcherStart(TR_EventDispatcher this) } else { TR_delete(event); } + + if (pthread_equal(this->events_wait, pthread_self())) { + this->events_wait = FALSE; + } } } diff --git a/src/event_handler_handle_event.c b/src/event_handler_handle_event.c index fa2b4ca..23f5a30 100644 --- a/src/event_handler_handle_event.c +++ b/src/event_handler_handle_event.c @@ -32,26 +32,29 @@ TR_EventDone TR_eventHandlerHandleEvent(TR_EventHandler this, TR_Event event) { + TR_EventDone retval; TR_EventMethod_fptr event_func = NULL; TR_HashValue handle_func_hv = TR_hashGetByVal( TR_CLASSVARS(TR_EventHandler, TR_GET_CLASS(this))->event_methods, TR_sdbm((unsigned char *)&event->id, sizeof(event->id))); - TR_loggerLog(TR_logger, TR_LOGGER_DEBUG, - "%zd - HANDLE(%zd): %s event on %p with no. %d", - this->dispatcher[0]->events->nmsg, - event->subject->emitted, - TR_getEventString(event), - event->subject, - event->serial); - if (! handle_func_hv) { return 0; } event_func = *(TR_EventMethod_fptr *)handle_func_hv->value; - return event_func(this, event); + retval = event_func(this, event); + + TR_loggerLog(TR_logger, TR_LOGGER_DEBUG, + "[%ld] - HANDLE(%zd): %s event on %p with no. %d", + pthread_self(), + event->subject->emitted, + TR_getEventString(event), + event->subject, + event->serial); + + return retval; } // vim: set ts=4 sw=4: diff --git a/src/event_handler_issue_event.c b/src/event_handler_issue_event.c index 1ef66f5..e339c0b 100644 --- a/src/event_handler_issue_event.c +++ b/src/event_handler_issue_event.c @@ -21,6 +21,7 @@ */ #include +#include #include "trbase.h" @@ -34,15 +35,15 @@ TR_eventHandlerIssueEvent(TR_EventHandler this, TR_Event event) int i; for (i=0; indispatcher; i++) { - TR_eventDispatcherEnqueueEvent(this->dispatcher[i], event); - TR_loggerLog(TR_logger, TR_LOGGER_DEBUG, - "%zd - ISSUE(%zd): %s event on %p with no. %d", - this->dispatcher[i]->events->nmsg, + "[%ld] - ISSUE(%zd): %s event on %p with no. %d", + pthread_self(), event->subject->emitted, TR_getEventString(event), event->subject, event->serial); + + TR_eventDispatcherEnqueueEvent(this->dispatcher[i], event); } return TRUE; diff --git a/src/event_thread.c b/src/event_thread.c new file mode 100644 index 0000000..4b3062e --- /dev/null +++ b/src/event_thread.c @@ -0,0 +1,56 @@ +/** + * \file + * + * \author Georg Hopp + * + * \copyright + * Copyright © 2014 Georg Hopp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include + +#include "trbase.h" + +#include "tr/event_dispatcher.h" +#include "tr/event_thread.h" + +static +int +eventThreadCtor(void * _this, va_list * params) +{ + TR_EventThread this = _this; + + this->dispatcher = va_arg(*params, TR_EventDispatcher); + + return 0; +} + +static +void +eventThreadDtor(void * _this) +{ + TR_EventThread this = _this; + + if (this->handle) { + pthread_cancel(this->handle); + pthread_join(this->handle, NULL); + } +} + +TR_INIT_IFACE(TR_Class, eventThreadCtor, eventThreadDtor, NULL); +TR_CREATE_CLASS(TR_EventThread, NULL, NULL, TR_IF(TR_Class)); + +// vim: set ts=4 sw=4: diff --git a/src/event_thread_join.c b/src/event_thread_join.c new file mode 100644 index 0000000..11ab0ef --- /dev/null +++ b/src/event_thread_join.c @@ -0,0 +1,65 @@ +/** + * \file + * + * \author Georg Hopp + * + * \copyright + * Copyright © 2014 Georg Hopp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include + +#include "trbase.h" + +#include "tr/event_thread.h" + +void +TR_eventThreadJoin(TR_EventThread this) +{ + int error = pthread_join(this->handle, NULL); + + /* + * AFAIC there is no error condition from this function that + * should lead to a retry. Additionally pthread_join is + * continued after a signal, so all I do is log error and + * continue with the next. + */ + switch (error) { + case EDEADLK: + TR_loggerLog( + TR_logger, + TR_LOGGER_WARNING, + "Thread deadlock detected"); + break; + + case EINVAL: + TR_loggerLog( + TR_logger, + TR_LOGGER_WARNING, + "Tried to join a non joinable thread"); + break; + + case ESRCH: + TR_loggerLog( + TR_logger, + TR_LOGGER_WARNING, + "Tried to join non existent thread"); + break; + } +} + +// vim: set ts=4 sw=4: diff --git a/src/event_thread_start.c b/src/event_thread_start.c new file mode 100644 index 0000000..deca49f --- /dev/null +++ b/src/event_thread_start.c @@ -0,0 +1,57 @@ +/** + * \file + * + * \author Georg Hopp + * + * \copyright + * Copyright © 2014 Georg Hopp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include + +#include "trbase.h" + +#include "tr/event_dispatcher.h" +#include "tr/event_thread.h" + +static +void * +TR_eventStreadRun(void * message) +{ + TR_EventThread this = message; + TR_eventDispatcherStart(this->dispatcher); + return NULL; +} + +void +TR_eventThreadStart(TR_EventThread this) +{ + int error = pthread_create( + &this->handle, + NULL, + TR_eventStreadRun, + (void *)this); + + if (error) { + TR_loggerLog( + TR_logger, + TR_LOGGER_ERR, + "Thread creation failed with error code: %d", + error); + } +} + +// vim: set ts=4 sw=4: