diff -r a00ae018daf8 -r 00b6708d1852 scheduler.py --- a/scheduler.py Sun Mar 21 17:06:53 2010 -0400 +++ b/scheduler.py Sun Mar 21 20:56:44 2010 -0400 @@ -12,6 +12,20 @@ # reason, you shouldnt call the callback/errorback method yourself (or # at least, don't put it back in the scheduler after that). +class _Callback(object): + def __init__(self, callback, errorback, threaded = False): + self.callback = callback + self.errorback = errorback + self.next = None + self.threaded = threaded + def Chain(self, next): + # Can only be called once + assert(self.next is None) + self.next = next + return next + def Next(self): + return self.next + class Task(object): @staticmethod @@ -24,42 +38,45 @@ def __init__(self, func = None, *args, **kwargs): super(Task, self).__init__() - self.callbacks = [] + self.head = None + self.tail = None if func: def callback(result): return func(*args, **kwargs) self.AddCallback(callback) - def AddCallback(self, callback, errorback = None): + def _AddCallback(self, callback): + if self.head is None: + self.head = self.tail = callback + else: + self.tail = self.tail.Chain(callback) + + def _GetNext(self): + head = self.head + if head: + self.head = head.Next() + return head + + def AddCallback(self, callback, errorback = None, threaded = False): if errorback == None: errorback = self.DefaultErrorback - self.callbacks.append((callback, errorback)) + cb = _Callback(callback, errorback, threaded) + self._AddCallback(cb) # permit chained calls return self - def ChainTask(self, task): - return self.AddCallback(task.Callback, task.Errorback) + def AddThreadedCallback(self, callback, errorback = None): + return self.AddCallback(callback, errorback, True) - def Callback(self, result, error = None, traceback = None): - logging.debug("Handling task %r callbacks", self) - for cb, eb in self.callbacks: - try: - if error: - error = eb(error) - else: - result = cb(result) - except: - errtype, error, traceback = sys.exc_info() - if error: - raise error, None, traceback - return result + def ChainTask(self, task): + self.tail.Chain(task.head) + self.tail = task.tail - ## - # Consume the callbacks with an error. Notes that error will - # raise if not handled and return value would be a result - def Errorback(self, error, traceback = None): - return self.Callback(None, error, traceback) - +class ThreadedTask(Task): + def __init__(self, func, *args, **kwargs): + super(ThreadedTask, self).__init__(func, *args, **kwargs) + self.head.threaded = True + class Scheduler(threading.Thread): class _SchedulerStop(Exception): @@ -76,8 +93,32 @@ task = self.tasks.get(blocking) except Queue.Empty: logging.debug("No task to run") - return - return task.Callback(None) + return None + result = None + error = None + traceback = None + while True: + cb = task._GetNext() + if not cb: + # no more callback + break + if cb.threaded: + # a threaded callback + self._AddJob(task, cb, result, error, traceback) + # don't pass Go, don't reclaim $200 + return None + # Run the callback according to the current state + try: + if error: + error = cb.errorback(error) + else: + result = cb.callback(result) + except: + errtype, error, traceback = sys.exc_info() + if error: + raise error, None, traceback + else: + return result def Run(self): logging.info("Scheduler start") @@ -98,6 +139,10 @@ self.pool.Stop(now) if now: self.tasks = Queue.Queue() + # We raise an exception to find if we stop stop the scheduler. + # We could have use a None task, but this make it easier if we + # want to add such mechanism public or we want to stop on + # other exception def RaiseSchedulerStop(): raise self._SchedulerStop self.AddTask(Task(RaiseSchedulerStop)) @@ -106,48 +151,33 @@ def AddTask(self, task, blocking = True): self.tasks.put(task, blocking) - ## - # A job is a task run in a seperated thread. After the job run, a - # new Task is add to the scheduler either with a result or an error, - # so that only the task, not the callbacks, is run in the worker - # thread. Note the passed task is consumed after this call. - # A better design would have to allow Task to suspend or - # resume themself and to run in the thread if it want to, but this will - # required the callback to know about its Task and the scheduler - # itself, which solve nothing. - def AddJob(self, task, func, *args, **kwargs): - def Job(): + def _AddJob(self, task, cb, result, error, traceback): + + def DoIt(task, cb, result, error, traceback): try: - result = func(*args, **kwargs) - def returnResult(): - return result - jobTask = Task(returnResult) + if error: + error = cb.errorback(error) + else: + result = cb.callback(result) except: errtype, error, traceback = sys.exc_info() - def raiseError(): + if error: + def RaiseError(): raise error, None, traceback - jobTask = Task(raiseError) + jobTask = Task(RaiseError) + else: + def ReturnResult(): + return result + jobTask = Task(ReturnResult) jobTask.ChainTask(task) self.AddTask(jobTask) + + # This double wrap (Job over DoIt) seems necessary to make + # error not look like a local of Job... + def Job(): + return DoIt(task, cb, result, error, traceback) self.pool.AddJob(Job) - ## - # This basically allow one callback to run in a seperated thread - # and then get the result to another deferred callback. Helas, - # this create two Tasks, one for the caller, one for the - # callbacks, with only the first one having to be passed to the - # scheduler. This should be more transparent: a callback which - # required to be run as a job should simply be mark like it and - # the current task suspend until the job is finished, then resume. - def CreateCallbackAsJob(self, src, cb): - dst = Task() - def CreateJobCallback(result): - def Job(): - return cb(result) - self.AddJob(dst, Job) - src.AddCallback(CreateJobCallback) - return dst - # The global scheduler scheduler = None @@ -172,6 +202,9 @@ def AsyncCall(name, seconds): global count count += 1 + + # Probably a bad example, since the callback + # doesn't return the exact same type... def Initialize(name, seconds): print "Here", name return name, seconds @@ -186,23 +219,17 @@ print name, "wakes up!" count -= 1 - dinit = Task(Initialize, name, seconds) - # How can I remove the scheduler from the picture ? - # The only way I can see is to have a special kind of Task - # and a suspended queue... May be this will also be more clean - dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) - dfinal.AddCallback(Finalize) - # This is confusing but the reason is that the dfinal callback - # will be added to the scheduler by the job itself at the end of - # its execution. - return dinit + task = Task(Initialize, name, seconds) + task.AddThreadedCallback(Blocking) + task.AddCallback(Finalize) + return task logging.info("Starting scheduler with 10 workers") StartScheduler(10) logging.info("Adding asynccall task") for x in xrange(int(sys.argv[1])): - dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) - scheduler.AddTask(dasync) + task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) + scheduler.AddTask(task) while count > 0: logging.debug("Count = %d", count) sleep(1)