Package Products :: Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zenprocess

  1  #! /usr/bin/env python 
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, 2009 Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 or (at your 
  9  # option) any later version as published by the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14  __doc__= """zenprocess 
 15   
 16  Gets SNMP process data from a device's HOST-RESOURCES-MIB 
 17  and store process performance in RRD files. 
 18  """ 
 19   
 20  import logging 
 21  import sys 
 22  import re 
 23  from md5 import md5 
 24  from pprint import pformat 
 25   
 26  import Globals 
 27   
 28  import zope.component 
 29  import zope.interface 
 30   
 31  from Products.ZenCollector.daemon import CollectorDaemon 
 32  from Products.ZenCollector.interfaces import ICollectorPreferences,\ 
 33      IScheduledTask, IEventService, IDataService, IConfigurationListener 
 34  from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\ 
 35      TaskStates 
 36  from Products.ZenEvents import Event 
 37  from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess, \ 
 38          Status_Perf 
 39  from Products.ZenUtils.observable import ObservableMixin 
 40  from Products.ZenUtils.Chain import Chain 
 41   
 42  # We retrieve our configuration data remotely via a Twisted PerspectiveBroker 
 43  # connection. To do so, we need to import the class that will be used by the 
 44  # configuration service to send the data over, i.e. DeviceProxy. 
 45  from Products.ZenUtils.Utils import unused 
 46  from Products.ZenCollector.services.config import DeviceProxy 
 47  unused(DeviceProxy) 
 48  from Products.ZenHub.services.ProcessConfig import ProcessProxy 
 49  unused(ProcessProxy) 
 50  from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo 
 51  unused(SnmpConnInfo) 
 52   
 53  from twisted.internet import defer, error 
 54  from twisted.python.failure import Failure 
 55  from pynetsnmp.twistedsnmp import Snmpv3Error 
 56   
 57  log = logging.getLogger("zen.zenprocess") 
 58   
 59  # HOST-RESOURCES-MIB OIDs used 
 60  HOSTROOT  ='.1.3.6.1.2.1.25' 
 61  RUNROOT   = HOSTROOT + '.4' 
 62  NAMETABLE = RUNROOT + '.2.1.2' 
 63  PATHTABLE = RUNROOT + '.2.1.4' 
 64  ARGSTABLE = RUNROOT + '.2.1.5' 
 65  PERFROOT  = HOSTROOT + '.5' 
 66  CPU       = PERFROOT + '.1.1.1.'        # note trailing dot 
 67  MEM       = PERFROOT + '.1.1.2.'        # note trailing dot 
 68   
 69  # Max size for CPU numbers 
 70  WRAP = 0xffffffffL 
71 72 # Create an implementation of the ICollectorPreferences interface so that the 73 # ZenCollector framework can configure itself from our preferences. 74 -class ZenProcessPreferences(object):
75 zope.interface.implements(ICollectorPreferences) 76
77 - def __init__(self):
78 """ 79 Constructs a new ZenProcessPreferences instance and provide default 80 values for needed attributes. 81 """ 82 self.collectorName = "zenprocess" 83 self.defaultRRDCreateCommand = None 84 self.configCycleInterval = 20 # minutes 85 86 #will be updated based on Performance Config property of same name 87 self.processCycleInterval = 3 * 60 88 89 #will be filled in based on buildOptions 90 self.options = None 91 92 # the configurationService attribute is the fully qualified class-name 93 # of our configuration service that runs within ZenHub 94 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
95 96 @property
97 - def cycleInterval(self):
98 """ 99 defined as a property since it is needed by the interface 100 """ 101 return self.processCycleInterval
102
103 - def buildOptions(self, parser):
104 """ 105 Build a list of command-line options 106 """ 107 parser.add_option('--showprocs', 108 dest='showprocs', 109 action="store_true", 110 default=False, 111 help="Show the list of processes found." \ 112 "For debugging purposes only.") 113 parser.add_option('--showrawtables', 114 dest='showrawtables', 115 action="store_true", 116 default=False, 117 help="Show the raw SNMP processes data returned " \ 118 "from the device. For debugging purposes only.") 119 parser.add_option('--captureFilePrefix', dest='captureFilePrefix', 120 default='', 121 help="Directory and filename to use as a template" 122 " to store SNMP results from device.")
123
124 - def postStartup(self):
125 pass
126
127 -class DeviceStats:
128 - def __init__(self, deviceProxy):
129 self.config = deviceProxy 130 # map pid number to ProcessStats object 131 self._pidToProcess = {} 132 # map ProcessProxy id to ProcessStats object 133 self._processes = {} 134 for id, process in deviceProxy.processes.items(): 135 self._processes[id] = ProcessStats(process)
136
137 - def update(self, deviceProxy):
138 unused = set(self._processes.keys()) 139 for id, process in deviceProxy.processes.items(): 140 unused.discard(id) 141 if self._processes.get(id): 142 self._processes[id].update(process) 143 else: 144 self._processes[id] = ProcessStats(process) 145 146 #delete the left overs 147 for id in unused: 148 del self._processes[id]
149 150 @property
151 - def processStats(self):
152 """ 153 The ProcessStats: processes configured to be monitored 154 """ 155 return self._processes.values()
156 157 @property
158 - def pids(self):
159 """ 160 returns the pids from being tracked 161 """ 162 return self._pidToProcess.keys()
163 164 @property
165 - def monitoredProcs(self):
166 """ 167 returns ProcessStats for which we have a pid 168 """ 169 return self._pidToProcess.values()
170
171 -class ProcessStats:
172 - def __init__(self, processProxy):
173 self._pids={} 174 self._config = processProxy 175 self.cpu = 0 176 self.digest = '' 177 if not self._config.ignoreParameters: 178 # The modeler plugin computes the MD5 hash of the args, 179 # and then tosses that into the name of the process 180 result = self._config.name.rsplit(' ', -1) 181 if len(result) == 2 and result[1] != '': 182 self.digest = result[1]
183
184 - def update(self, processProxy):
185 self._config = processProxy
186
187 - def __str__(self):
188 """ 189 Override the Python default to represent ourselves as a string 190 """ 191 return str(self._config.name)
192 __repr__ = __str__ 193
194 - def match(self, name, args, useMd5Digest=True):
195 """ 196 Perform exact comparisons on the process names. 197 198 @parameter name: name of a process to compare 199 @type name: string 200 @parameter args: argument list of the process 201 @type args: string 202 @parameter useMd5Digest: ignore true result if MD5 doesn't match the process name? 203 @type useMd5Digest: boolean 204 @return: does the name match this process's info? 205 @rtype: Boolean 206 """ 207 if self._config.name is None: 208 return False 209 210 # SNMP agents return a 'flexible' number of characters, 211 # so exact matching isn't always reliable. 212 if self._config.ignoreParameters or not args: 213 processName = name 214 else: 215 processName = '%s %s' % (name, args) 216 217 # Make the comparison 218 result = re.search(self._config.regex, processName) is not None 219 220 # We can a match, but it might not be for us 221 if result and useMd5Digest: 222 # Compare this arg list against the digest of this proc 223 digest = md5(args).hexdigest() 224 if self.digest and digest != self.digest: 225 result = False 226 227 return result
228
229 - def updateCpu(self, pid, value):
230 """ 231 """ 232 pid = self._pids.setdefault(pid, Pid()) 233 cpu = pid.updateCpu(value) 234 if cpu is not None: 235 self.cpu += cpu 236 self.cpu %= WRAP
237
238 - def getCpu(self):
239 """ 240 """ 241 return self.cpu
242
243 - def updateMemory(self, pid, value):
244 """ 245 """ 246 self._pids.setdefault(pid, Pid()).memory = value
247
248 - def getMemory(self):
249 """ 250 """ 251 return sum(x.memory for x in self._pids.values() if x.memory is not None)
252
253 - def discardPid(self, pid):
254 """ 255 """ 256 if pid in self._pids: 257 del self._pids[pid]
258
259 -class Pid:
260 """ 261 Helper class to track process id information 262 """ 263 cpu = None 264 memory = None 265
266 - def updateCpu(self, n):
267 """ 268 """ 269 if n is not None: 270 try: 271 n = int(n) 272 except ValueError: 273 log.warning("Bad value for CPU: '%s'", n) 274 275 if self.cpu is None or n is None: 276 self.cpu = n 277 return None 278 diff = n - self.cpu 279 if diff < 0: 280 # don't provide a value when the counter falls backwards 281 n = None 282 diff = None 283 self.cpu = n 284 return diff
285
286 - def updateMemory(self, n):
287 """ 288 """ 289 self.memory = n
290
291 - def __str__(self):
292 """ 293 Override the Python default to represent ourselves as a string 294 """ 295 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
296 __repr__ = __str__
297
298 -class ConfigListener(object):
299 zope.interface.implements(IConfigurationListener) 300
301 - def deleted(self, configurationId):
302 """ 303 Called when a configuration is deleted from the collector 304 """ 305 log.debug('ConfigListener: configuration %s deleted' % configurationId) 306 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
307
308 - def added(self, configuration):
309 """ 310 Called when a configuration is added to the collector 311 """ 312 log.debug('ConfigListener: configuration %s added' % configuration)
313 314
315 - def updated(self, newConfiguration):
316 """ 317 Called when a configuration is updated in collector 318 """ 319 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
320
321 # Create an implementation of the IScheduledTask interface that will perform 322 # the actual collection work needed by this collector. 323 -class ZenProcessTask(ObservableMixin):
324 """ 325 A scheduled task that finds instances of configure processes and collects 326 metrics on the processes 327 """ 328 zope.interface.implements(IScheduledTask) 329 330 #Keep state about process stats across task updates 331 DEVICE_STATS = {} 332 333 #counter to keep track of total restarted and missing processes 334 RESTARTED = 0 335 MISSING = 0 336 337 STATE_CONNECTING = 'CONNECTING' 338 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES' 339 STATE_FETCH_PERF = 'FETCH_PERF_DATA' 340 STATE_STORE_PERF = 'STORE_PERF_DATA' 341 STATE_PARSING_TABLE_DATA = 'PARSING_TABLE_DATA' 342 343 statusEvent = { 'eventClass' : Status_OSProcess, 344 'eventGroup' : 'Process' } 345
346 - def __init__(self, 347 deviceId, 348 taskName, 349 scheduleIntervalSeconds, 350 taskConfig):
351 super(ZenProcessTask, self).__init__() 352 353 #needed for interface 354 self.name = taskName 355 self.configId = deviceId 356 self.interval = scheduleIntervalSeconds 357 self.state = TaskStates.STATE_IDLE 358 359 #the task config corresponds to a DeviceProxy 360 self._device = taskConfig 361 self._devId = self._device.name 362 self._manageIp = self._device.manageIp 363 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest 364 365 self._dataService = zope.component.queryUtility(IDataService) 366 self._eventService = zope.component.queryUtility(IEventService) 367 self._preferences = zope.component.queryUtility(ICollectorPreferences, 368 "zenprocess") 369 self.snmpProxy = None 370 self.snmpConnInfo = self._device.snmpConnInfo 371 372 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId) 373 if self._deviceStats: 374 self._deviceStats.update(self._device) 375 else: 376 self._deviceStats = DeviceStats(self._device) 377 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
378
379 - def doTask(self):
380 """ 381 Contact to one device and return a deferred which gathers data from 382 the device. 383 384 @return: job to scan a device 385 @rtype: Twisted deferred object 386 """ 387 # see if we need to connect first before doing any collection 388 d = defer.maybeDeferred(self._connect) 389 d.addCallbacks(self._connectCallback, self._failure) 390 d.addCallback(self._collectCallback) 391 392 # Add the _finished callback to be called in both success and error 393 # scenarios. 394 d.addBoth(self._finished) 395 396 # returning a Deferred will keep the framework from assuming the task 397 # is done until the Deferred actually completes 398 return d
399
400 - def _failure(self, reason):
401 """ 402 Twisted errBack to log the exception for a single device. 403 404 @parameter reason: explanation of the failure 405 @type reason: Twisted error instance 406 """ 407 msg = 'Unable to read processes on device %s' % self._devId 408 if isinstance(reason.value, error.TimeoutError): 409 log.debug('Timeout on device %s' % self._devId) 410 msg = '%s; Timeout on device' % msg 411 elif isinstance(reason.value, Snmpv3Error): 412 msg = ("Cannot connect to SNMP agent on {0._devId}: {1.value}").format(self, reason) 413 log.debug(msg) 414 else: 415 msg = '%s; error: %s' % (msg, reason.getErrorMessage()) 416 log.error('Error on device %s; %s' % (self._devId, 417 reason.getErrorMessage())) 418 419 self._eventService.sendEvent(self.statusEvent, 420 eventClass=Status_Snmp, 421 component="process", 422 device=self._devId, 423 summary=msg, 424 severity=Event.Error) 425 return reason
426
427 - def _connectCallback(self, result):
428 """ 429 Callback called after a successful connect to the remote device. 430 """ 431 log.debug("Connected to %s [%s]", self._devId, self._manageIp) 432 return result
433
434 - def _collectCallback(self, result):
435 """ 436 Callback called after a connect or previous collection so that another 437 collection can take place. 438 """ 439 log.debug("Scanning for processes from %s [%s]", 440 self._devId, self._manageIp) 441 442 self.state = ZenProcessTask.STATE_SCANNING_PROCS 443 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 444 d = self._getTables(tables) 445 d.addCallback(self._parseProcessNames) 446 d.addCallback(self._determineProcessStatus) 447 d.addCallback(self._sendProcessEvents) 448 d.addErrback(self._failure) 449 d.addCallback(self._fetchPerf) 450 return d
451
452 - def _finished(self, result):
453 """ 454 Callback activated when the task is complete 455 """ 456 if not isinstance(result, Failure): 457 log.debug("Device %s [%s] scanned successfully", 458 self._devId, self._manageIp) 459 else: 460 log.debug("Device %s [%s] scanned failed, %s", 461 self._devId, self._manageIp, result.getErrorMessage()) 462 463 try: 464 self._close() 465 except Exception, e: 466 log.warn("Failed to close device %s: error %s" % 467 (self._devId, str(e))) 468 # give the result to the rest of the callback/errchain so that the 469 # ZenCollector framework can keep track of the success/failure rate 470 return result
471
472 - def cleanup(self):
473 return self._close()
474
475 - def capturePacket(self, hostname, data):
476 """ 477 Store SNMP results into files for unit-testing. 478 """ 479 # Prep for using capture replay module later 480 if not hasattr(self, 'captureSerialNum'): 481 self.captureSerialNum = 0 482 483 log.debug("Capturing packet from %s", hostname) 484 name = "%s-%s-%d" % (self._preferences.options.captureFilePrefix, 485 hostname, self.captureSerialNum) 486 try: 487 capFile = open(name, "w") 488 capFile.write(pformat(data)) 489 capFile.close() 490 self.captureSerialNum += 1 491 except Exception, ex: 492 log.warn("Couldn't write capture data to '%s' because %s", 493 name, str(ex) )
494
495 - def sendRestartEvents(self, afterByConfig, beforeByConfig, restarted):
496 for procStats, pConfig in restarted.iteritems(): 497 droppedPids = [] 498 for pid in beforeByConfig[procStats]: 499 if pid not in afterByConfig[procStats]: 500 droppedPids.append(pid) 501 summary = 'Process restarted: %s' % pConfig.originalName 502 message = '%s\n Using regex \'%s\' Discarded dead pid(s) %s Using new pid(s) %s'\ 503 % (summary, pConfig.regex, droppedPids, afterByConfig[procStats]) 504 self._eventService.sendEvent(self.statusEvent, 505 device=self._devId, 506 summary=summary, 507 message=message, 508 component=pConfig.originalName, 509 eventKey=pConfig.processClass, 510 severity=pConfig.severity) 511 log.info("(%s) %s" % (self._devId, message))
512
513 - def sendFoundProcsEvents(self, afterByConfig, restarted):
514 # report alive processes 515 for processStat in afterByConfig.keys(): 516 if processStat in restarted: continue 517 summary = "Process up: %s" % processStat._config.originalName 518 message = '%s\n Using regex \'%s\' with pid\'s %s '\ 519 % (summary, processStat._config.regex, afterByConfig[processStat]) 520 self._eventService.sendEvent(self.statusEvent, 521 device=self._devId, 522 summary=summary, 523 message=message, 524 component=processStat._config.originalName, 525 eventKey=processStat._config.processClass, 526 severity=Event.Clear) 527 log.debug("(%s) %s" % (self._devId, message))
528
529 - def _parseProcessNames(self, results):
530 """ 531 Parse the process tables and reconstruct the list of processes 532 that are on the device. 533 534 @parameter results: results of SNMP table gets 535 @type results: dictionary of dictionaries 536 """ 537 self.state = ZenProcessTask.STATE_PARSING_TABLE_DATA 538 if not results or not results[NAMETABLE]: 539 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % \ 540 self._devId 541 resolution="Verify with snmpwalk %s %s" % \ 542 (self._devId, NAMETABLE ) 543 544 self._eventService.sendEvent(self.statusEvent, 545 device=self._devId, 546 summary=summary, 547 resolution=resolution, 548 severity=Event.Error) 549 log.info(summary) 550 return defer.fail(summary) 551 552 if self._preferences.options.captureFilePrefix: 553 self.capturePacket(self._devId, results) 554 555 summary = 'Process table up for device %s' % self._devId 556 self._clearSnmpError(summary) 557 showrawtables = self._preferences.options.showrawtables 558 args, procs = mapResultsToDicts(showrawtables, results) 559 if self._preferences.options.showprocs: 560 self._showProcessList( procs ) 561 return procs
562
563 - def sendMissingProcsEvents(self, afterByConfig):
564 # Look for missing processes 565 for procStat in self._deviceStats.processStats: 566 if procStat not in afterByConfig: 567 procConfig = procStat._config 568 ZenProcessTask.MISSING += 1 569 summary = 'Process not running: %s' % procConfig.originalName 570 message = "%s\n Using regex \'%s\' \nAll Processes have stopped since the last model occurred. Last Modification time (%s)" \ 571 % (summary,procConfig.regex,self._device.lastmodeltime) 572 self._eventService.sendEvent(self.statusEvent, 573 device=self._devId, 574 summary=summary, 575 message=message, 576 component=procConfig.originalName, 577 eventKey=procConfig.processClass, 578 severity=procConfig.severity) 579 log.warning("(%s) %s" % (self._devId,message))
580
581 - def _sendProcessEvents(self, results):
582 (afterByConfig, afterPidToProcessStats, 583 beforeByConfig, newPids, restarted, deadPids) = results 584 self.sendRestartEvents(afterByConfig, beforeByConfig, restarted) 585 self.sendFoundProcsEvents(afterByConfig, restarted) 586 for pid in newPids: 587 log.debug("Found new %s pid %d on %s" % ( 588 afterPidToProcessStats[pid]._config.originalName, pid, 589 self._devId)) 590 self._deviceStats._pidToProcess = afterPidToProcessStats 591 self.sendMissingProcsEvents(afterByConfig) 592 # Store the total number of each process into an RRD 593 pidCounts = dict((p, 0) for p in self._deviceStats.processStats) 594 for procStat in self._deviceStats.monitoredProcs: 595 pidCounts[procStat] += 1 596 for procName, count in pidCounts.items(): 597 self._save(procName, 'count_count', count, 'GAUGE') 598 return "Sent events"
599
600 - def _determineProcessStatus(self, procs):
601 """ 602 Determine the up/down/restarted status of processes. 603 604 @parameter procs: array of pid, (name, args) info 605 @type procs: list 606 """ 607 # look for changes in processes 608 beforePids = set(self._deviceStats.pids) 609 beforeByConfig = reverseDict(self._deviceStats._pidToProcess) 610 afterPidToProcessStats = {} 611 for pid, (name, args) in procs: 612 pStats = self._deviceStats._pidToProcess.get(pid) 613 if pStats: 614 if pStats.match(name, args): 615 afterPidToProcessStats[pid] = pStats 616 continue 617 elif pStats.match(name, args, useMd5Digest=False): 618 # In this case, our raw SNMP data from the 619 # remote agent got futzed 620 afterPidToProcessStats[pid] = pStats 621 continue 622 623 # Search for the first match in our list of regexes 624 for pStats in self._deviceStats.processStats: 625 if pStats.match(name, args): 626 log.debug("Found process %d on %s", 627 pid, pStats._config.name) 628 afterPidToProcessStats[pid] = pStats 629 break 630 631 # If the hashes get trashed by the SNMP agent, try to 632 # make sensible guesses. 633 missingModeledStats = set(self._deviceStats.processStats) - \ 634 set(self._deviceStats.monitoredProcs) 635 if missingModeledStats: 636 log.info("Searching for possible matches for %s", missingModeledStats) 637 for pStats in missingModeledStats: 638 for pid, (name, args) in procs: 639 if pid in afterPidToProcessStats: 640 continue 641 642 if pStats.match(name, args, useMd5Digest=False): 643 afterPidToProcessStats[pid] = pStats 644 break 645 646 afterPids = set(afterPidToProcessStats.keys()) 647 afterByConfig = reverseDict(afterPidToProcessStats) 648 newPids = afterPids - beforePids 649 deadPids = beforePids - afterPids 650 651 restarted = {} 652 for pid in deadPids: 653 procStats = self._deviceStats._pidToProcess[pid] 654 procStats.discardPid(pid) 655 if procStats in afterByConfig: 656 ZenProcessTask.RESTARTED += 1 657 pConfig = procStats._config 658 if pConfig.restart: 659 restarted[procStats] = pConfig 660 661 return (afterByConfig, afterPidToProcessStats, 662 beforeByConfig, newPids, restarted, deadPids)
663
664 - def _fetchPerf(self, results):
665 """ 666 Get performance data for all the monitored processes on a device 667 668 @parameter results: results of SNMP table gets 669 @type results: list of (success, result) tuples 670 """ 671 self.state = ZenProcessTask.STATE_FETCH_PERF 672 673 oids = [] 674 for pid in self._deviceStats.pids: 675 oids.extend([CPU + str(pid), MEM + str(pid)]) 676 if not oids: 677 return defer.succeed(([])) 678 d = Chain(self._get, iter(chunk(oids, self._maxOidsPerRequest))).run() 679 d.addCallback(self._storePerfStats) 680 d.addErrback(self._failure) 681 return d
682
683 - def _storePerfStats(self, results):
684 """ 685 Save the process performance data in RRD files 686 687 @parameter results: results of SNMP table gets 688 @type results: list of (success, result) tuples 689 @parameter device: proxy object to the remote computer 690 @type device: Device object 691 """ 692 self.state = ZenProcessTask.STATE_STORE_PERF 693 for success, result in results: 694 if not success: 695 #return the failure 696 return result 697 self._clearSnmpError('Process table up for device %s' % self._devId) 698 parts = {} 699 for success, values in results: 700 if success: 701 parts.update(values) 702 results = parts 703 byConf = reverseDict(self._deviceStats._pidToProcess) 704 for procStat, pids in byConf.items(): 705 if len(pids) != 1: 706 log.debug("There are %d pids by the name %s", 707 len(pids), procStat._config.name) 708 procName = procStat._config.name 709 for pid in pids: 710 cpu = results.get(CPU + str(pid), None) 711 mem = results.get(MEM + str(pid), None) 712 procStat.updateCpu(pid, cpu) 713 procStat.updateMemory(pid, mem) 714 self._save(procName, 'cpu_cpu', procStat.getCpu(), 715 'DERIVE', min=0) 716 self._save(procName, 'mem_mem', 717 procStat.getMemory() * 1024, 'GAUGE') 718 return results
719
720 - def _getTables(self, oids):
721 """ 722 Perform SNMP getTable for specified OIDs 723 724 @parameter oids: OIDs to gather 725 @type oids: list of strings 726 @return: Twisted deferred 727 @rtype: Twisted deferred 728 """ 729 repetitions = self._maxOidsPerRequest / len(oids) 730 t = self.snmpProxy.getTable(oids, 731 timeout=self.snmpConnInfo.zSnmpTimeout, 732 retryCount=self.snmpConnInfo.zSnmpTries, 733 maxRepetitions=repetitions, 734 limit=sys.maxint) 735 return t
736
737 - def _get(self, oids):
738 """ 739 Perform SNMP get for specified OIDs 740 741 @parameter oids: OIDs to gather 742 @type oids: list of strings 743 @return: Twisted deferred 744 @rtype: Twisted deferred 745 """ 746 return self.snmpProxy.get(oids, 747 self.snmpConnInfo.zSnmpTimeout, 748 self.snmpConnInfo.zSnmpTries)
749
750 - def _connect(self):
751 """ 752 Create a connection to the remote device 753 """ 754 self.state = ZenProcessTask.STATE_CONNECTING 755 if (self.snmpProxy is None or 756 self.snmpProxy.snmpConnInfo != self.snmpConnInfo): 757 self.snmpProxy = self.snmpConnInfo.createSession() 758 self.snmpProxy.open()
759
760 - def _close(self):
761 """ 762 Close down the connection to the remote device 763 """ 764 if self.snmpProxy: 765 self.snmpProxy.close() 766 self.snmpProxy = None
767 768
769 - def _showProcessList(self, procs):
770 """ 771 Display the processes in a sane manner. 772 773 @parameter procs: list of (pid, (name, args)) 774 @type procs: list of tuples 775 """ 776 device_name = self._devId 777 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \ 778 in sorted(procs)] 779 proc_list.append('') 780 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
781
782 - def _clearSnmpError(self, message):
783 """ 784 Send an event to clear other events. 785 786 @parameter message: clear text 787 @type message: string 788 """ 789 self._eventService.sendEvent(self.statusEvent, 790 eventClass=Status_Snmp, 791 component="process", 792 device=self._devId, 793 summary=message, 794 agent='zenprocess', 795 severity=Event.Clear)
796
797 - def _save(self, pidName, statName, value, rrdType, min='U'):
798 """ 799 Save a value into an RRD file 800 801 @param pidName: process id of the monitored process 802 @type pidName: string 803 @param statName: metric name 804 @type statName: string 805 @param value: data to be stored 806 @type value: number 807 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER) 808 @type rrdType: string 809 """ 810 deviceName = self._devId 811 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 812 try: 813 self._dataService.writeRRD(path, value, rrdType, min=min) 814 except Exception, ex: 815 summary= "Unable to save data for process-monitor RRD %s" % \ 816 path 817 log.critical( summary ) 818 819 message= "Data was value= %s, type=%s" % \ 820 ( value, rrdType ) 821 log.critical( message ) 822 log.exception( ex ) 823 824 import traceback 825 trace_info= traceback.format_exc() 826 827 self._eventService.sendEvent(dict( 828 dedupid="%s|%s" % (self._preferences.options.monitor, 829 'RRD write failure'), 830 severity=Event.Critical, 831 device=self._preferences.options.monitor, 832 eventClass=Status_Perf, 833 component="RRD", 834 pidName=pidName, 835 statName=statName, 836 path=path, 837 message=message, 838 traceback=trace_info, 839 summary=summary))
840
841 842 -def mapResultsToDicts(showrawtables, results):
843 """ 844 Parse the process tables and reconstruct the list of processes 845 that are on the device. 846 847 @parameter showrawtables: log the raw table info? 848 @type showrawtables: boolean 849 @parameter results: results of SNMP table gets ie (OID + pid, value) 850 @type results: dictionary of dictionaries 851 @return: maps relating names and pids to each other 852 @rtype: dictionary, list of tuples 853 """ 854 def extract(dictionary, oid, value): 855 """ 856 Helper function to extract SNMP table data. 857 """ 858 pid = int(oid.split('.')[-1]) 859 dictionary[pid] = value
860 861 names, paths, args = {}, {}, {} 862 if showrawtables: 863 log.info("NAMETABLE = %r", results[NAMETABLE]) 864 for row in results[NAMETABLE].items(): 865 extract(names, *row) 866 867 if showrawtables: 868 log.info("PATHTABLE = %r", results[PATHTABLE]) 869 for row in results[PATHTABLE].items(): 870 extract(paths, *row) 871 872 if showrawtables: 873 log.info("ARGSTABLE = %r", results[ARGSTABLE]) 874 for row in results[ARGSTABLE].items(): 875 extract(args, *row) 876 877 procs = [] 878 for pid, name in names.items(): 879 path = paths.get(pid, '') 880 if path and path.find('\\') == -1: 881 name = path 882 arg = args.get(pid, '') 883 procs.append( (pid, (name, arg) ) ) 884 885 return args, procs 886
887 -def reverseDict(d):
888 """ 889 Return a dictionary with keys and values swapped: 890 all values are lists to handle the different keys mapping to the same value 891 """ 892 result = {} 893 for a, v in d.items(): 894 result.setdefault(v, []).append(a) 895 return result
896
897 -def chunk(lst, n):
898 """ 899 Break lst into n-sized chunks 900 """ 901 return [lst[i:i+n] for i in range(0, len(lst), n)]
902 903 # Collector Daemon Main entry point 904 # 905 if __name__ == '__main__': 906 myPreferences = ZenProcessPreferences() 907 908 myTaskFactory = SimpleTaskFactory(ZenProcessTask) 909 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 910 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener()) 911 daemon.run() 912