Rename test to scheduler.
authorFabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 16:50:13 -0400
changeset 1 ee10f7cde07d
parent 0 8ae7370093db
child 2 a00ae018daf8
Rename test to scheduler.
scheduler.py
test.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scheduler.py	Sun Mar 21 16:50:13 2010 -0400
@@ -0,0 +1,256 @@
+#!/usr/bin/env python2.6
+import sys
+import logging
+import threading
+import Queue
+
+##
+# A basic worker pool... should be replaced with a resizable one.
+class ThreadPool(object):
+    
+    class _Worker(threading.Thread):
+        def __init__(self, uid, pool):
+            threading.Thread.__init__(self, name = "Worker%d" % uid)
+            self.pool = pool
+        def run(self):
+            logging.info("%s started", self.getName())
+            while True:
+                job = self.pool.jobs.get()
+                if job:
+                    logging.debug("%s is running %r", self.getName(), job)
+                    try:
+                        job()
+                    except:
+                        logging.exception("Job %r return an exception", job)
+                    logging.debug("Job %r completed.", job)
+                else:
+                    self.pool.jobs.put(job)
+                    break
+            logging.info("%s stopped", self.getName())
+
+    def __init__(self, size):
+        super(ThreadPool, self).__init__()
+        self.jobs = Queue.Queue()
+        self._workers = [self._Worker(x+1, self) for x in xrange(size)]
+
+    def Start(self):
+        logging.info("Starting Workers pool")
+        for worker in self._workers:
+            worker.start()
+        logging.info("Workers started")
+
+    def Stop(self, now = False):
+        logging.info("Stopping Workers")
+        if now:
+            self.jobs = Queue.Queue()
+        self.jobs.put(None)
+        for worker in self._workers:
+            worker.join()
+        logging.info("Workers stopped")
+
+    def AddJob(self, job):
+        self.jobs.put(job)
+
+##
+# A task, representing a series of linked pairs of callback and error
+# handler Concept is similar to twisted, but you need to put all your
+# callback on the task before giving it to the Scheduler.  For this
+# reason, you shouldnt call the callback/errorback method yourself (or
+# at least, don't put it back in the scheduler after that).
+
+class Task(object):
+
+    @staticmethod
+    def DefaultCallback(result):
+        return result
+
+    @staticmethod
+    def DefaultErrorback(error):
+        return error
+
+    def __init__(self, func = None, *args, **kwargs):
+        super(Task, self).__init__()
+        self.callbacks = []
+        if func:
+            def callback(result):
+                return func(*args, **kwargs)
+            self.AddCallback(callback)
+
+    def AddCallback(self, callback, errorback = None):
+        if errorback == None:
+            errorback = self.DefaultErrorback
+        self.callbacks.append((callback, errorback))
+        # permit chained calls
+        return self
+
+    def ChainTask(self, task):
+        return self.AddCallback(task.Callback, task.Errorback)
+
+    def Callback(self, result, error = None, traceback = None):
+        logging.debug("Handling task %r callbacks", self)
+        for cb, eb in self.callbacks:
+            try:
+                if error:
+                    error = eb(error)
+                else:
+                    result = cb(result)
+            except:
+                errtype, error, traceback = sys.exc_info()
+        if error:
+            raise error, None, traceback
+        return result
+
+    ##
+    # Consume the callbacks with an error.  Notes that error will
+    # raise if not handled and return value would be a result
+    def Errorback(self, error, traceback = None):
+        return self.Callback(None, error, traceback)
+
+class Scheduler(threading.Thread):
+
+    class _SchedulerStop(Exception):
+        pass
+
+    def __init__(self, poolSize):
+        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
+        self.pool = ThreadPool(poolSize)
+        self.tasks = Queue.Queue()
+
+    def ExecuteOne(self, blocking = True):
+        logging.debug("Looking for next task...")
+        try:
+            task = self.tasks.get(blocking)
+        except Queue.Empty:
+            logging.debug("No task to run")
+            return
+        return task.Callback(None)
+
+    def Run(self):
+        logging.info("Scheduler start")
+        while True:
+            try:
+                self.ExecuteOne()
+            except self._SchedulerStop:
+                break
+            except:
+                logging.exception("Unhandled task exception")
+        logging.info("Scheduler stop")
+
+    def Start(self):
+        self.pool.Start()
+        return self.start()
+
+    def Stop(self, now = False):
+        self.pool.Stop(now)
+        if now:
+            self.tasks = Queue.Queue()
+        def RaiseSchedulerStop():
+            raise self._SchedulerStop
+        self.AddTask(Task(RaiseSchedulerStop))
+        self.join()
+
+    def AddTask(self, task, blocking = True):
+        self.tasks.put(task, blocking)
+
+    ##
+    # A job is a task run in a seperated thread.  After the job run, a
+    # new Task is add to the scheduler either with a result or an error,
+    # so that only the task, not the callbacks, is run in the worker
+    # thread. Note the passed task is consumed after this call.
+    # A better design would have to allow Task to suspend or
+    # resume themself and to run in the thread if it want to, but this will
+    # required the callback to know about its Task and the scheduler
+    # itself, which solve nothing.
+    def AddJob(self, task, func, *args, **kwargs):
+        def Job():
+            try:
+                result = func(*args, **kwargs)
+                def returnResult():
+                    return result
+                jobTask = Task(returnResult)
+            except:
+                errtype, error, traceback = sys.exc_info()
+                def raiseError():
+                    raise error, None, traceback
+                jobTask = Task(raiseError)
+            jobTask.ChainTask(task)
+            self.AddTask(jobTask)
+        self.pool.AddJob(Job)
+
+    ##
+    # This basically allow one callback to run in a seperated thread
+    # and then get the result to another deferred callback.  Helas,
+    # this create two Tasks, one for the caller, one for the
+    # callbacks, with only the first one having to be passed to the
+    # scheduler.  This should be more transparent: a callback which
+    # required to be run as a job should simply be mark like it and
+    # the current task suspend until the job is finished, then resume.
+    def CreateCallbackAsJob(self, src, cb):
+        dst = Task()
+        def CreateJobCallback(result):
+            def Job():
+                cb(result)
+            self.AddJob(dst, Job)
+        src.AddCallback(CreateJobCallback)
+        return dst
+
+# The global scheduler
+scheduler = None
+
+def StartScheduler(size):
+    global scheduler
+    if scheduler:
+        StopScheduler()
+    scheduler = Scheduler(size)
+    scheduler.Start()
+
+def StopScheduler(now = False):
+    global scheduler
+    if scheduler:
+        scheduler.Stop(now)
+    scheduler = None
+
+if __name__ == '__main__':
+    from time import sleep
+    logging.getLogger().setLevel(logging.DEBUG)
+    # This function is a sample and shouldn't know about the scheduler
+    count = 0
+    def AsyncCall():
+        global count
+        count += 1
+        def Initialize(name, sleep):
+            print "Here", name
+            return name, sleep
+        def Blocking(args):
+            name, time = args
+            print name, "goes to bed"
+            sleep(time)
+            print name, ": ZZZ..."
+            return name
+        def Finalize(name):
+            global count
+            print name, "wakes up!"
+            count -= 1
+
+        dinit = Task(Initialize, "Toto", 5)
+        # How can I remove the scheduler from the picture ?
+        # The only way I can see is to have a special kind of Task
+        # and a suspended queue... May be this will also be more clean
+        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
+        dfinal.AddCallback(Finalize)
+        # This is confusing but the reason is that the dfinal callback
+        # will be added to the scheduler by the job itself at the end of
+        # its execution.
+        return dinit
+
+    logging.info("Starting scheduler with 10 workers")
+    StartScheduler(10)
+    dasync = AsyncCall()
+    logging.info("Adding asynccall task")
+    scheduler.AddTask(dasync)
+    while count > 0:
+        logging.debug("Count = %d", count)
+        sleep(1)
+    logging.info("Stopping scheduler")
+    StopScheduler()
+    logging.info("The End.")
--- a/test.py	Sun Mar 21 16:40:06 2010 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,256 +0,0 @@
-#!/usr/bin/env python2.6
-import sys
-import logging
-import threading
-import Queue
-
-##
-# A basic worker pool... should be replaced with a resizable one.
-class ThreadPool(object):
-    
-    class _Worker(threading.Thread):
-        def __init__(self, uid, pool):
-            threading.Thread.__init__(self, name = "Worker%d" % uid)
-            self.pool = pool
-        def run(self):
-            logging.info("%s started", self.getName())
-            while True:
-                job = self.pool.jobs.get()
-                if job:
-                    logging.debug("%s is running %r", self.getName(), job)
-                    try:
-                        job()
-                    except:
-                        logging.exception("Job %r return an exception", job)
-                    logging.debug("Job %r completed.", job)
-                else:
-                    self.pool.jobs.put(job)
-                    break
-            logging.info("%s stopped", self.getName())
-
-    def __init__(self, size):
-        super(ThreadPool, self).__init__()
-        self.jobs = Queue.Queue()
-        self._workers = [self._Worker(x+1, self) for x in xrange(size)]
-
-    def Start(self):
-        logging.info("Starting Workers pool")
-        for worker in self._workers:
-            worker.start()
-        logging.info("Workers started")
-
-    def Stop(self, now = False):
-        logging.info("Stopping Workers")
-        if now:
-            self.jobs = Queue.Queue()
-        self.jobs.put(None)
-        for worker in self._workers:
-            worker.join()
-        logging.info("Workers stopped")
-
-    def AddJob(self, job):
-        self.jobs.put(job)
-
-##
-# A task, representing a series of linked pairs of callback and error
-# handler Concept is similar to twisted, but you need to put all your
-# callback on the task before giving it to the Scheduler.  For this
-# reason, you shouldnt call the callback/errorback method yourself (or
-# at least, don't put it back in the scheduler after that).
-
-class Task(object):
-
-    @staticmethod
-    def DefaultCallback(result):
-        return result
-
-    @staticmethod
-    def DefaultErrorback(error):
-        return error
-
-    def __init__(self, func = None, *args, **kwargs):
-        super(Task, self).__init__()
-        self.callbacks = []
-        if func:
-            def callback(result):
-                return func(*args, **kwargs)
-            self.AddCallback(callback)
-
-    def AddCallback(self, callback, errorback = None):
-        if errorback == None:
-            errorback = self.DefaultErrorback
-        self.callbacks.append((callback, errorback))
-        # permit chained calls
-        return self
-
-    def ChainTask(self, task):
-        return self.AddCallback(task.Callback, task.Errorback)
-
-    def Callback(self, result, error = None, traceback = None):
-        logging.debug("Handling task %r callbacks", self)
-        for cb, eb in self.callbacks:
-            try:
-                if error:
-                    error = eb(error)
-                else:
-                    result = cb(result)
-            except:
-                errtype, error, traceback = sys.exc_info()
-        if error:
-            raise error, None, traceback
-        return result
-
-    ##
-    # Consume the callbacks with an error.  Notes that error will
-    # raise if not handled and return value would be a result
-    def Errorback(self, error, traceback = None):
-        return self.Callback(None, error, traceback)
-
-class Scheduler(threading.Thread):
-
-    class _SchedulerStop(Exception):
-        pass
-
-    def __init__(self, poolSize):
-        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
-        self.pool = ThreadPool(poolSize)
-        self.tasks = Queue.Queue()
-
-    def ExecuteOne(self, blocking = True):
-        logging.debug("Looking for next task...")
-        try:
-            task = self.tasks.get(blocking)
-        except Queue.Empty:
-            logging.debug("No task to run")
-            return
-        return task.Callback(None)
-
-    def Run(self):
-        logging.info("Scheduler start")
-        while True:
-            try:
-                self.ExecuteOne()
-            except self._SchedulerStop:
-                break
-            except:
-                logging.exception("Unhandled task exception")
-        logging.info("Scheduler stop")
-
-    def Start(self):
-        self.pool.Start()
-        return self.start()
-
-    def Stop(self, now = False):
-        self.pool.Stop(now)
-        if now:
-            self.tasks = Queue.Queue()
-        def RaiseSchedulerStop():
-            raise self._SchedulerStop
-        self.AddTask(Task(RaiseSchedulerStop))
-        self.join()
-
-    def AddTask(self, task, blocking = True):
-        self.tasks.put(task, blocking)
-
-    ##
-    # A job is a task run in a seperated thread.  After the job run, a
-    # new Task is add to the scheduler either with a result or an error,
-    # so that only the task, not the callbacks, is run in the worker
-    # thread. Note the passed task is consumed after this call.
-    # A better design would have to allow Task to suspend or
-    # resume themself and to run in the thread if it want to, but this will
-    # required the callback to know about its Task and the scheduler
-    # itself, which solve nothing.
-    def AddJob(self, task, func, *args, **kwargs):
-        def Job():
-            try:
-                result = func(*args, **kwargs)
-                def returnResult():
-                    return result
-                jobTask = Task(returnResult)
-            except:
-                errtype, error, traceback = sys.exc_info()
-                def raiseError():
-                    raise error, None, traceback
-                jobTask = Task(raiseError)
-            jobTask.ChainTask(task)
-            self.AddTask(jobTask)
-        self.pool.AddJob(Job)
-
-    ##
-    # This basically allow one callback to run in a seperated thread
-    # and then get the result to another deferred callback.  Helas,
-    # this create two Tasks, one for the caller, one for the
-    # callbacks, with only the first one having to be passed to the
-    # scheduler.  This should be more transparent: a callback which
-    # required to be run as a job should simply be mark like it and
-    # the current task suspend until the job is finished, then resume.
-    def CreateCallbackAsJob(self, src, cb):
-        dst = Task()
-        def CreateJobCallback(result):
-            def Job():
-                cb(result)
-            self.AddJob(dst, Job)
-        src.AddCallback(CreateJobCallback)
-        return dst
-
-# The global scheduler
-scheduler = None
-
-def StartScheduler(size):
-    global scheduler
-    if scheduler:
-        StopScheduler()
-    scheduler = Scheduler(size)
-    scheduler.Start()
-
-def StopScheduler(now = False):
-    global scheduler
-    if scheduler:
-        scheduler.Stop(now)
-    scheduler = None
-
-if __name__ == '__main__':
-    from time import sleep
-    logging.getLogger().setLevel(logging.DEBUG)
-    # This function is a sample and shouldn't know about the scheduler
-    count = 0
-    def AsyncCall():
-        global count
-        count += 1
-        def Initialize(name, sleep):
-            print "Here", name
-            return name, sleep
-        def Blocking(args):
-            name, time = args
-            print name, "goes to bed"
-            sleep(time)
-            print name, ": ZZZ..."
-            return name
-        def Finalize(name):
-            global count
-            print name, "wakes up!"
-            count -= 1
-
-        dinit = Task(Initialize, "Toto", 5)
-        # How can I remove the scheduler from the picture ?
-        # The only way I can see is to have a special kind of Task
-        # and a suspended queue... May be this will also be more clean
-        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
-        dfinal.AddCallback(Finalize)
-        # This is confusing but the reason is that the dfinal callback
-        # will be added to the scheduler by the job itself at the end of
-        # its execution.
-        return dinit
-
-    logging.info("Starting scheduler with 10 workers")
-    StartScheduler(10)
-    dasync = AsyncCall()
-    logging.info("Adding asynccall task")
-    scheduler.AddTask(dasync)
-    while count > 0:
-        logging.debug("Count = %d", count)
-        sleep(1)
-    logging.info("Stopping scheduler")
-    StopScheduler()
-    logging.info("The End.")