2 import sys |
2 import sys |
3 import logging |
3 import logging |
4 import threading |
4 import threading |
5 import Queue |
5 import Queue |
6 from threadpool import ThreadPool |
6 from threadpool import ThreadPool |
7 |
7 from task import Task, ThreadedTask |
8 ## |
|
9 # A task, representing a series of linked pairs of callback and error |
|
10 # handler Concept is similar to twisted, but you need to put all your |
|
11 # callback on the task before giving it to the Scheduler. For this |
|
12 # reason, you shouldnt call the callback/errorback method yourself (or |
|
13 # at least, don't put it back in the scheduler after that). |
|
14 |
|
15 class _Callback(object): |
|
16 def __init__(self, callback, errorback, threaded = False): |
|
17 self.callback = callback |
|
18 self.errorback = errorback |
|
19 self.next = None |
|
20 self.threaded = threaded |
|
21 def Chain(self, next): |
|
22 # Can only be called once |
|
23 assert(self.next is None) |
|
24 self.next = next |
|
25 return next |
|
26 def Next(self): |
|
27 return self.next |
|
28 |
|
29 class Task(object): |
|
30 |
|
31 @staticmethod |
|
32 def DefaultCallback(result): |
|
33 return result |
|
34 |
|
35 @staticmethod |
|
36 def DefaultErrorback(error): |
|
37 return error |
|
38 |
|
39 def __init__(self, func = None, *args, **kwargs): |
|
40 super(Task, self).__init__() |
|
41 self.head = None |
|
42 self.tail = None |
|
43 if func: |
|
44 def callback(result): |
|
45 return func(*args, **kwargs) |
|
46 self.AddCallback(callback) |
|
47 |
|
48 def _AddCallback(self, callback): |
|
49 if self.head is None: |
|
50 self.head = self.tail = callback |
|
51 else: |
|
52 self.tail = self.tail.Chain(callback) |
|
53 |
|
54 def _GetNext(self): |
|
55 head = self.head |
|
56 if head: |
|
57 self.head = head.Next() |
|
58 return head |
|
59 |
|
60 def AddCallback(self, callback, errorback = None, threaded = False): |
|
61 if errorback == None: |
|
62 errorback = self.DefaultErrorback |
|
63 cb = _Callback(callback, errorback, threaded) |
|
64 self._AddCallback(cb) |
|
65 # permit chained calls |
|
66 return self |
|
67 |
|
68 def AddThreadedCallback(self, callback, errorback = None): |
|
69 return self.AddCallback(callback, errorback, True) |
|
70 |
|
71 def ChainTask(self, task): |
|
72 self.tail.Chain(task.head) |
|
73 self.tail = task.tail |
|
74 |
|
75 def GrabResult(self, data = None): |
|
76 if data is None: |
|
77 data = {} |
|
78 def SetResult(result): |
|
79 data["result"] = result |
|
80 return result |
|
81 def SetError(error): |
|
82 data["error"] = error |
|
83 self.AddCallback(SetResult, SetError) |
|
84 return data |
|
85 |
|
86 def SetupEvent(self, event = None, data = None): |
|
87 if not event: |
|
88 event = threading.Event() |
|
89 def SetEvent(dummy): |
|
90 event.set() |
|
91 self.AddCallback(SetEvent, SetEvent) |
|
92 return event |
|
93 |
|
94 ## |
|
95 # Helper class |
|
96 class ThreadedTask(Task): |
|
97 def __init__(self, func, *args, **kwargs): |
|
98 super(ThreadedTask, self).__init__(func, *args, **kwargs) |
|
99 self.head.threaded = True |
|
100 |
8 |
101 class Scheduler(threading.Thread): |
9 class Scheduler(threading.Thread): |
102 |
10 |
103 class _SchedulerStop(Exception): |
11 class _SchedulerStop(Exception): |
104 pass |
12 pass |