1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """NetworkModel
15
16 Maintain the state of the network topology and associated meta-data.
17
18 """
19
20 import time
21 import os
22 import re
23 import logging
24 import collections
25 log = logging.getLogger("zen.networkModel")
26
27 from collections import defaultdict
28 from networkx import DiGraph, shortest_path, minimum_spanning_tree
29 from networkx.readwrite import read_graphml, write_graphml
30 from networkx.exception import NetworkXError
31
32 import Globals
33 from zope.component import queryUtility
34
35 from Products.ZenCollector.interfaces import ICollector
36
37 from Products.ZenUtils.Utils import zenPath
38
39
40
41
42
43
44
46
47 - def __init__(self, version=4, loadCache=True):
48 self._preferences = queryUtility(ICollector)._prefs
49
50 self.version = version
51 self.notModeled = set()
52 self.traceTimedOut = set()
53
54 self.hopToCloudRules = {}
55
56 self.downDevices = set()
57
58 self.topology = None
59 if loadCache:
60 self.reloadCache()
61 else:
62 self._newTopology()
63
65 """
66 Create a blank new topology
67 """
68 collectorName = self._preferences.options.monitor
69 self.topology = DiGraph(name=collectorName,
70 creationTime=time.time())
71
73 """
74 Given an endpoint name, get the route to it.
75 If the endpoint doesn't exist, return an empty route.
76
77 @parameter endpoint: name of the device
78 @type endpoint: string
79 @returns: list of devices from the collector to the endpoint
80 @rtype: array
81 """
82 if endpoint not in self.topology:
83 return []
84
85
86
87 try:
88 route = shortest_path(self.topology,
89 self._preferences.options.name,
90 endpoint)
91 except NetworkXError:
92
93 route = []
94
95 return route
96
98 """
99 Print the route from the collector to the endpoint device IP.
100
101 @parameter device: name of the device
102 @type device: string
103 """
104 ip = self.resolveName(endpoint)
105 if not ip:
106 log.warn("The device '%s' was not found in the topology"
107 " -- try using the IP address", endpoint)
108 return
109 route = self.getRoute(ip)
110 if route:
111 log.info("Route to %s (%s): %s", endpoint, ip, route)
112 else:
113 log.warn("No route for the device %s (%s) was found",
114 endpoint, ip)
115
116 - def search(self, key, criteria=None, regex=None):
117 """
118 Search the topology for any matches to the key.
119
120 @parameter key:
121 @type key: string
122 @parameter criteria:
123 @type criteria: string or callable
124 """
125
126
127
128 if criteria is not None and not callable(criteria):
129 functor = lambda keyData: keyData[1].get(key) == criteria
130 elif regex is not None:
131 cregex = re.compile(regex)
132 functor = lambda keyData: cregex.search(keyData[1].get(key, ''))
133 else:
134 return []
135 return filter(functor, self.topology.nodes_iter(data=True))
136
138 """
139 Given a device name, resolve to an IP address using only the
140 information in the topology.
141 """
142 if deviceName in self.topology:
143 return deviceName
144
145 matches = self.search('name', regex=deviceName)
146 if matches:
147 return matches[0]
148
150 """
151 Checkpoint the topology to disk to preserve changes even
152 in the event of crashes etc.
153 """
154 now = time.time()
155 checkpointAge = (now - self.topologySaveTime) / 60
156 if checkpointAge > self._preferences.options.savetopominutes:
157 self._saveTopology()
158
160
161
162 topofile = getattr(self._preferences.options,
163 "ipv%dtopofile" % self.version, None)
164 if topofile:
165 return topofile
166 return zenPath('perf/Daemons/%s.ipv%s.topology.graphml' % (
167 self._preferences.options.monitor, self.version))
168
170 """
171 Restore any previously saved topology or create a new one.
172 """
173 if topologyCache is None:
174 topologyCache = self._getTopoCacheName()
175 if hasattr(topologyCache, 'read') or os.path.exists(topologyCache):
176 try:
177 start = time.time()
178 self.topology = read_graphml(topologyCache)
179 log.info("Read %s nodes and %s edges from IPv%d topology cache in %0.1f s.",
180 self.topology.number_of_nodes(),
181 self.topology.number_of_edges(),
182 self.version,
183 time.time() - start)
184 except IOError, ex:
185 log.warn("Unable to read topology file %s because %s",
186 topologyCache, str(ex))
187 except SyntaxError, ex:
188 log.warn("Bad topology file %s: %s",
189 topologyCache, str(ex))
190 try:
191 if not hasattr(topologyCache, 'read'):
192 os.rename(topologyCache, topologyCache + '.broken')
193 except IOError, ex:
194 log.warn("Unable to move bad topology file out of the way: %s",
195 str(ex))
196
197 if self.topology is None:
198 self._newTopology()
199
200
201 self.topologySaveTime = time.time()
202
204 """
205 Allow the topology to be periodically saved.
206 """
207 if not self._preferences.options.cycle:
208
209 return
210
211 if topologyCache is None:
212 topologyCache = self._getTopoCacheName()
213
214 if self.topology is None:
215 log.debug("Empty topology -- not saving")
216 return
217
218 if not hasattr(topologyCache, 'read') and os.path.exists(topologyCache):
219 try:
220 os.rename(topologyCache, topologyCache + '.previous')
221 except IOError, ex:
222 log.warn("Unable to create backup topology file because %s",
223 str(ex))
224 try:
225 start = time.time()
226
227
228 saved_metadata = self._stripUncacheableMetaData()
229
230 write_graphml(self.topology, topologyCache)
231
232
233 for ipAddress, metadata in saved_metadata.iteritems():
234 self.topology.node[ipAddress].update(metadata)
235
236 log.info("Saved %s nodes and %s edges in IPv%d topology cache in %0.1fs.",
237 self.topology.number_of_nodes(),
238 self.topology.number_of_edges(),
239 self.version,
240 time.time() - start)
241 except IOError, ex:
242 log.warn("Unable to write topology file %s because %s",
243 topologyCache, str(ex))
244 self.topologySaveTime = time.time()
245
256
258 """
259 Determine the list of devices to traceroute and update
260 the topology map.
261 """
262
263 if self.notModeled:
264 return list(self.notModeled - self.traceTimedOut)
265
266
267 return filter(lambda x: self.topology.degree(x) == 0 and \
268 x not in self.traceTimedOut,
269 self.topology.nodes_iter())
270
272 """
273 Canonicalize the route, removing any cloud entries,
274 and update the topology (if necessary).
275
276 @returns: was the topology updated or not?
277 @rtype: boolean
278 """
279 existingRoute = self.getRoute(route[-1])
280 canonRoute = self._canonicalizeRoute(route)
281 if existingRoute:
282 if existingRoute == canonRoute:
283 return False, False
284
285 lastHop = canonRoute.pop(0)
286 updated = False
287 while canonRoute:
288 hop = canonRoute.pop(0)
289
290 if hop not in self.topology:
291 self.topology.add_node(hop)
292 updated = True
293
294 if lastHop != hop and \
295 not self.topology.has_edge(lastHop, hop):
296 self.topology.add_edge(lastHop, hop)
297 updated = True
298 lastHop = hop
299
300 return updated
301
303 """
304 Clear the existing structure and create a spanning tree.
305 This is a dangerous operation.
306 """
307 G = minimum_spanning_tree(self.topology.to_undirected())
308
309
310
311
312
313 newEdgeList = G.edges()
314 oldEdgeList = self.topology.edges()
315 self.topology.remove_edges_from(oldEdgeList)
316 self.topology.add_edges_from(newEdgeList)
317
319 """
320 Given a route, reduce it to a route that can exist in the topology
321 """
322 lastHop = self._preferences.options.name
323 canonRoute = [lastHop]
324 while route:
325 hop = route.pop(0)
326 if hop == '*':
327 continue
328 hop = self._cloudify(hop)
329 canonRoute.append(hop)
330 lastHop = hop
331 return canonRoute
332
334 """
335 If the hop is a part of the cloud, then return the cloud name.
336 Else return the hop.
337
338 If the cloud does not exist in the topology, creates it.
339
340 @parameter hop: name or IP address
341 @type hop: string
342 @returns: the hop (if not a part of a cloud) or the cloud
343 @rtype: string
344 """
345 if hop in self.topology:
346
347
348 return hop
349
350 for cloudName, regex in self.hopToCloudRules.items():
351 if regex.match(hop):
352 self._addCloud(cloudName)
353 return cloudName
354 return hop
355
357 """
358 If the cloud already exists, do nothing.
359 Else, create the cloud and initialize it.
360
361 @parameter cloud: name of the cloud
362 @type cloud: string
363 """
364 if cloud in self.topology:
365 return
366 self.topology.add_node(cloud, ping=False)
367
369 """
370 Cleanly remove the device from the topology.
371
372 @parameter device: name of the device to check
373 @type device: string
374 """
375 if device not in self.topology:
376 log.debug("Device %s not deleted as it is not in topology",
377 device)
378 return
379 route = self.getRoute(device)
380 neighbors = self.topology.neighbors(device)
381 if route and isinstance(route, collections.Sequence) and len(route)>1:
382 route = route[:-1]
383 lastHop = route[-1]
384 if lastHop in neighbors:
385 neighbors.remove(lastHop)
386 self.notModeled.update(neighbors)
387 self.topology.remove_node(device)
388 log.debug("Deleted device %s from topology", device)
389
391 """
392 Prune out nodes from the topology which we don't monitor.
393 This means that if we model the route x -> y -> z
394 but we only ping nodes x + z, that if x and z are down
395 that we associate the cause of z being down as x, rather than being
396 independent.
397 """
398
399 seen = set()
400 ignoreList = set()
401 def keepNode(node):
402 if node in ignoreList:
403 return False
404 elif 'task' not in self.topology.node[node]:
405 ignoreList.add(node)
406 return False
407 return True
408
409 tree = {}
410 for root in self.topology.nodes_iter():
411 if root in seen:
412 continue
413
414 queue = [ root ]
415 lastValidRoot = None
416 while queue:
417 node = queue[-1]
418 if node not in seen:
419 seen.add(node)
420 if keepNode(node):
421 tree[node] = []
422 lastValidRoot = node
423
424 for w in self.topology.neighbors_iter(node):
425 if w not in seen:
426 queue.append(w)
427 if keepNode(w) and lastValidRoot:
428 tree[lastValidRoot].append(w)
429 break
430 else:
431 queue.pop()
432
433 return DiGraph(tree)
434