scheduler.py
changeset 5 eb1133af54ed
parent 4 76ba9b3a9e1c
child 6 6657247ddbbf
equal deleted inserted replaced
4:76ba9b3a9e1c 5:eb1133af54ed
     2 import sys
     2 import sys
     3 import logging
     3 import logging
     4 import threading
     4 import threading
     5 import Queue
     5 import Queue
     6 from threadpool import ThreadPool
     6 from threadpool import ThreadPool
     7 
     7 from task import Task, ThreadedTask
     8 ##
       
     9 # A task, representing a series of linked pairs of callback and error
       
    10 # handler Concept is similar to twisted, but you need to put all your
       
    11 # callback on the task before giving it to the Scheduler.  For this
       
    12 # reason, you shouldnt call the callback/errorback method yourself (or
       
    13 # at least, don't put it back in the scheduler after that).
       
    14 
       
    15 class _Callback(object):
       
    16     def __init__(self, callback, errorback, threaded = False):
       
    17         self.callback = callback
       
    18         self.errorback = errorback
       
    19         self.next = None
       
    20         self.threaded = threaded
       
    21     def Chain(self, next):
       
    22         # Can only be called once
       
    23         assert(self.next is None)
       
    24         self.next = next
       
    25         return next
       
    26     def Next(self):
       
    27         return self.next
       
    28 
       
    29 class Task(object):
       
    30 
       
    31     @staticmethod
       
    32     def DefaultCallback(result):
       
    33         return result
       
    34 
       
    35     @staticmethod
       
    36     def DefaultErrorback(error):
       
    37         return error
       
    38 
       
    39     def __init__(self, func = None, *args, **kwargs):
       
    40         super(Task, self).__init__()
       
    41         self.head = None
       
    42         self.tail = None
       
    43         if func:
       
    44             def callback(result):
       
    45                 return func(*args, **kwargs)
       
    46             self.AddCallback(callback)
       
    47 
       
    48     def _AddCallback(self, callback):
       
    49         if self.head is None:
       
    50             self.head = self.tail = callback
       
    51         else:
       
    52             self.tail = self.tail.Chain(callback)
       
    53 
       
    54     def _GetNext(self):
       
    55         head = self.head
       
    56         if head:
       
    57             self.head = head.Next()
       
    58         return head
       
    59 
       
    60     def AddCallback(self, callback, errorback = None, threaded = False):
       
    61         if errorback == None:
       
    62             errorback = self.DefaultErrorback
       
    63         cb = _Callback(callback, errorback, threaded)
       
    64         self._AddCallback(cb)
       
    65         # permit chained calls
       
    66         return self
       
    67 
       
    68     def AddThreadedCallback(self, callback, errorback = None):
       
    69         return self.AddCallback(callback, errorback, True)
       
    70 
       
    71     def ChainTask(self, task):
       
    72         self.tail.Chain(task.head)
       
    73         self.tail = task.tail
       
    74 
       
    75     def GrabResult(self, data = None):
       
    76         if data is None:
       
    77             data = {}
       
    78         def SetResult(result):
       
    79             data["result"] = result
       
    80             return result
       
    81         def SetError(error):
       
    82             data["error"] = error
       
    83         self.AddCallback(SetResult, SetError)
       
    84         return data
       
    85 
       
    86     def SetupEvent(self, event = None, data = None):
       
    87         if not event:
       
    88             event = threading.Event()
       
    89         def SetEvent(dummy):
       
    90             event.set()
       
    91         self.AddCallback(SetEvent, SetEvent)
       
    92         return event
       
    93 
       
    94 ##
       
    95 # Helper class
       
    96 class ThreadedTask(Task):
       
    97     def __init__(self, func, *args, **kwargs):
       
    98         super(ThreadedTask, self).__init__(func, *args, **kwargs)
       
    99         self.head.threaded = True
       
   100 
     8 
   101 class Scheduler(threading.Thread):
     9 class Scheduler(threading.Thread):
   102 
    10 
   103     class _SchedulerStop(Exception):
    11     class _SchedulerStop(Exception):
   104         pass
    12         pass