First version.
#!/usr/bin/env python2.6
import sys
import logging
import threading
import Queue
##
# A basic worker pool... should be replaced with a resizable one.
class ThreadPool(object):
class _Worker(threading.Thread):
def __init__(self, uid, pool):
threading.Thread.__init__(self, name = "Worker%d" % uid)
self.pool = pool
def run(self):
logging.info("%s started", self.getName())
while True:
job = self.pool.jobs.get()
if job:
logging.debug("%s is running %r", self.getName(), job)
try:
job()
except:
logging.exception("Job %r return an exception", job)
logging.debug("Job %r completed.", job)
else:
self.pool.jobs.put(job)
break
logging.info("%s stopped", self.getName())
def __init__(self, size):
super(ThreadPool, self).__init__()
self.jobs = Queue.Queue()
self._workers = [self._Worker(x+1, self) for x in xrange(size)]
def Start(self):
logging.info("Starting Workers pool")
for worker in self._workers:
worker.start()
logging.info("Workers started")
def Stop(self, now = False):
logging.info("Stopping Workers")
if now:
self.jobs = Queue.Queue()
self.jobs.put(None)
for worker in self._workers:
worker.join()
logging.info("Workers stopped")
def AddJob(self, job):
self.jobs.put(job)
##
# A task, representing a series of linked pairs of callback and error
# handler Concept is similar to twisted, but you need to put all your
# callback on the task before giving it to the Scheduler. For this
# reason, you shouldnt call the callback/errorback method yourself (or
# at least, don't put it back in the scheduler after that).
class Task(object):
@staticmethod
def DefaultCallback(result):
return result
@staticmethod
def DefaultErrorback(error):
return error
def __init__(self, func = None, *args, **kwargs):
super(Task, self).__init__()
self.callbacks = []
if func:
def callback(result):
return func(*args, **kwargs)
self.AddCallback(callback)
def AddCallback(self, callback, errorback = None):
if errorback == None:
errorback = self.DefaultErrorback
self.callbacks.append((callback, errorback))
# permit chained calls
return self
def ChainTask(self, task):
return self.AddCallback(task.Callback, task.Errorback)
def Callback(self, result, error = None, traceback = None):
logging.debug("Handling task %r callbacks", self)
for cb, eb in self.callbacks:
try:
if error:
error = eb(error)
else:
result = cb(result)
except:
errtype, error, traceback = sys.exc_info()
if error:
raise error, None, traceback
return result
##
# Consume the callbacks with an error. Notes that error will
# raise if not handled and return value would be a result
def Errorback(self, error, traceback = None):
return self.Callback(None, error, traceback)
class Scheduler(threading.Thread):
class _SchedulerStop(Exception):
pass
def __init__(self, poolSize):
threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
self.pool = ThreadPool(poolSize)
self.tasks = Queue.Queue()
def ExecuteOne(self, blocking = True):
logging.debug("Looking for next task...")
try:
task = self.tasks.get(blocking)
except Queue.Empty:
logging.debug("No task to run")
return
return task.Callback(None)
def Run(self):
logging.info("Scheduler start")
while True:
try:
self.ExecuteOne()
except self._SchedulerStop:
break
except:
logging.exception("Unhandled task exception")
logging.info("Scheduler stop")
def Start(self):
self.pool.Start()
return self.start()
def Stop(self, now = False):
self.pool.Stop(now)
if now:
self.tasks = Queue.Queue()
def RaiseSchedulerStop():
raise self._SchedulerStop
self.AddTask(Task(RaiseSchedulerStop))
self.join()
def AddTask(self, task, blocking = True):
self.tasks.put(task, blocking)
##
# A job is a task run in a seperated thread. After the job run, a
# new Task is add to the scheduler either with a result or an error,
# so that only the task, not the callbacks, is run in the worker
# thread. Note the passed task is consumed after this call.
# A better design would have to allow Task to suspend or
# resume themself and to run in the thread if it want to, but this will
# required the callback to know about its Task and the scheduler
# itself, which solve nothing.
def AddJob(self, task, func, *args, **kwargs):
def Job():
try:
result = func(*args, **kwargs)
def returnResult():
return result
jobTask = Task(returnResult)
except:
errtype, error, traceback = sys.exc_info()
def raiseError():
raise error, None, traceback
jobTask = Task(raiseError)
jobTask.ChainTask(task)
self.AddTask(jobTask)
self.pool.AddJob(Job)
##
# This basically allow one callback to run in a seperated thread
# and then get the result to another deferred callback. Helas,
# this create two Tasks, one for the caller, one for the
# callbacks, with only the first one having to be passed to the
# scheduler. This should be more transparent: a callback which
# required to be run as a job should simply be mark like it and
# the current task suspend until the job is finished, then resume.
def CreateCallbackAsJob(self, src, cb):
dst = Task()
def CreateJobCallback(result):
def Job():
cb(result)
self.AddJob(dst, Job)
src.AddCallback(CreateJobCallback)
return dst
# The global scheduler
scheduler = None
def StartScheduler(size):
global scheduler
if scheduler:
StopScheduler()
scheduler = Scheduler(size)
scheduler.Start()
def StopScheduler(now = False):
global scheduler
if scheduler:
scheduler.Stop(now)
scheduler = None
if __name__ == '__main__':
from time import sleep
logging.getLogger().setLevel(logging.DEBUG)
# This function is a sample and shouldn't know about the scheduler
count = 0
def AsyncCall():
global count
count += 1
def Initialize(name, sleep):
print "Here", name
return name, sleep
def Blocking(args):
name, time = args
print name, "goes to bed"
sleep(time)
print name, ": ZZZ..."
return name
def Finalize(name):
global count
print name, "wakes up!"
count -= 1
dinit = Task(Initialize, "Toto", 5)
# How can I remove the scheduler from the picture ?
# The only way I can see is to have a special kind of Task
# and a suspended queue... May be this will also be more clean
dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
dfinal.AddCallback(Finalize)
# This is confusing but the reason is that the dfinal callback
# will be added to the scheduler by the job itself at the end of
# its execution.
return dinit
logging.info("Starting scheduler with 10 workers")
StartScheduler(10)
dasync = AsyncCall()
logging.info("Adding asynccall task")
scheduler.AddTask(dasync)
while count > 0:
logging.debug("Count = %d", count)
sleep(1)
logging.info("Stopping scheduler")
StopScheduler()
logging.info("The End.")