1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__= """zenprocess
15
16 Gets SNMP process data from a device's HOST-RESOURCES-MIB
17 and store process performance in RRD files.
18 """
19
20 import logging
21 import sys
22 import re
23 from md5 import md5
24 from pprint import pformat
25
26 import Globals
27
28 import zope.component
29 import zope.interface
30
31 from Products.ZenCollector.daemon import CollectorDaemon
32 from Products.ZenCollector.interfaces import ICollectorPreferences,\
33 IScheduledTask, IEventService, IDataService, IConfigurationListener
34 from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\
35 TaskStates
36 from Products.ZenEvents import Event
37 from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess, \
38 Status_Perf
39 from Products.ZenUtils.observable import ObservableMixin
40 from Products.ZenUtils.Chain import Chain
41
42
43
44
45 from Products.ZenUtils.Utils import unused
46 from Products.ZenCollector.services.config import DeviceProxy
47 unused(DeviceProxy)
48 from Products.ZenHub.services.ProcessConfig import ProcessProxy
49 unused(ProcessProxy)
50 from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo
51 unused(SnmpConnInfo)
52
53 from twisted.internet import defer, error
54 from twisted.python.failure import Failure
55 from pynetsnmp.twistedsnmp import Snmpv3Error
56
57 log = logging.getLogger("zen.zenprocess")
58
59
60 HOSTROOT ='.1.3.6.1.2.1.25'
61 RUNROOT = HOSTROOT + '.4'
62 NAMETABLE = RUNROOT + '.2.1.2'
63 PATHTABLE = RUNROOT + '.2.1.4'
64 ARGSTABLE = RUNROOT + '.2.1.5'
65 PERFROOT = HOSTROOT + '.5'
66 CPU = PERFROOT + '.1.1.1.'
67 MEM = PERFROOT + '.1.1.2.'
68
69
70 WRAP = 0xffffffffL
75 zope.interface.implements(ICollectorPreferences)
76
78 """
79 Constructs a new ZenProcessPreferences instance and provide default
80 values for needed attributes.
81 """
82 self.collectorName = "zenprocess"
83 self.defaultRRDCreateCommand = None
84 self.configCycleInterval = 20
85
86
87 self.processCycleInterval = 3 * 60
88
89
90 self.options = None
91
92
93
94 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
95
96 @property
98 """
99 defined as a property since it is needed by the interface
100 """
101 return self.processCycleInterval
102
104 """
105 Build a list of command-line options
106 """
107 parser.add_option('--showprocs',
108 dest='showprocs',
109 action="store_true",
110 default=False,
111 help="Show the list of processes found." \
112 "For debugging purposes only.")
113 parser.add_option('--showrawtables',
114 dest='showrawtables',
115 action="store_true",
116 default=False,
117 help="Show the raw SNMP processes data returned " \
118 "from the device. For debugging purposes only.")
119 parser.add_option('--captureFilePrefix', dest='captureFilePrefix',
120 default='',
121 help="Directory and filename to use as a template"
122 " to store SNMP results from device.")
123
124 - def postStartup(self):
126
136
137 - def update(self, deviceProxy):
149
150 @property
152 """
153 The ProcessStats: processes configured to be monitored
154 """
155 return self._processes.values()
156
157 @property
159 """
160 returns the pids from being tracked
161 """
162 return self._pidToProcess.keys()
163
164 @property
166 """
167 returns ProcessStats for which we have a pid
168 """
169 return self._pidToProcess.values()
170
183
184 - def update(self, processProxy):
186
188 """
189 Override the Python default to represent ourselves as a string
190 """
191 return str(self._config.name)
192 __repr__ = __str__
193
194 - def match(self, name, args, useMd5Digest=True):
195 """
196 Perform exact comparisons on the process names.
197
198 @parameter name: name of a process to compare
199 @type name: string
200 @parameter args: argument list of the process
201 @type args: string
202 @parameter useMd5Digest: ignore true result if MD5 doesn't match the process name?
203 @type useMd5Digest: boolean
204 @return: does the name match this process's info?
205 @rtype: Boolean
206 """
207 if self._config.name is None:
208 return False
209
210
211
212 if self._config.ignoreParameters or not args:
213 processName = name
214 else:
215 processName = '%s %s' % (name, args)
216
217
218 result = re.search(self._config.regex, processName) is not None
219
220
221 if result and useMd5Digest:
222
223 digest = md5(args).hexdigest()
224 if self.digest and digest != self.digest:
225 result = False
226
227 return result
228
237
239 """
240 """
241 return self.cpu
242
247
249 """
250 """
251 return sum(x.memory for x in self._pids.values() if x.memory is not None)
252
254 """
255 """
256 if pid in self._pids:
257 del self._pids[pid]
258
260 """
261 Helper class to track process id information
262 """
263 cpu = None
264 memory = None
265
267 """
268 """
269 if n is not None:
270 try:
271 n = int(n)
272 except ValueError:
273 log.warning("Bad value for CPU: '%s'", n)
274
275 if self.cpu is None or n is None:
276 self.cpu = n
277 return None
278 diff = n - self.cpu
279 if diff < 0:
280
281 n = None
282 diff = None
283 self.cpu = n
284 return diff
285
287 """
288 """
289 self.memory = n
290
292 """
293 Override the Python default to represent ourselves as a string
294 """
295 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
296 __repr__ = __str__
297
299 zope.interface.implements(IConfigurationListener)
300
301 - def deleted(self, configurationId):
302 """
303 Called when a configuration is deleted from the collector
304 """
305 log.debug('ConfigListener: configuration %s deleted' % configurationId)
306 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
307
308 - def added(self, configuration):
309 """
310 Called when a configuration is added to the collector
311 """
312 log.debug('ConfigListener: configuration %s added' % configuration)
313
314
315 - def updated(self, newConfiguration):
316 """
317 Called when a configuration is updated in collector
318 """
319 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
320
324 """
325 A scheduled task that finds instances of configure processes and collects
326 metrics on the processes
327 """
328 zope.interface.implements(IScheduledTask)
329
330
331 DEVICE_STATS = {}
332
333
334 RESTARTED = 0
335 MISSING = 0
336
337 STATE_CONNECTING = 'CONNECTING'
338 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES'
339 STATE_FETCH_PERF = 'FETCH_PERF_DATA'
340 STATE_STORE_PERF = 'STORE_PERF_DATA'
341 STATE_PARSING_TABLE_DATA = 'PARSING_TABLE_DATA'
342
343 statusEvent = { 'eventClass' : Status_OSProcess,
344 'eventGroup' : 'Process' }
345
346 - def __init__(self,
347 deviceId,
348 taskName,
349 scheduleIntervalSeconds,
350 taskConfig):
351 super(ZenProcessTask, self).__init__()
352
353
354 self.name = taskName
355 self.configId = deviceId
356 self.interval = scheduleIntervalSeconds
357 self.state = TaskStates.STATE_IDLE
358
359
360 self._device = taskConfig
361 self._devId = self._device.name
362 self._manageIp = self._device.manageIp
363 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest
364
365 self._dataService = zope.component.queryUtility(IDataService)
366 self._eventService = zope.component.queryUtility(IEventService)
367 self._preferences = zope.component.queryUtility(ICollectorPreferences,
368 "zenprocess")
369 self.snmpProxy = None
370 self.snmpConnInfo = self._device.snmpConnInfo
371
372 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId)
373 if self._deviceStats:
374 self._deviceStats.update(self._device)
375 else:
376 self._deviceStats = DeviceStats(self._device)
377 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
378
380 """
381 Contact to one device and return a deferred which gathers data from
382 the device.
383
384 @return: job to scan a device
385 @rtype: Twisted deferred object
386 """
387
388 d = defer.maybeDeferred(self._connect)
389 d.addCallbacks(self._connectCallback, self._failure)
390 d.addCallback(self._collectCallback)
391
392
393
394 d.addBoth(self._finished)
395
396
397
398 return d
399
401 """
402 Twisted errBack to log the exception for a single device.
403
404 @parameter reason: explanation of the failure
405 @type reason: Twisted error instance
406 """
407 msg = 'Unable to read processes on device %s' % self._devId
408 if isinstance(reason.value, error.TimeoutError):
409 log.debug('Timeout on device %s' % self._devId)
410 msg = '%s; Timeout on device' % msg
411 elif isinstance(reason.value, Snmpv3Error):
412 msg = ("Cannot connect to SNMP agent on {0._devId}: {1.value}").format(self, reason)
413 log.debug(msg)
414 else:
415 msg = '%s; error: %s' % (msg, reason.getErrorMessage())
416 log.error('Error on device %s; %s' % (self._devId,
417 reason.getErrorMessage()))
418
419 self._eventService.sendEvent(self.statusEvent,
420 eventClass=Status_Snmp,
421 component="process",
422 device=self._devId,
423 summary=msg,
424 severity=Event.Error)
425 return reason
426
428 """
429 Callback called after a successful connect to the remote device.
430 """
431 log.debug("Connected to %s [%s]", self._devId, self._manageIp)
432 return result
433
451
453 """
454 Callback activated when the task is complete
455 """
456 if not isinstance(result, Failure):
457 log.debug("Device %s [%s] scanned successfully",
458 self._devId, self._manageIp)
459 else:
460 log.debug("Device %s [%s] scanned failed, %s",
461 self._devId, self._manageIp, result.getErrorMessage())
462
463 try:
464 self._close()
465 except Exception, e:
466 log.warn("Failed to close device %s: error %s" %
467 (self._devId, str(e)))
468
469
470 return result
471
474
476 """
477 Store SNMP results into files for unit-testing.
478 """
479
480 if not hasattr(self, 'captureSerialNum'):
481 self.captureSerialNum = 0
482
483 log.debug("Capturing packet from %s", hostname)
484 name = "%s-%s-%d" % (self._preferences.options.captureFilePrefix,
485 hostname, self.captureSerialNum)
486 try:
487 capFile = open(name, "w")
488 capFile.write(pformat(data))
489 capFile.close()
490 self.captureSerialNum += 1
491 except Exception, ex:
492 log.warn("Couldn't write capture data to '%s' because %s",
493 name, str(ex) )
494
496 for procStats, pConfig in restarted.iteritems():
497 droppedPids = []
498 for pid in beforeByConfig[procStats]:
499 if pid not in afterByConfig[procStats]:
500 droppedPids.append(pid)
501 summary = 'Process restarted: %s' % pConfig.originalName
502 message = '%s\n Using regex \'%s\' Discarded dead pid(s) %s Using new pid(s) %s'\
503 % (summary, pConfig.regex, droppedPids, afterByConfig[procStats])
504 self._eventService.sendEvent(self.statusEvent,
505 device=self._devId,
506 summary=summary,
507 message=message,
508 component=pConfig.originalName,
509 eventKey=pConfig.processClass,
510 severity=pConfig.severity)
511 log.info("(%s) %s" % (self._devId, message))
512
528
562
580
582 (afterByConfig, afterPidToProcessStats,
583 beforeByConfig, newPids, restarted, deadPids) = results
584 self.sendRestartEvents(afterByConfig, beforeByConfig, restarted)
585 self.sendFoundProcsEvents(afterByConfig, restarted)
586 for pid in newPids:
587 log.debug("Found new %s pid %d on %s" % (
588 afterPidToProcessStats[pid]._config.originalName, pid,
589 self._devId))
590 self._deviceStats._pidToProcess = afterPidToProcessStats
591 self.sendMissingProcsEvents(afterByConfig)
592
593 pidCounts = dict((p, 0) for p in self._deviceStats.processStats)
594 for procStat in self._deviceStats.monitoredProcs:
595 pidCounts[procStat] += 1
596 for procName, count in pidCounts.items():
597 self._save(procName, 'count_count', count, 'GAUGE')
598 return "Sent events"
599
601 """
602 Determine the up/down/restarted status of processes.
603
604 @parameter procs: array of pid, (name, args) info
605 @type procs: list
606 """
607
608 beforePids = set(self._deviceStats.pids)
609 beforeByConfig = reverseDict(self._deviceStats._pidToProcess)
610 afterPidToProcessStats = {}
611 for pid, (name, args) in procs:
612 pStats = self._deviceStats._pidToProcess.get(pid)
613 if pStats:
614 if pStats.match(name, args):
615 afterPidToProcessStats[pid] = pStats
616 continue
617 elif pStats.match(name, args, useMd5Digest=False):
618
619
620 afterPidToProcessStats[pid] = pStats
621 continue
622
623
624 for pStats in self._deviceStats.processStats:
625 if pStats.match(name, args):
626 log.debug("Found process %d on %s",
627 pid, pStats._config.name)
628 afterPidToProcessStats[pid] = pStats
629 break
630
631
632
633 missingModeledStats = set(self._deviceStats.processStats) - \
634 set(self._deviceStats.monitoredProcs)
635 if missingModeledStats:
636 log.info("Searching for possible matches for %s", missingModeledStats)
637 for pStats in missingModeledStats:
638 for pid, (name, args) in procs:
639 if pid in afterPidToProcessStats:
640 continue
641
642 if pStats.match(name, args, useMd5Digest=False):
643 afterPidToProcessStats[pid] = pStats
644 break
645
646 afterPids = set(afterPidToProcessStats.keys())
647 afterByConfig = reverseDict(afterPidToProcessStats)
648 newPids = afterPids - beforePids
649 deadPids = beforePids - afterPids
650
651 restarted = {}
652 for pid in deadPids:
653 procStats = self._deviceStats._pidToProcess[pid]
654 procStats.discardPid(pid)
655 if procStats in afterByConfig:
656 ZenProcessTask.RESTARTED += 1
657 pConfig = procStats._config
658 if pConfig.restart:
659 restarted[procStats] = pConfig
660
661 return (afterByConfig, afterPidToProcessStats,
662 beforeByConfig, newPids, restarted, deadPids)
663
665 """
666 Get performance data for all the monitored processes on a device
667
668 @parameter results: results of SNMP table gets
669 @type results: list of (success, result) tuples
670 """
671 self.state = ZenProcessTask.STATE_FETCH_PERF
672
673 oids = []
674 for pid in self._deviceStats.pids:
675 oids.extend([CPU + str(pid), MEM + str(pid)])
676 if not oids:
677 return defer.succeed(([]))
678 d = Chain(self._get, iter(chunk(oids, self._maxOidsPerRequest))).run()
679 d.addCallback(self._storePerfStats)
680 d.addErrback(self._failure)
681 return d
682
684 """
685 Save the process performance data in RRD files
686
687 @parameter results: results of SNMP table gets
688 @type results: list of (success, result) tuples
689 @parameter device: proxy object to the remote computer
690 @type device: Device object
691 """
692 self.state = ZenProcessTask.STATE_STORE_PERF
693 for success, result in results:
694 if not success:
695
696 return result
697 self._clearSnmpError('Process table up for device %s' % self._devId)
698 parts = {}
699 for success, values in results:
700 if success:
701 parts.update(values)
702 results = parts
703 byConf = reverseDict(self._deviceStats._pidToProcess)
704 for procStat, pids in byConf.items():
705 if len(pids) != 1:
706 log.debug("There are %d pids by the name %s",
707 len(pids), procStat._config.name)
708 procName = procStat._config.name
709 for pid in pids:
710 cpu = results.get(CPU + str(pid), None)
711 mem = results.get(MEM + str(pid), None)
712 procStat.updateCpu(pid, cpu)
713 procStat.updateMemory(pid, mem)
714 self._save(procName, 'cpu_cpu', procStat.getCpu(),
715 'DERIVE', min=0)
716 self._save(procName, 'mem_mem',
717 procStat.getMemory() * 1024, 'GAUGE')
718 return results
719
721 """
722 Perform SNMP getTable for specified OIDs
723
724 @parameter oids: OIDs to gather
725 @type oids: list of strings
726 @return: Twisted deferred
727 @rtype: Twisted deferred
728 """
729 repetitions = self._maxOidsPerRequest / len(oids)
730 t = self.snmpProxy.getTable(oids,
731 timeout=self.snmpConnInfo.zSnmpTimeout,
732 retryCount=self.snmpConnInfo.zSnmpTries,
733 maxRepetitions=repetitions,
734 limit=sys.maxint)
735 return t
736
737 - def _get(self, oids):
738 """
739 Perform SNMP get for specified OIDs
740
741 @parameter oids: OIDs to gather
742 @type oids: list of strings
743 @return: Twisted deferred
744 @rtype: Twisted deferred
745 """
746 return self.snmpProxy.get(oids,
747 self.snmpConnInfo.zSnmpTimeout,
748 self.snmpConnInfo.zSnmpTries)
749
751 """
752 Create a connection to the remote device
753 """
754 self.state = ZenProcessTask.STATE_CONNECTING
755 if (self.snmpProxy is None or
756 self.snmpProxy.snmpConnInfo != self.snmpConnInfo):
757 self.snmpProxy = self.snmpConnInfo.createSession()
758 self.snmpProxy.open()
759
761 """
762 Close down the connection to the remote device
763 """
764 if self.snmpProxy:
765 self.snmpProxy.close()
766 self.snmpProxy = None
767
768
770 """
771 Display the processes in a sane manner.
772
773 @parameter procs: list of (pid, (name, args))
774 @type procs: list of tuples
775 """
776 device_name = self._devId
777 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \
778 in sorted(procs)]
779 proc_list.append('')
780 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
781
796
797 - def _save(self, pidName, statName, value, rrdType, min='U'):
798 """
799 Save a value into an RRD file
800
801 @param pidName: process id of the monitored process
802 @type pidName: string
803 @param statName: metric name
804 @type statName: string
805 @param value: data to be stored
806 @type value: number
807 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER)
808 @type rrdType: string
809 """
810 deviceName = self._devId
811 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName)
812 try:
813 self._dataService.writeRRD(path, value, rrdType, min=min)
814 except Exception, ex:
815 summary= "Unable to save data for process-monitor RRD %s" % \
816 path
817 log.critical( summary )
818
819 message= "Data was value= %s, type=%s" % \
820 ( value, rrdType )
821 log.critical( message )
822 log.exception( ex )
823
824 import traceback
825 trace_info= traceback.format_exc()
826
827 self._eventService.sendEvent(dict(
828 dedupid="%s|%s" % (self._preferences.options.monitor,
829 'RRD write failure'),
830 severity=Event.Critical,
831 device=self._preferences.options.monitor,
832 eventClass=Status_Perf,
833 component="RRD",
834 pidName=pidName,
835 statName=statName,
836 path=path,
837 message=message,
838 traceback=trace_info,
839 summary=summary))
840
843 """
844 Parse the process tables and reconstruct the list of processes
845 that are on the device.
846
847 @parameter showrawtables: log the raw table info?
848 @type showrawtables: boolean
849 @parameter results: results of SNMP table gets ie (OID + pid, value)
850 @type results: dictionary of dictionaries
851 @return: maps relating names and pids to each other
852 @rtype: dictionary, list of tuples
853 """
854 def extract(dictionary, oid, value):
855 """
856 Helper function to extract SNMP table data.
857 """
858 pid = int(oid.split('.')[-1])
859 dictionary[pid] = value
860
861 names, paths, args = {}, {}, {}
862 if showrawtables:
863 log.info("NAMETABLE = %r", results[NAMETABLE])
864 for row in results[NAMETABLE].items():
865 extract(names, *row)
866
867 if showrawtables:
868 log.info("PATHTABLE = %r", results[PATHTABLE])
869 for row in results[PATHTABLE].items():
870 extract(paths, *row)
871
872 if showrawtables:
873 log.info("ARGSTABLE = %r", results[ARGSTABLE])
874 for row in results[ARGSTABLE].items():
875 extract(args, *row)
876
877 procs = []
878 for pid, name in names.items():
879 path = paths.get(pid, '')
880 if path and path.find('\\') == -1:
881 name = path
882 arg = args.get(pid, '')
883 procs.append( (pid, (name, arg) ) )
884
885 return args, procs
886
888 """
889 Return a dictionary with keys and values swapped:
890 all values are lists to handle the different keys mapping to the same value
891 """
892 result = {}
893 for a, v in d.items():
894 result.setdefault(v, []).append(a)
895 return result
896
898 """
899 Break lst into n-sized chunks
900 """
901 return [lst[i:i+n] for i in range(0, len(lst), n)]
902
903
904
905 if __name__ == '__main__':
906 myPreferences = ZenProcessPreferences()
907
908 myTaskFactory = SimpleTaskFactory(ZenProcessTask)
909 myTaskSplitter = SimpleTaskSplitter(myTaskFactory)
910 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener())
911 daemon.run()
912