scheduler.py
changeset 1 ee10f7cde07d
parent 0 8ae7370093db
child 2 a00ae018daf8
equal deleted inserted replaced
0:8ae7370093db 1:ee10f7cde07d
       
     1 #!/usr/bin/env python2.6
       
     2 import sys
       
     3 import logging
       
     4 import threading
       
     5 import Queue
       
     6 
       
     7 ##
       
     8 # A basic worker pool... should be replaced with a resizable one.
       
     9 class ThreadPool(object):
       
    10     
       
    11     class _Worker(threading.Thread):
       
    12         def __init__(self, uid, pool):
       
    13             threading.Thread.__init__(self, name = "Worker%d" % uid)
       
    14             self.pool = pool
       
    15         def run(self):
       
    16             logging.info("%s started", self.getName())
       
    17             while True:
       
    18                 job = self.pool.jobs.get()
       
    19                 if job:
       
    20                     logging.debug("%s is running %r", self.getName(), job)
       
    21                     try:
       
    22                         job()
       
    23                     except:
       
    24                         logging.exception("Job %r return an exception", job)
       
    25                     logging.debug("Job %r completed.", job)
       
    26                 else:
       
    27                     self.pool.jobs.put(job)
       
    28                     break
       
    29             logging.info("%s stopped", self.getName())
       
    30 
       
    31     def __init__(self, size):
       
    32         super(ThreadPool, self).__init__()
       
    33         self.jobs = Queue.Queue()
       
    34         self._workers = [self._Worker(x+1, self) for x in xrange(size)]
       
    35 
       
    36     def Start(self):
       
    37         logging.info("Starting Workers pool")
       
    38         for worker in self._workers:
       
    39             worker.start()
       
    40         logging.info("Workers started")
       
    41 
       
    42     def Stop(self, now = False):
       
    43         logging.info("Stopping Workers")
       
    44         if now:
       
    45             self.jobs = Queue.Queue()
       
    46         self.jobs.put(None)
       
    47         for worker in self._workers:
       
    48             worker.join()
       
    49         logging.info("Workers stopped")
       
    50 
       
    51     def AddJob(self, job):
       
    52         self.jobs.put(job)
       
    53 
       
    54 ##
       
    55 # A task, representing a series of linked pairs of callback and error
       
    56 # handler Concept is similar to twisted, but you need to put all your
       
    57 # callback on the task before giving it to the Scheduler.  For this
       
    58 # reason, you shouldnt call the callback/errorback method yourself (or
       
    59 # at least, don't put it back in the scheduler after that).
       
    60 
       
    61 class Task(object):
       
    62 
       
    63     @staticmethod
       
    64     def DefaultCallback(result):
       
    65         return result
       
    66 
       
    67     @staticmethod
       
    68     def DefaultErrorback(error):
       
    69         return error
       
    70 
       
    71     def __init__(self, func = None, *args, **kwargs):
       
    72         super(Task, self).__init__()
       
    73         self.callbacks = []
       
    74         if func:
       
    75             def callback(result):
       
    76                 return func(*args, **kwargs)
       
    77             self.AddCallback(callback)
       
    78 
       
    79     def AddCallback(self, callback, errorback = None):
       
    80         if errorback == None:
       
    81             errorback = self.DefaultErrorback
       
    82         self.callbacks.append((callback, errorback))
       
    83         # permit chained calls
       
    84         return self
       
    85 
       
    86     def ChainTask(self, task):
       
    87         return self.AddCallback(task.Callback, task.Errorback)
       
    88 
       
    89     def Callback(self, result, error = None, traceback = None):
       
    90         logging.debug("Handling task %r callbacks", self)
       
    91         for cb, eb in self.callbacks:
       
    92             try:
       
    93                 if error:
       
    94                     error = eb(error)
       
    95                 else:
       
    96                     result = cb(result)
       
    97             except:
       
    98                 errtype, error, traceback = sys.exc_info()
       
    99         if error:
       
   100             raise error, None, traceback
       
   101         return result
       
   102 
       
   103     ##
       
   104     # Consume the callbacks with an error.  Notes that error will
       
   105     # raise if not handled and return value would be a result
       
   106     def Errorback(self, error, traceback = None):
       
   107         return self.Callback(None, error, traceback)
       
   108 
       
   109 class Scheduler(threading.Thread):
       
   110 
       
   111     class _SchedulerStop(Exception):
       
   112         pass
       
   113 
       
   114     def __init__(self, poolSize):
       
   115         threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
       
   116         self.pool = ThreadPool(poolSize)
       
   117         self.tasks = Queue.Queue()
       
   118 
       
   119     def ExecuteOne(self, blocking = True):
       
   120         logging.debug("Looking for next task...")
       
   121         try:
       
   122             task = self.tasks.get(blocking)
       
   123         except Queue.Empty:
       
   124             logging.debug("No task to run")
       
   125             return
       
   126         return task.Callback(None)
       
   127 
       
   128     def Run(self):
       
   129         logging.info("Scheduler start")
       
   130         while True:
       
   131             try:
       
   132                 self.ExecuteOne()
       
   133             except self._SchedulerStop:
       
   134                 break
       
   135             except:
       
   136                 logging.exception("Unhandled task exception")
       
   137         logging.info("Scheduler stop")
       
   138 
       
   139     def Start(self):
       
   140         self.pool.Start()
       
   141         return self.start()
       
   142 
       
   143     def Stop(self, now = False):
       
   144         self.pool.Stop(now)
       
   145         if now:
       
   146             self.tasks = Queue.Queue()
       
   147         def RaiseSchedulerStop():
       
   148             raise self._SchedulerStop
       
   149         self.AddTask(Task(RaiseSchedulerStop))
       
   150         self.join()
       
   151 
       
   152     def AddTask(self, task, blocking = True):
       
   153         self.tasks.put(task, blocking)
       
   154 
       
   155     ##
       
   156     # A job is a task run in a seperated thread.  After the job run, a
       
   157     # new Task is add to the scheduler either with a result or an error,
       
   158     # so that only the task, not the callbacks, is run in the worker
       
   159     # thread. Note the passed task is consumed after this call.
       
   160     # A better design would have to allow Task to suspend or
       
   161     # resume themself and to run in the thread if it want to, but this will
       
   162     # required the callback to know about its Task and the scheduler
       
   163     # itself, which solve nothing.
       
   164     def AddJob(self, task, func, *args, **kwargs):
       
   165         def Job():
       
   166             try:
       
   167                 result = func(*args, **kwargs)
       
   168                 def returnResult():
       
   169                     return result
       
   170                 jobTask = Task(returnResult)
       
   171             except:
       
   172                 errtype, error, traceback = sys.exc_info()
       
   173                 def raiseError():
       
   174                     raise error, None, traceback
       
   175                 jobTask = Task(raiseError)
       
   176             jobTask.ChainTask(task)
       
   177             self.AddTask(jobTask)
       
   178         self.pool.AddJob(Job)
       
   179 
       
   180     ##
       
   181     # This basically allow one callback to run in a seperated thread
       
   182     # and then get the result to another deferred callback.  Helas,
       
   183     # this create two Tasks, one for the caller, one for the
       
   184     # callbacks, with only the first one having to be passed to the
       
   185     # scheduler.  This should be more transparent: a callback which
       
   186     # required to be run as a job should simply be mark like it and
       
   187     # the current task suspend until the job is finished, then resume.
       
   188     def CreateCallbackAsJob(self, src, cb):
       
   189         dst = Task()
       
   190         def CreateJobCallback(result):
       
   191             def Job():
       
   192                 cb(result)
       
   193             self.AddJob(dst, Job)
       
   194         src.AddCallback(CreateJobCallback)
       
   195         return dst
       
   196 
       
   197 # The global scheduler
       
   198 scheduler = None
       
   199 
       
   200 def StartScheduler(size):
       
   201     global scheduler
       
   202     if scheduler:
       
   203         StopScheduler()
       
   204     scheduler = Scheduler(size)
       
   205     scheduler.Start()
       
   206 
       
   207 def StopScheduler(now = False):
       
   208     global scheduler
       
   209     if scheduler:
       
   210         scheduler.Stop(now)
       
   211     scheduler = None
       
   212 
       
   213 if __name__ == '__main__':
       
   214     from time import sleep
       
   215     logging.getLogger().setLevel(logging.DEBUG)
       
   216     # This function is a sample and shouldn't know about the scheduler
       
   217     count = 0
       
   218     def AsyncCall():
       
   219         global count
       
   220         count += 1
       
   221         def Initialize(name, sleep):
       
   222             print "Here", name
       
   223             return name, sleep
       
   224         def Blocking(args):
       
   225             name, time = args
       
   226             print name, "goes to bed"
       
   227             sleep(time)
       
   228             print name, ": ZZZ..."
       
   229             return name
       
   230         def Finalize(name):
       
   231             global count
       
   232             print name, "wakes up!"
       
   233             count -= 1
       
   234 
       
   235         dinit = Task(Initialize, "Toto", 5)
       
   236         # How can I remove the scheduler from the picture ?
       
   237         # The only way I can see is to have a special kind of Task
       
   238         # and a suspended queue... May be this will also be more clean
       
   239         dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
       
   240         dfinal.AddCallback(Finalize)
       
   241         # This is confusing but the reason is that the dfinal callback
       
   242         # will be added to the scheduler by the job itself at the end of
       
   243         # its execution.
       
   244         return dinit
       
   245 
       
   246     logging.info("Starting scheduler with 10 workers")
       
   247     StartScheduler(10)
       
   248     dasync = AsyncCall()
       
   249     logging.info("Adding asynccall task")
       
   250     scheduler.AddTask(dasync)
       
   251     while count > 0:
       
   252         logging.debug("Count = %d", count)
       
   253         sleep(1)
       
   254     logging.info("Stopping scheduler")
       
   255     StopScheduler()
       
   256     logging.info("The End.")