test.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 16:40:06 -0400
changeset 0 8ae7370093db
permissions -rwxr-xr-x
First version.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     1
#!/usr/bin/env python2.6
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     2
import sys
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     3
import logging
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     4
import threading
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     5
import Queue
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     6
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     7
##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     8
# A basic worker pool... should be replaced with a resizable one.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     9
class ThreadPool(object):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    10
    
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    11
    class _Worker(threading.Thread):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    12
        def __init__(self, uid, pool):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    13
            threading.Thread.__init__(self, name = "Worker%d" % uid)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    14
            self.pool = pool
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    15
        def run(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    16
            logging.info("%s started", self.getName())
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    17
            while True:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    18
                job = self.pool.jobs.get()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    19
                if job:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    20
                    logging.debug("%s is running %r", self.getName(), job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    21
                    try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    22
                        job()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    23
                    except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    24
                        logging.exception("Job %r return an exception", job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    25
                    logging.debug("Job %r completed.", job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    26
                else:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    27
                    self.pool.jobs.put(job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    28
                    break
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    29
            logging.info("%s stopped", self.getName())
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    30
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    31
    def __init__(self, size):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    32
        super(ThreadPool, self).__init__()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    33
        self.jobs = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    34
        self._workers = [self._Worker(x+1, self) for x in xrange(size)]
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    35
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    36
    def Start(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    37
        logging.info("Starting Workers pool")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    38
        for worker in self._workers:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    39
            worker.start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    40
        logging.info("Workers started")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    41
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    42
    def Stop(self, now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    43
        logging.info("Stopping Workers")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    44
        if now:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    45
            self.jobs = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    46
        self.jobs.put(None)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    47
        for worker in self._workers:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    48
            worker.join()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    49
        logging.info("Workers stopped")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    50
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    51
    def AddJob(self, job):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    52
        self.jobs.put(job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    53
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    54
##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    55
# A task, representing a series of linked pairs of callback and error
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    56
# handler Concept is similar to twisted, but you need to put all your
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    57
# callback on the task before giving it to the Scheduler.  For this
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    58
# reason, you shouldnt call the callback/errorback method yourself (or
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    59
# at least, don't put it back in the scheduler after that).
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    60
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    61
class Task(object):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    62
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    63
    @staticmethod
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    64
    def DefaultCallback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    65
        return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    66
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    67
    @staticmethod
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    68
    def DefaultErrorback(error):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    69
        return error
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    70
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    71
    def __init__(self, func = None, *args, **kwargs):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    72
        super(Task, self).__init__()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    73
        self.callbacks = []
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    74
        if func:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    75
            def callback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    76
                return func(*args, **kwargs)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    77
            self.AddCallback(callback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    78
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    79
    def AddCallback(self, callback, errorback = None):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    80
        if errorback == None:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    81
            errorback = self.DefaultErrorback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    82
        self.callbacks.append((callback, errorback))
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    83
        # permit chained calls
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    84
        return self
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    85
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    86
    def ChainTask(self, task):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    87
        return self.AddCallback(task.Callback, task.Errorback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    88
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    89
    def Callback(self, result, error = None, traceback = None):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    90
        logging.debug("Handling task %r callbacks", self)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    91
        for cb, eb in self.callbacks:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    92
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    93
                if error:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    94
                    error = eb(error)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    95
                else:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    96
                    result = cb(result)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    97
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    98
                errtype, error, traceback = sys.exc_info()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    99
        if error:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   100
            raise error, None, traceback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   101
        return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   102
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   103
    ##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   104
    # Consume the callbacks with an error.  Notes that error will
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   105
    # raise if not handled and return value would be a result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   106
    def Errorback(self, error, traceback = None):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   107
        return self.Callback(None, error, traceback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   108
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   109
class Scheduler(threading.Thread):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   110
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   111
    class _SchedulerStop(Exception):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   112
        pass
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   113
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   114
    def __init__(self, poolSize):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   115
        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   116
        self.pool = ThreadPool(poolSize)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   117
        self.tasks = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   118
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   119
    def ExecuteOne(self, blocking = True):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   120
        logging.debug("Looking for next task...")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   121
        try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   122
            task = self.tasks.get(blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   123
        except Queue.Empty:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   124
            logging.debug("No task to run")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   125
            return
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   126
        return task.Callback(None)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   127
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   128
    def Run(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   129
        logging.info("Scheduler start")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   130
        while True:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   131
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   132
                self.ExecuteOne()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   133
            except self._SchedulerStop:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   134
                break
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   135
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   136
                logging.exception("Unhandled task exception")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   137
        logging.info("Scheduler stop")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   138
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   139
    def Start(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   140
        self.pool.Start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   141
        return self.start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   142
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   143
    def Stop(self, now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   144
        self.pool.Stop(now)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   145
        if now:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   146
            self.tasks = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   147
        def RaiseSchedulerStop():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   148
            raise self._SchedulerStop
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   149
        self.AddTask(Task(RaiseSchedulerStop))
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   150
        self.join()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   151
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   152
    def AddTask(self, task, blocking = True):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   153
        self.tasks.put(task, blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   154
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   155
    ##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   156
    # A job is a task run in a seperated thread.  After the job run, a
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   157
    # new Task is add to the scheduler either with a result or an error,
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   158
    # so that only the task, not the callbacks, is run in the worker
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   159
    # thread. Note the passed task is consumed after this call.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   160
    # A better design would have to allow Task to suspend or
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   161
    # resume themself and to run in the thread if it want to, but this will
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   162
    # required the callback to know about its Task and the scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   163
    # itself, which solve nothing.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   164
    def AddJob(self, task, func, *args, **kwargs):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   165
        def Job():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   166
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   167
                result = func(*args, **kwargs)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   168
                def returnResult():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   169
                    return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   170
                jobTask = Task(returnResult)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   171
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   172
                errtype, error, traceback = sys.exc_info()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   173
                def raiseError():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   174
                    raise error, None, traceback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   175
                jobTask = Task(raiseError)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   176
            jobTask.ChainTask(task)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   177
            self.AddTask(jobTask)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   178
        self.pool.AddJob(Job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   179
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   180
    ##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   181
    # This basically allow one callback to run in a seperated thread
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   182
    # and then get the result to another deferred callback.  Helas,
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   183
    # this create two Tasks, one for the caller, one for the
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   184
    # callbacks, with only the first one having to be passed to the
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   185
    # scheduler.  This should be more transparent: a callback which
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   186
    # required to be run as a job should simply be mark like it and
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   187
    # the current task suspend until the job is finished, then resume.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   188
    def CreateCallbackAsJob(self, src, cb):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   189
        dst = Task()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   190
        def CreateJobCallback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   191
            def Job():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   192
                cb(result)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   193
            self.AddJob(dst, Job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   194
        src.AddCallback(CreateJobCallback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   195
        return dst
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   196
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   197
# The global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   198
scheduler = None
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   199
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   200
def StartScheduler(size):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   201
    global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   202
    if scheduler:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   203
        StopScheduler()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   204
    scheduler = Scheduler(size)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   205
    scheduler.Start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   206
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   207
def StopScheduler(now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   208
    global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   209
    if scheduler:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   210
        scheduler.Stop(now)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   211
    scheduler = None
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   212
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   213
if __name__ == '__main__':
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   214
    from time import sleep
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   215
    logging.getLogger().setLevel(logging.DEBUG)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   216
    # This function is a sample and shouldn't know about the scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   217
    count = 0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   218
    def AsyncCall():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   219
        global count
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   220
        count += 1
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   221
        def Initialize(name, sleep):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   222
            print "Here", name
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   223
            return name, sleep
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   224
        def Blocking(args):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   225
            name, time = args
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   226
            print name, "goes to bed"
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   227
            sleep(time)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   228
            print name, ": ZZZ..."
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   229
            return name
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   230
        def Finalize(name):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   231
            global count
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   232
            print name, "wakes up!"
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   233
            count -= 1
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   234
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   235
        dinit = Task(Initialize, "Toto", 5)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   236
        # How can I remove the scheduler from the picture ?
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   237
        # The only way I can see is to have a special kind of Task
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   238
        # and a suspended queue... May be this will also be more clean
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   239
        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   240
        dfinal.AddCallback(Finalize)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   241
        # This is confusing but the reason is that the dfinal callback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   242
        # will be added to the scheduler by the job itself at the end of
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   243
        # its execution.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   244
        return dinit
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   245
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   246
    logging.info("Starting scheduler with 10 workers")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   247
    StartScheduler(10)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   248
    dasync = AsyncCall()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   249
    logging.info("Adding asynccall task")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   250
    scheduler.AddTask(dasync)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   251
    while count > 0:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   252
        logging.debug("Count = %d", count)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   253
        sleep(1)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   254
    logging.info("Stopping scheduler")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   255
    StopScheduler()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   256
    logging.info("The End.")