Seperate task and scheduler.
import threading # Only used in SetupEvent
class _Callback(object):
def __init__(self, callback, errorback, threaded = False):
self.callback = callback
self.errorback = errorback
self.next = None
self.threaded = threaded
def Chain(self, next):
# Can only be called once
assert(self.next is None)
self.next = next
return next
def Next(self):
return self.next
##
# A task, representing a series of linked pairs of callback and error
# handler Concept is similar to twisted, but you need to put all your
# callback on the task before giving it to the Scheduler. For this
# reason, you shouldnt call the callback/errorback method yourself (or
# at least, don't put it back in the scheduler after that).
class Task(object):
@staticmethod
def DefaultCallback(result):
return result
@staticmethod
def DefaultErrorback(error):
return error
def __init__(self, func = None, *args, **kwargs):
super(Task, self).__init__()
self.head = None
self.tail = None
if func:
def callback(result):
return func(*args, **kwargs)
self.AddCallback(callback)
def _AddCallback(self, callback):
if self.head is None:
self.head = self.tail = callback
else:
self.tail = self.tail.Chain(callback)
def _GetNext(self):
head = self.head
if head:
self.head = head.Next()
return head
def AddCallback(self, callback, errorback = None, threaded = False):
if errorback == None:
errorback = self.DefaultErrorback
cb = _Callback(callback, errorback, threaded)
self._AddCallback(cb)
# permit chained calls
return self
def AddThreadedCallback(self, callback, errorback = None):
return self.AddCallback(callback, errorback, True)
def ChainTask(self, task):
self.tail.Chain(task.head)
self.tail = task.tail
def GrabResult(self, data = None):
if data is None:
data = {}
def SetResult(result):
data["result"] = result
return result
def SetError(error):
data["error"] = error
self.AddCallback(SetResult, SetError)
return data
def SetupEvent(self, event = None, data = None):
if not event:
event = threading.Event()
def SetEvent(dummy):
event.set()
self.AddCallback(SetEvent, SetEvent)
return event
##
# Helper class
class ThreadedTask(Task):
def __init__(self, func, *args, **kwargs):
super(ThreadedTask, self).__init__(func, *args, **kwargs)
self.head.threaded = True