Seperate task and scheduler.
--- a/scheduler.py Sun Mar 21 21:40:40 2010 -0400
+++ b/scheduler.py Sun Mar 21 21:47:48 2010 -0400
@@ -4,99 +4,7 @@
import threading
import Queue
from threadpool import ThreadPool
-
-##
-# A task, representing a series of linked pairs of callback and error
-# handler Concept is similar to twisted, but you need to put all your
-# callback on the task before giving it to the Scheduler. For this
-# reason, you shouldnt call the callback/errorback method yourself (or
-# at least, don't put it back in the scheduler after that).
-
-class _Callback(object):
- def __init__(self, callback, errorback, threaded = False):
- self.callback = callback
- self.errorback = errorback
- self.next = None
- self.threaded = threaded
- def Chain(self, next):
- # Can only be called once
- assert(self.next is None)
- self.next = next
- return next
- def Next(self):
- return self.next
-
-class Task(object):
-
- @staticmethod
- def DefaultCallback(result):
- return result
-
- @staticmethod
- def DefaultErrorback(error):
- return error
-
- def __init__(self, func = None, *args, **kwargs):
- super(Task, self).__init__()
- self.head = None
- self.tail = None
- if func:
- def callback(result):
- return func(*args, **kwargs)
- self.AddCallback(callback)
-
- def _AddCallback(self, callback):
- if self.head is None:
- self.head = self.tail = callback
- else:
- self.tail = self.tail.Chain(callback)
-
- def _GetNext(self):
- head = self.head
- if head:
- self.head = head.Next()
- return head
-
- def AddCallback(self, callback, errorback = None, threaded = False):
- if errorback == None:
- errorback = self.DefaultErrorback
- cb = _Callback(callback, errorback, threaded)
- self._AddCallback(cb)
- # permit chained calls
- return self
-
- def AddThreadedCallback(self, callback, errorback = None):
- return self.AddCallback(callback, errorback, True)
-
- def ChainTask(self, task):
- self.tail.Chain(task.head)
- self.tail = task.tail
-
- def GrabResult(self, data = None):
- if data is None:
- data = {}
- def SetResult(result):
- data["result"] = result
- return result
- def SetError(error):
- data["error"] = error
- self.AddCallback(SetResult, SetError)
- return data
-
- def SetupEvent(self, event = None, data = None):
- if not event:
- event = threading.Event()
- def SetEvent(dummy):
- event.set()
- self.AddCallback(SetEvent, SetEvent)
- return event
-
-##
-# Helper class
-class ThreadedTask(Task):
- def __init__(self, func, *args, **kwargs):
- super(ThreadedTask, self).__init__(func, *args, **kwargs)
- self.head.threaded = True
+from task import Task, ThreadedTask
class Scheduler(threading.Thread):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/task.py Sun Mar 21 21:47:48 2010 -0400
@@ -0,0 +1,95 @@
+import threading # Only used in SetupEvent
+
+class _Callback(object):
+ def __init__(self, callback, errorback, threaded = False):
+ self.callback = callback
+ self.errorback = errorback
+ self.next = None
+ self.threaded = threaded
+ def Chain(self, next):
+ # Can only be called once
+ assert(self.next is None)
+ self.next = next
+ return next
+ def Next(self):
+ return self.next
+
+##
+# A task, representing a series of linked pairs of callback and error
+# handler Concept is similar to twisted, but you need to put all your
+# callback on the task before giving it to the Scheduler. For this
+# reason, you shouldnt call the callback/errorback method yourself (or
+# at least, don't put it back in the scheduler after that).
+
+class Task(object):
+
+ @staticmethod
+ def DefaultCallback(result):
+ return result
+
+ @staticmethod
+ def DefaultErrorback(error):
+ return error
+
+ def __init__(self, func = None, *args, **kwargs):
+ super(Task, self).__init__()
+ self.head = None
+ self.tail = None
+ if func:
+ def callback(result):
+ return func(*args, **kwargs)
+ self.AddCallback(callback)
+
+ def _AddCallback(self, callback):
+ if self.head is None:
+ self.head = self.tail = callback
+ else:
+ self.tail = self.tail.Chain(callback)
+
+ def _GetNext(self):
+ head = self.head
+ if head:
+ self.head = head.Next()
+ return head
+
+ def AddCallback(self, callback, errorback = None, threaded = False):
+ if errorback == None:
+ errorback = self.DefaultErrorback
+ cb = _Callback(callback, errorback, threaded)
+ self._AddCallback(cb)
+ # permit chained calls
+ return self
+
+ def AddThreadedCallback(self, callback, errorback = None):
+ return self.AddCallback(callback, errorback, True)
+
+ def ChainTask(self, task):
+ self.tail.Chain(task.head)
+ self.tail = task.tail
+
+ def GrabResult(self, data = None):
+ if data is None:
+ data = {}
+ def SetResult(result):
+ data["result"] = result
+ return result
+ def SetError(error):
+ data["error"] = error
+ self.AddCallback(SetResult, SetError)
+ return data
+
+ def SetupEvent(self, event = None, data = None):
+ if not event:
+ event = threading.Event()
+ def SetEvent(dummy):
+ event.set()
+ self.AddCallback(SetEvent, SetEvent)
+ return event
+
+##
+# Helper class
+class ThreadedTask(Task):
+ def __init__(self, func, *args, **kwargs):
+ super(ThreadedTask, self).__init__(func, *args, **kwargs)
+ self.head.threaded = True
+