scheduler.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 21 Mar 2010 21:40:40 -0400
changeset 4 76ba9b3a9e1c
parent 3 00b6708d1852
child 5 eb1133af54ed
permissions -rwxr-xr-x
Add GrabResult and SetupEvent, for synchronicity.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     1
#!/usr/bin/env python2.6
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     2
import sys
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     3
import logging
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     4
import threading
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     5
import Queue
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
     6
from threadpool import ThreadPool
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     7
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     8
##
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
     9
# A task, representing a series of linked pairs of callback and error
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    10
# handler Concept is similar to twisted, but you need to put all your
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    11
# callback on the task before giving it to the Scheduler.  For this
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    12
# reason, you shouldnt call the callback/errorback method yourself (or
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    13
# at least, don't put it back in the scheduler after that).
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    14
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    15
class _Callback(object):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    16
    def __init__(self, callback, errorback, threaded = False):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    17
        self.callback = callback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    18
        self.errorback = errorback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    19
        self.next = None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    20
        self.threaded = threaded
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    21
    def Chain(self, next):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    22
        # Can only be called once
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    23
        assert(self.next is None)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    24
        self.next = next
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    25
        return next
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    26
    def Next(self):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    27
        return self.next
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    28
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    29
class Task(object):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    30
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    31
    @staticmethod
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    32
    def DefaultCallback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    33
        return result
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    34
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    35
    @staticmethod
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    36
    def DefaultErrorback(error):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    37
        return error
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    38
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    39
    def __init__(self, func = None, *args, **kwargs):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    40
        super(Task, self).__init__()
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    41
        self.head = None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    42
        self.tail = None
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    43
        if func:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    44
            def callback(result):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    45
                return func(*args, **kwargs)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    46
            self.AddCallback(callback)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    47
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    48
    def _AddCallback(self, callback):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    49
        if self.head is None:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    50
            self.head = self.tail = callback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    51
        else:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    52
            self.tail = self.tail.Chain(callback)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    53
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    54
    def _GetNext(self):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    55
        head = self.head
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    56
        if head:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    57
            self.head = head.Next()
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    58
        return head
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    59
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    60
    def AddCallback(self, callback, errorback = None, threaded = False):
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    61
        if errorback == None:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    62
            errorback = self.DefaultErrorback
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    63
        cb = _Callback(callback, errorback, threaded)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    64
        self._AddCallback(cb)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    65
        # permit chained calls
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    66
        return self
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    67
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    68
    def AddThreadedCallback(self, callback, errorback = None):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    69
        return self.AddCallback(callback, errorback, True)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    70
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    71
    def ChainTask(self, task):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    72
        self.tail.Chain(task.head)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    73
        self.tail = task.tail
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
    74
4
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    75
    def GrabResult(self, data = None):
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    76
        if data is None:
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    77
            data = {}
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    78
        def SetResult(result):
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    79
            data["result"] = result
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    80
            return result
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    81
        def SetError(error):
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    82
            data["error"] = error
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    83
        self.AddCallback(SetResult, SetError)
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    84
        return data
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    85
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    86
    def SetupEvent(self, event = None, data = None):
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    87
        if not event:
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    88
            event = threading.Event()
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    89
        def SetEvent(dummy):
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    90
            event.set()
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    91
        self.AddCallback(SetEvent, SetEvent)
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    92
        return event
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    93
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    94
##
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    95
# Helper class
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    96
class ThreadedTask(Task):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    97
    def __init__(self, func, *args, **kwargs):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    98
        super(ThreadedTask, self).__init__(func, *args, **kwargs)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    99
        self.head.threaded = True
4
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   100
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   101
class Scheduler(threading.Thread):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   102
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   103
    class _SchedulerStop(Exception):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   104
        pass
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   105
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   106
    def __init__(self, poolSize):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   107
        threading.Thread.__init__(self, name = "Scheduler", target = self.Run)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   108
        self.pool = ThreadPool(poolSize)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   109
        self.tasks = Queue.Queue()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   110
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   111
    def ExecuteOne(self, blocking = True):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   112
        logging.debug("Looking for next task...")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   113
        try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   114
            task = self.tasks.get(blocking)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   115
        except Queue.Empty:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   116
            logging.debug("No task to run")
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   117
            return None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   118
        result = None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   119
        error = None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   120
        traceback = None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   121
        while True:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   122
            cb = task._GetNext()
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   123
            if not cb:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   124
                # no more callback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   125
                break
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   126
            if cb.threaded:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   127
                # a threaded callback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   128
                self._AddJob(task, cb, result, error, traceback)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   129
                # don't pass Go, don't reclaim $200
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   130
                return None
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   131
            # Run the callback according to the current state
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   132
            try:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   133
                if error:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   134
                    error = cb.errorback(error)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   135
                else:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   136
                    result = cb.callback(result)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   137
            except:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   138
                errtype, error, traceback = sys.exc_info()
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   139
        if error:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   140
            raise error, None, traceback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   141
        else:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   142
            return result
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   143
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   144
    def Run(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   145
        logging.info("Scheduler start")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   146
        while True:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   147
            try:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   148
                self.ExecuteOne()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   149
            except self._SchedulerStop:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   150
                break
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   151
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   152
                logging.exception("Unhandled task exception")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   153
        logging.info("Scheduler stop")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   154
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   155
    def Start(self):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   156
        self.pool.Start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   157
        return self.start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   158
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   159
    def Stop(self, now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   160
        self.pool.Stop(now)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   161
        if now:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   162
            self.tasks = Queue.Queue()
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   163
        # We raise an exception to find if we stop stop the scheduler.
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   164
        # We could have use a None task, but this make it easier if we
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   165
        # want to add such mechanism public or we want to stop on
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   166
        # other exception
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   167
        def RaiseSchedulerStop():
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   168
            raise self._SchedulerStop
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   169
        self.AddTask(Task(RaiseSchedulerStop))
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   170
        self.join()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   171
4
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   172
    def AddTask(self, task):
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   173
        self.tasks.put(task)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   174
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   175
    def _AddJob(self, task, cb, result, error, traceback):
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   176
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   177
        def DoIt(task, cb, result, error, traceback):
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   178
            try:
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   179
                if error:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   180
                    error = cb.errorback(error)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   181
                else:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   182
                    result = cb.callback(result)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   183
            except:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   184
                errtype, error, traceback = sys.exc_info()
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   185
            if error:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   186
                def RaiseError():
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   187
                    raise error, None, traceback
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   188
                jobTask = Task(RaiseError)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   189
            else:
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   190
                def ReturnResult():
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   191
                    return result
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   192
                jobTask = Task(ReturnResult)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   193
            jobTask.ChainTask(task)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   194
            self.AddTask(jobTask)
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   195
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   196
        # This double wrap (Job over DoIt) seems necessary to make
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   197
        # error not look like a local of Job...
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   198
        def Job():
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   199
            return DoIt(task, cb, result, error, traceback)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   200
        self.pool.AddJob(Job)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   201
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   202
# The global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   203
scheduler = None
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   204
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   205
def StartScheduler(size):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   206
    global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   207
    if scheduler:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   208
        StopScheduler()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   209
    scheduler = Scheduler(size)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   210
    scheduler.Start()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   211
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   212
def StopScheduler(now = False):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   213
    global scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   214
    if scheduler:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   215
        scheduler.Stop(now)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   216
    scheduler = None
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   217
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   218
if __name__ == '__main__':
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   219
    from time import sleep
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   220
    logging.getLogger().setLevel(logging.INFO)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   221
    # This function is a sample and shouldn't know about the scheduler
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   222
    count = 0
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   223
    def AsyncCall(name, seconds):
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   224
        global count
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   225
        count += 1
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   226
        
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   227
        # Probably a bad example, since the callback
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   228
        # doesn't return the exact same type...
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   229
        def Initialize(name, seconds):
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   230
            print "Here", name
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   231
            return name, seconds
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   232
        def Blocking(args):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   233
            name, time = args
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   234
            print name, "goes to bed"
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   235
            sleep(time)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   236
            print name, ": ZZZ..."
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   237
            return name
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   238
        def Finalize(name):
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   239
            global count
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   240
            print name, "wakes up!"
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   241
            count -= 1
4
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   242
            return name
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   243
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   244
        task = Task(Initialize, name, seconds)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   245
        task.AddThreadedCallback(Blocking)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   246
        task.AddCallback(Finalize)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   247
        return task
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   248
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   249
    logging.info("Starting scheduler with 10 workers")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   250
    StartScheduler(10)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   251
    logging.info("Adding asynccall task")
2
a00ae018daf8 Separate thread pool in another file and fix some issues.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   252
    for x in xrange(int(sys.argv[1])):
3
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   253
        task = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0)
00b6708d1852 Rewrite the whole to use linked chain of callback, allowing the
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   254
        scheduler.AddTask(task)
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   255
    while count > 0:
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   256
        logging.debug("Count = %d", count)
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   257
        sleep(1)
4
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   258
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   259
    # Check for King Toto sleep
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   260
    task = AsyncCall("King Toto", 5)
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   261
    data = task.GrabResult()
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   262
    event = task.SetupEvent()
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   263
    scheduler.AddTask(task)
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   264
    try:
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   265
        event.wait(10)
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   266
        print "data = %r" % (data,)
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   267
    except:
76ba9b3a9e1c Add GrabResult and SetupEvent, for synchronicity.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   268
        logging.exception("Error occured on wait")
0
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   269
    logging.info("Stopping scheduler")
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   270
    StopScheduler()
8ae7370093db First version.
Fabien Ninoles <fabien@tzone.org>
parents:
diff changeset
   271
    logging.info("The End.")