Rewrite the whole to use linked chain of callback, allowing the
authorFabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 20:56:44 -0400
changeset 3 00b6708d1852
parent 2 a00ae018daf8
child 4 76ba9b3a9e1c
Rewrite the whole to use linked chain of callback, allowing the reactor to be specified at the very end.
scheduler.py
--- a/scheduler.py	Sun Mar 21 17:06:53 2010 -0400
+++ b/scheduler.py	Sun Mar 21 20:56:44 2010 -0400
@@ -12,6 +12,20 @@
 # reason, you shouldnt call the callback/errorback method yourself (or
 # at least, don't put it back in the scheduler after that).
 
+class _Callback(object):
+    def __init__(self, callback, errorback, threaded = False):
+        self.callback = callback
+        self.errorback = errorback
+        self.next = None
+        self.threaded = threaded
+    def Chain(self, next):
+        # Can only be called once
+        assert(self.next is None)
+        self.next = next
+        return next
+    def Next(self):
+        return self.next
+
 class Task(object):
 
     @staticmethod
@@ -24,42 +38,45 @@
 
     def __init__(self, func = None, *args, **kwargs):
         super(Task, self).__init__()
-        self.callbacks = []
+        self.head = None
+        self.tail = None
         if func:
             def callback(result):
                 return func(*args, **kwargs)
             self.AddCallback(callback)
 
-    def AddCallback(self, callback, errorback = None):
+    def _AddCallback(self, callback):
+        if self.head is None:
+            self.head = self.tail = callback
+        else:
+            self.tail = self.tail.Chain(callback)
+
+    def _GetNext(self):
+        head = self.head
+        if head:
+            self.head = head.Next()
+        return head
+
+    def AddCallback(self, callback, errorback = None, threaded = False):
         if errorback == None:
             errorback = self.DefaultErrorback
-        self.callbacks.append((callback, errorback))
+        cb = _Callback(callback, errorback, threaded)
+        self._AddCallback(cb)
         # permit chained calls
         return self
 
-    def ChainTask(self, task):
-        return self.AddCallback(task.Callback, task.Errorback)
+    def AddThreadedCallback(self, callback, errorback = None):
+        return self.AddCallback(callback, errorback, True)
 
-    def Callback(self, result, error = None, traceback = None):
-        logging.debug("Handling task %r callbacks", self)
-        for cb, eb in self.callbacks:
-            try:
-                if error:
-                    error = eb(error)
-                else:
-                    result = cb(result)
-            except:
-                errtype, error, traceback = sys.exc_info()
-        if error:
-            raise error, None, traceback
-        return result
+    def ChainTask(self, task):
+        self.tail.Chain(task.head)
+        self.tail = task.tail
 
-    ##
-    # Consume the callbacks with an error.  Notes that error will
-    # raise if not handled and return value would be a result
-    def Errorback(self, error, traceback = None):
-        return self.Callback(None, error, traceback)
-
+class ThreadedTask(Task):
+    def __init__(self, func, *args, **kwargs):
+        super(ThreadedTask, self).__init__(func, *args, **kwargs)
+        self.head.threaded = True
+    
 class Scheduler(threading.Thread):
 
     class _SchedulerStop(Exception):
@@ -76,8 +93,32 @@
             task = self.tasks.get(blocking)
         except Queue.Empty:
             logging.debug("No task to run")
-            return
-        return task.Callback(None)
+            return None
+        result = None
+        error = None
+        traceback = None
+        while True:
+            cb = task._GetNext()
+            if not cb:
+                # no more callback
+                break
+            if cb.threaded:
+                # a threaded callback
+                self._AddJob(task, cb, result, error, traceback)
+                # don't pass Go, don't reclaim $200
+                return None
+            # Run the callback according to the current state
+            try:
+                if error:
+                    error = cb.errorback(error)
+                else:
+                    result = cb.callback(result)
+            except:
+                errtype, error, traceback = sys.exc_info()
+        if error:
+            raise error, None, traceback
+        else:
+            return result
 
     def Run(self):
         logging.info("Scheduler start")
@@ -98,6 +139,10 @@
         self.pool.Stop(now)
         if now:
             self.tasks = Queue.Queue()
+        # We raise an exception to find if we stop stop the scheduler.
+        # We could have use a None task, but this make it easier if we
+        # want to add such mechanism public or we want to stop on
+        # other exception
         def RaiseSchedulerStop():
             raise self._SchedulerStop
         self.AddTask(Task(RaiseSchedulerStop))
@@ -106,48 +151,33 @@
     def AddTask(self, task, blocking = True):
         self.tasks.put(task, blocking)
 
-    ##
-    # A job is a task run in a seperated thread.  After the job run, a
-    # new Task is add to the scheduler either with a result or an error,
-    # so that only the task, not the callbacks, is run in the worker
-    # thread. Note the passed task is consumed after this call.
-    # A better design would have to allow Task to suspend or
-    # resume themself and to run in the thread if it want to, but this will
-    # required the callback to know about its Task and the scheduler
-    # itself, which solve nothing.
-    def AddJob(self, task, func, *args, **kwargs):
-        def Job():
+    def _AddJob(self, task, cb, result, error, traceback):
+
+        def DoIt(task, cb, result, error, traceback):
             try:
-                result = func(*args, **kwargs)
-                def returnResult():
-                    return result
-                jobTask = Task(returnResult)
+                if error:
+                    error = cb.errorback(error)
+                else:
+                    result = cb.callback(result)
             except:
                 errtype, error, traceback = sys.exc_info()
-                def raiseError():
+            if error:
+                def RaiseError():
                     raise error, None, traceback
-                jobTask = Task(raiseError)
+                jobTask = Task(RaiseError)
+            else:
+                def ReturnResult():
+                    return result
+                jobTask = Task(ReturnResult)
             jobTask.ChainTask(task)
             self.AddTask(jobTask)
+
+        # This double wrap (Job over DoIt) seems necessary to make
+        # error not look like a local of Job...
+        def Job():
+            return DoIt(task, cb, result, error, traceback)
         self.pool.AddJob(Job)
 
-    ##
-    # This basically allow one callback to run in a seperated thread
-    # and then get the result to another deferred callback.  Helas,
-    # this create two Tasks, one for the caller, one for the
-    # callbacks, with only the first one having to be passed to the
-    # scheduler.  This should be more transparent: a callback which
-    # required to be run as a job should simply be mark like it and
-    # the current task suspend until the job is finished, then resume.
-    def CreateCallbackAsJob(self, src, cb):
-        dst = Task()
-        def CreateJobCallback(result):
-            def Job():
-                return cb(result)
-            self.AddJob(dst, Job)
-        src.AddCallback(CreateJobCallback)
-        return dst
-
 # The global scheduler
 scheduler = None
 
@@ -172,6 +202,9 @@
     def AsyncCall(name, seconds):
         global count
         count += 1
+        
+        # Probably a bad example, since the callback
+        # doesn't return the exact same type...
         def Initialize(name, seconds):
             print "Here", name
             return name, seconds
@@ -186,23 +219,17 @@
             print name, "wakes up!"
             count -= 1
 
-        dinit = Task(Initialize, name, seconds)
-        # How can I remove the scheduler from the picture ?
-        # The only way I can see is to have a special kind of Task
-        # and a suspended queue... May be this will also be more clean
-        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
-        dfinal.AddCallback(Finalize)
-        # This is confusing but the reason is that the dfinal callback
-        # will be added to the scheduler by the job itself at the end of
-        # its execution.
-        return dinit
+        task = Task(Initialize, name, seconds)
+        task.AddThreadedCallback(Blocking)
+        task.AddCallback(Finalize)
+        return task
 
     logging.info("Starting scheduler with 10 workers")
     StartScheduler(10)
     logging.info("Adding asynccall task")
     for x in xrange(int(sys.argv[1])):
-        dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
-        scheduler.AddTask(dasync)
+        task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
+        scheduler.AddTask(task)
     while count > 0:
         logging.debug("Count = %d", count)
         sleep(1)