Package Products :: Package ZenStatus :: Module CorrelatorTask
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.CorrelatorTask

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """TopologyCorrelatorTask 
 15   
 16  Examines the pending event queue and determines what events need to be sent. 
 17  The correlator can reschedule ping jobs to delay or obsess under stress 
 18  conditions. 
 19   
 20  The correlator is also the only job to send out events for the managedIps. 
 21   
 22  The correlator may not have all of the devices that are associated with an 
 23  IP realm or even a regular network. The implication is that two collectors 
 24  on the same server will split up the devices into two trees, neither of 
 25  which may have knowledge of the complete network topology. 
 26   
 27  This same issue occurs for layer 2 devices, which may dole out the work 
 28  to determine up/down status of connected devices over multiple collectors. 
 29   
 30  The simple case of just the Zenoss master server can be made to work, as 
 31  zenping will have full knowledge of the topology. 
 32  """ 
 33   
 34  import logging 
 35  log = logging.getLogger("zen.correlatorTask") 
 36   
 37  from networkx import dfs_tree 
 38   
 39  import Globals 
 40  import zope.interface 
 41  import zope.component 
 42   
 43  from twisted.internet import defer 
 44   
 45  from Products.ZenCollector.interfaces import IDataService,\ 
 46                                               ICollector,\ 
 47                                               IEventService,\ 
 48                                               IScheduledTask 
 49  from Products.ZenCollector.tasks import BaseTask,\ 
 50                                          TaskStates 
 51   
 52  from Products.ZenEvents.ZenEventClasses import Status_Ping 
 53  from Products.ZenCollector.tasks import SimpleTaskFactory 
 54  from interfaces import IPingCorrelatorTaskFactory 
 55   
 56  STATUS_EVENT = {  
 57                  'eventClass' : Status_Ping, 
 58                  'component' : 'zenping', 
 59                  'eventGroup' : 'Ping' } 
 60   
 61  # Event 'suppressed' state 
 62  SUPPRESSED = 2 
 63   
64 -class TopologyCorrelatorTask(BaseTask):
65 zope.interface.implements(IScheduledTask) 66
67 - def __init__(self, 68 taskName, configId, 69 scheduleIntervalSeconds=60, 70 taskConfig=None):
71 """ 72 @param deviceId: the Zenoss deviceId to watch 73 @type deviceId: string 74 @param taskName: the unique identifier for this task 75 @type taskName: string 76 @param scheduleIntervalSeconds: the interval at which this task will be 77 collected 78 @type scheduleIntervalSeconds: int 79 @param taskConfig: the configuration for this task 80 """ 81 super(TopologyCorrelatorTask, self).__init__( 82 taskName, configId, 83 scheduleIntervalSeconds, taskConfig=None 84 ) 85 86 # Needed for interface 87 self.name = taskName 88 self.configId = configId 89 self.state = TaskStates.STATE_IDLE 90 self.interval = scheduleIntervalSeconds 91 92 # This needs to run after other processes have stopped 93 self.stopPhase = 'before' 94 self.stopOrder = 10 95 96 if taskConfig is None: 97 raise TypeError("taskConfig cannot be None") 98 self._preferences = taskConfig 99 100 self._daemon = zope.component.getUtility(ICollector) 101 if taskName.endswith('IPv6'): 102 self.version = 6 103 self._network = self._daemon.ipv6network 104 else: 105 self.version = 4 106 self._network = self._daemon.network 107 108 self._topology = self._network.topology 109 self._downDevices = self._network.downDevices 110 self._notModeled = self._network.notModeled 111 112 self._dataService = zope.component.queryUtility(IDataService) 113 self._eventService = zope.component.queryUtility(IEventService) 114 115 self._lastErrorMsg = ''
116
117 - def doTask(self):
118 """ 119 Determine root cause for IPs being down and send events. 120 """ 121 log.debug('---- IPv%d correlation begins ----', self.version) 122 # Prune out intermediate devices that we don't ping 123 pingGraph = self._network.subgraphPingNodes() 124 125 # The subgraph() call preserves the base graph's structure (DAG), but 126 # returns the connected subtrees (ie all down nodes and their connected 127 # edges). A copy(), not deepcopy() is done to the meta-data. 128 downGraph = pingGraph.subgraph(self._downDevices) 129 130 # All down-nodes are possible roots of down-trees 131 rootEvents = 0 132 victimEvents = 0 133 for root in downGraph.nodes_iter(): 134 if not downGraph.predecessors(root): 135 # This node is the root of subtree 136 victims = dfs_tree(downGraph, source=root).nodes() 137 victims.remove(root) 138 rootEvents += 1 139 victimEvents += len(victims) 140 self._sendPingDown(root, victims) 141 142 stats = 'IPv%d correlator sent %d root cause and %d victim events' % ( 143 self.version, rootEvents, victimEvents) 144 log.debug(stats) 145 log.debug('---- IPv%d correlation ends ----', self.version) 146 return defer.succeed(stats)
147
148 - def _sendPingDown(self, root, victims):
149 """ 150 Send ping down events for the subtree of devices rooted at the root. 151 152 @parameter root: IP address of the root device of the downed subtree 153 @type root: string 154 @parameter victims: IP addresses of the consequences of the downed root device 155 @type victims: array of strings 156 """ 157 log.debug("Sending down events for %s and affected devices %s", 158 root, victims) 159 task = self._topology.node[root]['task'] 160 self.sendPingEvent(task.pingjob, root=root) 161 162 for victim in victims: 163 task = self._topology.node[victim]['task'] 164 self.sendPingEvent(task.pingjob, root=root, 165 eventState=SUPPRESSED)
166
167 - def sendPingEvent(self, pj, root, eventState=None):
168 """ 169 Send an event based on a ping job to the event backend. 170 """ 171 message = pj.message 172 if not message: 173 if pj.severity == 0: 174 message = "Device %s is UP!" % pj.hostname 175 else: 176 message = "Device %s is DOWN!" % pj.hostname 177 evt = dict(device=pj.hostname, 178 ipAddress=pj.ipaddr, 179 summary=message, 180 severity=pj.severity, 181 eventClass=Status_Ping, 182 eventGroup='Ping', 183 rootDevice=root, 184 component=pj.iface) 185 if eventState is not None: 186 evt['eventState'] = eventState 187 self._eventService.sendEvent(evt)
188
189 - def displayStatistics(self):
190 """ 191 Called by the collector framework scheduler, and allows us to 192 see how each task is doing. 193 """ 194 nodes = self._network.topology.number_of_nodes() 195 edges = self._network.topology.number_of_edges() 196 down = len(self._network.downDevices) 197 display = "%s nodes: %d edges: %d down: %d\n" % ( 198 self.name, nodes, edges, down) 199 200 if self._lastErrorMsg: 201 display += "%s\n" % self._lastErrorMsg 202 return display
203
204 - def cleanup(self):
205 # Do one last round of correlation before exiting zenping 206 return self.doTask()
207 208
209 -class PingCorrelatorTaskFactory(SimpleTaskFactory):
210 zope.interface.implements(IPingCorrelatorTaskFactory) 211
212 - def __init__(self):
214 215 #if __name__=='__main__': 216 # TODO: Debugging tool 217 # TODO: read a topology file and use that to populate the graph, 218 # TODO: then read a log file to determine how well the correlation 219 # TODO: technique works. 220