scheduler.py
changeset 2 a00ae018daf8
parent 1 ee10f7cde07d
child 3 00b6708d1852
equal deleted inserted replaced
1:ee10f7cde07d 2:a00ae018daf8
     1 #!/usr/bin/env python2.6
     1 #!/usr/bin/env python2.6
     2 import sys
     2 import sys
     3 import logging
     3 import logging
     4 import threading
     4 import threading
     5 import Queue
     5 import Queue
     6 
     6 from threadpool import ThreadPool
     7 ##
       
     8 # A basic worker pool... should be replaced with a resizable one.
       
     9 class ThreadPool(object):
       
    10     
       
    11     class _Worker(threading.Thread):
       
    12         def __init__(self, uid, pool):
       
    13             threading.Thread.__init__(self, name = "Worker%d" % uid)
       
    14             self.pool = pool
       
    15         def run(self):
       
    16             logging.info("%s started", self.getName())
       
    17             while True:
       
    18                 job = self.pool.jobs.get()
       
    19                 if job:
       
    20                     logging.debug("%s is running %r", self.getName(), job)
       
    21                     try:
       
    22                         job()
       
    23                     except:
       
    24                         logging.exception("Job %r return an exception", job)
       
    25                     logging.debug("Job %r completed.", job)
       
    26                 else:
       
    27                     self.pool.jobs.put(job)
       
    28                     break
       
    29             logging.info("%s stopped", self.getName())
       
    30 
       
    31     def __init__(self, size):
       
    32         super(ThreadPool, self).__init__()
       
    33         self.jobs = Queue.Queue()
       
    34         self._workers = [self._Worker(x+1, self) for x in xrange(size)]
       
    35 
       
    36     def Start(self):
       
    37         logging.info("Starting Workers pool")
       
    38         for worker in self._workers:
       
    39             worker.start()
       
    40         logging.info("Workers started")
       
    41 
       
    42     def Stop(self, now = False):
       
    43         logging.info("Stopping Workers")
       
    44         if now:
       
    45             self.jobs = Queue.Queue()
       
    46         self.jobs.put(None)
       
    47         for worker in self._workers:
       
    48             worker.join()
       
    49         logging.info("Workers stopped")
       
    50 
       
    51     def AddJob(self, job):
       
    52         self.jobs.put(job)
       
    53 
     7 
    54 ##
     8 ##
    55 # A task, representing a series of linked pairs of callback and error
     9 # A task, representing a series of linked pairs of callback and error
    56 # handler Concept is similar to twisted, but you need to put all your
    10 # handler Concept is similar to twisted, but you need to put all your
    57 # callback on the task before giving it to the Scheduler.  For this
    11 # callback on the task before giving it to the Scheduler.  For this
   187     # the current task suspend until the job is finished, then resume.
   141     # the current task suspend until the job is finished, then resume.
   188     def CreateCallbackAsJob(self, src, cb):
   142     def CreateCallbackAsJob(self, src, cb):
   189         dst = Task()
   143         dst = Task()
   190         def CreateJobCallback(result):
   144         def CreateJobCallback(result):
   191             def Job():
   145             def Job():
   192                 cb(result)
   146                 return cb(result)
   193             self.AddJob(dst, Job)
   147             self.AddJob(dst, Job)
   194         src.AddCallback(CreateJobCallback)
   148         src.AddCallback(CreateJobCallback)
   195         return dst
   149         return dst
   196 
   150 
   197 # The global scheduler
   151 # The global scheduler
   210         scheduler.Stop(now)
   164         scheduler.Stop(now)
   211     scheduler = None
   165     scheduler = None
   212 
   166 
   213 if __name__ == '__main__':
   167 if __name__ == '__main__':
   214     from time import sleep
   168     from time import sleep
   215     logging.getLogger().setLevel(logging.DEBUG)
   169     logging.getLogger().setLevel(logging.INFO)
   216     # This function is a sample and shouldn't know about the scheduler
   170     # This function is a sample and shouldn't know about the scheduler
   217     count = 0
   171     count = 0
   218     def AsyncCall():
   172     def AsyncCall(name, seconds):
   219         global count
   173         global count
   220         count += 1
   174         count += 1
   221         def Initialize(name, sleep):
   175         def Initialize(name, seconds):
   222             print "Here", name
   176             print "Here", name
   223             return name, sleep
   177             return name, seconds
   224         def Blocking(args):
   178         def Blocking(args):
   225             name, time = args
   179             name, time = args
   226             print name, "goes to bed"
   180             print name, "goes to bed"
   227             sleep(time)
   181             sleep(time)
   228             print name, ": ZZZ..."
   182             print name, ": ZZZ..."
   230         def Finalize(name):
   184         def Finalize(name):
   231             global count
   185             global count
   232             print name, "wakes up!"
   186             print name, "wakes up!"
   233             count -= 1
   187             count -= 1
   234 
   188 
   235         dinit = Task(Initialize, "Toto", 5)
   189         dinit = Task(Initialize, name, seconds)
   236         # How can I remove the scheduler from the picture ?
   190         # How can I remove the scheduler from the picture ?
   237         # The only way I can see is to have a special kind of Task
   191         # The only way I can see is to have a special kind of Task
   238         # and a suspended queue... May be this will also be more clean
   192         # and a suspended queue... May be this will also be more clean
   239         dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
   193         dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
   240         dfinal.AddCallback(Finalize)
   194         dfinal.AddCallback(Finalize)
   243         # its execution.
   197         # its execution.
   244         return dinit
   198         return dinit
   245 
   199 
   246     logging.info("Starting scheduler with 10 workers")
   200     logging.info("Starting scheduler with 10 workers")
   247     StartScheduler(10)
   201     StartScheduler(10)
   248     dasync = AsyncCall()
       
   249     logging.info("Adding asynccall task")
   202     logging.info("Adding asynccall task")
   250     scheduler.AddTask(dasync)
   203     for x in xrange(int(sys.argv[1])):
       
   204         dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
       
   205         scheduler.AddTask(dasync)
   251     while count > 0:
   206     while count > 0:
   252         logging.debug("Count = %d", count)
   207         logging.debug("Count = %d", count)
   253         sleep(1)
   208         sleep(1)
   254     logging.info("Stopping scheduler")
   209     logging.info("Stopping scheduler")
   255     StopScheduler()
   210     StopScheduler()