First version.
authorFabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 16:40:06 -0400
changeset 0 8ae7370093db
child 1 ee10f7cde07d
First version.
test.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test.py	Sun Mar 21 16:40:06 2010 -0400
@@ -0,0 +1,256 @@
+#!/usr/bin/env python2.6
+import sys
+import logging
+import threading
+import Queue
+
+##
+# A basic worker pool... should be replaced with a resizable one.
+class ThreadPool(object):
+    
+    class _Worker(threading.Thread):
+        def __init__(self, uid, pool):
+            threading.Thread.__init__(self, name = "Worker%d" % uid)
+            self.pool = pool
+        def run(self):
+            logging.info("%s started", self.getName())
+            while True:
+                job = self.pool.jobs.get()
+                if job:
+                    logging.debug("%s is running %r", self.getName(), job)
+                    try:
+                        job()
+                    except:
+                        logging.exception("Job %r return an exception", job)
+                    logging.debug("Job %r completed.", job)
+                else:
+                    self.pool.jobs.put(job)
+                    break
+            logging.info("%s stopped", self.getName())
+
+    def __init__(self, size):
+        super(ThreadPool, self).__init__()
+        self.jobs = Queue.Queue()
+        self._workers = [self._Worker(x+1, self) for x in xrange(size)]
+
+    def Start(self):
+        logging.info("Starting Workers pool")
+        for worker in self._workers:
+            worker.start()
+        logging.info("Workers started")
+
+    def Stop(self, now = False):
+        logging.info("Stopping Workers")
+        if now:
+            self.jobs = Queue.Queue()
+        self.jobs.put(None)
+        for worker in self._workers:
+            worker.join()
+        logging.info("Workers stopped")
+
+    def AddJob(self, job):
+        self.jobs.put(job)
+
+##
+# A task, representing a series of linked pairs of callback and error
+# handler Concept is similar to twisted, but you need to put all your
+# callback on the task before giving it to the Scheduler.  For this
+# reason, you shouldnt call the callback/errorback method yourself (or
+# at least, don't put it back in the scheduler after that).
+
+class Task(object):
+
+    @staticmethod
+    def DefaultCallback(result):
+        return result
+
+    @staticmethod
+    def DefaultErrorback(error):
+        return error
+
+    def __init__(self, func = None, *args, **kwargs):
+        super(Task, self).__init__()
+        self.callbacks = []
+        if func:
+            def callback(result):
+                return func(*args, **kwargs)
+            self.AddCallback(callback)
+
+    def AddCallback(self, callback, errorback = None):
+        if errorback == None:
+            errorback = self.DefaultErrorback
+        self.callbacks.append((callback, errorback))
+        # permit chained calls
+        return self
+
+    def ChainTask(self, task):
+        return self.AddCallback(task.Callback, task.Errorback)
+
+    def Callback(self, result, error = None, traceback = None):
+        logging.debug("Handling task %r callbacks", self)
+        for cb, eb in self.callbacks:
+            try:
+                if error:
+                    error = eb(error)
+                else:
+                    result = cb(result)
+            except:
+                errtype, error, traceback = sys.exc_info()
+        if error:
+            raise error, None, traceback
+        return result
+
+    ##
+    # Consume the callbacks with an error.  Notes that error will
+    # raise if not handled and return value would be a result
+    def Errorback(self, error, traceback = None):
+        return self.Callback(None, error, traceback)
+
+class Scheduler(threading.Thread):
+
+    class _SchedulerStop(Exception):
+        pass
+
+    def __init__(self, poolSize):
+        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
+        self.pool = ThreadPool(poolSize)
+        self.tasks = Queue.Queue()
+
+    def ExecuteOne(self, blocking = True):
+        logging.debug("Looking for next task...")
+        try:
+            task = self.tasks.get(blocking)
+        except Queue.Empty:
+            logging.debug("No task to run")
+            return
+        return task.Callback(None)
+
+    def Run(self):
+        logging.info("Scheduler start")
+        while True:
+            try:
+                self.ExecuteOne()
+            except self._SchedulerStop:
+                break
+            except:
+                logging.exception("Unhandled task exception")
+        logging.info("Scheduler stop")
+
+    def Start(self):
+        self.pool.Start()
+        return self.start()
+
+    def Stop(self, now = False):
+        self.pool.Stop(now)
+        if now:
+            self.tasks = Queue.Queue()
+        def RaiseSchedulerStop():
+            raise self._SchedulerStop
+        self.AddTask(Task(RaiseSchedulerStop))
+        self.join()
+
+    def AddTask(self, task, blocking = True):
+        self.tasks.put(task, blocking)
+
+    ##
+    # A job is a task run in a seperated thread.  After the job run, a
+    # new Task is add to the scheduler either with a result or an error,
+    # so that only the task, not the callbacks, is run in the worker
+    # thread. Note the passed task is consumed after this call.
+    # A better design would have to allow Task to suspend or
+    # resume themself and to run in the thread if it want to, but this will
+    # required the callback to know about its Task and the scheduler
+    # itself, which solve nothing.
+    def AddJob(self, task, func, *args, **kwargs):
+        def Job():
+            try:
+                result = func(*args, **kwargs)
+                def returnResult():
+                    return result
+                jobTask = Task(returnResult)
+            except:
+                errtype, error, traceback = sys.exc_info()
+                def raiseError():
+                    raise error, None, traceback
+                jobTask = Task(raiseError)
+            jobTask.ChainTask(task)
+            self.AddTask(jobTask)
+        self.pool.AddJob(Job)
+
+    ##
+    # This basically allow one callback to run in a seperated thread
+    # and then get the result to another deferred callback.  Helas,
+    # this create two Tasks, one for the caller, one for the
+    # callbacks, with only the first one having to be passed to the
+    # scheduler.  This should be more transparent: a callback which
+    # required to be run as a job should simply be mark like it and
+    # the current task suspend until the job is finished, then resume.
+    def CreateCallbackAsJob(self, src, cb):
+        dst = Task()
+        def CreateJobCallback(result):
+            def Job():
+                cb(result)
+            self.AddJob(dst, Job)
+        src.AddCallback(CreateJobCallback)
+        return dst
+
+# The global scheduler
+scheduler = None
+
+def StartScheduler(size):
+    global scheduler
+    if scheduler:
+        StopScheduler()
+    scheduler = Scheduler(size)
+    scheduler.Start()
+
+def StopScheduler(now = False):
+    global scheduler
+    if scheduler:
+        scheduler.Stop(now)
+    scheduler = None
+
+if __name__ == '__main__':
+    from time import sleep
+    logging.getLogger().setLevel(logging.DEBUG)
+    # This function is a sample and shouldn't know about the scheduler
+    count = 0
+    def AsyncCall():
+        global count
+        count += 1
+        def Initialize(name, sleep):
+            print "Here", name
+            return name, sleep
+        def Blocking(args):
+            name, time = args
+            print name, "goes to bed"
+            sleep(time)
+            print name, ": ZZZ..."
+            return name
+        def Finalize(name):
+            global count
+            print name, "wakes up!"
+            count -= 1
+
+        dinit = Task(Initialize, "Toto", 5)
+        # How can I remove the scheduler from the picture ?
+        # The only way I can see is to have a special kind of Task
+        # and a suspended queue... May be this will also be more clean
+        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
+        dfinal.AddCallback(Finalize)
+        # This is confusing but the reason is that the dfinal callback
+        # will be added to the scheduler by the job itself at the end of
+        # its execution.
+        return dinit
+
+    logging.info("Starting scheduler with 10 workers")
+    StartScheduler(10)
+    dasync = AsyncCall()
+    logging.info("Adding asynccall task")
+    scheduler.AddTask(dasync)
+    while count > 0:
+        logging.debug("Count = %d", count)
+        sleep(1)
+    logging.info("Stopping scheduler")
+    StopScheduler()
+    logging.info("The End.")