scheduler.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 17:06:53 -0400
changeset 2 a00ae018daf8
parent 1 ee10f7cde07d
child 3 00b6708d1852
permissions -rwxr-xr-x
Separate thread pool in another file and fix some issues.

#!/usr/bin/env python2.6
import sys
import logging
import threading
import Queue
from threadpool import ThreadPool

##
# A task, representing a series of linked pairs of callback and error
# handler Concept is similar to twisted, but you need to put all your
# callback on the task before giving it to the Scheduler.  For this
# reason, you shouldnt call the callback/errorback method yourself (or
# at least, don't put it back in the scheduler after that).

class Task(object):

    @staticmethod
    def DefaultCallback(result):
        return result

    @staticmethod
    def DefaultErrorback(error):
        return error

    def __init__(self, func = None, *args, **kwargs):
        super(Task, self).__init__()
        self.callbacks = []
        if func:
            def callback(result):
                return func(*args, **kwargs)
            self.AddCallback(callback)

    def AddCallback(self, callback, errorback = None):
        if errorback == None:
            errorback = self.DefaultErrorback
        self.callbacks.append((callback, errorback))
        # permit chained calls
        return self

    def ChainTask(self, task):
        return self.AddCallback(task.Callback, task.Errorback)

    def Callback(self, result, error = None, traceback = None):
        logging.debug("Handling task %r callbacks", self)
        for cb, eb in self.callbacks:
            try:
                if error:
                    error = eb(error)
                else:
                    result = cb(result)
            except:
                errtype, error, traceback = sys.exc_info()
        if error:
            raise error, None, traceback
        return result

    ##
    # Consume the callbacks with an error.  Notes that error will
    # raise if not handled and return value would be a result
    def Errorback(self, error, traceback = None):
        return self.Callback(None, error, traceback)

class Scheduler(threading.Thread):

    class _SchedulerStop(Exception):
        pass

    def __init__(self, poolSize):
        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
        self.pool = ThreadPool(poolSize)
        self.tasks = Queue.Queue()

    def ExecuteOne(self, blocking = True):
        logging.debug("Looking for next task...")
        try:
            task = self.tasks.get(blocking)
        except Queue.Empty:
            logging.debug("No task to run")
            return
        return task.Callback(None)

    def Run(self):
        logging.info("Scheduler start")
        while True:
            try:
                self.ExecuteOne()
            except self._SchedulerStop:
                break
            except:
                logging.exception("Unhandled task exception")
        logging.info("Scheduler stop")

    def Start(self):
        self.pool.Start()
        return self.start()

    def Stop(self, now = False):
        self.pool.Stop(now)
        if now:
            self.tasks = Queue.Queue()
        def RaiseSchedulerStop():
            raise self._SchedulerStop
        self.AddTask(Task(RaiseSchedulerStop))
        self.join()

    def AddTask(self, task, blocking = True):
        self.tasks.put(task, blocking)

    ##
    # A job is a task run in a seperated thread.  After the job run, a
    # new Task is add to the scheduler either with a result or an error,
    # so that only the task, not the callbacks, is run in the worker
    # thread. Note the passed task is consumed after this call.
    # A better design would have to allow Task to suspend or
    # resume themself and to run in the thread if it want to, but this will
    # required the callback to know about its Task and the scheduler
    # itself, which solve nothing.
    def AddJob(self, task, func, *args, **kwargs):
        def Job():
            try:
                result = func(*args, **kwargs)
                def returnResult():
                    return result
                jobTask = Task(returnResult)
            except:
                errtype, error, traceback = sys.exc_info()
                def raiseError():
                    raise error, None, traceback
                jobTask = Task(raiseError)
            jobTask.ChainTask(task)
            self.AddTask(jobTask)
        self.pool.AddJob(Job)

    ##
    # This basically allow one callback to run in a seperated thread
    # and then get the result to another deferred callback.  Helas,
    # this create two Tasks, one for the caller, one for the
    # callbacks, with only the first one having to be passed to the
    # scheduler.  This should be more transparent: a callback which
    # required to be run as a job should simply be mark like it and
    # the current task suspend until the job is finished, then resume.
    def CreateCallbackAsJob(self, src, cb):
        dst = Task()
        def CreateJobCallback(result):
            def Job():
                return cb(result)
            self.AddJob(dst, Job)
        src.AddCallback(CreateJobCallback)
        return dst

# The global scheduler
scheduler = None

def StartScheduler(size):
    global scheduler
    if scheduler:
        StopScheduler()
    scheduler = Scheduler(size)
    scheduler.Start()

def StopScheduler(now = False):
    global scheduler
    if scheduler:
        scheduler.Stop(now)
    scheduler = None

if __name__ == '__main__':
    from time import sleep
    logging.getLogger().setLevel(logging.INFO)
    # This function is a sample and shouldn't know about the scheduler
    count = 0
    def AsyncCall(name, seconds):
        global count
        count += 1
        def Initialize(name, seconds):
            print "Here", name
            return name, seconds
        def Blocking(args):
            name, time = args
            print name, "goes to bed"
            sleep(time)
            print name, ": ZZZ..."
            return name
        def Finalize(name):
            global count
            print name, "wakes up!"
            count -= 1

        dinit = Task(Initialize, name, seconds)
        # How can I remove the scheduler from the picture ?
        # The only way I can see is to have a special kind of Task
        # and a suspended queue... May be this will also be more clean
        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
        dfinal.AddCallback(Finalize)
        # This is confusing but the reason is that the dfinal callback
        # will be added to the scheduler by the job itself at the end of
        # its execution.
        return dinit

    logging.info("Starting scheduler with 10 workers")
    StartScheduler(10)
    logging.info("Adding asynccall task")
    for x in xrange(int(sys.argv[1])):
        dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
        scheduler.AddTask(dasync)
    while count > 0:
        logging.debug("Count = %d", count)
        sleep(1)
    logging.info("Stopping scheduler")
    StopScheduler()
    logging.info("The End.")