Package Products :: Package ZenRRD :: Module zencommand
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zencommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2009, 2010 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """ZenCommand 
 15   
 16  Run Command plugins periodically. 
 17   
 18  """ 
 19   
 20  import random 
 21  import time 
 22  from pprint import pformat 
 23  import logging 
 24  log = logging.getLogger("zen.zencommand") 
 25  import traceback 
 26  from copy import copy 
 27   
 28  from twisted.internet import reactor, defer, error 
 29  from twisted.internet.protocol import ProcessProtocol 
 30  from twisted.python.failure import Failure 
 31   
 32  from twisted.spread import pb 
 33   
 34  import Globals 
 35  import zope.interface 
 36   
 37  from Products.ZenUtils.Utils import unused, getExitMessage, readable_time 
 38  from Products.DataCollector.SshClient import SshClient 
 39  from Products.ZenEvents.ZenEventClasses import Clear, Error, Cmd_Fail, Cmd_Ok 
 40  from Products.ZenRRD.CommandParser import ParsedResults 
 41   
 42  from Products.ZenCollector.daemon import CollectorDaemon 
 43  from Products.ZenCollector.interfaces import ICollectorPreferences,\ 
 44                                               IDataService,\ 
 45                                               IEventService,\ 
 46                                               IScheduledTask 
 47  from Products.ZenCollector.tasks import SimpleTaskFactory,\ 
 48                                          SubConfigurationTaskSplitter,\ 
 49                                          TaskStates, \ 
 50                                          BaseTask 
 51  from Products.ZenCollector.pools import getPool 
 52  from Products.ZenEvents import Event 
 53  from Products.ZenUtils.Executor import TwistedExecutor 
 54   
 55  from Products.DataCollector import Plugins 
 56  unused(Plugins) 
 57   
 58  MAX_CONNECTIONS = 250 
 59  MAX_BACK_OFF_MINUTES = 20 
 60   
 61  # We retrieve our configuration data remotely via a Twisted PerspectiveBroker 
 62  # connection. To do so, we need to import the class that will be used by the 
 63  # configuration service to send the data over, i.e. DeviceProxy. 
 64  from Products.ZenCollector.services.config import DeviceProxy 
 65  unused(DeviceProxy) 
 66   
 67  COLLECTOR_NAME = "zencommand" 
 68  POOL_NAME = 'SshConfigs' 
 69   
70 -class SshPerformanceCollectionPreferences(object):
71 zope.interface.implements(ICollectorPreferences) 72
73 - def __init__(self):
74 """ 75 Constructs a new SshPerformanceCollectionPreferences instance and 76 provides default values for needed attributes. 77 """ 78 self.collectorName = COLLECTOR_NAME 79 self.defaultRRDCreateCommand = None 80 self.configCycleInterval = 20 # minutes 81 self.cycleInterval = 5 * 60 # seconds 82 83 # The configurationService attribute is the fully qualified class-name 84 # of our configuration service that runs within ZenHub 85 self.configurationService = 'Products.ZenHub.services.CommandPerformanceConfig' 86 87 # Provide a reasonable default for the max number of tasks 88 self.maxTasks = 50 89 90 # Will be filled in based on buildOptions 91 self.options = None
92
93 - def buildOptions(self, parser):
94 parser.add_option('--showrawresults', 95 dest='showrawresults', 96 action="store_true", 97 default=False, 98 help="Show the raw RRD values. For debugging purposes only.") 99 100 parser.add_option('--maxbackoffminutes', 101 dest='maxbackoffminutes', 102 default=MAX_BACK_OFF_MINUTES, 103 help="When a device fails to respond, increase the time to" \ 104 " check on the device until this limit.") 105 106 parser.add_option('--showfullcommand', 107 dest='showfullcommand', 108 action="store_true", 109 default=False, 110 help="Display the entire command and command-line arguments, " \ 111 " including any passwords.")
112
113 - def postStartup(self):
114 pass
115 116
117 -class SshPerCycletimeTaskSplitter(SubConfigurationTaskSplitter):
118 subconfigName = 'datasources' 119
120 - def makeConfigKey(self, config, subconfig):
121 return (config.id, subconfig.cycleTime, 'Remote' if subconfig.useSsh else 'Local')
122 123
124 -class TimeoutError(Exception):
125 """ 126 Error for a defered call taking too long to complete 127 """
128 - def __init__(self, *args):
129 Exception.__init__(self) 130 self.args = args
131 132
133 -def timeoutCommand(deferred, seconds, obj):
134 "Cause an error on a deferred when it is taking too long to complete" 135 136 def _timeout(deferred, obj): 137 "took too long... call an errback" 138 deferred.errback(Failure(TimeoutError(obj)))
139 140 def _cb(arg, timer): 141 "the command finished, possibly by timing out" 142 if not timer.called: 143 timer.cancel() 144 return arg 145 146 timer = reactor.callLater(seconds, _timeout, deferred, obj) 147 deferred.mytimer = timer 148 deferred.addBoth(_cb, timer) 149 return deferred 150 151
152 -class ProcessRunner(ProcessProtocol):
153 """ 154 Provide deferred process execution for a *single* command 155 """ 156 stopped = None 157 exitCode = None 158 output = '' 159 stderr = '' 160
161 - def start(self, cmd):
162 """ 163 Kick off the process: run it local 164 """ 165 log.debug('Running %s', cmd.command.split()[0]) 166 167 self._cmd = cmd 168 shell = '/bin/sh' 169 self.cmdline = (shell, '-c', 'exec %s' % cmd.command) 170 self.command = ' '.join(self.cmdline) 171 172 reactor.spawnProcess(self, shell, self.cmdline, env=cmd.env) 173 174 d = timeoutCommand(defer.Deferred(), cmd.deviceConfig.zCommandCommandTimeout, cmd) 175 self.stopped = d 176 self.stopped.addErrback(self.timeout) 177 return d
178
179 - def timeout(self, value):
180 """ 181 Kill a process if it takes too long 182 """ 183 try: 184 self.transport.signalProcess('KILL') 185 except error.ProcessExitedAlready: 186 log.debug("Command already exited: %s", self.command.split()[0]) 187 return value
188
189 - def outReceived(self, data):
190 """ 191 Store up the output as it arrives from the process 192 """ 193 self.output += data
194
195 - def errReceived(self, data):
196 """ 197 Store up the output as it arrives from the process 198 """ 199 self.stderr += data
200
201 - def processEnded(self, reason):
202 """ 203 Notify the starter that their process is complete 204 """ 205 self.exitCode = reason.value.exitCode 206 if self.exitCode is not None: 207 msg = """Datasource: %s Received exit code: %s Output:\n%r""" 208 data = [self._cmd.ds, self.exitCode, self.output] 209 if self.stderr: 210 msg += "\nStandard Error:\n%r" 211 data.append(self.stderr) 212 log.debug(msg, *data) 213 214 if self.stopped: 215 d, self.stopped = self.stopped, None 216 if not d.called: 217 d.callback(self)
218 219
220 -class MySshClient(SshClient):
221 """ 222 Connection to SSH server at the remote device 223 """
224 - def __init__(self, *args, **kw):
225 SshClient.__init__(self, *args, **kw) 226 self.defers = {} 227 self._taskList = set()
228
229 - def addCommand(self, command):
230 """ 231 Run a command against the server 232 """ 233 d = defer.Deferred() 234 self.defers[command] = d 235 SshClient.addCommand(self, command) 236 return d
237
238 - def addResult(self, command, data, code):
239 """ 240 Forward the results of the command execution to the starter 241 """ 242 # don't call the CollectorClient.addResult which adds the result to a 243 # member variable for zenmodeler 244 d = self.defers.pop(command, None) 245 if d is None: 246 log.error("Internal error where deferred object not in dictionary." \ 247 " Command = '%s' Data = '%s' Code = '%s'", 248 command.split()[0], data, code) 249 elif not d.called: 250 d.callback((data, code))
251
252 - def clientConnectionLost(self, connector, reason):
253 connection_description = '%s:*****@%s:%s' % (self.username, self.ip, self.port) 254 # Connection was lost, but could be because we just closed it. Not necessarily cause for concern. 255 log.debug("Connection %s lost." % connection_description) 256 pool = getPool('SSH Connections') 257 poolkey = hash((self.username, self.password, self.ip, self.port)) 258 if poolkey in pool: 259 # Clean it up so the next time around the task will get a new connection 260 log.debug("Deleting connection %s from pool." % connection_description) 261 del pool[poolkey]
262
263 - def check(self, ip, timeout=2):
264 """ 265 Turn off blocking SshClient.test method 266 """ 267 return True
268
269 - def clientFinished(self):
270 """ 271 We don't need to track commands/results when they complete 272 """ 273 SshClient.clientFinished(self) 274 self.cmdmap = {} 275 self._commands = [] 276 self.results = []
277
278 - def clientConnectionFailed(self, connector, reason):
279 """ 280 If we didn't connect let the modeler know 281 282 @param connector: connector associated with this failure 283 @type connector: object 284 @param reason: failure object 285 @type reason: object 286 """ 287 self.clientFinished() 288 message= reason.getErrorMessage() 289 for task in list(self._taskList): 290 task.connectionFailed(message)
291 292
293 -class SshOptions:
294 loginTries=1 295 searchPath='' 296 existenceTest=None 297
298 - def __init__(self, username, password, loginTimeout, commandTimeout, 299 keyPath, concurrentSessions):
300 self.username = username 301 self.password = password 302 self.loginTimeout=loginTimeout 303 self.commandTimeout=commandTimeout 304 self.keyPath = keyPath 305 self.concurrentSessions = concurrentSessions
306 307
308 -class SshRunner(object):
309 """ 310 Run a single command across a cached SSH connection 311 """ 312
313 - def __init__(self, connection):
314 self._connection = connection 315 self.exitCode = None 316 self.output = None 317 self.stderr = None
318
319 - def start(self, cmd):
320 "Initiate a command on the remote device" 321 self.defer = defer.Deferred(canceller=self._canceller) 322 try: 323 d = timeoutCommand(self._connection.addCommand(cmd.command), 324 self._connection.commandTimeout, 325 cmd) 326 except Exception, ex: 327 log.warning('Error starting command: %s', ex) 328 return defer.fail(ex) 329 d.addErrback(self.timeout) 330 d.addBoth(self.processEnded) 331 return d
332
333 - def _canceller(self, deferToCancel):
334 if not deferToCancel.mytimer.called: 335 deferToCancel.mytimer.cancel() 336 return None
337
338 - def timeout(self, arg):
339 "Deal with slow executing command/connection (close it)" 340 # We could send a kill signal, but then we would need to track 341 # the command channel to send it. Just close the connection. 342 return arg
343
344 - def processEnded(self, value):
345 "Deliver ourselves to the starter with the proper attributes" 346 if isinstance(value, Failure): 347 return value 348 self.output, self.exitCode = value 349 return self
350 351
352 -class DataPointConfig(pb.Copyable, pb.RemoteCopy):
353 id = '' 354 component = '' 355 rrdPath = '' 356 rrdType = None 357 rrdCreateCommand = '' 358 rrdMin = None 359 rrdMax = None 360
361 - def __init__(self):
362 self.data = {}
363
364 - def __repr__(self):
365 return pformat((self.data, self.id))
366 367 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig) 368
369 -class Cmd(pb.Copyable, pb.RemoteCopy):
370 """ 371 Holds the config of every command to be run 372 """ 373 device = '' 374 command = None 375 ds = '' 376 useSsh = False 377 cycleTime = None 378 eventClass = None 379 eventKey = None 380 severity = 3 381 lastStart = 0 382 lastStop = 0 383 result = None 384 env = None 385
386 - def __init__(self):
387 self.points = []
388
389 - def processCompleted(self, pr):
390 """ 391 Return back the datasource with the ProcessRunner/SshRunner stored in 392 the the 'result' attribute. 393 """ 394 self.result = pr 395 self.lastStop = time.time() 396 397 # Check for a condition that could cause zencommand to stop cycling. 398 # http://dev.zenoss.org/trac/ticket/4936 399 if self.lastStop < self.lastStart: 400 log.debug('System clock went back?') 401 self.lastStop = self.lastStart 402 403 if isinstance(pr, Failure): 404 return pr 405 406 log.debug('Process %s stopped (%s), %.2f seconds elapsed', 407 self.name, 408 pr.exitCode, 409 self.lastStop - self.lastStart) 410 return self
411
412 - def getEventKey(self, point):
413 # fetch datapoint name from filename path and add it to the event key 414 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
415
416 - def commandKey(self):
417 "Provide a value that establishes the uniqueness of this command" 418 return '%'.join(map(str, [self.useSsh, self.cycleTime, 419 self.severity, self.command]))
420 - def __str__(self):
421 return ' '.join(map(str, [ 422 self.ds, 423 'useSSH=%s' % self.useSsh, 424 self.cycleTime, 425 ]))
426 427 pb.setUnjellyableForClass(Cmd, Cmd) 428 429 430 STATUS_EVENT = { 'eventClass' : Cmd_Fail, 431 'component' : 'command', 432 } 433
434 -class SshPerformanceCollectionTask(BaseTask):
435 """ 436 A task that performs periodic performance collection for devices providing 437 data via SSH connections. 438 """ 439 zope.interface.implements(IScheduledTask) 440 441 STATE_CONNECTING = 'CONNECTING' 442 STATE_FETCH_DATA = 'FETCH_DATA' 443 STATE_PARSE_DATA = 'PARSING_DATA' 444 STATE_STORE_PERF = 'STORE_PERF_DATA' 445
446 - def __init__(self, 447 taskName, 448 configId, 449 scheduleIntervalSeconds, 450 taskConfig):
451 """ 452 @param taskName: the unique identifier for this task 453 @type taskName: string 454 @param configId: configuration to watch 455 @type configId: string 456 @param scheduleIntervalSeconds: the interval at which this task will be 457 collected 458 @type scheduleIntervalSeconds: int 459 @param taskConfig: the configuration for this task 460 """ 461 super(SshPerformanceCollectionTask, self).__init__( 462 taskName, configId, 463 scheduleIntervalSeconds, taskConfig 464 ) 465 466 # Needed for interface 467 self.name = taskName 468 self.configId = configId 469 self.state = TaskStates.STATE_IDLE 470 self.interval = scheduleIntervalSeconds 471 472 # The taskConfig corresponds to a DeviceProxy 473 self._device = taskConfig 474 475 self._devId = self._device.id 476 self._manageIp = self._device.manageIp 477 478 self._dataService = zope.component.queryUtility(IDataService) 479 self._eventService = zope.component.queryUtility(IEventService) 480 481 self._preferences = zope.component.queryUtility(ICollectorPreferences, 482 COLLECTOR_NAME) 483 self._lastErrorMsg = '' 484 485 self._maxbackoffseconds = self._preferences.options.maxbackoffminutes * 60 486 487 self._concurrentSessions = taskConfig.zSshConcurrentSessions 488 self._executor = TwistedExecutor(self._concurrentSessions) 489 self._useSsh = taskConfig.datasources[0].useSsh 490 self._connection = None 491 492 self._datasources = taskConfig.datasources 493 self.pool = getPool('SSH Connections') 494 self.executed = 0
495
496 - def __str__(self):
497 return "COMMAND schedule Name: %s configId: %s Datasources: %d" % ( 498 self.name, self.configId, len(self._datasources))
499
500 - def cleanup(self):
501 self._cleanUpPool() 502 self._close()
503
504 - def _getPoolKey(self):
505 """ 506 Get the key under which the client should be stored in the pool. 507 """ 508 username = self._device.zCommandUsername 509 password = self._device.zCommandPassword 510 ip = self._manageIp 511 port = self._device.zCommandPort 512 return hash((username, password, ip, port))
513
514 - def _cleanUpPool(self):
515 """ 516 Close the connection currently associated with this task. 517 """ 518 poolkey = self._getPoolKey() 519 if poolkey in self.pool: 520 client = self.pool[poolkey] 521 tasklist = client._taskList 522 if not tasklist: 523 # No other tasks, so safe to clean up 524 transport = client.transport 525 if transport: 526 transport.loseConnection() 527 del self.pool[poolkey]
528
529 - def doTask(self):
530 """ 531 Contact to one device and return a deferred which gathers data from 532 the device. 533 534 @return: Deferred actions to run against a device configuration 535 @rtype: Twisted deferred object 536 """ 537 # See if we need to connect first before doing any collection 538 d = defer.maybeDeferred(self._connect) 539 d.addCallbacks(self._connectCallback, self._failure) 540 d.addCallback(self._fetchPerf) 541 542 # Call _finished for both success and error scenarios 543 d.addBoth(self._finished) 544 545 # Wait until the Deferred actually completes 546 return d
547
548 - def _connect(self):
549 """ 550 If a local datasource executor, do nothing. 551 552 If an SSH datasource executor, create a connection to object the remote device. 553 Make a new SSH connection object if there isn't one available. This doesn't 554 actually connect to the device. 555 """ 556 if not self._useSsh: 557 return defer.succeed(None) 558 559 connection = self.pool.get(self._getPoolKey(), None) 560 if connection is None: 561 self.state = SshPerformanceCollectionTask.STATE_CONNECTING 562 log.debug("Creating connection object to %s", self._devId) 563 username = self._device.zCommandUsername 564 password = self._device.zCommandPassword 565 loginTimeout = self._device.zCommandLoginTimeout 566 commandTimeout = self._device.zCommandCommandTimeout 567 keypath = self._device.zKeyPath 568 options = SshOptions(username, password, 569 loginTimeout, commandTimeout, 570 keypath, self._concurrentSessions) 571 572 connection = MySshClient(self._devId, self._manageIp, 573 self._device.zCommandPort, options=options) 574 connection.sendEvent = self._eventService.sendEvent 575 576 self.pool[self._getPoolKey()] = connection 577 578 # Opens SSH connection to device 579 connection.run() 580 581 self._connection = connection 582 self._connection._taskList.add(self) 583 return connection
584
585 - def _close(self):
586 """ 587 If a local datasource executor, do nothing. 588 589 If an SSH datasource executor, relinquish a connection to the remote device. 590 """ 591 if self._connection: 592 self._connection._taskList.discard(self) 593 if not self._connection._taskList: 594 if self._getPoolKey() in self.pool: 595 client = self.pool[self._getPoolKey()] 596 client.clientFinished() 597 client.channelClosed() 598 self._connection = None
599
600 - def connectionFailed(self, msg):
601 """ 602 This method is called by the SSH client when the connection fails. 603 604 @parameter msg: message indicating the cause of the problem 605 @type msg: string 606 """ 607 # Note: Raising an exception and then catching it doesn't work 608 # as it appears that the exception is discarded in PBDaemon.py 609 self.state = TaskStates.STATE_PAUSED 610 log.error("Pausing task %s as %s [%s] connection failure: %s", 611 self.name, self._devId, self._manageIp, msg) 612 self._eventService.sendEvent(STATUS_EVENT, 613 device=self._devId, 614 summary=msg, 615 component=COLLECTOR_NAME, 616 severity=Event.Error) 617 self._commandsToExecute.cancel()
618
619 - def _failure(self, reason):
620 """ 621 Twisted errBack to log the exception for a single device. 622 623 @parameter reason: explanation of the failure 624 @type reason: Twisted error instance 625 """ 626 # Decode the exception 627 if isinstance(reason.value, TimeoutError): 628 cmd, = reason.value.args 629 msg = "Command timed out on device %s: %r" % ( 630 self._devId, cmd.command.split()[0]) 631 log.warning(msg) 632 ev = self._makeCmdEvent(cmd, cmd.severity, msg) 633 self._eventService.sendEvent(ev) 634 635 # Don't log a traceback by not returning a result 636 reason = None 637 638 elif isinstance(reason.value, defer.CancelledError): 639 # The actual issue is logged by connectionFailed 640 # Don't log a traceback by not returning a result 641 msg = "Task %s paused due to connection error" % self.name 642 reason = None 643 644 else: 645 msg = reason.getErrorMessage() 646 if not msg: # Sometimes we get blank error messages 647 msg = reason.__class__ 648 msg = '%s %s' % (self._devId, msg) 649 # Leave 'reason' alone to generate a traceback 650 651 if self._lastErrorMsg != msg: 652 self._lastErrorMsg = msg 653 if msg: 654 log.error(msg) 655 656 if reason: 657 self._eventService.sendEvent(STATUS_EVENT, 658 device=self._devId, 659 summary=msg, 660 severity=Event.Error) 661 662 if self._useSsh: 663 self._delayNextCheck() 664 665 return reason
666
667 - def _connectCallback(self, result):
668 """ 669 Callback called after a successful connect to the remote device. 670 """ 671 if self._useSsh: 672 log.debug("Connected to %s [%s]", self._devId, self._manageIp) 673 else: 674 log.debug("Running command(s) locally") 675 return result
676
677 - def _addDatasource(self, datasource):
678 """ 679 Add a new instantiation of ProcessRunner or SshRunner 680 for every datasource. 681 """ 682 if self._preferences.options.showfullcommand: 683 log.info("Datasource %s command: %s", datasource.name, 684 datasource.command) 685 686 if self._useSsh: 687 runner = SshRunner(self._connection) 688 else: 689 runner = ProcessRunner() 690 691 d = runner.start(datasource) 692 datasource.lastStart = time.time() 693 d.addBoth(datasource.processCompleted) 694 return d
695
696 - def _fetchPerf(self, ignored):
697 """ 698 Get performance data for all the monitored components on a device 699 700 @parameter ignored: required to keep Twisted's callback chain happy 701 @type ignored: result of previous callback 702 """ 703 self.state = SshPerformanceCollectionTask.STATE_FETCH_DATA 704 705 # The keys are the datasource commands, which are by definition unique 706 # to the command run. 707 cacheableDS = {} 708 709 # Bundle up the list of tasks 710 deferredCmds = [] 711 for datasource in self._datasources: 712 datasource.deviceConfig = self._device 713 714 if datasource.command in cacheableDS: 715 cacheableDS[datasource.command].append(datasource) 716 continue 717 cacheableDS[datasource.command] = [] 718 719 task = self._executor.submit(self._addDatasource, datasource) 720 deferredCmds.append(task) 721 722 # Run the tasks 723 dl = defer.DeferredList(deferredCmds, consumeErrors=True) 724 dl.addCallback(self._parseResults, cacheableDS) 725 dl.addCallback(self._storeResults) 726 dl.addCallback(self._updateStatus) 727 728 # Save the list in case we need to cancel the commands 729 self._commandsToExecute = dl 730 return dl
731
732 - def _parseResults(self, resultList, cacheableDS):
733 """ 734 Interpret the results retrieved from the commands and pass on 735 the datapoint values and events. 736 737 @parameter resultList: results of running the commands in a DeferredList 738 @type resultList: array of (boolean, datasource) 739 @parameter cacheableDS: other datasources that can use the same results 740 @type cacheableDS: dictionary of arrays of datasources 741 """ 742 self.state = SshPerformanceCollectionTask.STATE_PARSE_DATA 743 parseableResults = [] 744 for success, datasource in resultList: 745 results = ParsedResults() 746 if not success: 747 # In this case, our datasource is actually a defer.Failure 748 reason = datasource 749 datasource, = reason.value.args 750 msg = "Datasource %s command timed out" % ( 751 datasource.name) 752 ev = self._makeCmdEvent(datasource, msg) 753 results.events.append(ev) 754 755 else: 756 # Re-use our results for any similar datasources 757 cachedDsList = cacheableDS.get(datasource.command) 758 if cachedDsList: 759 for ds in cachedDsList: 760 ds.result = copy(datasource.result) 761 results = ParsedResults() 762 self._processDatasourceResults(ds, results) 763 parseableResults.append( (ds, results) ) 764 results = ParsedResults() 765 766 self._processDatasourceResults(datasource, results) 767 768 parseableResults.append( (datasource, results) ) 769 return parseableResults
770
771 - def _makeParser(self, datasource, eventList):
772 """ 773 Create a parser object to process data 774 775 @parameter datasource: datasource containg information 776 @type datasource: Cmd object 777 @parameter eventList: list of events 778 @type eventList: list of dictionaries 779 """ 780 parser = None 781 try: 782 parser = datasource.parser.create() 783 except Exception, ex: 784 msg = "Error loading parser %s" % datasource.parser 785 log.exception("%s %s %s", self.name, datasource.name, msg) 786 ev = self._makeCmdEvent(datasource, msg) 787 ev['message'] = traceback.format_exc() 788 eventList.append(ev) 789 return parser
790
791 - def _processDatasourceResults(self, datasource, results):
792 """ 793 Process a single datasource's results 794 795 @parameter datasource: datasource containg information 796 @type datasource: Cmd object 797 @parameter results: empty results object 798 @type results: ParsedResults object 799 """ 800 showcommand = self._preferences.options.showfullcommand 801 if not datasource.result.output: 802 msg = "No data returned for command" 803 if showcommand: 804 msg += ": %s" % datasource.command 805 log.warn("%s %s %s", self.name, datasource.name, msg) 806 ev = self._makeCmdEvent(datasource, msg) 807 if showcommand: 808 ev['command'] = datasource.command 809 results.events.append(ev) 810 return 811 812 parser = self._makeParser(datasource, results.events) 813 if not parser: 814 return 815 816 try: 817 parser.preprocessResults(datasource, log) 818 parser.processResults(datasource, results) 819 if not results.events and parser.createDefaultEventUsingExitCode: 820 # Add a failsafe event guessing at the error codes 821 self._addDefaultEvent(datasource, results) 822 if datasource.result.stderr: 823 self._addStderrMsg(datasource.result.stderr, 824 results.events) 825 except Exception, ex: 826 msg = "Error running parser %s" % datasource.parser 827 log.exception("%s %s %s", self.name, datasource.name, msg) 828 ev = self._makeCmdEvent(datasource, msg) 829 ev['message'] = traceback.format_exc() 830 ev['output'] = datasource.result.output 831 results.events.append(ev)
832
833 - def _addDefaultEvent(self, datasource, results):
834 """ 835 If there is no event, send one based on the exit code. 836 """ 837 exitCode = datasource.result.exitCode 838 if exitCode == 0: 839 msg = '' 840 severity = 0 841 else: 842 msg = 'Datasource: %s - Code: %s - Msg: %s' % ( 843 datasource.name, exitCode, getExitMessage(exitCode)) 844 severity = datasource.severity 845 846 ev = self._makeCmdEvent(datasource, msg, severity) 847 results.events.append(ev)
848
849 - def _addStderrMsg(self, stderrMsg, eventList):
850 """ 851 Add the stderr output to error events. 852 853 @parameter stderrMsg: stderr output from the command 854 @type stderrMsg: string 855 @parameter eventList: list of events 856 @type eventList: list of dictionaries 857 """ 858 for event in eventList: 859 if event['severity'] not in ('Clear', 'Info', 'Debug'): 860 event['stderr'] = stderrMsg
861
862 - def _storeResults(self, resultList):
863 """ 864 Store the values in RRD files 865 866 @parameter resultList: results of running the commands 867 @type resultList: array of (datasource, dictionary) 868 """ 869 self.state = SshPerformanceCollectionTask.STATE_STORE_PERF 870 for datasource, results in resultList: 871 for dp, value in results.values: 872 threshData = { 873 'eventKey': datasource.getEventKey(dp), 874 'component': dp.component, 875 } 876 self._dataService.writeRRD( 877 dp.rrdPath, 878 value, 879 dp.rrdType, 880 dp.rrdCreateCommand, 881 datasource.cycleTime, 882 dp.rrdMin, 883 dp.rrdMax, 884 threshData) 885 886 return resultList
887
888 - def _updateStatus(self, resultList):
889 """ 890 Send any accumulated events 891 892 @parameter resultList: results of running the commands 893 @type resultList: array of (datasource, dictionary) 894 """ 895 for datasource, results in resultList: 896 self._clearEvent(datasource, results.events) 897 for ev in results.events: 898 self._eventService.sendEvent(ev, device=self._devId) 899 return resultList
900
901 - def _clearEvent(self, datasource, eventList):
902 """ 903 Ensure that a CLEAR event is sent for any command that 904 successfully completes. 905 """ 906 # If the result is a Failure, no exitCode exists 907 exitCode = getattr(datasource.result, 'exitCode', -1) 908 if exitCode is None or exitCode != 0: 909 return 910 911 clearEvents = [ev for ev in eventList if ev['severity'] == Clear] 912 if not clearEvents: 913 msg = 'Datasource %s command completed successfully' % ( 914 datasource.name) 915 ev = self._makeCmdEvent(datasource, msg, severity=Clear) 916 eventList.append(ev)
917
918 - def _makeCmdEvent(self, datasource, msg, severity=None):
919 """ 920 Create an event using the info in the Cmd object. 921 """ 922 severity = datasource.severity if severity is None else severity 923 ev = dict( 924 device=self._devId, 925 component=datasource.component, 926 eventClass=datasource.eventClass, 927 eventKey=datasource.eventKey, 928 severity=severity, 929 summary=msg 930 ) 931 return ev
932
933 - def _finished(self, result):
934 """ 935 Callback activated when the task is complete 936 937 @parameter result: results of the task 938 @type result: deferred object 939 """ 940 if not isinstance(result, Failure): 941 self._returnToNormalSchedule() 942 943 try: 944 self._close() 945 except Exception, ex: 946 log.warn("Failed to close device %s: error %s" % 947 (self._devId, str(ex))) 948 949 # Return the result so the framework can track success/failure 950 return result
951
952 - def displayStatistics(self):
953 """ 954 Called by the collector framework scheduler, and allows us to 955 see how each task is doing. 956 """ 957 display = "%s useSSH: %s\n" % ( 958 self.name, self._useSsh) 959 if self._lastErrorMsg: 960 display += "%s\n" % self._lastErrorMsg 961 return display
962 963 964 if __name__ == '__main__': 965 # Required for passing classes from zenhub to here 966 from Products.ZenRRD.zencommand import Cmd, DataPointConfig 967 968 myPreferences = SshPerformanceCollectionPreferences() 969 myTaskFactory = SimpleTaskFactory(SshPerformanceCollectionTask) 970 myTaskSplitter = SshPerCycletimeTaskSplitter(myTaskFactory) 971 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 972 daemon.run() 973