10 # handler Concept is similar to twisted, but you need to put all your |
10 # handler Concept is similar to twisted, but you need to put all your |
11 # callback on the task before giving it to the Scheduler. For this |
11 # callback on the task before giving it to the Scheduler. For this |
12 # reason, you shouldnt call the callback/errorback method yourself (or |
12 # reason, you shouldnt call the callback/errorback method yourself (or |
13 # at least, don't put it back in the scheduler after that). |
13 # at least, don't put it back in the scheduler after that). |
14 |
14 |
|
15 class _Callback(object): |
|
16 def __init__(self, callback, errorback, threaded = False): |
|
17 self.callback = callback |
|
18 self.errorback = errorback |
|
19 self.next = None |
|
20 self.threaded = threaded |
|
21 def Chain(self, next): |
|
22 # Can only be called once |
|
23 assert(self.next is None) |
|
24 self.next = next |
|
25 return next |
|
26 def Next(self): |
|
27 return self.next |
|
28 |
15 class Task(object): |
29 class Task(object): |
16 |
30 |
17 @staticmethod |
31 @staticmethod |
18 def DefaultCallback(result): |
32 def DefaultCallback(result): |
19 return result |
33 return result |
22 def DefaultErrorback(error): |
36 def DefaultErrorback(error): |
23 return error |
37 return error |
24 |
38 |
25 def __init__(self, func = None, *args, **kwargs): |
39 def __init__(self, func = None, *args, **kwargs): |
26 super(Task, self).__init__() |
40 super(Task, self).__init__() |
27 self.callbacks = [] |
41 self.head = None |
|
42 self.tail = None |
28 if func: |
43 if func: |
29 def callback(result): |
44 def callback(result): |
30 return func(*args, **kwargs) |
45 return func(*args, **kwargs) |
31 self.AddCallback(callback) |
46 self.AddCallback(callback) |
32 |
47 |
33 def AddCallback(self, callback, errorback = None): |
48 def _AddCallback(self, callback): |
|
49 if self.head is None: |
|
50 self.head = self.tail = callback |
|
51 else: |
|
52 self.tail = self.tail.Chain(callback) |
|
53 |
|
54 def _GetNext(self): |
|
55 head = self.head |
|
56 if head: |
|
57 self.head = head.Next() |
|
58 return head |
|
59 |
|
60 def AddCallback(self, callback, errorback = None, threaded = False): |
34 if errorback == None: |
61 if errorback == None: |
35 errorback = self.DefaultErrorback |
62 errorback = self.DefaultErrorback |
36 self.callbacks.append((callback, errorback)) |
63 cb = _Callback(callback, errorback, threaded) |
|
64 self._AddCallback(cb) |
37 # permit chained calls |
65 # permit chained calls |
38 return self |
66 return self |
39 |
67 |
|
68 def AddThreadedCallback(self, callback, errorback = None): |
|
69 return self.AddCallback(callback, errorback, True) |
|
70 |
40 def ChainTask(self, task): |
71 def ChainTask(self, task): |
41 return self.AddCallback(task.Callback, task.Errorback) |
72 self.tail.Chain(task.head) |
42 |
73 self.tail = task.tail |
43 def Callback(self, result, error = None, traceback = None): |
74 |
44 logging.debug("Handling task %r callbacks", self) |
75 class ThreadedTask(Task): |
45 for cb, eb in self.callbacks: |
76 def __init__(self, func, *args, **kwargs): |
46 try: |
77 super(ThreadedTask, self).__init__(func, *args, **kwargs) |
47 if error: |
78 self.head.threaded = True |
48 error = eb(error) |
79 |
49 else: |
|
50 result = cb(result) |
|
51 except: |
|
52 errtype, error, traceback = sys.exc_info() |
|
53 if error: |
|
54 raise error, None, traceback |
|
55 return result |
|
56 |
|
57 ## |
|
58 # Consume the callbacks with an error. Notes that error will |
|
59 # raise if not handled and return value would be a result |
|
60 def Errorback(self, error, traceback = None): |
|
61 return self.Callback(None, error, traceback) |
|
62 |
|
63 class Scheduler(threading.Thread): |
80 class Scheduler(threading.Thread): |
64 |
81 |
65 class _SchedulerStop(Exception): |
82 class _SchedulerStop(Exception): |
66 pass |
83 pass |
67 |
84 |
74 logging.debug("Looking for next task...") |
91 logging.debug("Looking for next task...") |
75 try: |
92 try: |
76 task = self.tasks.get(blocking) |
93 task = self.tasks.get(blocking) |
77 except Queue.Empty: |
94 except Queue.Empty: |
78 logging.debug("No task to run") |
95 logging.debug("No task to run") |
79 return |
96 return None |
80 return task.Callback(None) |
97 result = None |
|
98 error = None |
|
99 traceback = None |
|
100 while True: |
|
101 cb = task._GetNext() |
|
102 if not cb: |
|
103 # no more callback |
|
104 break |
|
105 if cb.threaded: |
|
106 # a threaded callback |
|
107 self._AddJob(task, cb, result, error, traceback) |
|
108 # don't pass Go, don't reclaim $200 |
|
109 return None |
|
110 # Run the callback according to the current state |
|
111 try: |
|
112 if error: |
|
113 error = cb.errorback(error) |
|
114 else: |
|
115 result = cb.callback(result) |
|
116 except: |
|
117 errtype, error, traceback = sys.exc_info() |
|
118 if error: |
|
119 raise error, None, traceback |
|
120 else: |
|
121 return result |
81 |
122 |
82 def Run(self): |
123 def Run(self): |
83 logging.info("Scheduler start") |
124 logging.info("Scheduler start") |
84 while True: |
125 while True: |
85 try: |
126 try: |
96 |
137 |
97 def Stop(self, now = False): |
138 def Stop(self, now = False): |
98 self.pool.Stop(now) |
139 self.pool.Stop(now) |
99 if now: |
140 if now: |
100 self.tasks = Queue.Queue() |
141 self.tasks = Queue.Queue() |
|
142 # We raise an exception to find if we stop stop the scheduler. |
|
143 # We could have use a None task, but this make it easier if we |
|
144 # want to add such mechanism public or we want to stop on |
|
145 # other exception |
101 def RaiseSchedulerStop(): |
146 def RaiseSchedulerStop(): |
102 raise self._SchedulerStop |
147 raise self._SchedulerStop |
103 self.AddTask(Task(RaiseSchedulerStop)) |
148 self.AddTask(Task(RaiseSchedulerStop)) |
104 self.join() |
149 self.join() |
105 |
150 |
106 def AddTask(self, task, blocking = True): |
151 def AddTask(self, task, blocking = True): |
107 self.tasks.put(task, blocking) |
152 self.tasks.put(task, blocking) |
108 |
153 |
109 ## |
154 def _AddJob(self, task, cb, result, error, traceback): |
110 # A job is a task run in a seperated thread. After the job run, a |
155 |
111 # new Task is add to the scheduler either with a result or an error, |
156 def DoIt(task, cb, result, error, traceback): |
112 # so that only the task, not the callbacks, is run in the worker |
|
113 # thread. Note the passed task is consumed after this call. |
|
114 # A better design would have to allow Task to suspend or |
|
115 # resume themself and to run in the thread if it want to, but this will |
|
116 # required the callback to know about its Task and the scheduler |
|
117 # itself, which solve nothing. |
|
118 def AddJob(self, task, func, *args, **kwargs): |
|
119 def Job(): |
|
120 try: |
157 try: |
121 result = func(*args, **kwargs) |
158 if error: |
122 def returnResult(): |
159 error = cb.errorback(error) |
123 return result |
160 else: |
124 jobTask = Task(returnResult) |
161 result = cb.callback(result) |
125 except: |
162 except: |
126 errtype, error, traceback = sys.exc_info() |
163 errtype, error, traceback = sys.exc_info() |
127 def raiseError(): |
164 if error: |
|
165 def RaiseError(): |
128 raise error, None, traceback |
166 raise error, None, traceback |
129 jobTask = Task(raiseError) |
167 jobTask = Task(RaiseError) |
|
168 else: |
|
169 def ReturnResult(): |
|
170 return result |
|
171 jobTask = Task(ReturnResult) |
130 jobTask.ChainTask(task) |
172 jobTask.ChainTask(task) |
131 self.AddTask(jobTask) |
173 self.AddTask(jobTask) |
|
174 |
|
175 # This double wrap (Job over DoIt) seems necessary to make |
|
176 # error not look like a local of Job... |
|
177 def Job(): |
|
178 return DoIt(task, cb, result, error, traceback) |
132 self.pool.AddJob(Job) |
179 self.pool.AddJob(Job) |
133 |
|
134 ## |
|
135 # This basically allow one callback to run in a seperated thread |
|
136 # and then get the result to another deferred callback. Helas, |
|
137 # this create two Tasks, one for the caller, one for the |
|
138 # callbacks, with only the first one having to be passed to the |
|
139 # scheduler. This should be more transparent: a callback which |
|
140 # required to be run as a job should simply be mark like it and |
|
141 # the current task suspend until the job is finished, then resume. |
|
142 def CreateCallbackAsJob(self, src, cb): |
|
143 dst = Task() |
|
144 def CreateJobCallback(result): |
|
145 def Job(): |
|
146 return cb(result) |
|
147 self.AddJob(dst, Job) |
|
148 src.AddCallback(CreateJobCallback) |
|
149 return dst |
|
150 |
180 |
151 # The global scheduler |
181 # The global scheduler |
152 scheduler = None |
182 scheduler = None |
153 |
183 |
154 def StartScheduler(size): |
184 def StartScheduler(size): |
184 def Finalize(name): |
217 def Finalize(name): |
185 global count |
218 global count |
186 print name, "wakes up!" |
219 print name, "wakes up!" |
187 count -= 1 |
220 count -= 1 |
188 |
221 |
189 dinit = Task(Initialize, name, seconds) |
222 task = Task(Initialize, name, seconds) |
190 # How can I remove the scheduler from the picture ? |
223 task.AddThreadedCallback(Blocking) |
191 # The only way I can see is to have a special kind of Task |
224 task.AddCallback(Finalize) |
192 # and a suspended queue... May be this will also be more clean |
225 return task |
193 dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) |
|
194 dfinal.AddCallback(Finalize) |
|
195 # This is confusing but the reason is that the dfinal callback |
|
196 # will be added to the scheduler by the job itself at the end of |
|
197 # its execution. |
|
198 return dinit |
|
199 |
226 |
200 logging.info("Starting scheduler with 10 workers") |
227 logging.info("Starting scheduler with 10 workers") |
201 StartScheduler(10) |
228 StartScheduler(10) |
202 logging.info("Adding asynccall task") |
229 logging.info("Adding asynccall task") |
203 for x in xrange(int(sys.argv[1])): |
230 for x in xrange(int(sys.argv[1])): |
204 dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
231 task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
205 scheduler.AddTask(dasync) |
232 scheduler.AddTask(task) |
206 while count > 0: |
233 while count > 0: |
207 logging.debug("Count = %d", count) |
234 logging.debug("Count = %d", count) |
208 sleep(1) |
235 sleep(1) |
209 logging.info("Stopping scheduler") |
236 logging.info("Stopping scheduler") |
210 StopScheduler() |
237 StopScheduler() |