scheduler.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 17:06:53 -0400
changeset 2 a00ae018daf8
parent 1 ee10f7cde07d
child 3 00b6708d1852
permissions -rwxr-xr-x
Separate thread pool in another file and fix some issues.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     1
#!/usr/bin/env python2.6
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     2
import sys
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     3
import logging
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     4
import threading
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     5
import Queue
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
     6
from threadpool import ThreadPool
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     7
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     8
##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     9
# A task, representing a series of linked pairs of callback and error
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    10
# handler Concept is similar to twisted, but you need to put all your
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    11
# callback on the task before giving it to the Scheduler.  For this
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    12
# reason, you shouldnt call the callback/errorback method yourself (or
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    13
# at least, don't put it back in the scheduler after that).
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    14
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    15
class Task(object):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    16
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    17
    @staticmethod
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    18
    def DefaultCallback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    19
        return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    20
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    21
    @staticmethod
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    22
    def DefaultErrorback(error):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    23
        return error
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    24
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    25
    def __init__(self, func = None, *args, **kwargs):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    26
        super(Task, self).__init__()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    27
        self.callbacks = []
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    28
        if func:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    29
            def callback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    30
                return func(*args, **kwargs)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    31
            self.AddCallback(callback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    32
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    33
    def AddCallback(self, callback, errorback = None):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    34
        if errorback == None:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    35
            errorback = self.DefaultErrorback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    36
        self.callbacks.append((callback, errorback))
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    37
        # permit chained calls
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    38
        return self
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    39
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    40
    def ChainTask(self, task):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    41
        return self.AddCallback(task.Callback, task.Errorback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    42
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    43
    def Callback(self, result, error = None, traceback = None):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    44
        logging.debug("Handling task %r callbacks", self)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    45
        for cb, eb in self.callbacks:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    46
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    47
                if error:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    48
                    error = eb(error)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    49
                else:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    50
                    result = cb(result)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    51
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    52
                errtype, error, traceback = sys.exc_info()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    53
        if error:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    54
            raise error, None, traceback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    55
        return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    56
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    57
    ##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    58
    # Consume the callbacks with an error.  Notes that error will
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    59
    # raise if not handled and return value would be a result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    60
    def Errorback(self, error, traceback = None):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    61
        return self.Callback(None, error, traceback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    62
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    63
class Scheduler(threading.Thread):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    64
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    65
    class _SchedulerStop(Exception):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    66
        pass
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    67
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    68
    def __init__(self, poolSize):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    69
        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    70
        self.pool = ThreadPool(poolSize)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    71
        self.tasks = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    72
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    73
    def ExecuteOne(self, blocking = True):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    74
        logging.debug("Looking for next task...")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    75
        try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    76
            task = self.tasks.get(blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    77
        except Queue.Empty:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    78
            logging.debug("No task to run")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    79
            return
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    80
        return task.Callback(None)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    81
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    82
    def Run(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    83
        logging.info("Scheduler start")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    84
        while True:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    85
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    86
                self.ExecuteOne()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    87
            except self._SchedulerStop:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    88
                break
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    89
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    90
                logging.exception("Unhandled task exception")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    91
        logging.info("Scheduler stop")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    92
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    93
    def Start(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    94
        self.pool.Start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    95
        return self.start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    96
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    97
    def Stop(self, now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    98
        self.pool.Stop(now)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    99
        if now:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   100
            self.tasks = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   101
        def RaiseSchedulerStop():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   102
            raise self._SchedulerStop
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   103
        self.AddTask(Task(RaiseSchedulerStop))
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   104
        self.join()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   105
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   106
    def AddTask(self, task, blocking = True):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   107
        self.tasks.put(task, blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   108
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   109
    ##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   110
    # A job is a task run in a seperated thread.  After the job run, a
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   111
    # new Task is add to the scheduler either with a result or an error,
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   112
    # so that only the task, not the callbacks, is run in the worker
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   113
    # thread. Note the passed task is consumed after this call.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   114
    # A better design would have to allow Task to suspend or
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   115
    # resume themself and to run in the thread if it want to, but this will
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   116
    # required the callback to know about its Task and the scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   117
    # itself, which solve nothing.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   118
    def AddJob(self, task, func, *args, **kwargs):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   119
        def Job():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   120
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   121
                result = func(*args, **kwargs)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   122
                def returnResult():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   123
                    return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   124
                jobTask = Task(returnResult)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   125
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   126
                errtype, error, traceback = sys.exc_info()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   127
                def raiseError():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   128
                    raise error, None, traceback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   129
                jobTask = Task(raiseError)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   130
            jobTask.ChainTask(task)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   131
            self.AddTask(jobTask)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   132
        self.pool.AddJob(Job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   133
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   134
    ##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   135
    # This basically allow one callback to run in a seperated thread
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   136
    # and then get the result to another deferred callback.  Helas,
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   137
    # this create two Tasks, one for the caller, one for the
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   138
    # callbacks, with only the first one having to be passed to the
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   139
    # scheduler.  This should be more transparent: a callback which
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   140
    # required to be run as a job should simply be mark like it and
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   141
    # the current task suspend until the job is finished, then resume.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   142
    def CreateCallbackAsJob(self, src, cb):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   143
        dst = Task()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   144
        def CreateJobCallback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   145
            def Job():
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   146
                return cb(result)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   147
            self.AddJob(dst, Job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   148
        src.AddCallback(CreateJobCallback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   149
        return dst
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   150
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   151
# The global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   152
scheduler = None
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   153
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   154
def StartScheduler(size):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   155
    global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   156
    if scheduler:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   157
        StopScheduler()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   158
    scheduler = Scheduler(size)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   159
    scheduler.Start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   160
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   161
def StopScheduler(now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   162
    global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   163
    if scheduler:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   164
        scheduler.Stop(now)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   165
    scheduler = None
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   166
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   167
if __name__ == '__main__':
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   168
    from time import sleep
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   169
    logging.getLogger().setLevel(logging.INFO)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   170
    # This function is a sample and shouldn't know about the scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   171
    count = 0
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   172
    def AsyncCall(name, seconds):
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   173
        global count
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   174
        count += 1
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   175
        def Initialize(name, seconds):
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   176
            print "Here", name
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   177
            return name, seconds
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   178
        def Blocking(args):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   179
            name, time = args
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   180
            print name, "goes to bed"
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   181
            sleep(time)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   182
            print name, ": ZZZ..."
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   183
            return name
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   184
        def Finalize(name):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   185
            global count
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   186
            print name, "wakes up!"
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   187
            count -= 1
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   188
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   189
        dinit = Task(Initialize, name, seconds)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   190
        # How can I remove the scheduler from the picture ?
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   191
        # The only way I can see is to have a special kind of Task
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   192
        # and a suspended queue... May be this will also be more clean
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   193
        dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   194
        dfinal.AddCallback(Finalize)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   195
        # This is confusing but the reason is that the dfinal callback
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   196
        # will be added to the scheduler by the job itself at the end of
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   197
        # its execution.
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   198
        return dinit
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   199
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   200
    logging.info("Starting scheduler with 10 workers")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   201
    StartScheduler(10)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   202
    logging.info("Adding asynccall task")
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   203
    for x in xrange(int(sys.argv[1])):
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   204
        dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   205
        scheduler.AddTask(dasync)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   206
    while count > 0:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   207
        logging.debug("Count = %d", count)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   208
        sleep(1)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   209
    logging.info("Stopping scheduler")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   210
    StopScheduler()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   211
    logging.info("The End.")