Browse Source

Revert "Revert "more thread syncs.""

This reverts commit 48a57a14f5.
1.0.0
Georg Hopp 11 years ago
parent
commit
9e0658637e
  1. 3
      include/tr/event_dispatcher.h
  2. 7
      include/tr/event_handler.h
  3. 16
      include/tr/event_subject.h
  4. 5
      include/tr/event_thread.h
  5. 8
      src/event.c
  6. 1
      src/event_dispatcher.c
  7. 40
      src/event_dispatcher_start.c
  8. 12
      src/event_handler_handle_event.c
  9. 11
      src/event_handler_issue_event.c
  10. 17
      src/event_subject.c
  11. 3
      src/event_subject_emit.c
  12. 8
      src/event_thread.c
  13. 3
      src/event_thread_start.c

3
include/tr/event_dispatcher.h

@ -52,7 +52,8 @@ TR_CLASS(TR_EventDispatcher) {
TR_Queue events; TR_Queue events;
pthread_mutex_t events_lock; pthread_mutex_t events_lock;
pthread_cond_t events_cond; pthread_cond_t events_cond;
pthread_t events_wait;
pthread_t events_poll;
size_t events_handling;
TR_Hash handler; TR_Hash handler;
TR_EventHandler default_handler; TR_EventHandler default_handler;

7
include/tr/event_handler.h

@ -24,6 +24,7 @@
#define __TR_EVENT_HANDLER_H__ #define __TR_EVENT_HANDLER_H__
#include <sys/types.h> #include <sys/types.h>
#include <pthread.h>
#include "trbase.h" #include "trbase.h"
#include "trdata.h" #include "trdata.h"
@ -68,6 +69,12 @@ void TR__eventHandlerClassCleanup(TR_class_ptr);
sizeof(TR_EventMethod_fptr))); \ sizeof(TR_EventMethod_fptr))); \
} while(0) } while(0)
#define TR_INIT_HANDLER(cname) \
TR_INSTANCE(TR_Tree, cname##EventMethodsTree, NULL, PTHREAD_MUTEX_INITIALIZER); \
TR_INSTANCE(TR_Hash, cname##EventMethods, &(_##cname##EventMethodsTree.data), 0)
#define TR_HANDLER_CVARS(cname) &(_##cname##EventMethods.data)
#endif // __TR_EVENT_HANDLER_H__ #endif // __TR_EVENT_HANDLER_H__
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

16
include/tr/event_subject.h

@ -25,12 +25,14 @@
#include <sys/types.h> #include <sys/types.h>
#include <stdint.h> #include <stdint.h>
#include <pthread.h>
#include "trbase.h" #include "trbase.h"
TR_CLASS(TR_EventSubject) { TR_CLASS(TR_EventSubject) {
int fin;
size_t emitted;
int fin;
size_t emitted;
pthread_mutex_t lock;
}; };
TR_INSTANCE_INIT(TR_EventSubject); TR_INSTANCE_INIT(TR_EventSubject);
TR_CLASSVARS_DECL(TR_EventSubject) { TR_CLASSVARS_DECL(TR_EventSubject) {
@ -57,6 +59,16 @@ TR_CLASSVARS_DECL(TR_EventSubject) {
intptr_t TR__eventSubjectId(TR_class_ptr, size_t); intptr_t TR__eventSubjectId(TR_class_ptr, size_t);
TR_Event TR_eventSubjectEmit(TR_EventSubject, int, void *); TR_Event TR_eventSubjectEmit(TR_EventSubject, int, void *);
static
inline
void
TR_eventSubjectAbsorb(TR_EventSubject this)
{
pthread_mutex_lock(&this->lock);
this->emitted = this->emitted == 0 ? 0 : this->emitted - 1;
pthread_mutex_unlock(&this->lock);
}
#define TR_eventSubjectFinalize(es) ((es)->fin = TRUE) #define TR_eventSubjectFinalize(es) ((es)->fin = TRUE)
#endif // __TR_EVENT_SUBJECT_H__ #endif // __TR_EVENT_SUBJECT_H__

5
include/tr/event_thread.h

@ -29,8 +29,9 @@
#include "event_dispatcher.h" #include "event_dispatcher.h"
TR_CLASS(TR_EventThread) { TR_CLASS(TR_EventThread) {
TR_EventDispatcher dispatcher;
pthread_t handle;
TR_EventDispatcher dispatcher;
pthread_t handle;
char * name;
}; };
TR_INSTANCE_INIT(TR_EventThread); TR_INSTANCE_INIT(TR_EventThread);
TR_CLASSVARS_DECL(TR_EventThread) {}; TR_CLASSVARS_DECL(TR_EventThread) {};

8
src/event.c

@ -41,13 +41,13 @@ eventCtor(void * _this, va_list * params)
return 0; return 0;
} }
static void eventDtor(void * _this)
static
void
eventDtor(void * _this)
{ {
TR_Event this = _this; TR_Event this = _this;
this->subject->emitted =
this->subject->emitted == 0
? 0 : this->subject->emitted - 1;
TR_eventSubjectAbsorb(this->subject);
if (0 == this->subject->emitted && this->subject->fin) { if (0 == this->subject->emitted && this->subject->fin) {
TR_delete(this->subject); TR_delete(this->subject);

1
src/event_dispatcher.c

@ -57,6 +57,7 @@ init_signals(void)
signal(SIGABRT, terminate); signal(SIGABRT, terminate);
signal(SIGALRM, SIG_IGN); signal(SIGALRM, SIG_IGN);
signal(SIGURG, SIG_IGN); signal(SIGURG, SIG_IGN);
signal(SIGUSR1, SIG_IGN);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
} }

40
src/event_dispatcher_start.c

@ -20,6 +20,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _GNU_SOURCE
#include "trbase.h" #include "trbase.h"
#include "trdata.h" #include "trdata.h"
#include "trhash.h" #include "trhash.h"
@ -49,28 +51,39 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
(TR_EventSubject)this, (TR_EventSubject)this,
TR_DISPATCHER_EVENT_HEARTBEAT, TR_DISPATCHER_EVENT_HEARTBEAT,
NULL); NULL);
} else if (TR_queueEmpty(this->events)) {
if (! this->events_wait) {
} else {
event = TR_queueGet(this->events);
if (! (event || this->events_poll || this->events_handling)) {
int evtid = TR_EVD_CLIENT == this->mode int evtid = TR_EVD_CLIENT == this->mode
? TR_DISPATCHER_EVENT_USER_WAIT ? TR_DISPATCHER_EVENT_USER_WAIT
: TR_DISPATCHER_EVENT_DATA_WAIT; : TR_DISPATCHER_EVENT_DATA_WAIT;
this->events_wait = pthread_self();
this->events_poll = pthread_self();
event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, NULL); event = TR_eventSubjectEmit((TR_EventSubject)this, evtid, NULL);
} else {
pthread_cond_wait(&(this->events_cond), &(this->events_lock));
event = NULL;
} }
} else {
event = TR_queueGet(this->events);
} }
pthread_mutex_unlock(&(this->events_lock));
if (! event) { if (! event) {
char buffer[17];
pthread_getname_np(pthread_self(), buffer, 17);
TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,
"[%s] - enter cond wait",
buffer);
pthread_cond_wait(&(this->events_cond), &(this->events_lock));
TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,
"[%s] - leave cond wait",
buffer);
event = NULL;
pthread_mutex_unlock(&(this->events_lock));
continue; continue;
} else {
this->events_handling++;
} }
pthread_mutex_unlock(&(this->events_lock));
handler_queue_hv = TR_hashGetByVal( handler_queue_hv = TR_hashGetByVal(
this->handler, this->handler,
TR_sdbm( TR_sdbm(
@ -104,9 +117,12 @@ TR_eventDispatcherStart(TR_EventDispatcher this)
TR_delete(event); TR_delete(event);
} }
if (pthread_equal(this->events_wait, pthread_self())) {
this->events_wait = FALSE;
pthread_mutex_lock(&(this->events_lock));
this->events_handling--;
if (pthread_equal(this->events_poll, pthread_self())) {
this->events_poll = FALSE;
} }
pthread_mutex_unlock(&(this->events_lock));
} }
} }

12
src/event_handler_handle_event.c

@ -20,6 +20,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _GNU_SOURCE
#include <stdio.h> #include <stdio.h>
#include "trbase.h" #include "trbase.h"
@ -32,7 +34,7 @@
TR_EventDone TR_EventDone
TR_eventHandlerHandleEvent(TR_EventHandler this, TR_Event event) TR_eventHandlerHandleEvent(TR_EventHandler this, TR_Event event)
{ {
TR_EventDone retval;
char buffer[17];
TR_EventMethod_fptr event_func = NULL; TR_EventMethod_fptr event_func = NULL;
TR_HashValue handle_func_hv = TR_hashGetByVal( TR_HashValue handle_func_hv = TR_hashGetByVal(
TR_CLASSVARS(TR_EventHandler, TR_GET_CLASS(this))->event_methods, TR_CLASSVARS(TR_EventHandler, TR_GET_CLASS(this))->event_methods,
@ -44,17 +46,17 @@ TR_eventHandlerHandleEvent(TR_EventHandler this, TR_Event event)
event_func = *(TR_EventMethod_fptr *)handle_func_hv->value; event_func = *(TR_EventMethod_fptr *)handle_func_hv->value;
retval = event_func(this, event);
pthread_getname_np(pthread_self(), buffer, 17);
TR_loggerLog(TR_logger, TR_LOGGER_DEBUG, TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,
"[%ld] - HANDLE(%zd): %s event on %p with no. %d",
pthread_self(),
"[%s] - HANDLE(%zd): %s event on %p with no. %d",
buffer,
event->subject->emitted, event->subject->emitted,
TR_getEventString(event), TR_getEventString(event),
event->subject, event->subject,
event->serial); event->serial);
return retval;
return event_func(this, event);
} }
// vim: set ts=4 sw=4: // vim: set ts=4 sw=4:

11
src/event_handler_issue_event.c

@ -20,6 +20,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _GNU_SOURCE
#include <stdio.h> #include <stdio.h>
#include <pthread.h> #include <pthread.h>
@ -32,12 +34,15 @@ int
TR_eventHandlerIssueEvent(TR_EventHandler this, TR_Event event) TR_eventHandlerIssueEvent(TR_EventHandler this, TR_Event event)
{ {
if (event) { if (event) {
int i;
int i;
char buffer[17];
pthread_getname_np(pthread_self(), buffer, 17);
for (i=0; i<this->ndispatcher; i++) { for (i=0; i<this->ndispatcher; i++) {
TR_loggerLog(TR_logger, TR_LOGGER_DEBUG, TR_loggerLog(TR_logger, TR_LOGGER_DEBUG,
"[%ld] - ISSUE(%zd): %s event on %p with no. %d",
pthread_self(),
"[%s] - ISSUE(%zd): %s event on %p with no. %d",
buffer,
event->subject->emitted, event->subject->emitted,
TR_getEventString(event), TR_getEventString(event),
event->subject, event->subject,

17
src/event_subject.c

@ -27,8 +27,21 @@
#include "tr/logger.h" #include "tr/logger.h"
#include "trbase.h" #include "trbase.h"
static int eventSubjectCtor(void * _this, va_list * params) { return 0; }
static void eventSubjectDtor(void * _this) {}
static
int
eventSubjectCtor(void * _this, va_list * params)
{
pthread_mutex_init(&((TR_EventSubject)_this)->lock, NULL);
return 0;
}
static
void
eventSubjectDtor(void * _this)
{
pthread_mutex_destroy(&((TR_EventSubject)_this)->lock);
}
TR_INIT_IFACE(TR_Class, eventSubjectCtor, eventSubjectDtor, NULL); TR_INIT_IFACE(TR_Class, eventSubjectCtor, eventSubjectDtor, NULL);
TR_CREATE_CLASS(TR_EventSubject, NULL, NULL, TR_IF(TR_Class)) = { TR_CREATE_CLASS(TR_EventSubject, NULL, NULL, TR_IF(TR_Class)) = {

3
src/event_subject_emit.c

@ -22,6 +22,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdint.h> #include <stdint.h>
#include <pthread.h>
#include "tr/event.h" #include "tr/event.h"
#include "tr/event_subject.h" #include "tr/event_subject.h"
@ -37,7 +38,9 @@ TR_eventSubjectEmit(TR_EventSubject this, int idx, void * data)
if (id && ! this->fin) { if (id && ! this->fin) {
event = TR_new(TR_Event, id, this); event = TR_new(TR_Event, id, this);
TR_eventSetData(event, data); TR_eventSetData(event, data);
pthread_mutex_lock(&this->lock);
this->emitted++; this->emitted++;
pthread_mutex_unlock(&this->lock);
} }
return event; return event;

8
src/event_thread.c

@ -31,9 +31,13 @@ static
int int
eventThreadCtor(void * _this, va_list * params) eventThreadCtor(void * _this, va_list * params)
{ {
TR_EventThread this = _this;
TR_EventThread this = _this;
char * name;
this->dispatcher = va_arg(*params, TR_EventDispatcher); this->dispatcher = va_arg(*params, TR_EventDispatcher);
name = va_arg(*params, char *);
if (name) this->name = TR_strdup(name);
return 0; return 0;
} }
@ -48,6 +52,8 @@ eventThreadDtor(void * _this)
pthread_cancel(this->handle); pthread_cancel(this->handle);
pthread_join(this->handle, NULL); pthread_join(this->handle, NULL);
} }
TR_MEM_FREE(this->name);
} }
TR_INIT_IFACE(TR_Class, eventThreadCtor, eventThreadDtor, NULL); TR_INIT_IFACE(TR_Class, eventThreadCtor, eventThreadDtor, NULL);

3
src/event_thread_start.c

@ -20,6 +20,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _GNU_SOURCE
#include <pthread.h> #include <pthread.h>
#include "trbase.h" #include "trbase.h"
@ -32,6 +34,7 @@ void *
TR_eventStreadRun(void * message) TR_eventStreadRun(void * message)
{ {
TR_EventThread this = message; TR_EventThread this = message;
if (this->name) pthread_setname_np(pthread_self(), this->name);
TR_eventDispatcherStart(this->dispatcher); TR_eventDispatcherStart(this->dispatcher);
return NULL; return NULL;
} }

Loading…
Cancel
Save