scheduler.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 16:50:13 -0400
changeset 1 ee10f7cde07d
parent 0 test.py@8ae7370093db
child 2 a00ae018daf8
permissions -rwxr-xr-x
Rename test to scheduler.

#!/usr/bin/env python2.6
import sys
import logging
import threading
import Queue

##
# A basic worker pool... should be replaced with a resizable one.
class ThreadPool(object):
    
    class _Worker(threading.Thread):
        def __init__(self, uid, pool):
            threading.Thread.__init__(self, name = "Worker%d" % uid)
            self.pool = pool
        def run(self):
            logging.info("%s started", self.getName())
            while True:
                job = self.pool.jobs.get()
                if job:
                    logging.debug("%s is running %r", self.getName(), job)
                    try:
                        job()
                    except:
                        logging.exception("Job %r return an exception", job)
                    logging.debug("Job %r completed.", job)
                else:
                    self.pool.jobs.put(job)
                    break
            logging.info("%s stopped", self.getName())

    def __init__(self, size):
        super(ThreadPool, self).__init__()
        self.jobs = Queue.Queue()
        self._workers = [self._Worker(x+1, self) for x in xrange(size)]

    def Start(self):
        logging.info("Starting Workers pool")
        for worker in self._workers:
            worker.start()
        logging.info("Workers started")

    def Stop(self, now = False):
        logging.info("Stopping Workers")
        if now:
            self.jobs = Queue.Queue()
        self.jobs.put(None)
        for worker in self._workers:
            worker.join()
        logging.info("Workers stopped")

    def AddJob(self, job):
        self.jobs.put(job)

##
# A task, representing a series of linked pairs of callback and error
# handler Concept is similar to twisted, but you need to put all your
# callback on the task before giving it to the Scheduler.  For this
# reason, you shouldnt call the callback/errorback method yourself (or
# at least, don't put it back in the scheduler after that).

class Task(object):

    @staticmethod
    def DefaultCallback(result):
        return result

    @staticmethod
    def DefaultErrorback(error):
        return error

    def __init__(self, func = None, *args, **kwargs):
        super(Task, self).__init__()
        self.callbacks = []
        if func:
            def callback(result):
                return func(*args, **kwargs)
            self.AddCallback(callback)

    def AddCallback(self, callback, errorback = None):
        if errorback == None:
            errorback = self.DefaultErrorback
        self.callbacks.append((callback, errorback))
        # permit chained calls
        return self

    def ChainTask(self, task):
        return self.AddCallback(task.Callback, task.Errorback)

    def Callback(self, result, error = None, traceback = None):
        logging.debug("Handling task %r callbacks", self)
        for cb, eb in self.callbacks:
            try:
                if error:
                    error = eb(error)
                else:
                    result = cb(result)
            except:
                errtype, error, traceback = sys.exc_info()
        if error:
            raise error, None, traceback
        return result

    ##
    # Consume the callbacks with an error.  Notes that error will
    # raise if not handled and return value would be a result
    def Errorback(self, error, traceback = None):
        return self.Callback(None, error, traceback)

class Scheduler(threading.Thread):

    class _SchedulerStop(Exception):
        pass

    def __init__(self, poolSize):
        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
        self.pool = ThreadPool(poolSize)
        self.tasks = Queue.Queue()

    def ExecuteOne(self, blocking = True):
        logging.debug("Looking for next task...")
        try:
            task = self.tasks.get(blocking)
        except Queue.Empty:
            logging.debug("No task to run")
            return
        return task.Callback(None)

    def Run(self):
        logging.info("Scheduler start")
        while True:
            try:
                self.ExecuteOne()
            except self._SchedulerStop:
                break
            except:
                logging.exception("Unhandled task exception")
        logging.info("Scheduler stop")

    def Start(self):
        self.pool.Start()
        return self.start()

    def Stop(self, now = False):
        self.pool.Stop(now)
        if now:
            self.tasks = Queue.Queue()
        def RaiseSchedulerStop():
            raise self._SchedulerStop
        self.AddTask(Task(RaiseSchedulerStop))
        self.join()

    def AddTask(self, task, blocking = True):
        self.tasks.put(task, blocking)

    ##
    # A job is a task run in a seperated thread.  After the job run, a
    # new Task is add to the scheduler either with a result or an error,
    # so that only the task, not the callbacks, is run in the worker
    # thread. Note the passed task is consumed after this call.
    # A better design would have to allow Task to suspend or
    # resume themself and to run in the thread if it want to, but this will
    # required the callback to know about its Task and the scheduler
    # itself, which solve nothing.
    def AddJob(self, task, func, *args, **kwargs):
        def Job():
            try:
                result = func(*args, **kwargs)
                def returnResult():
                    return result
                jobTask = Task(returnResult)
            except:
                errtype, error, traceback = sys.exc_info()
                def raiseError():
                    raise error, None, traceback
                jobTask = Task(raiseError)
            jobTask.ChainTask(task)
            self.AddTask(jobTask)
        self.pool.AddJob(Job)

    ##
    # This basically allow one callback to run in a seperated thread
    # and then get the result to another deferred callback.  Helas,
    # this create two Tasks, one for the caller, one for the
    # callbacks, with only the first one having to be passed to the
    # scheduler.  This should be more transparent: a callback which
    # required to be run as a job should simply be mark like it and
    # the current task suspend until the job is finished, then resume.
    def CreateCallbackAsJob(self, src, cb):
        dst = Task()
        def CreateJobCallback(result):
            def Job():
                cb(result)
            self.AddJob(dst, Job)
        src.AddCallback(CreateJobCallback)
        return dst

# The global scheduler
scheduler = None

def StartScheduler(size):
    global scheduler
    if scheduler:
        StopScheduler()
    scheduler = Scheduler(size)
    scheduler.Start()

def StopScheduler(now = False):
    global scheduler
    if scheduler:
        scheduler.Stop(now)
    scheduler = None

if __name__ == '__main__':
    from time import sleep
    logging.getLogger().setLevel(logging.DEBUG)
    # This function is a sample and shouldn't know about the scheduler
    count = 0
    def AsyncCall():
        global count
        count += 1
        def Initialize(name, sleep):
            print "Here", name
            return name, sleep
        def Blocking(args):
            name, time = args
            print name, "goes to bed"
            sleep(time)
            print name, ": ZZZ..."
            return name
        def Finalize(name):
            global count
            print name, "wakes up!"
            count -= 1

        dinit = Task(Initialize, "Toto", 5)
        # How can I remove the scheduler from the picture ?
        # The only way I can see is to have a special kind of Task
        # and a suspended queue... May be this will also be more clean
        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
        dfinal.AddCallback(Finalize)
        # This is confusing but the reason is that the dfinal callback
        # will be added to the scheduler by the job itself at the end of
        # its execution.
        return dinit

    logging.info("Starting scheduler with 10 workers")
    StartScheduler(10)
    dasync = AsyncCall()
    logging.info("Adding asynccall task")
    scheduler.AddTask(dasync)
    while count > 0:
        logging.debug("Count = %d", count)
        sleep(1)
    logging.info("Stopping scheduler")
    StopScheduler()
    logging.info("The End.")