1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """TopologyCorrelatorTask
15
16 Examines the pending event queue and determines what events need to be sent.
17 The correlator can reschedule ping jobs to delay or obsess under stress
18 conditions.
19
20 The correlator is also the only job to send out events for the managedIps.
21
22 The correlator may not have all of the devices that are associated with an
23 IP realm or even a regular network. The implication is that two collectors
24 on the same server will split up the devices into two trees, neither of
25 which may have knowledge of the complete network topology.
26
27 This same issue occurs for layer 2 devices, which may dole out the work
28 to determine up/down status of connected devices over multiple collectors.
29
30 The simple case of just the Zenoss master server can be made to work, as
31 zenping will have full knowledge of the topology.
32 """
33
34 import logging
35 log = logging.getLogger("zen.correlatorTask")
36
37 from networkx import dfs_tree
38
39 import Globals
40 import zope.interface
41 import zope.component
42
43 from twisted.internet import defer
44
45 from Products.ZenCollector.interfaces import IDataService,\
46 ICollector,\
47 IEventService,\
48 IScheduledTask
49 from Products.ZenCollector.tasks import BaseTask,\
50 TaskStates
51
52 from Products.ZenEvents.ZenEventClasses import Status_Ping
53 from Products.ZenCollector.tasks import SimpleTaskFactory
54 from interfaces import IPingCorrelatorTaskFactory
55
56 STATUS_EVENT = {
57 'eventClass' : Status_Ping,
58 'component' : 'zenping',
59 'eventGroup' : 'Ping' }
60
61
62 SUPPRESSED = 2
63
65 zope.interface.implements(IScheduledTask)
66
67 - def __init__(self,
68 taskName, configId,
69 scheduleIntervalSeconds=60,
70 taskConfig=None):
71 """
72 @param deviceId: the Zenoss deviceId to watch
73 @type deviceId: string
74 @param taskName: the unique identifier for this task
75 @type taskName: string
76 @param scheduleIntervalSeconds: the interval at which this task will be
77 collected
78 @type scheduleIntervalSeconds: int
79 @param taskConfig: the configuration for this task
80 """
81 super(TopologyCorrelatorTask, self).__init__(
82 taskName, configId,
83 scheduleIntervalSeconds, taskConfig=None
84 )
85
86
87 self.name = taskName
88 self.configId = configId
89 self.state = TaskStates.STATE_IDLE
90 self.interval = scheduleIntervalSeconds
91
92
93 self.stopPhase = 'before'
94 self.stopOrder = 10
95
96 if taskConfig is None:
97 raise TypeError("taskConfig cannot be None")
98 self._preferences = taskConfig
99
100 self._daemon = zope.component.getUtility(ICollector)
101 if taskName.endswith('IPv6'):
102 self.version = 6
103 self._network = self._daemon.ipv6network
104 else:
105 self.version = 4
106 self._network = self._daemon.network
107
108 self._topology = self._network.topology
109 self._downDevices = self._network.downDevices
110 self._notModeled = self._network.notModeled
111
112 self._dataService = zope.component.queryUtility(IDataService)
113 self._eventService = zope.component.queryUtility(IEventService)
114
115 self._lastErrorMsg = ''
116
118 """
119 Determine root cause for IPs being down and send events.
120 """
121 log.debug('---- IPv%d correlation begins ----', self.version)
122
123 pingGraph = self._network.subgraphPingNodes()
124
125
126
127
128 downGraph = pingGraph.subgraph(self._downDevices)
129
130
131 rootEvents = 0
132 victimEvents = 0
133 for root in downGraph.nodes_iter():
134 if not downGraph.predecessors(root):
135
136 victims = dfs_tree(downGraph, source=root).nodes()
137 victims.remove(root)
138 rootEvents += 1
139 victimEvents += len(victims)
140 self._sendPingDown(root, victims)
141
142 stats = 'IPv%d correlator sent %d root cause and %d victim events' % (
143 self.version, rootEvents, victimEvents)
144 log.debug(stats)
145 log.debug('---- IPv%d correlation ends ----', self.version)
146 return defer.succeed(stats)
147
149 """
150 Send ping down events for the subtree of devices rooted at the root.
151
152 @parameter root: IP address of the root device of the downed subtree
153 @type root: string
154 @parameter victims: IP addresses of the consequences of the downed root device
155 @type victims: array of strings
156 """
157 log.debug("Sending down events for %s and affected devices %s",
158 root, victims)
159 task = self._topology.node[root]['task']
160 self.sendPingEvent(task.pingjob, root=root)
161
162 for victim in victims:
163 task = self._topology.node[victim]['task']
164 self.sendPingEvent(task.pingjob, root=root,
165 eventState=SUPPRESSED)
166
188
190 """
191 Called by the collector framework scheduler, and allows us to
192 see how each task is doing.
193 """
194 nodes = self._network.topology.number_of_nodes()
195 edges = self._network.topology.number_of_edges()
196 down = len(self._network.downDevices)
197 display = "%s nodes: %d edges: %d down: %d\n" % (
198 self.name, nodes, edges, down)
199
200 if self._lastErrorMsg:
201 display += "%s\n" % self._lastErrorMsg
202 return display
203
207
208
214
215
216
217
218
219
220