Package Products :: Package ZenUtils :: Module Executor
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.Executor

 1  ########################################################################### 
 2  # 
 3  # This program is part of Zenoss Core, an open source monitoring platform. 
 4  # Copyright (C) 2010, Zenoss Inc. 
 5  # 
 6  # This program is free software; you can redistribute it and/or modify it 
 7  # under the terms of the GNU General Public License version 2 or (at your 
 8  # option) any later version as published by the Free Software Foundation. 
 9  # 
10  # For complete information please visit: http://www.zenoss.com/oss/ 
11  # 
12  ########################################################################### 
13  from twisted.internet.defer import Deferred, maybeDeferred 
14  from twisted.internet import reactor 
15 16 -class TwistedExecutor(object):
17 """ 18 Executes up to N callables at a time. N is determined by the maxParrallel 19 used to construct an instance, unlimited by default. 20 """
21 - def __init__(self, maxParrallel=None):
22 self._max = maxParrallel 23 self._running = 0 24 self._taskQueue = []
25
26 - def setMax(self, max):
27 self._max = max 28 reactor.callLater(0, self._runTask)
29
30 - def getMax(self):
31 return self._max
32 33 @property
34 - def running(self):
35 return self._running
36 37 @property
38 - def queued(self):
39 return len(self._taskQueue)
40 41
42 - def submit(self, callable, *args, **kw):
43 """ 44 submit a callable to be executed. A deferred will be returned with the 45 the result of the callable. 46 """ 47 deferred = Deferred() 48 deferred.addBoth(self._taskFinished) 49 task = ExecutorTask(deferred, callable, *args, **kw) 50 self._taskQueue.append(task) 51 reactor.callLater(0, self._runTask) 52 return deferred
53
54 - def _runTask(self):
55 if self._taskQueue and (self._max is None or self._running < self._max): 56 self._running += 1 57 task = self._taskQueue.pop(0) 58 task() 59 reactor.callLater(0, self._runTask)
60
61 - def _taskFinished(self, result):
62 self._running -= 1 63 reactor.callLater(0, self._runTask) 64 return result
65
66 67 -class ExecutorTask(object):
68 """ 69 Used by TwistedExecutor to execute queued tasks 70 """
71 - def __init__(self, deferred, callable, *args, **kw):
72 self._callable = callable 73 self._args = args 74 self._kw = kw 75 self._deferred = deferred
76
77 - def __call__(self):
78 deferred = maybeDeferred(self._callable,*self._args, **self._kw) 79 deferred.addCallback(self._finished) 80 deferred.addErrback(self._error)
81
82 - def _finished(self, result):
83 self._deferred.callback(result)
84
85 - def _error(self, result):
86 self._deferred.errback(result)
87