scheduler.py
changeset 3 00b6708d1852
parent 2 a00ae018daf8
child 4 76ba9b3a9e1c
equal deleted inserted replaced
2:a00ae018daf8 3:00b6708d1852
    10 # handler Concept is similar to twisted, but you need to put all your
    10 # handler Concept is similar to twisted, but you need to put all your
    11 # callback on the task before giving it to the Scheduler.  For this
    11 # callback on the task before giving it to the Scheduler.  For this
    12 # reason, you shouldnt call the callback/errorback method yourself (or
    12 # reason, you shouldnt call the callback/errorback method yourself (or
    13 # at least, don't put it back in the scheduler after that).
    13 # at least, don't put it back in the scheduler after that).
    14 
    14 
       
    15 class _Callback(object):
       
    16     def __init__(self, callback, errorback, threaded = False):
       
    17         self.callback = callback
       
    18         self.errorback = errorback
       
    19         self.next = None
       
    20         self.threaded = threaded
       
    21     def Chain(self, next):
       
    22         # Can only be called once
       
    23         assert(self.next is None)
       
    24         self.next = next
       
    25         return next
       
    26     def Next(self):
       
    27         return self.next
       
    28 
    15 class Task(object):
    29 class Task(object):
    16 
    30 
    17     @staticmethod
    31     @staticmethod
    18     def DefaultCallback(result):
    32     def DefaultCallback(result):
    19         return result
    33         return result
    22     def DefaultErrorback(error):
    36     def DefaultErrorback(error):
    23         return error
    37         return error
    24 
    38 
    25     def __init__(self, func = None, *args, **kwargs):
    39     def __init__(self, func = None, *args, **kwargs):
    26         super(Task, self).__init__()
    40         super(Task, self).__init__()
    27         self.callbacks = []
    41         self.head = None
       
    42         self.tail = None
    28         if func:
    43         if func:
    29             def callback(result):
    44             def callback(result):
    30                 return func(*args, **kwargs)
    45                 return func(*args, **kwargs)
    31             self.AddCallback(callback)
    46             self.AddCallback(callback)
    32 
    47 
    33     def AddCallback(self, callback, errorback = None):
    48     def _AddCallback(self, callback):
       
    49         if self.head is None:
       
    50             self.head = self.tail = callback
       
    51         else:
       
    52             self.tail = self.tail.Chain(callback)
       
    53 
       
    54     def _GetNext(self):
       
    55         head = self.head
       
    56         if head:
       
    57             self.head = head.Next()
       
    58         return head
       
    59 
       
    60     def AddCallback(self, callback, errorback = None, threaded = False):
    34         if errorback == None:
    61         if errorback == None:
    35             errorback = self.DefaultErrorback
    62             errorback = self.DefaultErrorback
    36         self.callbacks.append((callback, errorback))
    63         cb = _Callback(callback, errorback, threaded)
       
    64         self._AddCallback(cb)
    37         # permit chained calls
    65         # permit chained calls
    38         return self
    66         return self
    39 
    67 
       
    68     def AddThreadedCallback(self, callback, errorback = None):
       
    69         return self.AddCallback(callback, errorback, True)
       
    70 
    40     def ChainTask(self, task):
    71     def ChainTask(self, task):
    41         return self.AddCallback(task.Callback, task.Errorback)
    72         self.tail.Chain(task.head)
    42 
    73         self.tail = task.tail
    43     def Callback(self, result, error = None, traceback = None):
    74 
    44         logging.debug("Handling task %r callbacks", self)
    75 class ThreadedTask(Task):
    45         for cb, eb in self.callbacks:
    76     def __init__(self, func, *args, **kwargs):
    46             try:
    77         super(ThreadedTask, self).__init__(func, *args, **kwargs)
    47                 if error:
    78         self.head.threaded = True
    48                     error = eb(error)
    79     
    49                 else:
       
    50                     result = cb(result)
       
    51             except:
       
    52                 errtype, error, traceback = sys.exc_info()
       
    53         if error:
       
    54             raise error, None, traceback
       
    55         return result
       
    56 
       
    57     ##
       
    58     # Consume the callbacks with an error.  Notes that error will
       
    59     # raise if not handled and return value would be a result
       
    60     def Errorback(self, error, traceback = None):
       
    61         return self.Callback(None, error, traceback)
       
    62 
       
    63 class Scheduler(threading.Thread):
    80 class Scheduler(threading.Thread):
    64 
    81 
    65     class _SchedulerStop(Exception):
    82     class _SchedulerStop(Exception):
    66         pass
    83         pass
    67 
    84 
    74         logging.debug("Looking for next task...")
    91         logging.debug("Looking for next task...")
    75         try:
    92         try:
    76             task = self.tasks.get(blocking)
    93             task = self.tasks.get(blocking)
    77         except Queue.Empty:
    94         except Queue.Empty:
    78             logging.debug("No task to run")
    95             logging.debug("No task to run")
    79             return
    96             return None
    80         return task.Callback(None)
    97         result = None
       
    98         error = None
       
    99         traceback = None
       
   100         while True:
       
   101             cb = task._GetNext()
       
   102             if not cb:
       
   103                 # no more callback
       
   104                 break
       
   105             if cb.threaded:
       
   106                 # a threaded callback
       
   107                 self._AddJob(task, cb, result, error, traceback)
       
   108                 # don't pass Go, don't reclaim $200
       
   109                 return None
       
   110             # Run the callback according to the current state
       
   111             try:
       
   112                 if error:
       
   113                     error = cb.errorback(error)
       
   114                 else:
       
   115                     result = cb.callback(result)
       
   116             except:
       
   117                 errtype, error, traceback = sys.exc_info()
       
   118         if error:
       
   119             raise error, None, traceback
       
   120         else:
       
   121             return result
    81 
   122 
    82     def Run(self):
   123     def Run(self):
    83         logging.info("Scheduler start")
   124         logging.info("Scheduler start")
    84         while True:
   125         while True:
    85             try:
   126             try:
    96 
   137 
    97     def Stop(self, now = False):
   138     def Stop(self, now = False):
    98         self.pool.Stop(now)
   139         self.pool.Stop(now)
    99         if now:
   140         if now:
   100             self.tasks = Queue.Queue()
   141             self.tasks = Queue.Queue()
       
   142         # We raise an exception to find if we stop stop the scheduler.
       
   143         # We could have use a None task, but this make it easier if we
       
   144         # want to add such mechanism public or we want to stop on
       
   145         # other exception
   101         def RaiseSchedulerStop():
   146         def RaiseSchedulerStop():
   102             raise self._SchedulerStop
   147             raise self._SchedulerStop
   103         self.AddTask(Task(RaiseSchedulerStop))
   148         self.AddTask(Task(RaiseSchedulerStop))
   104         self.join()
   149         self.join()
   105 
   150 
   106     def AddTask(self, task, blocking = True):
   151     def AddTask(self, task, blocking = True):
   107         self.tasks.put(task, blocking)
   152         self.tasks.put(task, blocking)
   108 
   153 
   109     ##
   154     def _AddJob(self, task, cb, result, error, traceback):
   110     # A job is a task run in a seperated thread.  After the job run, a
   155 
   111     # new Task is add to the scheduler either with a result or an error,
   156         def DoIt(task, cb, result, error, traceback):
   112     # so that only the task, not the callbacks, is run in the worker
       
   113     # thread. Note the passed task is consumed after this call.
       
   114     # A better design would have to allow Task to suspend or
       
   115     # resume themself and to run in the thread if it want to, but this will
       
   116     # required the callback to know about its Task and the scheduler
       
   117     # itself, which solve nothing.
       
   118     def AddJob(self, task, func, *args, **kwargs):
       
   119         def Job():
       
   120             try:
   157             try:
   121                 result = func(*args, **kwargs)
   158                 if error:
   122                 def returnResult():
   159                     error = cb.errorback(error)
   123                     return result
   160                 else:
   124                 jobTask = Task(returnResult)
   161                     result = cb.callback(result)
   125             except:
   162             except:
   126                 errtype, error, traceback = sys.exc_info()
   163                 errtype, error, traceback = sys.exc_info()
   127                 def raiseError():
   164             if error:
       
   165                 def RaiseError():
   128                     raise error, None, traceback
   166                     raise error, None, traceback
   129                 jobTask = Task(raiseError)
   167                 jobTask = Task(RaiseError)
       
   168             else:
       
   169                 def ReturnResult():
       
   170                     return result
       
   171                 jobTask = Task(ReturnResult)
   130             jobTask.ChainTask(task)
   172             jobTask.ChainTask(task)
   131             self.AddTask(jobTask)
   173             self.AddTask(jobTask)
       
   174 
       
   175         # This double wrap (Job over DoIt) seems necessary to make
       
   176         # error not look like a local of Job...
       
   177         def Job():
       
   178             return DoIt(task, cb, result, error, traceback)
   132         self.pool.AddJob(Job)
   179         self.pool.AddJob(Job)
   133 
       
   134     ##
       
   135     # This basically allow one callback to run in a seperated thread
       
   136     # and then get the result to another deferred callback.  Helas,
       
   137     # this create two Tasks, one for the caller, one for the
       
   138     # callbacks, with only the first one having to be passed to the
       
   139     # scheduler.  This should be more transparent: a callback which
       
   140     # required to be run as a job should simply be mark like it and
       
   141     # the current task suspend until the job is finished, then resume.
       
   142     def CreateCallbackAsJob(self, src, cb):
       
   143         dst = Task()
       
   144         def CreateJobCallback(result):
       
   145             def Job():
       
   146                 return cb(result)
       
   147             self.AddJob(dst, Job)
       
   148         src.AddCallback(CreateJobCallback)
       
   149         return dst
       
   150 
   180 
   151 # The global scheduler
   181 # The global scheduler
   152 scheduler = None
   182 scheduler = None
   153 
   183 
   154 def StartScheduler(size):
   184 def StartScheduler(size):
   170     # This function is a sample and shouldn't know about the scheduler
   200     # This function is a sample and shouldn't know about the scheduler
   171     count = 0
   201     count = 0
   172     def AsyncCall(name, seconds):
   202     def AsyncCall(name, seconds):
   173         global count
   203         global count
   174         count += 1
   204         count += 1
       
   205         
       
   206         # Probably a bad example, since the callback
       
   207         # doesn't return the exact same type...
   175         def Initialize(name, seconds):
   208         def Initialize(name, seconds):
   176             print "Here", name
   209             print "Here", name
   177             return name, seconds
   210             return name, seconds
   178         def Blocking(args):
   211         def Blocking(args):
   179             name, time = args
   212             name, time = args
   184         def Finalize(name):
   217         def Finalize(name):
   185             global count
   218             global count
   186             print name, "wakes up!"
   219             print name, "wakes up!"
   187             count -= 1
   220             count -= 1
   188 
   221 
   189         dinit = Task(Initialize, name, seconds)
   222         task = Task(Initialize, name, seconds)
   190         # How can I remove the scheduler from the picture ?
   223         task.AddThreadedCallback(Blocking)
   191         # The only way I can see is to have a special kind of Task
   224         task.AddCallback(Finalize)
   192         # and a suspended queue... May be this will also be more clean
   225         return task
   193         dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
       
   194         dfinal.AddCallback(Finalize)
       
   195         # This is confusing but the reason is that the dfinal callback
       
   196         # will be added to the scheduler by the job itself at the end of
       
   197         # its execution.
       
   198         return dinit
       
   199 
   226 
   200     logging.info("Starting scheduler with 10 workers")
   227     logging.info("Starting scheduler with 10 workers")
   201     StartScheduler(10)
   228     StartScheduler(10)
   202     logging.info("Adding asynccall task")
   229     logging.info("Adding asynccall task")
   203     for x in xrange(int(sys.argv[1])):
   230     for x in xrange(int(sys.argv[1])):
   204         dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
   231         task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
   205         scheduler.AddTask(dasync)
   232         scheduler.AddTask(task)
   206     while count > 0:
   233     while count > 0:
   207         logging.debug("Count = %d", count)
   234         logging.debug("Count = %d", count)
   208         sleep(1)
   235         sleep(1)
   209     logging.info("Stopping scheduler")
   236     logging.info("Stopping scheduler")
   210     StopScheduler()
   237     StopScheduler()