scheduler.py
changeset 4 76ba9b3a9e1c
parent 3 00b6708d1852
child 5 eb1133af54ed
equal deleted inserted replaced
3:00b6708d1852 4:76ba9b3a9e1c
    70 
    70 
    71     def ChainTask(self, task):
    71     def ChainTask(self, task):
    72         self.tail.Chain(task.head)
    72         self.tail.Chain(task.head)
    73         self.tail = task.tail
    73         self.tail = task.tail
    74 
    74 
       
    75     def GrabResult(self, data = None):
       
    76         if data is None:
       
    77             data = {}
       
    78         def SetResult(result):
       
    79             data["result"] = result
       
    80             return result
       
    81         def SetError(error):
       
    82             data["error"] = error
       
    83         self.AddCallback(SetResult, SetError)
       
    84         return data
       
    85 
       
    86     def SetupEvent(self, event = None, data = None):
       
    87         if not event:
       
    88             event = threading.Event()
       
    89         def SetEvent(dummy):
       
    90             event.set()
       
    91         self.AddCallback(SetEvent, SetEvent)
       
    92         return event
       
    93 
       
    94 ##
       
    95 # Helper class
    75 class ThreadedTask(Task):
    96 class ThreadedTask(Task):
    76     def __init__(self, func, *args, **kwargs):
    97     def __init__(self, func, *args, **kwargs):
    77         super(ThreadedTask, self).__init__(func, *args, **kwargs)
    98         super(ThreadedTask, self).__init__(func, *args, **kwargs)
    78         self.head.threaded = True
    99         self.head.threaded = True
    79     
   100 
    80 class Scheduler(threading.Thread):
   101 class Scheduler(threading.Thread):
    81 
   102 
    82     class _SchedulerStop(Exception):
   103     class _SchedulerStop(Exception):
    83         pass
   104         pass
    84 
   105 
   146         def RaiseSchedulerStop():
   167         def RaiseSchedulerStop():
   147             raise self._SchedulerStop
   168             raise self._SchedulerStop
   148         self.AddTask(Task(RaiseSchedulerStop))
   169         self.AddTask(Task(RaiseSchedulerStop))
   149         self.join()
   170         self.join()
   150 
   171 
   151     def AddTask(self, task, blocking = True):
   172     def AddTask(self, task):
   152         self.tasks.put(task, blocking)
   173         self.tasks.put(task)
   153 
   174 
   154     def _AddJob(self, task, cb, result, error, traceback):
   175     def _AddJob(self, task, cb, result, error, traceback):
   155 
   176 
   156         def DoIt(task, cb, result, error, traceback):
   177         def DoIt(task, cb, result, error, traceback):
   157             try:
   178             try:
   216             return name
   237             return name
   217         def Finalize(name):
   238         def Finalize(name):
   218             global count
   239             global count
   219             print name, "wakes up!"
   240             print name, "wakes up!"
   220             count -= 1
   241             count -= 1
       
   242             return name
   221 
   243 
   222         task = Task(Initialize, name, seconds)
   244         task = Task(Initialize, name, seconds)
   223         task.AddThreadedCallback(Blocking)
   245         task.AddThreadedCallback(Blocking)
   224         task.AddCallback(Finalize)
   246         task.AddCallback(Finalize)
   225         return task
   247         return task
   231         task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
   253         task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
   232         scheduler.AddTask(task)
   254         scheduler.AddTask(task)
   233     while count > 0:
   255     while count > 0:
   234         logging.debug("Count = %d", count)
   256         logging.debug("Count = %d", count)
   235         sleep(1)
   257         sleep(1)
       
   258 
       
   259     # Check for King Toto sleep
       
   260     task = AsyncCall("King Toto", 5)
       
   261     data = task.GrabResult()
       
   262     event = task.SetupEvent()
       
   263     scheduler.AddTask(task)
       
   264     try:
       
   265         event.wait(10)
       
   266         print "data = %r" % (data,)
       
   267     except:
       
   268         logging.exception("Error occured on wait")
   236     logging.info("Stopping scheduler")
   269     logging.info("Stopping scheduler")
   237     StopScheduler()
   270     StopScheduler()
   238     logging.info("The End.")
   271     logging.info("The End.")