You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
133 lines
4.1 KiB
133 lines
4.1 KiB
"""
|
|
Dispatch Events to registered handlers.
|
|
|
|
Author: Georg Hopp <ghopp@spamtitan.com>
|
|
"""
|
|
import sys
|
|
import time
|
|
import threading
|
|
from collections import deque
|
|
|
|
from Event import Event
|
|
from EventHandler import EventHandler
|
|
from EventSubject import EventSubject
|
|
|
|
class DefaultHandler(EventHandler):
|
|
def handleEvent(self, event):
|
|
return True
|
|
|
|
SERVER = 0x00
|
|
CLIENT = 0x01
|
|
|
|
class EventDispatcher(EventSubject):
|
|
_EVENTS = {
|
|
'heartbeat' : 0x01,
|
|
'user_wait' : 0x02,
|
|
'data_wait' : 0x03,
|
|
'shutdown' : 0x04
|
|
}
|
|
|
|
def __init__(self, mode = SERVER, default_handler = DefaultHandler()):
|
|
super(EventDispatcher, self).__init__()
|
|
|
|
self._events = deque([])
|
|
self._handler = {}
|
|
self._default_handler = default_handler
|
|
self._running = False
|
|
self._heartbeat = 0.0
|
|
self._nextbeat = 0.0
|
|
self._mode = mode
|
|
self._queue_lock = threading.Lock()
|
|
self._event_wait = threading.Condition()
|
|
self._data_wait_id = EventDispatcher.eventId('data_wait')
|
|
self._user_wait_id = EventDispatcher.eventId('user_wait')
|
|
|
|
def registerHandler(self, handler):
|
|
for eid in handler.getHandledIds():
|
|
if eid in self._handler:
|
|
self._handler[eid].append(handler)
|
|
else:
|
|
self._handler[eid] = [handler]
|
|
|
|
handler.setDispatcher(self)
|
|
|
|
def setHeartbeat(self, heartbeat):
|
|
self._heartbeat = heartbeat
|
|
if self._heartbeat:
|
|
self._nextbeat = time.time() + self._heartbeat
|
|
else:
|
|
self._nextbeat = 0.0
|
|
|
|
def getBeattime(self):
|
|
return self._nextbeat - time.time()
|
|
|
|
def getDataWaitTime(self):
|
|
if self._mode == SERVER:
|
|
return self.getBeattime()
|
|
|
|
# here comes a timeout into play.... currently I expect
|
|
# the stuff to work...
|
|
# TODO add timeout
|
|
return 0.0
|
|
|
|
def queueEvent(self, event):
|
|
self._queue_lock.acquire()
|
|
self._events.append(event)
|
|
self._queue_lock.release()
|
|
self._event_wait.acquire()
|
|
self._event_wait.notify_all()
|
|
self._event_wait.release()
|
|
|
|
def start(self, name):
|
|
self._running = True
|
|
|
|
while self._running or self._events:
|
|
now = time.time()
|
|
if self._nextbeat and self._nextbeat <= now:
|
|
self._nextbeat += self._heartbeat
|
|
self.queueEvent(self.emit('heartbeat'))
|
|
|
|
current = None
|
|
if not self._events:
|
|
if not name:
|
|
if self._mode == CLIENT:
|
|
current = self.emit('user_wait')
|
|
else:
|
|
current = self.emit('data_wait')
|
|
else:
|
|
self._event_wait.acquire()
|
|
self._event_wait.wait()
|
|
self._event_wait.release()
|
|
|
|
self._queue_lock.acquire()
|
|
if (not current) and self._events:
|
|
current = self._events.popleft()
|
|
self._queue_lock.release()
|
|
|
|
if current:
|
|
if current.type not in self._handler:
|
|
#print '[%s] handle: %s(%d) on %s: %s' % (
|
|
# name, current.name, current.sno, hex(id(current.subject)), 'default')
|
|
self._default_handler.handleEvent(current)
|
|
else:
|
|
for handler in self._handler[current.type]:
|
|
#print '[%s] handle: %s(%d) on %s: %s' % (
|
|
# name, current.name, current.sno, hex(id(current.subject)),
|
|
# handler.__class__.__name__)
|
|
if handler.handleEvent(current):
|
|
break
|
|
|
|
# if we leave the loop eventually inform all other threads
|
|
# so they can quit too.
|
|
self._event_wait.acquire()
|
|
self._event_wait.notify_all()
|
|
self._event_wait.release()
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
|
|
def shutdown(self):
|
|
self.queueEvent(self.emit('shutdown'))
|
|
self.stop()
|
|
|
|
# vim: set ft=python et ts=8 sw=4 sts=4:
|