scheduler.py
changeset 2 a00ae018daf8
parent 1 ee10f7cde07d
child 3 00b6708d1852
--- a/scheduler.py	Sun Mar 21 16:50:13 2010 -0400
+++ b/scheduler.py	Sun Mar 21 17:06:53 2010 -0400
@@ -3,53 +3,7 @@
 import logging
 import threading
 import Queue
-
-##
-# A basic worker pool... should be replaced with a resizable one.
-class ThreadPool(object):
-    
-    class _Worker(threading.Thread):
-        def __init__(self, uid, pool):
-            threading.Thread.__init__(self, name = "Worker%d" % uid)
-            self.pool = pool
-        def run(self):
-            logging.info("%s started", self.getName())
-            while True:
-                job = self.pool.jobs.get()
-                if job:
-                    logging.debug("%s is running %r", self.getName(), job)
-                    try:
-                        job()
-                    except:
-                        logging.exception("Job %r return an exception", job)
-                    logging.debug("Job %r completed.", job)
-                else:
-                    self.pool.jobs.put(job)
-                    break
-            logging.info("%s stopped", self.getName())
-
-    def __init__(self, size):
-        super(ThreadPool, self).__init__()
-        self.jobs = Queue.Queue()
-        self._workers = [self._Worker(x+1, self) for x in xrange(size)]
-
-    def Start(self):
-        logging.info("Starting Workers pool")
-        for worker in self._workers:
-            worker.start()
-        logging.info("Workers started")
-
-    def Stop(self, now = False):
-        logging.info("Stopping Workers")
-        if now:
-            self.jobs = Queue.Queue()
-        self.jobs.put(None)
-        for worker in self._workers:
-            worker.join()
-        logging.info("Workers stopped")
-
-    def AddJob(self, job):
-        self.jobs.put(job)
+from threadpool import ThreadPool
 
 ##
 # A task, representing a series of linked pairs of callback and error
@@ -189,7 +143,7 @@
         dst = Task()
         def CreateJobCallback(result):
             def Job():
-                cb(result)
+                return cb(result)
             self.AddJob(dst, Job)
         src.AddCallback(CreateJobCallback)
         return dst
@@ -212,15 +166,15 @@
 
 if __name__ == '__main__':
     from time import sleep
-    logging.getLogger().setLevel(logging.DEBUG)
+    logging.getLogger().setLevel(logging.INFO)
     # This function is a sample and shouldn't know about the scheduler
     count = 0
-    def AsyncCall():
+    def AsyncCall(name, seconds):
         global count
         count += 1
-        def Initialize(name, sleep):
+        def Initialize(name, seconds):
             print "Here", name
-            return name, sleep
+            return name, seconds
         def Blocking(args):
             name, time = args
             print name, "goes to bed"
@@ -232,7 +186,7 @@
             print name, "wakes up!"
             count -= 1
 
-        dinit = Task(Initialize, "Toto", 5)
+        dinit = Task(Initialize, name, seconds)
         # How can I remove the scheduler from the picture ?
         # The only way I can see is to have a special kind of Task
         # and a suspended queue... May be this will also be more clean
@@ -245,9 +199,10 @@
 
     logging.info("Starting scheduler with 10 workers")
     StartScheduler(10)
-    dasync = AsyncCall()
     logging.info("Adding asynccall task")
-    scheduler.AddTask(dasync)
+    for x in xrange(int(sys.argv[1])):
+        dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
+        scheduler.AddTask(dasync)
     while count > 0:
         logging.debug("Count = %d", count)
         sleep(1)