--- a/scheduler.py Sun Mar 21 16:50:13 2010 -0400
+++ b/scheduler.py Sun Mar 21 17:06:53 2010 -0400
@@ -3,53 +3,7 @@
import logging
import threading
import Queue
-
-##
-# A basic worker pool... should be replaced with a resizable one.
-class ThreadPool(object):
-
- class _Worker(threading.Thread):
- def __init__(self, uid, pool):
- threading.Thread.__init__(self, name = "Worker%d" % uid)
- self.pool = pool
- def run(self):
- logging.info("%s started", self.getName())
- while True:
- job = self.pool.jobs.get()
- if job:
- logging.debug("%s is running %r", self.getName(), job)
- try:
- job()
- except:
- logging.exception("Job %r return an exception", job)
- logging.debug("Job %r completed.", job)
- else:
- self.pool.jobs.put(job)
- break
- logging.info("%s stopped", self.getName())
-
- def __init__(self, size):
- super(ThreadPool, self).__init__()
- self.jobs = Queue.Queue()
- self._workers = [self._Worker(x+1, self) for x in xrange(size)]
-
- def Start(self):
- logging.info("Starting Workers pool")
- for worker in self._workers:
- worker.start()
- logging.info("Workers started")
-
- def Stop(self, now = False):
- logging.info("Stopping Workers")
- if now:
- self.jobs = Queue.Queue()
- self.jobs.put(None)
- for worker in self._workers:
- worker.join()
- logging.info("Workers stopped")
-
- def AddJob(self, job):
- self.jobs.put(job)
+from threadpool import ThreadPool
##
# A task, representing a series of linked pairs of callback and error
@@ -189,7 +143,7 @@
dst = Task()
def CreateJobCallback(result):
def Job():
- cb(result)
+ return cb(result)
self.AddJob(dst, Job)
src.AddCallback(CreateJobCallback)
return dst
@@ -212,15 +166,15 @@
if __name__ == '__main__':
from time import sleep
- logging.getLogger().setLevel(logging.DEBUG)
+ logging.getLogger().setLevel(logging.INFO)
# This function is a sample and shouldn't know about the scheduler
count = 0
- def AsyncCall():
+ def AsyncCall(name, seconds):
global count
count += 1
- def Initialize(name, sleep):
+ def Initialize(name, seconds):
print "Here", name
- return name, sleep
+ return name, seconds
def Blocking(args):
name, time = args
print name, "goes to bed"
@@ -232,7 +186,7 @@
print name, "wakes up!"
count -= 1
- dinit = Task(Initialize, "Toto", 5)
+ dinit = Task(Initialize, name, seconds)
# How can I remove the scheduler from the picture ?
# The only way I can see is to have a special kind of Task
# and a suspended queue... May be this will also be more clean
@@ -245,9 +199,10 @@
logging.info("Starting scheduler with 10 workers")
StartScheduler(10)
- dasync = AsyncCall()
logging.info("Adding asynccall task")
- scheduler.AddTask(dasync)
+ for x in xrange(int(sys.argv[1])):
+ dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
+ scheduler.AddTask(dasync)
while count > 0:
logging.debug("Count = %d", count)
sleep(1)