author | Fabien Ninoles <fabien@tzone.org> |
Sun, 21 Mar 2010 21:40:40 -0400 | |
changeset 4 | 76ba9b3a9e1c |
parent 3 | 00b6708d1852 |
child 5 | eb1133af54ed |
permissions | -rwxr-xr-x |
0 | 1 |
#!/usr/bin/env python2.6 |
2 |
import sys |
|
3 |
import logging |
|
4 |
import threading |
|
5 |
import Queue |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
6 |
from threadpool import ThreadPool |
0 | 7 |
|
8 |
## |
|
9 |
# A task, representing a series of linked pairs of callback and error |
|
10 |
# handler Concept is similar to twisted, but you need to put all your |
|
11 |
# callback on the task before giving it to the Scheduler. For this |
|
12 |
# reason, you shouldnt call the callback/errorback method yourself (or |
|
13 |
# at least, don't put it back in the scheduler after that). |
|
14 |
||
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
15 |
class _Callback(object): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
16 |
def __init__(self, callback, errorback, threaded = False): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
17 |
self.callback = callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
18 |
self.errorback = errorback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
19 |
self.next = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
20 |
self.threaded = threaded |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
21 |
def Chain(self, next): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
22 |
# Can only be called once |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
23 |
assert(self.next is None) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
24 |
self.next = next |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
25 |
return next |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
26 |
def Next(self): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
27 |
return self.next |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
28 |
|
0 | 29 |
class Task(object): |
30 |
||
31 |
@staticmethod |
|
32 |
def DefaultCallback(result): |
|
33 |
return result |
|
34 |
||
35 |
@staticmethod |
|
36 |
def DefaultErrorback(error): |
|
37 |
return error |
|
38 |
||
39 |
def __init__(self, func = None, *args, **kwargs): |
|
40 |
super(Task, self).__init__() |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
41 |
self.head = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
42 |
self.tail = None |
0 | 43 |
if func: |
44 |
def callback(result): |
|
45 |
return func(*args, **kwargs) |
|
46 |
self.AddCallback(callback) |
|
47 |
||
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
48 |
def _AddCallback(self, callback): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
49 |
if self.head is None: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
50 |
self.head = self.tail = callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
51 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
52 |
self.tail = self.tail.Chain(callback) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
53 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
54 |
def _GetNext(self): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
55 |
head = self.head |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
56 |
if head: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
57 |
self.head = head.Next() |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
58 |
return head |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
59 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
60 |
def AddCallback(self, callback, errorback = None, threaded = False): |
0 | 61 |
if errorback == None: |
62 |
errorback = self.DefaultErrorback |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
63 |
cb = _Callback(callback, errorback, threaded) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
64 |
self._AddCallback(cb) |
0 | 65 |
# permit chained calls |
66 |
return self |
|
67 |
||
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
68 |
def AddThreadedCallback(self, callback, errorback = None): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
69 |
return self.AddCallback(callback, errorback, True) |
0 | 70 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
71 |
def ChainTask(self, task): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
72 |
self.tail.Chain(task.head) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
73 |
self.tail = task.tail |
0 | 74 |
|
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
75 |
def GrabResult(self, data = None): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
76 |
if data is None: |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
77 |
data = {} |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
78 |
def SetResult(result): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
79 |
data["result"] = result |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
80 |
return result |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
81 |
def SetError(error): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
82 |
data["error"] = error |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
83 |
self.AddCallback(SetResult, SetError) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
84 |
return data |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
85 |
|
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
86 |
def SetupEvent(self, event = None, data = None): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
87 |
if not event: |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
88 |
event = threading.Event() |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
89 |
def SetEvent(dummy): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
90 |
event.set() |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
91 |
self.AddCallback(SetEvent, SetEvent) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
92 |
return event |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
93 |
|
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
94 |
## |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
95 |
# Helper class |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
96 |
class ThreadedTask(Task): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
97 |
def __init__(self, func, *args, **kwargs): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
98 |
super(ThreadedTask, self).__init__(func, *args, **kwargs) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
99 |
self.head.threaded = True |
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
100 |
|
0 | 101 |
class Scheduler(threading.Thread): |
102 |
||
103 |
class _SchedulerStop(Exception): |
|
104 |
pass |
|
105 |
||
106 |
def __init__(self, poolSize): |
|
107 |
threading.Thread.__init__(self, name = "Scheduler", target = self.Run) |
|
108 |
self.pool = ThreadPool(poolSize) |
|
109 |
self.tasks = Queue.Queue() |
|
110 |
||
111 |
def ExecuteOne(self, blocking = True): |
|
112 |
logging.debug("Looking for next task...") |
|
113 |
try: |
|
114 |
task = self.tasks.get(blocking) |
|
115 |
except Queue.Empty: |
|
116 |
logging.debug("No task to run") |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
117 |
return None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
118 |
result = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
119 |
error = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
120 |
traceback = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
121 |
while True: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
122 |
cb = task._GetNext() |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
123 |
if not cb: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
124 |
# no more callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
125 |
break |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
126 |
if cb.threaded: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
127 |
# a threaded callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
128 |
self._AddJob(task, cb, result, error, traceback) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
129 |
# don't pass Go, don't reclaim $200 |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
130 |
return None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
131 |
# Run the callback according to the current state |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
132 |
try: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
133 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
134 |
error = cb.errorback(error) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
135 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
136 |
result = cb.callback(result) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
137 |
except: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
138 |
errtype, error, traceback = sys.exc_info() |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
139 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
140 |
raise error, None, traceback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
141 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
142 |
return result |
0 | 143 |
|
144 |
def Run(self): |
|
145 |
logging.info("Scheduler start") |
|
146 |
while True: |
|
147 |
try: |
|
148 |
self.ExecuteOne() |
|
149 |
except self._SchedulerStop: |
|
150 |
break |
|
151 |
except: |
|
152 |
logging.exception("Unhandled task exception") |
|
153 |
logging.info("Scheduler stop") |
|
154 |
||
155 |
def Start(self): |
|
156 |
self.pool.Start() |
|
157 |
return self.start() |
|
158 |
||
159 |
def Stop(self, now = False): |
|
160 |
self.pool.Stop(now) |
|
161 |
if now: |
|
162 |
self.tasks = Queue.Queue() |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
163 |
# We raise an exception to find if we stop stop the scheduler. |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
164 |
# We could have use a None task, but this make it easier if we |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
165 |
# want to add such mechanism public or we want to stop on |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
166 |
# other exception |
0 | 167 |
def RaiseSchedulerStop(): |
168 |
raise self._SchedulerStop |
|
169 |
self.AddTask(Task(RaiseSchedulerStop)) |
|
170 |
self.join() |
|
171 |
||
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
172 |
def AddTask(self, task): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
173 |
self.tasks.put(task) |
0 | 174 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
175 |
def _AddJob(self, task, cb, result, error, traceback): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
176 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
177 |
def DoIt(task, cb, result, error, traceback): |
0 | 178 |
try: |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
179 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
180 |
error = cb.errorback(error) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
181 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
182 |
result = cb.callback(result) |
0 | 183 |
except: |
184 |
errtype, error, traceback = sys.exc_info() |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
185 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
186 |
def RaiseError(): |
0 | 187 |
raise error, None, traceback |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
188 |
jobTask = Task(RaiseError) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
189 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
190 |
def ReturnResult(): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
191 |
return result |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
192 |
jobTask = Task(ReturnResult) |
0 | 193 |
jobTask.ChainTask(task) |
194 |
self.AddTask(jobTask) |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
195 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
196 |
# This double wrap (Job over DoIt) seems necessary to make |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
197 |
# error not look like a local of Job... |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
198 |
def Job(): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
199 |
return DoIt(task, cb, result, error, traceback) |
0 | 200 |
self.pool.AddJob(Job) |
201 |
||
202 |
# The global scheduler |
|
203 |
scheduler = None |
|
204 |
||
205 |
def StartScheduler(size): |
|
206 |
global scheduler |
|
207 |
if scheduler: |
|
208 |
StopScheduler() |
|
209 |
scheduler = Scheduler(size) |
|
210 |
scheduler.Start() |
|
211 |
||
212 |
def StopScheduler(now = False): |
|
213 |
global scheduler |
|
214 |
if scheduler: |
|
215 |
scheduler.Stop(now) |
|
216 |
scheduler = None |
|
217 |
||
218 |
if __name__ == '__main__': |
|
219 |
from time import sleep |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
220 |
logging.getLogger().setLevel(logging.INFO) |
0 | 221 |
# This function is a sample and shouldn't know about the scheduler |
222 |
count = 0 |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
223 |
def AsyncCall(name, seconds): |
0 | 224 |
global count |
225 |
count += 1 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
226 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
227 |
# Probably a bad example, since the callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
228 |
# doesn't return the exact same type... |
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
229 |
def Initialize(name, seconds): |
0 | 230 |
print "Here", name |
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
231 |
return name, seconds |
0 | 232 |
def Blocking(args): |
233 |
name, time = args |
|
234 |
print name, "goes to bed" |
|
235 |
sleep(time) |
|
236 |
print name, ": ZZZ..." |
|
237 |
return name |
|
238 |
def Finalize(name): |
|
239 |
global count |
|
240 |
print name, "wakes up!" |
|
241 |
count -= 1 |
|
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
242 |
return name |
0 | 243 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
244 |
task = Task(Initialize, name, seconds) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
245 |
task.AddThreadedCallback(Blocking) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
246 |
task.AddCallback(Finalize) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
247 |
return task |
0 | 248 |
|
249 |
logging.info("Starting scheduler with 10 workers") |
|
250 |
StartScheduler(10) |
|
251 |
logging.info("Adding asynccall task") |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
252 |
for x in xrange(int(sys.argv[1])): |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
253 |
task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
254 |
scheduler.AddTask(task) |
0 | 255 |
while count > 0: |
256 |
logging.debug("Count = %d", count) |
|
257 |
sleep(1) |
|
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
258 |
|
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
259 |
# Check for King Toto sleep |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
260 |
task = AsyncCall("King Toto", 5) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
261 |
data = task.GrabResult() |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
262 |
event = task.SetupEvent() |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
263 |
scheduler.AddTask(task) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
264 |
try: |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
265 |
event.wait(10) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
266 |
print "data = %r" % (data,) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
267 |
except: |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
268 |
logging.exception("Error occured on wait") |
0 | 269 |
logging.info("Stopping scheduler") |
270 |
StopScheduler() |
|
271 |
logging.info("The End.") |