diff -r 8ae7370093db -r ee10f7cde07d scheduler.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scheduler.py Sun Mar 21 16:50:13 2010 -0400 @@ -0,0 +1,256 @@ +#!/usr/bin/env python2.6 +import sys +import logging +import threading +import Queue + +## +# A basic worker pool... should be replaced with a resizable one. +class ThreadPool(object): + + class _Worker(threading.Thread): + def __init__(self, uid, pool): + threading.Thread.__init__(self, name = "Worker%d" % uid) + self.pool = pool + def run(self): + logging.info("%s started", self.getName()) + while True: + job = self.pool.jobs.get() + if job: + logging.debug("%s is running %r", self.getName(), job) + try: + job() + except: + logging.exception("Job %r return an exception", job) + logging.debug("Job %r completed.", job) + else: + self.pool.jobs.put(job) + break + logging.info("%s stopped", self.getName()) + + def __init__(self, size): + super(ThreadPool, self).__init__() + self.jobs = Queue.Queue() + self._workers = [self._Worker(x+1, self) for x in xrange(size)] + + def Start(self): + logging.info("Starting Workers pool") + for worker in self._workers: + worker.start() + logging.info("Workers started") + + def Stop(self, now = False): + logging.info("Stopping Workers") + if now: + self.jobs = Queue.Queue() + self.jobs.put(None) + for worker in self._workers: + worker.join() + logging.info("Workers stopped") + + def AddJob(self, job): + self.jobs.put(job) + +## +# A task, representing a series of linked pairs of callback and error +# handler Concept is similar to twisted, but you need to put all your +# callback on the task before giving it to the Scheduler. For this +# reason, you shouldnt call the callback/errorback method yourself (or +# at least, don't put it back in the scheduler after that). + +class Task(object): + + @staticmethod + def DefaultCallback(result): + return result + + @staticmethod + def DefaultErrorback(error): + return error + + def __init__(self, func = None, *args, **kwargs): + super(Task, self).__init__() + self.callbacks = [] + if func: + def callback(result): + return func(*args, **kwargs) + self.AddCallback(callback) + + def AddCallback(self, callback, errorback = None): + if errorback == None: + errorback = self.DefaultErrorback + self.callbacks.append((callback, errorback)) + # permit chained calls + return self + + def ChainTask(self, task): + return self.AddCallback(task.Callback, task.Errorback) + + def Callback(self, result, error = None, traceback = None): + logging.debug("Handling task %r callbacks", self) + for cb, eb in self.callbacks: + try: + if error: + error = eb(error) + else: + result = cb(result) + except: + errtype, error, traceback = sys.exc_info() + if error: + raise error, None, traceback + return result + + ## + # Consume the callbacks with an error. Notes that error will + # raise if not handled and return value would be a result + def Errorback(self, error, traceback = None): + return self.Callback(None, error, traceback) + +class Scheduler(threading.Thread): + + class _SchedulerStop(Exception): + pass + + def __init__(self, poolSize): + threading.Thread.__init__(self, name = "Scheduler", target = self.Run) + self.pool = ThreadPool(poolSize) + self.tasks = Queue.Queue() + + def ExecuteOne(self, blocking = True): + logging.debug("Looking for next task...") + try: + task = self.tasks.get(blocking) + except Queue.Empty: + logging.debug("No task to run") + return + return task.Callback(None) + + def Run(self): + logging.info("Scheduler start") + while True: + try: + self.ExecuteOne() + except self._SchedulerStop: + break + except: + logging.exception("Unhandled task exception") + logging.info("Scheduler stop") + + def Start(self): + self.pool.Start() + return self.start() + + def Stop(self, now = False): + self.pool.Stop(now) + if now: + self.tasks = Queue.Queue() + def RaiseSchedulerStop(): + raise self._SchedulerStop + self.AddTask(Task(RaiseSchedulerStop)) + self.join() + + def AddTask(self, task, blocking = True): + self.tasks.put(task, blocking) + + ## + # A job is a task run in a seperated thread. After the job run, a + # new Task is add to the scheduler either with a result or an error, + # so that only the task, not the callbacks, is run in the worker + # thread. Note the passed task is consumed after this call. + # A better design would have to allow Task to suspend or + # resume themself and to run in the thread if it want to, but this will + # required the callback to know about its Task and the scheduler + # itself, which solve nothing. + def AddJob(self, task, func, *args, **kwargs): + def Job(): + try: + result = func(*args, **kwargs) + def returnResult(): + return result + jobTask = Task(returnResult) + except: + errtype, error, traceback = sys.exc_info() + def raiseError(): + raise error, None, traceback + jobTask = Task(raiseError) + jobTask.ChainTask(task) + self.AddTask(jobTask) + self.pool.AddJob(Job) + + ## + # This basically allow one callback to run in a seperated thread + # and then get the result to another deferred callback. Helas, + # this create two Tasks, one for the caller, one for the + # callbacks, with only the first one having to be passed to the + # scheduler. This should be more transparent: a callback which + # required to be run as a job should simply be mark like it and + # the current task suspend until the job is finished, then resume. + def CreateCallbackAsJob(self, src, cb): + dst = Task() + def CreateJobCallback(result): + def Job(): + cb(result) + self.AddJob(dst, Job) + src.AddCallback(CreateJobCallback) + return dst + +# The global scheduler +scheduler = None + +def StartScheduler(size): + global scheduler + if scheduler: + StopScheduler() + scheduler = Scheduler(size) + scheduler.Start() + +def StopScheduler(now = False): + global scheduler + if scheduler: + scheduler.Stop(now) + scheduler = None + +if __name__ == '__main__': + from time import sleep + logging.getLogger().setLevel(logging.DEBUG) + # This function is a sample and shouldn't know about the scheduler + count = 0 + def AsyncCall(): + global count + count += 1 + def Initialize(name, sleep): + print "Here", name + return name, sleep + def Blocking(args): + name, time = args + print name, "goes to bed" + sleep(time) + print name, ": ZZZ..." + return name + def Finalize(name): + global count + print name, "wakes up!" + count -= 1 + + dinit = Task(Initialize, "Toto", 5) + # How can I remove the scheduler from the picture ? + # The only way I can see is to have a special kind of Task + # and a suspended queue... May be this will also be more clean + dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) + dfinal.AddCallback(Finalize) + # This is confusing but the reason is that the dfinal callback + # will be added to the scheduler by the job itself at the end of + # its execution. + return dinit + + logging.info("Starting scheduler with 10 workers") + StartScheduler(10) + dasync = AsyncCall() + logging.info("Adding asynccall task") + scheduler.AddTask(dasync) + while count > 0: + logging.debug("Count = %d", count) + sleep(1) + logging.info("Stopping scheduler") + StopScheduler() + logging.info("The End.")