70 |
70 |
71 def ChainTask(self, task): |
71 def ChainTask(self, task): |
72 self.tail.Chain(task.head) |
72 self.tail.Chain(task.head) |
73 self.tail = task.tail |
73 self.tail = task.tail |
74 |
74 |
|
75 def GrabResult(self, data = None): |
|
76 if data is None: |
|
77 data = {} |
|
78 def SetResult(result): |
|
79 data["result"] = result |
|
80 return result |
|
81 def SetError(error): |
|
82 data["error"] = error |
|
83 self.AddCallback(SetResult, SetError) |
|
84 return data |
|
85 |
|
86 def SetupEvent(self, event = None, data = None): |
|
87 if not event: |
|
88 event = threading.Event() |
|
89 def SetEvent(dummy): |
|
90 event.set() |
|
91 self.AddCallback(SetEvent, SetEvent) |
|
92 return event |
|
93 |
|
94 ## |
|
95 # Helper class |
75 class ThreadedTask(Task): |
96 class ThreadedTask(Task): |
76 def __init__(self, func, *args, **kwargs): |
97 def __init__(self, func, *args, **kwargs): |
77 super(ThreadedTask, self).__init__(func, *args, **kwargs) |
98 super(ThreadedTask, self).__init__(func, *args, **kwargs) |
78 self.head.threaded = True |
99 self.head.threaded = True |
79 |
100 |
80 class Scheduler(threading.Thread): |
101 class Scheduler(threading.Thread): |
81 |
102 |
82 class _SchedulerStop(Exception): |
103 class _SchedulerStop(Exception): |
83 pass |
104 pass |
84 |
105 |
146 def RaiseSchedulerStop(): |
167 def RaiseSchedulerStop(): |
147 raise self._SchedulerStop |
168 raise self._SchedulerStop |
148 self.AddTask(Task(RaiseSchedulerStop)) |
169 self.AddTask(Task(RaiseSchedulerStop)) |
149 self.join() |
170 self.join() |
150 |
171 |
151 def AddTask(self, task, blocking = True): |
172 def AddTask(self, task): |
152 self.tasks.put(task, blocking) |
173 self.tasks.put(task) |
153 |
174 |
154 def _AddJob(self, task, cb, result, error, traceback): |
175 def _AddJob(self, task, cb, result, error, traceback): |
155 |
176 |
156 def DoIt(task, cb, result, error, traceback): |
177 def DoIt(task, cb, result, error, traceback): |
157 try: |
178 try: |
216 return name |
237 return name |
217 def Finalize(name): |
238 def Finalize(name): |
218 global count |
239 global count |
219 print name, "wakes up!" |
240 print name, "wakes up!" |
220 count -= 1 |
241 count -= 1 |
|
242 return name |
221 |
243 |
222 task = Task(Initialize, name, seconds) |
244 task = Task(Initialize, name, seconds) |
223 task.AddThreadedCallback(Blocking) |
245 task.AddThreadedCallback(Blocking) |
224 task.AddCallback(Finalize) |
246 task.AddCallback(Finalize) |
225 return task |
247 return task |
231 task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
253 task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
232 scheduler.AddTask(task) |
254 scheduler.AddTask(task) |
233 while count > 0: |
255 while count > 0: |
234 logging.debug("Count = %d", count) |
256 logging.debug("Count = %d", count) |
235 sleep(1) |
257 sleep(1) |
|
258 |
|
259 # Check for King Toto sleep |
|
260 task = AsyncCall("King Toto", 5) |
|
261 data = task.GrabResult() |
|
262 event = task.SetupEvent() |
|
263 scheduler.AddTask(task) |
|
264 try: |
|
265 event.wait(10) |
|
266 print "data = %r" % (data,) |
|
267 except: |
|
268 logging.exception("Error occured on wait") |
236 logging.info("Stopping scheduler") |
269 logging.info("Stopping scheduler") |
237 StopScheduler() |
270 StopScheduler() |
238 logging.info("The End.") |
271 logging.info("The End.") |