|
1 #!/usr/bin/env python2.6 |
|
2 import sys |
|
3 import logging |
|
4 import threading |
|
5 import Queue |
|
6 |
|
7 ## |
|
8 # A basic worker pool... should be replaced with a resizable one. |
|
9 class ThreadPool(object): |
|
10 |
|
11 class _Worker(threading.Thread): |
|
12 def __init__(self, uid, pool): |
|
13 threading.Thread.__init__(self, name = "Worker%d" % uid) |
|
14 self.pool = pool |
|
15 def run(self): |
|
16 logging.info("%s started", self.getName()) |
|
17 while True: |
|
18 job = self.pool.jobs.get() |
|
19 if job: |
|
20 logging.debug("%s is running %r", self.getName(), job) |
|
21 try: |
|
22 job() |
|
23 except: |
|
24 logging.exception("Job %r return an exception", job) |
|
25 logging.debug("Job %r completed.", job) |
|
26 else: |
|
27 self.pool.jobs.put(job) |
|
28 break |
|
29 logging.info("%s stopped", self.getName()) |
|
30 |
|
31 def __init__(self, size): |
|
32 super(ThreadPool, self).__init__() |
|
33 self.jobs = Queue.Queue() |
|
34 self._workers = [self._Worker(x+1, self) for x in xrange(size)] |
|
35 |
|
36 def Start(self): |
|
37 logging.info("Starting Workers pool") |
|
38 for worker in self._workers: |
|
39 worker.start() |
|
40 logging.info("Workers started") |
|
41 |
|
42 def Stop(self, now = False): |
|
43 logging.info("Stopping Workers") |
|
44 if now: |
|
45 self.jobs = Queue.Queue() |
|
46 self.jobs.put(None) |
|
47 for worker in self._workers: |
|
48 worker.join() |
|
49 logging.info("Workers stopped") |
|
50 |
|
51 def AddJob(self, job): |
|
52 self.jobs.put(job) |
|
53 |
|
54 ## |
|
55 # A task, representing a series of linked pairs of callback and error |
|
56 # handler Concept is similar to twisted, but you need to put all your |
|
57 # callback on the task before giving it to the Scheduler. For this |
|
58 # reason, you shouldnt call the callback/errorback method yourself (or |
|
59 # at least, don't put it back in the scheduler after that). |
|
60 |
|
61 class Task(object): |
|
62 |
|
63 @staticmethod |
|
64 def DefaultCallback(result): |
|
65 return result |
|
66 |
|
67 @staticmethod |
|
68 def DefaultErrorback(error): |
|
69 return error |
|
70 |
|
71 def __init__(self, func = None, *args, **kwargs): |
|
72 super(Task, self).__init__() |
|
73 self.callbacks = [] |
|
74 if func: |
|
75 def callback(result): |
|
76 return func(*args, **kwargs) |
|
77 self.AddCallback(callback) |
|
78 |
|
79 def AddCallback(self, callback, errorback = None): |
|
80 if errorback == None: |
|
81 errorback = self.DefaultErrorback |
|
82 self.callbacks.append((callback, errorback)) |
|
83 # permit chained calls |
|
84 return self |
|
85 |
|
86 def ChainTask(self, task): |
|
87 return self.AddCallback(task.Callback, task.Errorback) |
|
88 |
|
89 def Callback(self, result, error = None, traceback = None): |
|
90 logging.debug("Handling task %r callbacks", self) |
|
91 for cb, eb in self.callbacks: |
|
92 try: |
|
93 if error: |
|
94 error = eb(error) |
|
95 else: |
|
96 result = cb(result) |
|
97 except: |
|
98 errtype, error, traceback = sys.exc_info() |
|
99 if error: |
|
100 raise error, None, traceback |
|
101 return result |
|
102 |
|
103 ## |
|
104 # Consume the callbacks with an error. Notes that error will |
|
105 # raise if not handled and return value would be a result |
|
106 def Errorback(self, error, traceback = None): |
|
107 return self.Callback(None, error, traceback) |
|
108 |
|
109 class Scheduler(threading.Thread): |
|
110 |
|
111 class _SchedulerStop(Exception): |
|
112 pass |
|
113 |
|
114 def __init__(self, poolSize): |
|
115 threading.Thread.__init__(self, name = "Scheduler", target = self.Run) |
|
116 self.pool = ThreadPool(poolSize) |
|
117 self.tasks = Queue.Queue() |
|
118 |
|
119 def ExecuteOne(self, blocking = True): |
|
120 logging.debug("Looking for next task...") |
|
121 try: |
|
122 task = self.tasks.get(blocking) |
|
123 except Queue.Empty: |
|
124 logging.debug("No task to run") |
|
125 return |
|
126 return task.Callback(None) |
|
127 |
|
128 def Run(self): |
|
129 logging.info("Scheduler start") |
|
130 while True: |
|
131 try: |
|
132 self.ExecuteOne() |
|
133 except self._SchedulerStop: |
|
134 break |
|
135 except: |
|
136 logging.exception("Unhandled task exception") |
|
137 logging.info("Scheduler stop") |
|
138 |
|
139 def Start(self): |
|
140 self.pool.Start() |
|
141 return self.start() |
|
142 |
|
143 def Stop(self, now = False): |
|
144 self.pool.Stop(now) |
|
145 if now: |
|
146 self.tasks = Queue.Queue() |
|
147 def RaiseSchedulerStop(): |
|
148 raise self._SchedulerStop |
|
149 self.AddTask(Task(RaiseSchedulerStop)) |
|
150 self.join() |
|
151 |
|
152 def AddTask(self, task, blocking = True): |
|
153 self.tasks.put(task, blocking) |
|
154 |
|
155 ## |
|
156 # A job is a task run in a seperated thread. After the job run, a |
|
157 # new Task is add to the scheduler either with a result or an error, |
|
158 # so that only the task, not the callbacks, is run in the worker |
|
159 # thread. Note the passed task is consumed after this call. |
|
160 # A better design would have to allow Task to suspend or |
|
161 # resume themself and to run in the thread if it want to, but this will |
|
162 # required the callback to know about its Task and the scheduler |
|
163 # itself, which solve nothing. |
|
164 def AddJob(self, task, func, *args, **kwargs): |
|
165 def Job(): |
|
166 try: |
|
167 result = func(*args, **kwargs) |
|
168 def returnResult(): |
|
169 return result |
|
170 jobTask = Task(returnResult) |
|
171 except: |
|
172 errtype, error, traceback = sys.exc_info() |
|
173 def raiseError(): |
|
174 raise error, None, traceback |
|
175 jobTask = Task(raiseError) |
|
176 jobTask.ChainTask(task) |
|
177 self.AddTask(jobTask) |
|
178 self.pool.AddJob(Job) |
|
179 |
|
180 ## |
|
181 # This basically allow one callback to run in a seperated thread |
|
182 # and then get the result to another deferred callback. Helas, |
|
183 # this create two Tasks, one for the caller, one for the |
|
184 # callbacks, with only the first one having to be passed to the |
|
185 # scheduler. This should be more transparent: a callback which |
|
186 # required to be run as a job should simply be mark like it and |
|
187 # the current task suspend until the job is finished, then resume. |
|
188 def CreateCallbackAsJob(self, src, cb): |
|
189 dst = Task() |
|
190 def CreateJobCallback(result): |
|
191 def Job(): |
|
192 cb(result) |
|
193 self.AddJob(dst, Job) |
|
194 src.AddCallback(CreateJobCallback) |
|
195 return dst |
|
196 |
|
197 # The global scheduler |
|
198 scheduler = None |
|
199 |
|
200 def StartScheduler(size): |
|
201 global scheduler |
|
202 if scheduler: |
|
203 StopScheduler() |
|
204 scheduler = Scheduler(size) |
|
205 scheduler.Start() |
|
206 |
|
207 def StopScheduler(now = False): |
|
208 global scheduler |
|
209 if scheduler: |
|
210 scheduler.Stop(now) |
|
211 scheduler = None |
|
212 |
|
213 if __name__ == '__main__': |
|
214 from time import sleep |
|
215 logging.getLogger().setLevel(logging.DEBUG) |
|
216 # This function is a sample and shouldn't know about the scheduler |
|
217 count = 0 |
|
218 def AsyncCall(): |
|
219 global count |
|
220 count += 1 |
|
221 def Initialize(name, sleep): |
|
222 print "Here", name |
|
223 return name, sleep |
|
224 def Blocking(args): |
|
225 name, time = args |
|
226 print name, "goes to bed" |
|
227 sleep(time) |
|
228 print name, ": ZZZ..." |
|
229 return name |
|
230 def Finalize(name): |
|
231 global count |
|
232 print name, "wakes up!" |
|
233 count -= 1 |
|
234 |
|
235 dinit = Task(Initialize, "Toto", 5) |
|
236 # How can I remove the scheduler from the picture ? |
|
237 # The only way I can see is to have a special kind of Task |
|
238 # and a suspended queue... May be this will also be more clean |
|
239 dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) |
|
240 dfinal.AddCallback(Finalize) |
|
241 # This is confusing but the reason is that the dfinal callback |
|
242 # will be added to the scheduler by the job itself at the end of |
|
243 # its execution. |
|
244 return dinit |
|
245 |
|
246 logging.info("Starting scheduler with 10 workers") |
|
247 StartScheduler(10) |
|
248 dasync = AsyncCall() |
|
249 logging.info("Adding asynccall task") |
|
250 scheduler.AddTask(dasync) |
|
251 while count > 0: |
|
252 logging.debug("Count = %d", count) |
|
253 sleep(1) |
|
254 logging.info("Stopping scheduler") |
|
255 StopScheduler() |
|
256 logging.info("The End.") |