--- a/scheduler.py Sun Mar 21 17:06:53 2010 -0400
+++ b/scheduler.py Sun Mar 21 20:56:44 2010 -0400
@@ -12,6 +12,20 @@
# reason, you shouldnt call the callback/errorback method yourself (or
# at least, don't put it back in the scheduler after that).
+class _Callback(object):
+ def __init__(self, callback, errorback, threaded = False):
+ self.callback = callback
+ self.errorback = errorback
+ self.next = None
+ self.threaded = threaded
+ def Chain(self, next):
+ # Can only be called once
+ assert(self.next is None)
+ self.next = next
+ return next
+ def Next(self):
+ return self.next
+
class Task(object):
@staticmethod
@@ -24,42 +38,45 @@
def __init__(self, func = None, *args, **kwargs):
super(Task, self).__init__()
- self.callbacks = []
+ self.head = None
+ self.tail = None
if func:
def callback(result):
return func(*args, **kwargs)
self.AddCallback(callback)
- def AddCallback(self, callback, errorback = None):
+ def _AddCallback(self, callback):
+ if self.head is None:
+ self.head = self.tail = callback
+ else:
+ self.tail = self.tail.Chain(callback)
+
+ def _GetNext(self):
+ head = self.head
+ if head:
+ self.head = head.Next()
+ return head
+
+ def AddCallback(self, callback, errorback = None, threaded = False):
if errorback == None:
errorback = self.DefaultErrorback
- self.callbacks.append((callback, errorback))
+ cb = _Callback(callback, errorback, threaded)
+ self._AddCallback(cb)
# permit chained calls
return self
- def ChainTask(self, task):
- return self.AddCallback(task.Callback, task.Errorback)
+ def AddThreadedCallback(self, callback, errorback = None):
+ return self.AddCallback(callback, errorback, True)
- def Callback(self, result, error = None, traceback = None):
- logging.debug("Handling task %r callbacks", self)
- for cb, eb in self.callbacks:
- try:
- if error:
- error = eb(error)
- else:
- result = cb(result)
- except:
- errtype, error, traceback = sys.exc_info()
- if error:
- raise error, None, traceback
- return result
+ def ChainTask(self, task):
+ self.tail.Chain(task.head)
+ self.tail = task.tail
- ##
- # Consume the callbacks with an error. Notes that error will
- # raise if not handled and return value would be a result
- def Errorback(self, error, traceback = None):
- return self.Callback(None, error, traceback)
-
+class ThreadedTask(Task):
+ def __init__(self, func, *args, **kwargs):
+ super(ThreadedTask, self).__init__(func, *args, **kwargs)
+ self.head.threaded = True
+
class Scheduler(threading.Thread):
class _SchedulerStop(Exception):
@@ -76,8 +93,32 @@
task = self.tasks.get(blocking)
except Queue.Empty:
logging.debug("No task to run")
- return
- return task.Callback(None)
+ return None
+ result = None
+ error = None
+ traceback = None
+ while True:
+ cb = task._GetNext()
+ if not cb:
+ # no more callback
+ break
+ if cb.threaded:
+ # a threaded callback
+ self._AddJob(task, cb, result, error, traceback)
+ # don't pass Go, don't reclaim $200
+ return None
+ # Run the callback according to the current state
+ try:
+ if error:
+ error = cb.errorback(error)
+ else:
+ result = cb.callback(result)
+ except:
+ errtype, error, traceback = sys.exc_info()
+ if error:
+ raise error, None, traceback
+ else:
+ return result
def Run(self):
logging.info("Scheduler start")
@@ -98,6 +139,10 @@
self.pool.Stop(now)
if now:
self.tasks = Queue.Queue()
+ # We raise an exception to find if we stop stop the scheduler.
+ # We could have use a None task, but this make it easier if we
+ # want to add such mechanism public or we want to stop on
+ # other exception
def RaiseSchedulerStop():
raise self._SchedulerStop
self.AddTask(Task(RaiseSchedulerStop))
@@ -106,48 +151,33 @@
def AddTask(self, task, blocking = True):
self.tasks.put(task, blocking)
- ##
- # A job is a task run in a seperated thread. After the job run, a
- # new Task is add to the scheduler either with a result or an error,
- # so that only the task, not the callbacks, is run in the worker
- # thread. Note the passed task is consumed after this call.
- # A better design would have to allow Task to suspend or
- # resume themself and to run in the thread if it want to, but this will
- # required the callback to know about its Task and the scheduler
- # itself, which solve nothing.
- def AddJob(self, task, func, *args, **kwargs):
- def Job():
+ def _AddJob(self, task, cb, result, error, traceback):
+
+ def DoIt(task, cb, result, error, traceback):
try:
- result = func(*args, **kwargs)
- def returnResult():
- return result
- jobTask = Task(returnResult)
+ if error:
+ error = cb.errorback(error)
+ else:
+ result = cb.callback(result)
except:
errtype, error, traceback = sys.exc_info()
- def raiseError():
+ if error:
+ def RaiseError():
raise error, None, traceback
- jobTask = Task(raiseError)
+ jobTask = Task(RaiseError)
+ else:
+ def ReturnResult():
+ return result
+ jobTask = Task(ReturnResult)
jobTask.ChainTask(task)
self.AddTask(jobTask)
+
+ # This double wrap (Job over DoIt) seems necessary to make
+ # error not look like a local of Job...
+ def Job():
+ return DoIt(task, cb, result, error, traceback)
self.pool.AddJob(Job)
- ##
- # This basically allow one callback to run in a seperated thread
- # and then get the result to another deferred callback. Helas,
- # this create two Tasks, one for the caller, one for the
- # callbacks, with only the first one having to be passed to the
- # scheduler. This should be more transparent: a callback which
- # required to be run as a job should simply be mark like it and
- # the current task suspend until the job is finished, then resume.
- def CreateCallbackAsJob(self, src, cb):
- dst = Task()
- def CreateJobCallback(result):
- def Job():
- return cb(result)
- self.AddJob(dst, Job)
- src.AddCallback(CreateJobCallback)
- return dst
-
# The global scheduler
scheduler = None
@@ -172,6 +202,9 @@
def AsyncCall(name, seconds):
global count
count += 1
+
+ # Probably a bad example, since the callback
+ # doesn't return the exact same type...
def Initialize(name, seconds):
print "Here", name
return name, seconds
@@ -186,23 +219,17 @@
print name, "wakes up!"
count -= 1
- dinit = Task(Initialize, name, seconds)
- # How can I remove the scheduler from the picture ?
- # The only way I can see is to have a special kind of Task
- # and a suspended queue... May be this will also be more clean
- dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
- dfinal.AddCallback(Finalize)
- # This is confusing but the reason is that the dfinal callback
- # will be added to the scheduler by the job itself at the end of
- # its execution.
- return dinit
+ task = Task(Initialize, name, seconds)
+ task.AddThreadedCallback(Blocking)
+ task.AddCallback(Finalize)
+ return task
logging.info("Starting scheduler with 10 workers")
StartScheduler(10)
logging.info("Adding asynccall task")
for x in xrange(int(sys.argv[1])):
- dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
- scheduler.AddTask(dasync)
+ task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
+ scheduler.AddTask(task)
while count > 0:
logging.debug("Count = %d", count)
sleep(1)