Package Products :: Package ZenStatus :: Module NetworkModel
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.NetworkModel

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """NetworkModel 
 15   
 16  Maintain the state of the network topology and associated meta-data. 
 17   
 18  """ 
 19   
 20  import time 
 21  import os 
 22  import re 
 23  import logging 
 24  import collections 
 25  log = logging.getLogger("zen.networkModel") 
 26   
 27  from collections import defaultdict 
 28  from networkx import DiGraph, shortest_path, minimum_spanning_tree 
 29  from networkx.readwrite import read_graphml, write_graphml 
 30  from networkx.exception import NetworkXError 
 31   
 32  import Globals 
 33  from zope.component import queryUtility 
 34   
 35  from Products.ZenCollector.interfaces import ICollector 
 36   
 37  from Products.ZenUtils.Utils import zenPath 
 38   
 39   
 40  # TODO: Examine how bad the memory usage is and see if it's worth it 
 41  #       to convert the node labels to being integers rather than strings -- 
 42  #       or if that even affects the memory usage. 
 43  #from Products.ZenUtils.IpUtil import ipToDecimal 
 44   
45 -class NetworkModel(object):
46
47 - def __init__(self, version=4, loadCache=True):
48 self._preferences = queryUtility(ICollector)._prefs 49 50 self.version = version 51 self.notModeled = set() 52 self.traceTimedOut = set() 53 54 self.hopToCloudRules = {} 55 56 self.downDevices = set() 57 58 self.topology = None 59 if loadCache: 60 self.reloadCache() 61 else: 62 self._newTopology()
63
64 - def _newTopology(self):
65 """ 66 Create a blank new topology 67 """ 68 collectorName = self._preferences.options.monitor 69 self.topology = DiGraph(name=collectorName, 70 creationTime=time.time())
71
72 - def getRoute(self, endpoint):
73 """ 74 Given an endpoint name, get the route to it. 75 If the endpoint doesn't exist, return an empty route. 76 77 @parameter endpoint: name of the device 78 @type endpoint: string 79 @returns: list of devices from the collector to the endpoint 80 @rtype: array 81 """ 82 if endpoint not in self.topology: 83 return [] 84 85 # Since we don't allow cycles or multiple references, 86 # the shortest path should also be the ONLY path 87 try: 88 route = shortest_path(self.topology, 89 self._preferences.options.name, 90 endpoint) 91 except NetworkXError: 92 # Node is in the topology but not connected 93 route = [] 94 95 return route
96
97 - def showRoute(self, endpoint):
98 """ 99 Print the route from the collector to the endpoint device IP. 100 101 @parameter device: name of the device 102 @type device: string 103 """ 104 ip = self.resolveName(endpoint) 105 if not ip: 106 log.warn("The device '%s' was not found in the topology" 107 " -- try using the IP address", endpoint) 108 return 109 route = self.getRoute(ip) 110 if route: 111 log.info("Route to %s (%s): %s", endpoint, ip, route) 112 else: 113 log.warn("No route for the device %s (%s) was found", 114 endpoint, ip)
115
116 - def search(self, key, criteria=None, regex=None):
117 """ 118 Search the topology for any matches to the key. 119 120 @parameter key: 121 @type key: string 122 @parameter criteria: 123 @type criteria: string or callable 124 """ 125 # Note that the data coming back from self.topology.nodes(data=True) 126 # is a tuple of 127 # (key, data) 128 if criteria is not None and not callable(criteria): 129 functor = lambda keyData: keyData[1].get(key) == criteria 130 elif regex is not None: 131 cregex = re.compile(regex) 132 functor = lambda keyData: cregex.search(keyData[1].get(key, '')) 133 else: 134 return [] 135 return filter(functor, self.topology.nodes_iter(data=True))
136
137 - def resolveName(self, deviceName):
138 """ 139 Given a device name, resolve to an IP address using only the 140 information in the topology. 141 """ 142 if deviceName in self.topology: 143 return deviceName 144 145 matches = self.search('name', regex=deviceName) 146 if matches: 147 return matches[0]
148
149 - def saveTopology(self):
150 """ 151 Checkpoint the topology to disk to preserve changes even 152 in the event of crashes etc. 153 """ 154 now = time.time() 155 checkpointAge = (now - self.topologySaveTime) / 60 156 if checkpointAge > self._preferences.options.savetopominutes: 157 self._saveTopology()
158
159 - def _getTopoCacheName(self):
160 # The graphml library supports automatic compression of the file 161 # if the filename ends in .gz or .bz2 162 topofile = getattr(self._preferences.options, 163 "ipv%dtopofile" % self.version, None) 164 if topofile: 165 return topofile 166 return zenPath('perf/Daemons/%s.ipv%s.topology.graphml' % ( 167 self._preferences.options.monitor, self.version))
168
169 - def reloadCache(self, topologyCache=None):
170 """ 171 Restore any previously saved topology or create a new one. 172 """ 173 if topologyCache is None: 174 topologyCache = self._getTopoCacheName() 175 if hasattr(topologyCache, 'read') or os.path.exists(topologyCache): 176 try: 177 start = time.time() 178 self.topology = read_graphml(topologyCache) 179 log.info("Read %s nodes and %s edges from IPv%d topology cache in %0.1f s.", 180 self.topology.number_of_nodes(), 181 self.topology.number_of_edges(), 182 self.version, 183 time.time() - start) 184 except IOError, ex: 185 log.warn("Unable to read topology file %s because %s", 186 topologyCache, str(ex)) 187 except SyntaxError, ex: 188 log.warn("Bad topology file %s: %s", 189 topologyCache, str(ex)) 190 try: 191 if not hasattr(topologyCache, 'read'): 192 os.rename(topologyCache, topologyCache + '.broken') 193 except IOError, ex: 194 log.warn("Unable to move bad topology file out of the way: %s", 195 str(ex)) 196 197 if self.topology is None: 198 self._newTopology() 199 200 # Reset the save time to avoid saving the cache immediately 201 self.topologySaveTime = time.time()
202
203 - def _saveTopology(self, topologyCache=None):
204 """ 205 Allow the topology to be periodically saved. 206 """ 207 if not self._preferences.options.cycle: 208 # Prevent possibly trashing the cache if we're not cycling 209 return 210 211 if topologyCache is None: 212 topologyCache = self._getTopoCacheName() 213 214 if self.topology is None: 215 log.debug("Empty topology -- not saving") 216 return 217 218 if not hasattr(topologyCache, 'read') and os.path.exists(topologyCache): 219 try: 220 os.rename(topologyCache, topologyCache + '.previous') 221 except IOError, ex: 222 log.warn("Unable to create backup topology file because %s", 223 str(ex)) 224 try: 225 start = time.time() 226 227 # Strip metadata from topology for persisting cache 228 saved_metadata = self._stripUncacheableMetaData() 229 230 write_graphml(self.topology, topologyCache) 231 232 # Restore metadata 233 for ipAddress, metadata in saved_metadata.iteritems(): 234 self.topology.node[ipAddress].update(metadata) 235 236 log.info("Saved %s nodes and %s edges in IPv%d topology cache in %0.1fs.", 237 self.topology.number_of_nodes(), 238 self.topology.number_of_edges(), 239 self.version, 240 time.time() - start) 241 except IOError, ex: 242 log.warn("Unable to write topology file %s because %s", 243 topologyCache, str(ex)) 244 self.topologySaveTime = time.time()
245
247 """ 248 The topology map carries a direct reference to tasks (ie pointers), 249 which can't be persisted. 250 """ 251 saved = defaultdict(dict) 252 for ipAddress in self.topology.nodes_iter(): 253 if 'task' in self.topology.node[ipAddress]: 254 saved[ipAddress]['task'] = self.topology.node[ipAddress].pop('task') 255 return saved
256
257 - def disconnectedNodes(self):
258 """ 259 Determine the list of devices to traceroute and update 260 the topology map. 261 """ 262 # Devices recently added 263 if self.notModeled: 264 return list(self.notModeled - self.traceTimedOut) 265 266 # Devices added from cache, but not properly modeled 267 return filter(lambda x: self.topology.degree(x) == 0 and \ 268 x not in self.traceTimedOut, 269 self.topology.nodes_iter())
270
271 - def updateTopology(self, route):
272 """ 273 Canonicalize the route, removing any cloud entries, 274 and update the topology (if necessary). 275 276 @returns: was the topology updated or not? 277 @rtype: boolean 278 """ 279 existingRoute = self.getRoute(route[-1]) 280 canonRoute = self._canonicalizeRoute(route) 281 if existingRoute: 282 if existingRoute == canonRoute: 283 return False, False 284 285 lastHop = canonRoute.pop(0) 286 updated = False 287 while canonRoute: 288 hop = canonRoute.pop(0) 289 290 if hop not in self.topology: 291 self.topology.add_node(hop) 292 updated = True 293 294 if lastHop != hop and \ 295 not self.topology.has_edge(lastHop, hop): 296 self.topology.add_edge(lastHop, hop) 297 updated = True 298 lastHop = hop 299 300 return updated
301
302 - def makeSpanningTree(self):
303 """ 304 Clear the existing structure and create a spanning tree. 305 This is a dangerous operation. 306 """ 307 G = minimum_spanning_tree(self.topology.to_undirected()) 308 309 # Note: to_directed() returns a directed graph -- 310 # but with edges in *both* directions 311 312 # Update with the minimum spanning tree edge list 313 newEdgeList = G.edges() 314 oldEdgeList = self.topology.edges() 315 self.topology.remove_edges_from(oldEdgeList) 316 self.topology.add_edges_from(newEdgeList)
317
318 - def _canonicalizeRoute(self, route):
319 """ 320 Given a route, reduce it to a route that can exist in the topology 321 """ 322 lastHop = self._preferences.options.name 323 canonRoute = [lastHop] 324 while route: 325 hop = route.pop(0) 326 if hop == '*': # Build bogus device? 327 continue 328 hop = self._cloudify(hop) 329 canonRoute.append(hop) 330 lastHop = hop 331 return canonRoute
332
333 - def _cloudify(self, hop):
334 """ 335 If the hop is a part of the cloud, then return the cloud name. 336 Else return the hop. 337 338 If the cloud does not exist in the topology, creates it. 339 340 @parameter hop: name or IP address 341 @type hop: string 342 @returns: the hop (if not a part of a cloud) or the cloud 343 @rtype: string 344 """ 345 if hop in self.topology: 346 # If we've already added this into the topology, don't 347 # attempt to mask it out. 348 return hop 349 350 for cloudName, regex in self.hopToCloudRules.items(): 351 if regex.match(hop): 352 self._addCloud(cloudName) 353 return cloudName 354 return hop
355
356 - def _addCloud(self, cloud):
357 """ 358 If the cloud already exists, do nothing. 359 Else, create the cloud and initialize it. 360 361 @parameter cloud: name of the cloud 362 @type cloud: string 363 """ 364 if cloud in self.topology: 365 return 366 self.topology.add_node(cloud, ping=False)
367
368 - def removeDevice(self, device):
369 """ 370 Cleanly remove the device from the topology. 371 372 @parameter device: name of the device to check 373 @type device: string 374 """ 375 if device not in self.topology: 376 log.debug("Device %s not deleted as it is not in topology", 377 device) 378 return 379 route = self.getRoute(device) 380 neighbors = self.topology.neighbors(device) 381 if route and isinstance(route, collections.Sequence) and len(route)>1: 382 route = route[:-1] 383 lastHop = route[-1] 384 if lastHop in neighbors: 385 neighbors.remove(lastHop) 386 self.notModeled.update(neighbors) 387 self.topology.remove_node(device) 388 log.debug("Deleted device %s from topology", device)
389
390 - def subgraphPingNodes(self):
391 """ 392 Prune out nodes from the topology which we don't monitor. 393 This means that if we model the route x -> y -> z 394 but we only ping nodes x + z, that if x and z are down 395 that we associate the cause of z being down as x, rather than being 396 independent. 397 """ 398 # Based on the code from networkx for dfs_tree/dfs_successor 399 seen = set() 400 ignoreList = set() 401 def keepNode(node): 402 if node in ignoreList: 403 return False 404 elif 'task' not in self.topology.node[node]: 405 ignoreList.add(node) 406 return False 407 return True
408 409 tree = {} 410 for root in self.topology.nodes_iter(): 411 if root in seen: 412 continue 413 414 queue = [ root ] 415 lastValidRoot = None 416 while queue: 417 node = queue[-1] 418 if node not in seen: 419 seen.add(node) 420 if keepNode(node): 421 tree[node] = [] 422 lastValidRoot = node 423 424 for w in self.topology.neighbors_iter(node): 425 if w not in seen: 426 queue.append(w) 427 if keepNode(w) and lastValidRoot: 428 tree[lastValidRoot].append(w) 429 break 430 else: 431 queue.pop() 432 433 return DiGraph(tree)
434