# HG changeset patch # User Fabien Ninoles # Date 1269204613 14400 # Node ID ee10f7cde07d942aa2cb9eb2582bf375a95d613a # Parent 8ae7370093dbfe76470d1646d2525f47a766bdb8 Rename test to scheduler. diff -r 8ae7370093db -r ee10f7cde07d scheduler.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scheduler.py Sun Mar 21 16:50:13 2010 -0400 @@ -0,0 +1,256 @@ +#!/usr/bin/env python2.6 +import sys +import logging +import threading +import Queue + +## +# A basic worker pool... should be replaced with a resizable one. +class ThreadPool(object): + + class _Worker(threading.Thread): + def __init__(self, uid, pool): + threading.Thread.__init__(self, name = "Worker%d" % uid) + self.pool = pool + def run(self): + logging.info("%s started", self.getName()) + while True: + job = self.pool.jobs.get() + if job: + logging.debug("%s is running %r", self.getName(), job) + try: + job() + except: + logging.exception("Job %r return an exception", job) + logging.debug("Job %r completed.", job) + else: + self.pool.jobs.put(job) + break + logging.info("%s stopped", self.getName()) + + def __init__(self, size): + super(ThreadPool, self).__init__() + self.jobs = Queue.Queue() + self._workers = [self._Worker(x+1, self) for x in xrange(size)] + + def Start(self): + logging.info("Starting Workers pool") + for worker in self._workers: + worker.start() + logging.info("Workers started") + + def Stop(self, now = False): + logging.info("Stopping Workers") + if now: + self.jobs = Queue.Queue() + self.jobs.put(None) + for worker in self._workers: + worker.join() + logging.info("Workers stopped") + + def AddJob(self, job): + self.jobs.put(job) + +## +# A task, representing a series of linked pairs of callback and error +# handler Concept is similar to twisted, but you need to put all your +# callback on the task before giving it to the Scheduler. For this +# reason, you shouldnt call the callback/errorback method yourself (or +# at least, don't put it back in the scheduler after that). + +class Task(object): + + @staticmethod + def DefaultCallback(result): + return result + + @staticmethod + def DefaultErrorback(error): + return error + + def __init__(self, func = None, *args, **kwargs): + super(Task, self).__init__() + self.callbacks = [] + if func: + def callback(result): + return func(*args, **kwargs) + self.AddCallback(callback) + + def AddCallback(self, callback, errorback = None): + if errorback == None: + errorback = self.DefaultErrorback + self.callbacks.append((callback, errorback)) + # permit chained calls + return self + + def ChainTask(self, task): + return self.AddCallback(task.Callback, task.Errorback) + + def Callback(self, result, error = None, traceback = None): + logging.debug("Handling task %r callbacks", self) + for cb, eb in self.callbacks: + try: + if error: + error = eb(error) + else: + result = cb(result) + except: + errtype, error, traceback = sys.exc_info() + if error: + raise error, None, traceback + return result + + ## + # Consume the callbacks with an error. Notes that error will + # raise if not handled and return value would be a result + def Errorback(self, error, traceback = None): + return self.Callback(None, error, traceback) + +class Scheduler(threading.Thread): + + class _SchedulerStop(Exception): + pass + + def __init__(self, poolSize): + threading.Thread.__init__(self, name = "Scheduler", target = self.Run) + self.pool = ThreadPool(poolSize) + self.tasks = Queue.Queue() + + def ExecuteOne(self, blocking = True): + logging.debug("Looking for next task...") + try: + task = self.tasks.get(blocking) + except Queue.Empty: + logging.debug("No task to run") + return + return task.Callback(None) + + def Run(self): + logging.info("Scheduler start") + while True: + try: + self.ExecuteOne() + except self._SchedulerStop: + break + except: + logging.exception("Unhandled task exception") + logging.info("Scheduler stop") + + def Start(self): + self.pool.Start() + return self.start() + + def Stop(self, now = False): + self.pool.Stop(now) + if now: + self.tasks = Queue.Queue() + def RaiseSchedulerStop(): + raise self._SchedulerStop + self.AddTask(Task(RaiseSchedulerStop)) + self.join() + + def AddTask(self, task, blocking = True): + self.tasks.put(task, blocking) + + ## + # A job is a task run in a seperated thread. After the job run, a + # new Task is add to the scheduler either with a result or an error, + # so that only the task, not the callbacks, is run in the worker + # thread. Note the passed task is consumed after this call. + # A better design would have to allow Task to suspend or + # resume themself and to run in the thread if it want to, but this will + # required the callback to know about its Task and the scheduler + # itself, which solve nothing. + def AddJob(self, task, func, *args, **kwargs): + def Job(): + try: + result = func(*args, **kwargs) + def returnResult(): + return result + jobTask = Task(returnResult) + except: + errtype, error, traceback = sys.exc_info() + def raiseError(): + raise error, None, traceback + jobTask = Task(raiseError) + jobTask.ChainTask(task) + self.AddTask(jobTask) + self.pool.AddJob(Job) + + ## + # This basically allow one callback to run in a seperated thread + # and then get the result to another deferred callback. Helas, + # this create two Tasks, one for the caller, one for the + # callbacks, with only the first one having to be passed to the + # scheduler. This should be more transparent: a callback which + # required to be run as a job should simply be mark like it and + # the current task suspend until the job is finished, then resume. + def CreateCallbackAsJob(self, src, cb): + dst = Task() + def CreateJobCallback(result): + def Job(): + cb(result) + self.AddJob(dst, Job) + src.AddCallback(CreateJobCallback) + return dst + +# The global scheduler +scheduler = None + +def StartScheduler(size): + global scheduler + if scheduler: + StopScheduler() + scheduler = Scheduler(size) + scheduler.Start() + +def StopScheduler(now = False): + global scheduler + if scheduler: + scheduler.Stop(now) + scheduler = None + +if __name__ == '__main__': + from time import sleep + logging.getLogger().setLevel(logging.DEBUG) + # This function is a sample and shouldn't know about the scheduler + count = 0 + def AsyncCall(): + global count + count += 1 + def Initialize(name, sleep): + print "Here", name + return name, sleep + def Blocking(args): + name, time = args + print name, "goes to bed" + sleep(time) + print name, ": ZZZ..." + return name + def Finalize(name): + global count + print name, "wakes up!" + count -= 1 + + dinit = Task(Initialize, "Toto", 5) + # How can I remove the scheduler from the picture ? + # The only way I can see is to have a special kind of Task + # and a suspended queue... May be this will also be more clean + dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) + dfinal.AddCallback(Finalize) + # This is confusing but the reason is that the dfinal callback + # will be added to the scheduler by the job itself at the end of + # its execution. + return dinit + + logging.info("Starting scheduler with 10 workers") + StartScheduler(10) + dasync = AsyncCall() + logging.info("Adding asynccall task") + scheduler.AddTask(dasync) + while count > 0: + logging.debug("Count = %d", count) + sleep(1) + logging.info("Stopping scheduler") + StopScheduler() + logging.info("The End.") diff -r 8ae7370093db -r ee10f7cde07d test.py --- a/test.py Sun Mar 21 16:40:06 2010 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,256 +0,0 @@ -#!/usr/bin/env python2.6 -import sys -import logging -import threading -import Queue - -## -# A basic worker pool... should be replaced with a resizable one. -class ThreadPool(object): - - class _Worker(threading.Thread): - def __init__(self, uid, pool): - threading.Thread.__init__(self, name = "Worker%d" % uid) - self.pool = pool - def run(self): - logging.info("%s started", self.getName()) - while True: - job = self.pool.jobs.get() - if job: - logging.debug("%s is running %r", self.getName(), job) - try: - job() - except: - logging.exception("Job %r return an exception", job) - logging.debug("Job %r completed.", job) - else: - self.pool.jobs.put(job) - break - logging.info("%s stopped", self.getName()) - - def __init__(self, size): - super(ThreadPool, self).__init__() - self.jobs = Queue.Queue() - self._workers = [self._Worker(x+1, self) for x in xrange(size)] - - def Start(self): - logging.info("Starting Workers pool") - for worker in self._workers: - worker.start() - logging.info("Workers started") - - def Stop(self, now = False): - logging.info("Stopping Workers") - if now: - self.jobs = Queue.Queue() - self.jobs.put(None) - for worker in self._workers: - worker.join() - logging.info("Workers stopped") - - def AddJob(self, job): - self.jobs.put(job) - -## -# A task, representing a series of linked pairs of callback and error -# handler Concept is similar to twisted, but you need to put all your -# callback on the task before giving it to the Scheduler. For this -# reason, you shouldnt call the callback/errorback method yourself (or -# at least, don't put it back in the scheduler after that). - -class Task(object): - - @staticmethod - def DefaultCallback(result): - return result - - @staticmethod - def DefaultErrorback(error): - return error - - def __init__(self, func = None, *args, **kwargs): - super(Task, self).__init__() - self.callbacks = [] - if func: - def callback(result): - return func(*args, **kwargs) - self.AddCallback(callback) - - def AddCallback(self, callback, errorback = None): - if errorback == None: - errorback = self.DefaultErrorback - self.callbacks.append((callback, errorback)) - # permit chained calls - return self - - def ChainTask(self, task): - return self.AddCallback(task.Callback, task.Errorback) - - def Callback(self, result, error = None, traceback = None): - logging.debug("Handling task %r callbacks", self) - for cb, eb in self.callbacks: - try: - if error: - error = eb(error) - else: - result = cb(result) - except: - errtype, error, traceback = sys.exc_info() - if error: - raise error, None, traceback - return result - - ## - # Consume the callbacks with an error. Notes that error will - # raise if not handled and return value would be a result - def Errorback(self, error, traceback = None): - return self.Callback(None, error, traceback) - -class Scheduler(threading.Thread): - - class _SchedulerStop(Exception): - pass - - def __init__(self, poolSize): - threading.Thread.__init__(self, name = "Scheduler", target = self.Run) - self.pool = ThreadPool(poolSize) - self.tasks = Queue.Queue() - - def ExecuteOne(self, blocking = True): - logging.debug("Looking for next task...") - try: - task = self.tasks.get(blocking) - except Queue.Empty: - logging.debug("No task to run") - return - return task.Callback(None) - - def Run(self): - logging.info("Scheduler start") - while True: - try: - self.ExecuteOne() - except self._SchedulerStop: - break - except: - logging.exception("Unhandled task exception") - logging.info("Scheduler stop") - - def Start(self): - self.pool.Start() - return self.start() - - def Stop(self, now = False): - self.pool.Stop(now) - if now: - self.tasks = Queue.Queue() - def RaiseSchedulerStop(): - raise self._SchedulerStop - self.AddTask(Task(RaiseSchedulerStop)) - self.join() - - def AddTask(self, task, blocking = True): - self.tasks.put(task, blocking) - - ## - # A job is a task run in a seperated thread. After the job run, a - # new Task is add to the scheduler either with a result or an error, - # so that only the task, not the callbacks, is run in the worker - # thread. Note the passed task is consumed after this call. - # A better design would have to allow Task to suspend or - # resume themself and to run in the thread if it want to, but this will - # required the callback to know about its Task and the scheduler - # itself, which solve nothing. - def AddJob(self, task, func, *args, **kwargs): - def Job(): - try: - result = func(*args, **kwargs) - def returnResult(): - return result - jobTask = Task(returnResult) - except: - errtype, error, traceback = sys.exc_info() - def raiseError(): - raise error, None, traceback - jobTask = Task(raiseError) - jobTask.ChainTask(task) - self.AddTask(jobTask) - self.pool.AddJob(Job) - - ## - # This basically allow one callback to run in a seperated thread - # and then get the result to another deferred callback. Helas, - # this create two Tasks, one for the caller, one for the - # callbacks, with only the first one having to be passed to the - # scheduler. This should be more transparent: a callback which - # required to be run as a job should simply be mark like it and - # the current task suspend until the job is finished, then resume. - def CreateCallbackAsJob(self, src, cb): - dst = Task() - def CreateJobCallback(result): - def Job(): - cb(result) - self.AddJob(dst, Job) - src.AddCallback(CreateJobCallback) - return dst - -# The global scheduler -scheduler = None - -def StartScheduler(size): - global scheduler - if scheduler: - StopScheduler() - scheduler = Scheduler(size) - scheduler.Start() - -def StopScheduler(now = False): - global scheduler - if scheduler: - scheduler.Stop(now) - scheduler = None - -if __name__ == '__main__': - from time import sleep - logging.getLogger().setLevel(logging.DEBUG) - # This function is a sample and shouldn't know about the scheduler - count = 0 - def AsyncCall(): - global count - count += 1 - def Initialize(name, sleep): - print "Here", name - return name, sleep - def Blocking(args): - name, time = args - print name, "goes to bed" - sleep(time) - print name, ": ZZZ..." - return name - def Finalize(name): - global count - print name, "wakes up!" - count -= 1 - - dinit = Task(Initialize, "Toto", 5) - # How can I remove the scheduler from the picture ? - # The only way I can see is to have a special kind of Task - # and a suspended queue... May be this will also be more clean - dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) - dfinal.AddCallback(Finalize) - # This is confusing but the reason is that the dfinal callback - # will be added to the scheduler by the job itself at the end of - # its execution. - return dinit - - logging.info("Starting scheduler with 10 workers") - StartScheduler(10) - dasync = AsyncCall() - logging.info("Adding asynccall task") - scheduler.AddTask(dasync) - while count > 0: - logging.debug("Count = %d", count) - sleep(1) - logging.info("Stopping scheduler") - StopScheduler() - logging.info("The End.")