Rename test to scheduler.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/scheduler.py Sun Mar 21 16:50:13 2010 -0400
@@ -0,0 +1,256 @@
+#!/usr/bin/env python2.6
+import sys
+import logging
+import threading
+import Queue
+
+##
+# A basic worker pool... should be replaced with a resizable one.
+class ThreadPool(object):
+
+ class _Worker(threading.Thread):
+ def __init__(self, uid, pool):
+ threading.Thread.__init__(self, name = "Worker%d" % uid)
+ self.pool = pool
+ def run(self):
+ logging.info("%s started", self.getName())
+ while True:
+ job = self.pool.jobs.get()
+ if job:
+ logging.debug("%s is running %r", self.getName(), job)
+ try:
+ job()
+ except:
+ logging.exception("Job %r return an exception", job)
+ logging.debug("Job %r completed.", job)
+ else:
+ self.pool.jobs.put(job)
+ break
+ logging.info("%s stopped", self.getName())
+
+ def __init__(self, size):
+ super(ThreadPool, self).__init__()
+ self.jobs = Queue.Queue()
+ self._workers = [self._Worker(x+1, self) for x in xrange(size)]
+
+ def Start(self):
+ logging.info("Starting Workers pool")
+ for worker in self._workers:
+ worker.start()
+ logging.info("Workers started")
+
+ def Stop(self, now = False):
+ logging.info("Stopping Workers")
+ if now:
+ self.jobs = Queue.Queue()
+ self.jobs.put(None)
+ for worker in self._workers:
+ worker.join()
+ logging.info("Workers stopped")
+
+ def AddJob(self, job):
+ self.jobs.put(job)
+
+##
+# A task, representing a series of linked pairs of callback and error
+# handler Concept is similar to twisted, but you need to put all your
+# callback on the task before giving it to the Scheduler. For this
+# reason, you shouldnt call the callback/errorback method yourself (or
+# at least, don't put it back in the scheduler after that).
+
+class Task(object):
+
+ @staticmethod
+ def DefaultCallback(result):
+ return result
+
+ @staticmethod
+ def DefaultErrorback(error):
+ return error
+
+ def __init__(self, func = None, *args, **kwargs):
+ super(Task, self).__init__()
+ self.callbacks = []
+ if func:
+ def callback(result):
+ return func(*args, **kwargs)
+ self.AddCallback(callback)
+
+ def AddCallback(self, callback, errorback = None):
+ if errorback == None:
+ errorback = self.DefaultErrorback
+ self.callbacks.append((callback, errorback))
+ # permit chained calls
+ return self
+
+ def ChainTask(self, task):
+ return self.AddCallback(task.Callback, task.Errorback)
+
+ def Callback(self, result, error = None, traceback = None):
+ logging.debug("Handling task %r callbacks", self)
+ for cb, eb in self.callbacks:
+ try:
+ if error:
+ error = eb(error)
+ else:
+ result = cb(result)
+ except:
+ errtype, error, traceback = sys.exc_info()
+ if error:
+ raise error, None, traceback
+ return result
+
+ ##
+ # Consume the callbacks with an error. Notes that error will
+ # raise if not handled and return value would be a result
+ def Errorback(self, error, traceback = None):
+ return self.Callback(None, error, traceback)
+
+class Scheduler(threading.Thread):
+
+ class _SchedulerStop(Exception):
+ pass
+
+ def __init__(self, poolSize):
+ threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
+ self.pool = ThreadPool(poolSize)
+ self.tasks = Queue.Queue()
+
+ def ExecuteOne(self, blocking = True):
+ logging.debug("Looking for next task...")
+ try:
+ task = self.tasks.get(blocking)
+ except Queue.Empty:
+ logging.debug("No task to run")
+ return
+ return task.Callback(None)
+
+ def Run(self):
+ logging.info("Scheduler start")
+ while True:
+ try:
+ self.ExecuteOne()
+ except self._SchedulerStop:
+ break
+ except:
+ logging.exception("Unhandled task exception")
+ logging.info("Scheduler stop")
+
+ def Start(self):
+ self.pool.Start()
+ return self.start()
+
+ def Stop(self, now = False):
+ self.pool.Stop(now)
+ if now:
+ self.tasks = Queue.Queue()
+ def RaiseSchedulerStop():
+ raise self._SchedulerStop
+ self.AddTask(Task(RaiseSchedulerStop))
+ self.join()
+
+ def AddTask(self, task, blocking = True):
+ self.tasks.put(task, blocking)
+
+ ##
+ # A job is a task run in a seperated thread. After the job run, a
+ # new Task is add to the scheduler either with a result or an error,
+ # so that only the task, not the callbacks, is run in the worker
+ # thread. Note the passed task is consumed after this call.
+ # A better design would have to allow Task to suspend or
+ # resume themself and to run in the thread if it want to, but this will
+ # required the callback to know about its Task and the scheduler
+ # itself, which solve nothing.
+ def AddJob(self, task, func, *args, **kwargs):
+ def Job():
+ try:
+ result = func(*args, **kwargs)
+ def returnResult():
+ return result
+ jobTask = Task(returnResult)
+ except:
+ errtype, error, traceback = sys.exc_info()
+ def raiseError():
+ raise error, None, traceback
+ jobTask = Task(raiseError)
+ jobTask.ChainTask(task)
+ self.AddTask(jobTask)
+ self.pool.AddJob(Job)
+
+ ##
+ # This basically allow one callback to run in a seperated thread
+ # and then get the result to another deferred callback. Helas,
+ # this create two Tasks, one for the caller, one for the
+ # callbacks, with only the first one having to be passed to the
+ # scheduler. This should be more transparent: a callback which
+ # required to be run as a job should simply be mark like it and
+ # the current task suspend until the job is finished, then resume.
+ def CreateCallbackAsJob(self, src, cb):
+ dst = Task()
+ def CreateJobCallback(result):
+ def Job():
+ cb(result)
+ self.AddJob(dst, Job)
+ src.AddCallback(CreateJobCallback)
+ return dst
+
+# The global scheduler
+scheduler = None
+
+def StartScheduler(size):
+ global scheduler
+ if scheduler:
+ StopScheduler()
+ scheduler = Scheduler(size)
+ scheduler.Start()
+
+def StopScheduler(now = False):
+ global scheduler
+ if scheduler:
+ scheduler.Stop(now)
+ scheduler = None
+
+if __name__ == '__main__':
+ from time import sleep
+ logging.getLogger().setLevel(logging.DEBUG)
+ # This function is a sample and shouldn't know about the scheduler
+ count = 0
+ def AsyncCall():
+ global count
+ count += 1
+ def Initialize(name, sleep):
+ print "Here", name
+ return name, sleep
+ def Blocking(args):
+ name, time = args
+ print name, "goes to bed"
+ sleep(time)
+ print name, ": ZZZ..."
+ return name
+ def Finalize(name):
+ global count
+ print name, "wakes up!"
+ count -= 1
+
+ dinit = Task(Initialize, "Toto", 5)
+ # How can I remove the scheduler from the picture ?
+ # The only way I can see is to have a special kind of Task
+ # and a suspended queue... May be this will also be more clean
+ dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
+ dfinal.AddCallback(Finalize)
+ # This is confusing but the reason is that the dfinal callback
+ # will be added to the scheduler by the job itself at the end of
+ # its execution.
+ return dinit
+
+ logging.info("Starting scheduler with 10 workers")
+ StartScheduler(10)
+ dasync = AsyncCall()
+ logging.info("Adding asynccall task")
+ scheduler.AddTask(dasync)
+ while count > 0:
+ logging.debug("Count = %d", count)
+ sleep(1)
+ logging.info("Stopping scheduler")
+ StopScheduler()
+ logging.info("The End.")
--- a/test.py Sun Mar 21 16:40:06 2010 -0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,256 +0,0 @@
-#!/usr/bin/env python2.6
-import sys
-import logging
-import threading
-import Queue
-
-##
-# A basic worker pool... should be replaced with a resizable one.
-class ThreadPool(object):
-
- class _Worker(threading.Thread):
- def __init__(self, uid, pool):
- threading.Thread.__init__(self, name = "Worker%d" % uid)
- self.pool = pool
- def run(self):
- logging.info("%s started", self.getName())
- while True:
- job = self.pool.jobs.get()
- if job:
- logging.debug("%s is running %r", self.getName(), job)
- try:
- job()
- except:
- logging.exception("Job %r return an exception", job)
- logging.debug("Job %r completed.", job)
- else:
- self.pool.jobs.put(job)
- break
- logging.info("%s stopped", self.getName())
-
- def __init__(self, size):
- super(ThreadPool, self).__init__()
- self.jobs = Queue.Queue()
- self._workers = [self._Worker(x+1, self) for x in xrange(size)]
-
- def Start(self):
- logging.info("Starting Workers pool")
- for worker in self._workers:
- worker.start()
- logging.info("Workers started")
-
- def Stop(self, now = False):
- logging.info("Stopping Workers")
- if now:
- self.jobs = Queue.Queue()
- self.jobs.put(None)
- for worker in self._workers:
- worker.join()
- logging.info("Workers stopped")
-
- def AddJob(self, job):
- self.jobs.put(job)
-
-##
-# A task, representing a series of linked pairs of callback and error
-# handler Concept is similar to twisted, but you need to put all your
-# callback on the task before giving it to the Scheduler. For this
-# reason, you shouldnt call the callback/errorback method yourself (or
-# at least, don't put it back in the scheduler after that).
-
-class Task(object):
-
- @staticmethod
- def DefaultCallback(result):
- return result
-
- @staticmethod
- def DefaultErrorback(error):
- return error
-
- def __init__(self, func = None, *args, **kwargs):
- super(Task, self).__init__()
- self.callbacks = []
- if func:
- def callback(result):
- return func(*args, **kwargs)
- self.AddCallback(callback)
-
- def AddCallback(self, callback, errorback = None):
- if errorback == None:
- errorback = self.DefaultErrorback
- self.callbacks.append((callback, errorback))
- # permit chained calls
- return self
-
- def ChainTask(self, task):
- return self.AddCallback(task.Callback, task.Errorback)
-
- def Callback(self, result, error = None, traceback = None):
- logging.debug("Handling task %r callbacks", self)
- for cb, eb in self.callbacks:
- try:
- if error:
- error = eb(error)
- else:
- result = cb(result)
- except:
- errtype, error, traceback = sys.exc_info()
- if error:
- raise error, None, traceback
- return result
-
- ##
- # Consume the callbacks with an error. Notes that error will
- # raise if not handled and return value would be a result
- def Errorback(self, error, traceback = None):
- return self.Callback(None, error, traceback)
-
-class Scheduler(threading.Thread):
-
- class _SchedulerStop(Exception):
- pass
-
- def __init__(self, poolSize):
- threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
- self.pool = ThreadPool(poolSize)
- self.tasks = Queue.Queue()
-
- def ExecuteOne(self, blocking = True):
- logging.debug("Looking for next task...")
- try:
- task = self.tasks.get(blocking)
- except Queue.Empty:
- logging.debug("No task to run")
- return
- return task.Callback(None)
-
- def Run(self):
- logging.info("Scheduler start")
- while True:
- try:
- self.ExecuteOne()
- except self._SchedulerStop:
- break
- except:
- logging.exception("Unhandled task exception")
- logging.info("Scheduler stop")
-
- def Start(self):
- self.pool.Start()
- return self.start()
-
- def Stop(self, now = False):
- self.pool.Stop(now)
- if now:
- self.tasks = Queue.Queue()
- def RaiseSchedulerStop():
- raise self._SchedulerStop
- self.AddTask(Task(RaiseSchedulerStop))
- self.join()
-
- def AddTask(self, task, blocking = True):
- self.tasks.put(task, blocking)
-
- ##
- # A job is a task run in a seperated thread. After the job run, a
- # new Task is add to the scheduler either with a result or an error,
- # so that only the task, not the callbacks, is run in the worker
- # thread. Note the passed task is consumed after this call.
- # A better design would have to allow Task to suspend or
- # resume themself and to run in the thread if it want to, but this will
- # required the callback to know about its Task and the scheduler
- # itself, which solve nothing.
- def AddJob(self, task, func, *args, **kwargs):
- def Job():
- try:
- result = func(*args, **kwargs)
- def returnResult():
- return result
- jobTask = Task(returnResult)
- except:
- errtype, error, traceback = sys.exc_info()
- def raiseError():
- raise error, None, traceback
- jobTask = Task(raiseError)
- jobTask.ChainTask(task)
- self.AddTask(jobTask)
- self.pool.AddJob(Job)
-
- ##
- # This basically allow one callback to run in a seperated thread
- # and then get the result to another deferred callback. Helas,
- # this create two Tasks, one for the caller, one for the
- # callbacks, with only the first one having to be passed to the
- # scheduler. This should be more transparent: a callback which
- # required to be run as a job should simply be mark like it and
- # the current task suspend until the job is finished, then resume.
- def CreateCallbackAsJob(self, src, cb):
- dst = Task()
- def CreateJobCallback(result):
- def Job():
- cb(result)
- self.AddJob(dst, Job)
- src.AddCallback(CreateJobCallback)
- return dst
-
-# The global scheduler
-scheduler = None
-
-def StartScheduler(size):
- global scheduler
- if scheduler:
- StopScheduler()
- scheduler = Scheduler(size)
- scheduler.Start()
-
-def StopScheduler(now = False):
- global scheduler
- if scheduler:
- scheduler.Stop(now)
- scheduler = None
-
-if __name__ == '__main__':
- from time import sleep
- logging.getLogger().setLevel(logging.DEBUG)
- # This function is a sample and shouldn't know about the scheduler
- count = 0
- def AsyncCall():
- global count
- count += 1
- def Initialize(name, sleep):
- print "Here", name
- return name, sleep
- def Blocking(args):
- name, time = args
- print name, "goes to bed"
- sleep(time)
- print name, ": ZZZ..."
- return name
- def Finalize(name):
- global count
- print name, "wakes up!"
- count -= 1
-
- dinit = Task(Initialize, "Toto", 5)
- # How can I remove the scheduler from the picture ?
- # The only way I can see is to have a special kind of Task
- # and a suspended queue... May be this will also be more clean
- dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
- dfinal.AddCallback(Finalize)
- # This is confusing but the reason is that the dfinal callback
- # will be added to the scheduler by the job itself at the end of
- # its execution.
- return dinit
-
- logging.info("Starting scheduler with 10 workers")
- StartScheduler(10)
- dasync = AsyncCall()
- logging.info("Adding asynccall task")
- scheduler.AddTask(dasync)
- while count > 0:
- logging.debug("Count = %d", count)
- sleep(1)
- logging.info("Stopping scheduler")
- StopScheduler()
- logging.info("The End.")