#!/usr/bin/env python2.6import sysimport loggingimport threadingimport Queue### A basic worker pool... should be replaced with a resizable one.class ThreadPool(object): class _Worker(threading.Thread): def __init__(self, uid, pool): threading.Thread.__init__(self, name = "Worker%d" % uid) self.pool = pool def run(self): logging.info("%s started", self.getName()) while True: job = self.pool.jobs.get() if job: logging.debug("%s is running %r", self.getName(), job) try: job() except: logging.exception("Job %r return an exception", job) logging.debug("Job %r completed.", job) else: self.pool.jobs.put(job) break logging.info("%s stopped", self.getName()) def __init__(self, size): super(ThreadPool, self).__init__() self.jobs = Queue.Queue() self._workers = [self._Worker(x+1, self) for x in xrange(size)] def Start(self): logging.info("Starting Workers pool") for worker in self._workers: worker.start() logging.info("Workers started") def Stop(self, now = False): logging.info("Stopping Workers") if now: self.jobs = Queue.Queue() self.jobs.put(None) for worker in self._workers: worker.join() logging.info("Workers stopped") def AddJob(self, job): self.jobs.put(job)### A task, representing a series of linked pairs of callback and error# handler Concept is similar to twisted, but you need to put all your# callback on the task before giving it to the Scheduler. For this# reason, you shouldnt call the callback/errorback method yourself (or# at least, don't put it back in the scheduler after that).class Task(object): @staticmethod def DefaultCallback(result): return result @staticmethod def DefaultErrorback(error): return error def __init__(self, func = None, *args, **kwargs): super(Task, self).__init__() self.callbacks = [] if func: def callback(result): return func(*args, **kwargs) self.AddCallback(callback) def AddCallback(self, callback, errorback = None): if errorback == None: errorback = self.DefaultErrorback self.callbacks.append((callback, errorback)) # permit chained calls return self def ChainTask(self, task): return self.AddCallback(task.Callback, task.Errorback) def Callback(self, result, error = None, traceback = None): logging.debug("Handling task %r callbacks", self) for cb, eb in self.callbacks: try: if error: error = eb(error) else: result = cb(result) except: errtype, error, traceback = sys.exc_info() if error: raise error, None, traceback return result ## # Consume the callbacks with an error. Notes that error will # raise if not handled and return value would be a result def Errorback(self, error, traceback = None): return self.Callback(None, error, traceback)class Scheduler(threading.Thread): class _SchedulerStop(Exception): pass def __init__(self, poolSize): threading.Thread.__init__(self, name = "Scheduler", target = self.Run) self.pool = ThreadPool(poolSize) self.tasks = Queue.Queue() def ExecuteOne(self, blocking = True): logging.debug("Looking for next task...") try: task = self.tasks.get(blocking) except Queue.Empty: logging.debug("No task to run") return return task.Callback(None) def Run(self): logging.info("Scheduler start") while True: try: self.ExecuteOne() except self._SchedulerStop: break except: logging.exception("Unhandled task exception") logging.info("Scheduler stop") def Start(self): self.pool.Start() return self.start() def Stop(self, now = False): self.pool.Stop(now) if now: self.tasks = Queue.Queue() def RaiseSchedulerStop(): raise self._SchedulerStop self.AddTask(Task(RaiseSchedulerStop)) self.join() def AddTask(self, task, blocking = True): self.tasks.put(task, blocking) ## # A job is a task run in a seperated thread. After the job run, a # new Task is add to the scheduler either with a result or an error, # so that only the task, not the callbacks, is run in the worker # thread. Note the passed task is consumed after this call. # A better design would have to allow Task to suspend or # resume themself and to run in the thread if it want to, but this will # required the callback to know about its Task and the scheduler # itself, which solve nothing. def AddJob(self, task, func, *args, **kwargs): def Job(): try: result = func(*args, **kwargs) def returnResult(): return result jobTask = Task(returnResult) except: errtype, error, traceback = sys.exc_info() def raiseError(): raise error, None, traceback jobTask = Task(raiseError) jobTask.ChainTask(task) self.AddTask(jobTask) self.pool.AddJob(Job) ## # This basically allow one callback to run in a seperated thread # and then get the result to another deferred callback. Helas, # this create two Tasks, one for the caller, one for the # callbacks, with only the first one having to be passed to the # scheduler. This should be more transparent: a callback which # required to be run as a job should simply be mark like it and # the current task suspend until the job is finished, then resume. def CreateCallbackAsJob(self, src, cb): dst = Task() def CreateJobCallback(result): def Job(): cb(result) self.AddJob(dst, Job) src.AddCallback(CreateJobCallback) return dst# The global schedulerscheduler = Nonedef StartScheduler(size): global scheduler if scheduler: StopScheduler() scheduler = Scheduler(size) scheduler.Start()def StopScheduler(now = False): global scheduler if scheduler: scheduler.Stop(now) scheduler = Noneif __name__ == '__main__': from time import sleep logging.getLogger().setLevel(logging.DEBUG) # This function is a sample and shouldn't know about the scheduler count = 0 def AsyncCall(): global count count += 1 def Initialize(name, sleep): print "Here", name return name, sleep def Blocking(args): name, time = args print name, "goes to bed" sleep(time) print name, ": ZZZ..." return name def Finalize(name): global count print name, "wakes up!" count -= 1 dinit = Task(Initialize, "Toto", 5) # How can I remove the scheduler from the picture ? # The only way I can see is to have a special kind of Task # and a suspended queue... May be this will also be more clean dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) dfinal.AddCallback(Finalize) # This is confusing but the reason is that the dfinal callback # will be added to the scheduler by the job itself at the end of # its execution. return dinit logging.info("Starting scheduler with 10 workers") StartScheduler(10) dasync = AsyncCall() logging.info("Adding asynccall task") scheduler.AddTask(dasync) while count > 0: logging.debug("Count = %d", count) sleep(1) logging.info("Stopping scheduler") StopScheduler() logging.info("The End.")