import threading # Only used in SetupEventclass _Callback(object): def __init__(self, callback, errorback, threaded = False): self.callback = callback self.errorback = errorback self.next = None self.threaded = threaded def Chain(self, next): # Can only be called once assert(self.next is None) self.next = next return next def Next(self): return self.next### A task, representing a series of linked pairs of callback and error# handler Concept is similar to twisted, but you need to put all your# callback on the task before giving it to the Scheduler. For this# reason, you shouldnt call the callback/errorback method yourself (or# at least, don't put it back in the scheduler after that).class Task(object): @staticmethod def DefaultCallback(result): return result @staticmethod def DefaultErrorback(error): return error def __init__(self, func = None, *args, **kwargs): super(Task, self).__init__() self.head = None self.tail = None if func: def callback(result): return func(*args, **kwargs) self.AddCallback(callback) def _AddCallback(self, callback): if self.head is None: self.head = self.tail = callback else: self.tail = self.tail.Chain(callback) def _GetNext(self): head = self.head if head: self.head = head.Next() return head def AddCallback(self, callback, errorback = None, threaded = False): if errorback == None: errorback = self.DefaultErrorback cb = _Callback(callback, errorback, threaded) self._AddCallback(cb) # permit chained calls return self def AddThreadedCallback(self, callback, errorback = None): return self.AddCallback(callback, errorback, True) def ChainTask(self, task): self.tail.Chain(task.head) self.tail = task.tail def GrabResult(self, data = None): if data is None: data = {} def SetResult(result): data["result"] = result return result def SetError(error): data["error"] = error self.AddCallback(SetResult, SetError) return data def SetupEvent(self, event = None, data = None): if not event: event = threading.Event() def SetEvent(dummy): event.set() self.AddCallback(SetEvent, SetEvent) return event### Helper classclass ThreadedTask(Task): def __init__(self, func, *args, **kwargs): super(ThreadedTask, self).__init__(func, *args, **kwargs) self.head.threaded = True