Package Products :: Package ZenUtils :: Module ProcessQueue
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.ProcessQueue

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2009, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  from collections import deque 
 15  from twisted.internet import reactor, defer, error 
 16  from twisted.internet.protocol import ProcessProtocol 
 17  from twisted.python import failure 
 18  import time 
 19  import logging 
 20   
 21  log = logging.getLogger("zen.processqueue") 
 22   
 23   
24 -class QueueStopped(Exception):
25 pass
26
27 -class ProcessQueue(object):
28 """ 29 Ansynchronously run processes. Processes are queued up and run in a FIFO 30 order. Processes are run in concurrently up to the configured amount. 31 """ 32
33 - def __init__(self, parallel=10):
34 """ 35 initialize the process queue; process in queue will not executed until 36 start is called. 37 @param parallel: number of process to run concurrently 38 @type parallel: int 39 """ 40 self._parallel=parallel 41 self._processes=deque() 42 self._started=False 43 self._num_running=0 44 45 self._maxQtime = 0 46 self._maxExecTime = 0 47 self._stopped=None
48
49 - def queueProcess(self, executable, args=(), env={}, path=None, uid=None, 50 gid=None, usePTY=0, childFDs=None, processProtocol=None, 51 timeout=60, timeout_callback=None):
52 """ 53 add a process to the queue. args are similar to reactor.spawnProcess 54 @param processProtocol: optional protocol to control the process 55 @type processProtocol: ProcessProtocol from twisted 56 @param timeout: how many seconds to let the process execute 57 @type timeout: int 58 @param timeout_callback: callable to call if the process times out 59 @type timeout_callback: callable w/ one arg 60 @raise QueueStopped: if the queue has been stopped 61 """ 62 if self._stopped: 63 raise QueueStopped() 64 processQProtocol = None 65 if processProtocol: 66 processQProtocol = _ProcessQueueProtocolDecorator(processProtocol, 67 executable, args, 68 env, path, uid, 69 gid, usePTY, 70 childFDs,timeout, 71 timeout_callback) 72 else: 73 processQProtocol = _ProcessQueueProtocol(executable, args, 74 env, path, uid, 75 gid, usePTY, 76 childFDs,timeout, 77 timeout_callback) 78 log.debug("Adding process %s to queue" % processQProtocol) 79 log.debug("Processes in queue: %s" % len(self._processes)) 80 81 self._processes.append(processQProtocol) 82 83 if self._started: 84 self._processQueue()
85
86 - def stop(self):
87 """ 88 stops the process queue; no more processes will be accepted. deferred 89 will be called back when process queue is empty 90 """ 91 if self._stopped: return self._stopped 92 self._stopped = defer.Deferred() 93 if self._num_running ==0 and len(self._processes) == 0: 94 self._stopped.callback("process queue is empty and stopped") 95 return self._stopped
96
97 - def start(self):
98 """ 99 start processing the queue. Processes will only be executed when the 100 reactor starts 101 """ 102 def _doStart(): 103 # don't want to actually start unless reactor is running to prevent 104 #zombie processes 105 if not self._started: 106 self._started=True 107 self._processQueue()
108 109 reactor.callLater(0,_doStart)
110
111 - def _processQueue(self):
112 def processFinished(value, processProtocol): 113 self._num_running -= 1 114 reactor.callLater(0, self._processQueue) 115 116 execTime = processProtocol.execStopTime - processProtocol.execStartTime 117 qTime = processProtocol.queueStopTime - processProtocol.queueStartTime 118 self._maxQtime = max(self._maxQtime, qTime) 119 self._maxExecTime = max(self._maxExecTime, execTime) 120 log.debug("execution time %s seconds; queue time %s seconds; " 121 "process %s" 122 % ( execTime, qTime, processProtocol)) 123 if (self._num_running == 0 124 and self._stopped 125 and not self._stopped.called 126 and len(self._processes) == 0): 127 self._stopped.callback("process queue is empty and stopped")
128 log.debug("Number of process being executed: %s" % self._num_running) 129 if self._num_running < self._parallel: 130 processQProtocol = None 131 if self._processes: 132 processQProtocol = self._processes.popleft() 133 if processQProtocol: 134 self._num_running += 1 135 d = processQProtocol.start() 136 d.addBoth(processFinished, processQProtocol) 137 138 if self._processes and self._num_running < self._parallel: 139 reactor.callLater(0, self._processQueue) 140 return 141
142 -class _ProcessQueueProtocol(ProcessProtocol):
143 """ 144 For interal use by ProcessQueue 145 Protocol to run processes in ProcessQueue. Controls life cycle or process 146 including timing out long running processes 147 """ 148
149 - def __init__(self, executable, args=(), env={}, path=None, 150 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60, 151 timeout_callback=None):
152 self._executable=executable 153 self._args=args 154 self._env=env 155 self._path=path 156 self._uid=uid 157 self._gid=gid 158 self._usePTY=usePTY 159 self._childFDs=childFDs 160 self._time_out=timeout 161 self._timeoutDeferred=None 162 self._timeout_callback=timeout_callback 163 self.queueStartTime = time.time() 164 self.queueStopTime = None 165 self.execStartTime = None 166 self.execStopTime = None
167
168 - def __str__(self):
169 if self._args: 170 return"process %s" % " ".join(self._args) 171 else: 172 return "process %s" % self._executable
173
174 - def start(self):
175 log.debug("spawning %s " % self) 176 now = time.time() 177 self.queueStopTime = now 178 self.execStartTime = now 179 reactor.spawnProcess(self, self._executable, self._args, 180 self._env, self._path, self._uid, self._gid, 181 self._usePTY, self._childFDs) 182 self._timeoutDeferred = createTimeout(defer.Deferred(), self._time_out, self) 183 self._timeoutDeferred.addErrback(self._timedOut) 184 if self._timeout_callback: 185 self._timeoutDeferred.addErrback(self._timeout_callback) 186 return self._timeoutDeferred
187
188 - def _timedOut(self, value):
189 "Kill a process if it takes too long" 190 try: 191 if not self.execStopTime: 192 self.execStopTime = time.time() 193 194 self.transport.signalProcess('KILL') 195 log.warning("timed out after %s seconds: %s" % (self._time_out, 196 self)) 197 except error.ProcessExitedAlready: 198 log.debug("Process already exited: %s" % self) 199 return value
200
201 - def processEnded(self, reason):
202 """ 203 This will be called when the subprocess is finished. 204 205 @type reason: L{twisted.python.failure.Failure} 206 """ 207 if not self.execStopTime: 208 self.execStopTime = time.time() 209 210 deferred = self._timeoutDeferred 211 self._timeoutDeferred = None 212 if deferred and not deferred.called: 213 msg = reason.getErrorMessage() 214 exitCode = reason.value.exitCode 215 deferred.callback((exitCode,msg))
216 217 218
219 -class _ProcessQueueProtocolDecorator(_ProcessQueueProtocol):
220 """ 221 For interal use by ProcessQueue 222 Wraps an existing ProcessProtocol so that it can be run in a ProcessQueue 223 """
224 - def __init__(self, protocol, executable, args=(), env={}, path=None, 225 uid=None, gid=None, usePTY=0, childFDs=None, timeout=60, 226 timeout_callback=None):
227 _ProcessQueueProtocol.__init__(self, executable, args, env, path, uid, 228 gid, usePTY, childFDs, timeout, 229 timeout_callback) 230 self._protocol = protocol
231 232
233 - def outReceived(self, data):
234 """ 235 Some data was received from stdout. 236 """ 237 self._protocol.outReceived(data)
238
239 - def errReceived(self, data):
240 """ 241 Some data was received from stderr. 242 """ 243 self._protocol.errReceived(data)
244
245 - def inConnectionLost(self):
246 """ 247 This will be called when stdin is closed. 248 """ 249 self._protocol.inConnectionLost()
250
251 - def outConnectionLost(self):
252 """ 253 This will be called when stdout is closed. 254 """ 255 self._protocol.outConnectionLost()
256 257
258 - def errConnectionLost(self):
259 """ 260 This will be called when stderr is closed. 261 """ 262 self._protocol.errConnectionLost()
263
264 - def processEnded(self, reason):
265 """ 266 This will be called when the subprocess is finished. 267 268 @type reason: L{twisted.python.failure.Failure} 269 """ 270 _ProcessQueueProtocol.processEnded(self, reason) 271 self._protocol.processEnded(reason)
272
273 -class TimeoutError(Exception):
274 """ 275 Error for a defered call taking too long to complete 276 """ 277
278 - def __init__(self, *args):
279 Exception.__init__(self) 280 self.args = args
281
282 -def createTimeout(deferred, seconds, obj):
283 """ 284 Cause an error on a deferred when it is taking too long to complete. 285 @param deferred: deferred to monitor for callback/errback 286 @type deferred: Deferred 287 @param seconds: Time to wait for a callback/errback on the deferred 288 @type seconds: int 289 @pram obj: context for the TimeoutError when timeout occurs 290 @type obj: anything 291 """ 292 293 def _timeout(deferred, obj): 294 "took too long... call an errback" 295 deferred.errback(failure.Failure(TimeoutError(obj)))
296 297 def _cb(arg, timer): 298 "the process finished, possibly by timing out" 299 if not timer.called: 300 timer.cancel() 301 return arg 302 303 timer = reactor.callLater(seconds, _timeout, deferred, obj) 304 deferred.addBoth(_cb, timer) 305 return deferred 306