author | Fabien Ninoles <fabien@tzone.org> |
Mon, 07 Jun 2010 23:56:53 -0400 | |
changeset 6 | 6657247ddbbf |
parent 5 | eb1133af54ed |
permissions | -rwxr-xr-x |
0 | 1 |
#!/usr/bin/env python2.6 |
2 |
import sys |
|
3 |
import logging |
|
4 |
import threading |
|
5 |
import Queue |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
6 |
from threadpool import ThreadPool |
5
eb1133af54ed
Seperate task and scheduler.
Fabien Ninoles <fabien@tzone.org>
parents:
4
diff
changeset
|
7 |
from task import Task, ThreadedTask |
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
8 |
|
0 | 9 |
class Scheduler(threading.Thread): |
10 |
||
11 |
class _SchedulerStop(Exception): |
|
12 |
pass |
|
13 |
||
14 |
def __init__(self, poolSize): |
|
15 |
threading.Thread.__init__(self, name = "Scheduler", target = self.Run) |
|
16 |
self.pool = ThreadPool(poolSize) |
|
17 |
self.tasks = Queue.Queue() |
|
18 |
||
19 |
def ExecuteOne(self, blocking = True): |
|
20 |
logging.debug("Looking for next task...") |
|
21 |
try: |
|
22 |
task = self.tasks.get(blocking) |
|
23 |
except Queue.Empty: |
|
24 |
logging.debug("No task to run") |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
25 |
return None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
26 |
result = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
27 |
error = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
28 |
traceback = None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
29 |
while True: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
30 |
cb = task._GetNext() |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
31 |
if not cb: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
32 |
# no more callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
33 |
break |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
34 |
if cb.threaded: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
35 |
# a threaded callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
36 |
self._AddJob(task, cb, result, error, traceback) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
37 |
# don't pass Go, don't reclaim $200 |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
38 |
return None |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
39 |
# Run the callback according to the current state |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
40 |
try: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
41 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
42 |
error = cb.errorback(error) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
43 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
44 |
result = cb.callback(result) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
45 |
except: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
46 |
errtype, error, traceback = sys.exc_info() |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
47 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
48 |
raise error, None, traceback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
49 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
50 |
return result |
0 | 51 |
|
52 |
def Run(self): |
|
53 |
logging.info("Scheduler start") |
|
54 |
while True: |
|
55 |
try: |
|
56 |
self.ExecuteOne() |
|
57 |
except self._SchedulerStop: |
|
58 |
break |
|
59 |
except: |
|
60 |
logging.exception("Unhandled task exception") |
|
61 |
logging.info("Scheduler stop") |
|
62 |
||
63 |
def Start(self): |
|
64 |
self.pool.Start() |
|
65 |
return self.start() |
|
66 |
||
67 |
def Stop(self, now = False): |
|
68 |
self.pool.Stop(now) |
|
69 |
if now: |
|
70 |
self.tasks = Queue.Queue() |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
71 |
# We raise an exception to find if we stop stop the scheduler. |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
72 |
# We could have use a None task, but this make it easier if we |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
73 |
# want to add such mechanism public or we want to stop on |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
74 |
# other exception |
0 | 75 |
def RaiseSchedulerStop(): |
76 |
raise self._SchedulerStop |
|
77 |
self.AddTask(Task(RaiseSchedulerStop)) |
|
78 |
self.join() |
|
79 |
||
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
80 |
def AddTask(self, task): |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
81 |
self.tasks.put(task) |
0 | 82 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
83 |
def _AddJob(self, task, cb, result, error, traceback): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
84 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
85 |
def DoIt(task, cb, result, error, traceback): |
0 | 86 |
try: |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
87 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
88 |
error = cb.errorback(error) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
89 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
90 |
result = cb.callback(result) |
0 | 91 |
except: |
92 |
errtype, error, traceback = sys.exc_info() |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
93 |
if error: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
94 |
def RaiseError(): |
0 | 95 |
raise error, None, traceback |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
96 |
jobTask = Task(RaiseError) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
97 |
else: |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
98 |
def ReturnResult(): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
99 |
return result |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
100 |
jobTask = Task(ReturnResult) |
0 | 101 |
jobTask.ChainTask(task) |
102 |
self.AddTask(jobTask) |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
103 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
104 |
# This double wrap (Job over DoIt) seems necessary to make |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
105 |
# error not look like a local of Job... |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
106 |
def Job(): |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
107 |
return DoIt(task, cb, result, error, traceback) |
0 | 108 |
self.pool.AddJob(Job) |
109 |
||
110 |
# The global scheduler |
|
111 |
scheduler = None |
|
112 |
||
113 |
def StartScheduler(size): |
|
114 |
global scheduler |
|
115 |
if scheduler: |
|
116 |
StopScheduler() |
|
117 |
scheduler = Scheduler(size) |
|
118 |
scheduler.Start() |
|
119 |
||
120 |
def StopScheduler(now = False): |
|
121 |
global scheduler |
|
122 |
if scheduler: |
|
123 |
scheduler.Stop(now) |
|
124 |
scheduler = None |
|
125 |
||
126 |
if __name__ == '__main__': |
|
127 |
from time import sleep |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
128 |
logging.getLogger().setLevel(logging.INFO) |
6
6657247ddbbf
Add slightly better command line handling.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
129 |
if len(sys.argv) == 1: |
6657247ddbbf
Add slightly better command line handling.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
130 |
numberOfTasks = 100 |
6657247ddbbf
Add slightly better command line handling.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
131 |
else: |
6657247ddbbf
Add slightly better command line handling.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
132 |
numberOfTasks = int(sys.argv[1]) |
0 | 133 |
# This function is a sample and shouldn't know about the scheduler |
134 |
count = 0 |
|
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
135 |
def AsyncCall(name, seconds): |
0 | 136 |
global count |
137 |
count += 1 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
138 |
|
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
139 |
# Probably a bad example, since the callback |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
140 |
# doesn't return the exact same type... |
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
141 |
def Initialize(name, seconds): |
0 | 142 |
print "Here", name |
2
a00ae018daf8
Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
143 |
return name, seconds |
0 | 144 |
def Blocking(args): |
145 |
name, time = args |
|
146 |
print name, "goes to bed" |
|
147 |
sleep(time) |
|
148 |
print name, ": ZZZ..." |
|
149 |
return name |
|
150 |
def Finalize(name): |
|
151 |
global count |
|
152 |
print name, "wakes up!" |
|
153 |
count -= 1 |
|
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
154 |
return name |
0 | 155 |
|
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
156 |
task = Task(Initialize, name, seconds) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
157 |
task.AddThreadedCallback(Blocking) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
158 |
task.AddCallback(Finalize) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
159 |
return task |
0 | 160 |
|
161 |
logging.info("Starting scheduler with 10 workers") |
|
162 |
StartScheduler(10) |
|
163 |
logging.info("Adding asynccall task") |
|
6
6657247ddbbf
Add slightly better command line handling.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
164 |
for x in xrange(numberOfTasks): |
3
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
165 |
task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
00b6708d1852
Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents:
2
diff
changeset
|
166 |
scheduler.AddTask(task) |
0 | 167 |
while count > 0: |
168 |
logging.debug("Count = %d", count) |
|
169 |
sleep(1) |
|
4
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
170 |
|
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
171 |
# Check for King Toto sleep |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
172 |
task = AsyncCall("King Toto", 5) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
173 |
data = task.GrabResult() |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
174 |
event = task.SetupEvent() |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
175 |
scheduler.AddTask(task) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
176 |
try: |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
177 |
event.wait(10) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
178 |
print "data = %r" % (data,) |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
179 |
except: |
76ba9b3a9e1c
Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
180 |
logging.exception("Error occured on wait") |
0 | 181 |
logging.info("Stopping scheduler") |
182 |
StopScheduler() |
|
183 |
logging.info("The End.") |