1
2
3
4
5
6
7
8
9
10
11
12
13
14 from collections import deque
15 from twisted.internet import reactor, defer, error
16 from twisted.internet.protocol import ProcessProtocol
17 from twisted.python import failure
18 import time
19 import logging
20
21 log = logging.getLogger("zen.processqueue")
22
23
26
28 """
29 Ansynchronously run processes. Processes are queued up and run in a FIFO
30 order. Processes are run in concurrently up to the configured amount.
31 """
32
34 """
35 initialize the process queue; process in queue will not executed until
36 start is called.
37 @param parallel: number of process to run concurrently
38 @type parallel: int
39 """
40 self._parallel=parallel
41 self._processes=deque()
42 self._started=False
43 self._num_running=0
44
45 self._maxQtime = 0
46 self._maxExecTime = 0
47 self._stopped=None
48
49 - def queueProcess(self, executable, args=(), env={}, path=None, uid=None,
50 gid=None, usePTY=0, childFDs=None, processProtocol=None,
51 timeout=60, timeout_callback=None):
52 """
53 add a process to the queue. args are similar to reactor.spawnProcess
54 @param processProtocol: optional protocol to control the process
55 @type processProtocol: ProcessProtocol from twisted
56 @param timeout: how many seconds to let the process execute
57 @type timeout: int
58 @param timeout_callback: callable to call if the process times out
59 @type timeout_callback: callable w/ one arg
60 @raise QueueStopped: if the queue has been stopped
61 """
62 if self._stopped:
63 raise QueueStopped()
64 processQProtocol = None
65 if processProtocol:
66 processQProtocol = _ProcessQueueProtocolDecorator(processProtocol,
67 executable, args,
68 env, path, uid,
69 gid, usePTY,
70 childFDs,timeout,
71 timeout_callback)
72 else:
73 processQProtocol = _ProcessQueueProtocol(executable, args,
74 env, path, uid,
75 gid, usePTY,
76 childFDs,timeout,
77 timeout_callback)
78 log.debug("Adding process %s to queue" % processQProtocol)
79 log.debug("Processes in queue: %s" % len(self._processes))
80
81 self._processes.append(processQProtocol)
82
83 if self._started:
84 self._processQueue()
85
87 """
88 stops the process queue; no more processes will be accepted. deferred
89 will be called back when process queue is empty
90 """
91 if self._stopped: return self._stopped
92 self._stopped = defer.Deferred()
93 if self._num_running ==0 and len(self._processes) == 0:
94 self._stopped.callback("process queue is empty and stopped")
95 return self._stopped
96
98 """
99 start processing the queue. Processes will only be executed when the
100 reactor starts
101 """
102 def _doStart():
103
104
105 if not self._started:
106 self._started=True
107 self._processQueue()
108
109 reactor.callLater(0,_doStart)
110
112 def processFinished(value, processProtocol):
113 self._num_running -= 1
114 reactor.callLater(0, self._processQueue)
115
116 execTime = processProtocol.execStopTime - processProtocol.execStartTime
117 qTime = processProtocol.queueStopTime - processProtocol.queueStartTime
118 self._maxQtime = max(self._maxQtime, qTime)
119 self._maxExecTime = max(self._maxExecTime, execTime)
120 log.debug("execution time %s seconds; queue time %s seconds; "
121 "process %s"
122 % ( execTime, qTime, processProtocol))
123 if (self._num_running == 0
124 and self._stopped
125 and not self._stopped.called
126 and len(self._processes) == 0):
127 self._stopped.callback("process queue is empty and stopped")
128 log.debug("Number of process being executed: %s" % self._num_running)
129 if self._num_running < self._parallel:
130 processQProtocol = None
131 if self._processes:
132 processQProtocol = self._processes.popleft()
133 if processQProtocol:
134 self._num_running += 1
135 d = processQProtocol.start()
136 d.addBoth(processFinished, processQProtocol)
137
138 if self._processes and self._num_running < self._parallel:
139 reactor.callLater(0, self._processQueue)
140 return
141
143 """
144 For interal use by ProcessQueue
145 Protocol to run processes in ProcessQueue. Controls life cycle or process
146 including timing out long running processes
147 """
148
149 - def __init__(self, executable, args=(), env={}, path=None,
150 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60,
151 timeout_callback=None):
152 self._executable=executable
153 self._args=args
154 self._env=env
155 self._path=path
156 self._uid=uid
157 self._gid=gid
158 self._usePTY=usePTY
159 self._childFDs=childFDs
160 self._time_out=timeout
161 self._timeoutDeferred=None
162 self._timeout_callback=timeout_callback
163 self.queueStartTime = time.time()
164 self.queueStopTime = None
165 self.execStartTime = None
166 self.execStopTime = None
167
169 if self._args:
170 return"process %s" % " ".join(self._args)
171 else:
172 return "process %s" % self._executable
173
175 log.debug("spawning %s " % self)
176 now = time.time()
177 self.queueStopTime = now
178 self.execStartTime = now
179 reactor.spawnProcess(self, self._executable, self._args,
180 self._env, self._path, self._uid, self._gid,
181 self._usePTY, self._childFDs)
182 self._timeoutDeferred = createTimeout(defer.Deferred(), self._time_out, self)
183 self._timeoutDeferred.addErrback(self._timedOut)
184 if self._timeout_callback:
185 self._timeoutDeferred.addErrback(self._timeout_callback)
186 return self._timeoutDeferred
187
189 "Kill a process if it takes too long"
190 try:
191 if not self.execStopTime:
192 self.execStopTime = time.time()
193
194 self.transport.signalProcess('KILL')
195 log.warning("timed out after %s seconds: %s" % (self._time_out,
196 self))
197 except error.ProcessExitedAlready:
198 log.debug("Process already exited: %s" % self)
199 return value
200
202 """
203 This will be called when the subprocess is finished.
204
205 @type reason: L{twisted.python.failure.Failure}
206 """
207 if not self.execStopTime:
208 self.execStopTime = time.time()
209
210 deferred = self._timeoutDeferred
211 self._timeoutDeferred = None
212 if deferred and not deferred.called:
213 msg = reason.getErrorMessage()
214 exitCode = reason.value.exitCode
215 deferred.callback((exitCode,msg))
216
217
218
220 """
221 For interal use by ProcessQueue
222 Wraps an existing ProcessProtocol so that it can be run in a ProcessQueue
223 """
224 - def __init__(self, protocol, executable, args=(), env={}, path=None,
225 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60,
226 timeout_callback=None):
227 _ProcessQueueProtocol.__init__(self, executable, args, env, path, uid,
228 gid, usePTY, childFDs, timeout,
229 timeout_callback)
230 self._protocol = protocol
231
232
234 """
235 Some data was received from stdout.
236 """
237 self._protocol.outReceived(data)
238
240 """
241 Some data was received from stderr.
242 """
243 self._protocol.errReceived(data)
244
246 """
247 This will be called when stdin is closed.
248 """
249 self._protocol.inConnectionLost()
250
252 """
253 This will be called when stdout is closed.
254 """
255 self._protocol.outConnectionLost()
256
257
259 """
260 This will be called when stderr is closed.
261 """
262 self._protocol.errConnectionLost()
263
272
274 """
275 Error for a defered call taking too long to complete
276 """
277
279 Exception.__init__(self)
280 self.args = args
281
283 """
284 Cause an error on a deferred when it is taking too long to complete.
285 @param deferred: deferred to monitor for callback/errback
286 @type deferred: Deferred
287 @param seconds: Time to wait for a callback/errback on the deferred
288 @type seconds: int
289 @pram obj: context for the TimeoutError when timeout occurs
290 @type obj: anything
291 """
292
293 def _timeout(deferred, obj):
294 "took too long... call an errback"
295 deferred.errback(failure.Failure(TimeoutError(obj)))
296
297 def _cb(arg, timer):
298 "the process finished, possibly by timing out"
299 if not timer.called:
300 timer.cancel()
301 return arg
302
303 timer = reactor.callLater(seconds, _timeout, deferred, obj)
304 deferred.addBoth(_cb, timer)
305 return deferred
306