Package Products :: Package ZenStatus :: Module TracerouteTask
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.TracerouteTask

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """TracerouteTask 
 15   
 16  Determines the route to an IP addresses using TTLs (eg traceroute). 
 17   
 18  Invoke this file with the list of devices to traceroute to test by hand. 
 19   
 20  TracerouteTask.py host1 host2 
 21   
 22  """ 
 23   
 24  import time 
 25  import logging 
 26  log = logging.getLogger("zen.tracerouteTask") 
 27   
 28  import Globals 
 29  import zope.interface 
 30  import zope.component 
 31   
 32  from twisted.internet import defer, error 
 33   
 34  from Products.ZenCollector.interfaces import IScheduledTask,\ 
 35                                               ICollector,\ 
 36                                               IStatisticsService 
 37  from Products.ZenCollector.tasks import TaskStates 
 38  from Products.ZenUtils.observable import ObservableMixin 
 39  from Products.ZenRRD.zencommand import Cmd, ProcessRunner, TimeoutError 
 40   
 41  MAX_TRACEROUTES = 10 
 42   
 43   
44 -class TracerouteTask(ObservableMixin):
45 zope.interface.implements(IScheduledTask) 46 47 STATE_TOPOLOGY_COLLECTION = 'TOPOLOGY_COLLECTION' 48 STATE_TOPOLOGY_PROCESS = 'TOPOLOGY_PROCESS_RESULTS' 49 STATE_TOPOLOGY_UPDATE = 'TOPOLOGY_UPDATE' 50
51 - def __init__(self, 52 taskName, 53 configId=None, 54 scheduleIntervalSeconds=60, 55 taskConfig=None):
56 """ 57 @param deviceId: the Zenoss deviceId to watch 58 @type deviceId: string 59 @param taskName: the unique identifier for this task 60 @type taskName: string 61 @param scheduleIntervalSeconds: the interval at which this task will be 62 collected 63 @type scheduleIntervalSeconds: int 64 @param taskConfig: the configuration for this task 65 """ 66 super(TracerouteTask, self).__init__() 67 68 # Needed for interface 69 self.name = taskName 70 self.configId = configId 71 self.state = TaskStates.STATE_IDLE 72 self.interval = scheduleIntervalSeconds 73 74 if taskConfig is None: 75 raise TypeError("taskConfig cannot be None") 76 self._preferences = taskConfig 77 78 self._daemon = zope.component.getUtility(ICollector) 79 80 if taskName.endswith('IPv6'): 81 self.version = 6 82 self._network = self._daemon.ipv6network 83 else: 84 self.version = 4 85 self._network = self._daemon.network 86 87 self._notModeled = self._network.notModeled 88 self._traceTimedOut = self._network.traceTimedOut 89 self._errorDevices = [] 90 91 # add our collector's custom statistics 92 self._statService = zope.component.queryUtility(IStatisticsService) 93 self._traceTimeStatName = "%s_traceroute_time" % self.version 94 self._statService.addStatistic(self._traceTimeStatName, "GAUGE") 95 96 self._modeledCount = 0 97 self._failedModeledCount = 0 98 99 self._maxTraceroutes = MAX_TRACEROUTES 100 101 self._lastErrorMsg = ''
102
103 - def _failure(self, reason):
104 """ 105 Twisted errBack to log the exception for a single device. 106 107 @parameter reason: explanation of the failure 108 @type reason: Twisted error instance 109 """ 110 # Decode the exception 111 if isinstance(reason.value, error.TimeoutError): 112 msg = '%s second timeout connecting to device %s' % ( 113 'timeout', self._devId) 114 # Indicate that we've handled the error by 115 # not returning a result 116 reason = None 117 118 else: 119 msg = reason.getErrorMessage() 120 if not msg: # Sometimes we get blank error messages 121 msg = reason.__class__ 122 msg = '%s %s' % (self._devId, msg) 123 124 # Leave 'reason' alone to generate a traceback 125 126 if self._lastErrorMsg != msg: 127 self._lastErrorMsg = msg 128 if msg: 129 log.error(msg) 130 131 return reason
132
133 - def doTask(self):
134 """ 135 Traceroute from the collector to the endpoint nodes, 136 with chunking. 137 138 @return: A task to traceroute devices 139 @rtype: Twisted deferred object 140 """ 141 self._network.saveTopology() 142 self.deferredCmds = [] 143 devices = self._chooseDevicesToTrace() 144 if not devices: 145 return defer.succeed("No devices to trace at this time.") 146 147 log.debug("Devices to trace: %s", devices) 148 for devIp in devices: 149 d = defer.maybeDeferred(self._modelRoute, devIp) 150 self.deferredCmds.append(d) 151 152 dl = defer.DeferredList(self.deferredCmds, consumeErrors=True) 153 dl.addCallback(self._parseResults) 154 dl.addCallback(self._processResults) 155 return dl
156
157 - def cleanup(self):
158 pass
159
160 - def _chooseDevicesToTrace(self):
161 """ 162 Select the devices to traceroute 163 """ 164 # Get the first chunkSize or fewer devices 165 chunkSize = self._preferences.options.tracechunk 166 traceDevices = self._network.disconnectedNodes()[:chunkSize] 167 if not traceDevices: 168 traceDevices = self._reTraceDevices(chunkSize) 169 return traceDevices
170
171 - def _reTraceDevices(self, chunkSize):
172 """ 173 Re-traceroute devices based on reasonable criteria. 174 """ 175 traceDevices = self._errorDevices[:chunkSize] 176 self._errorDevices = self._errorDevices[chunkSize:] 177 if not traceDevices: 178 pass 179 # TODO: do something useful here (eg choose devices at random?) 180 return traceDevices
181
182 - def _modelRoute(self, ip):
183 """ 184 Given an IP address, perform a traceroute to determine (from the 185 physical network devices) the underlying route to the IP from 186 this collector device. 187 188 @parameter ip: IP address of the device to search 189 @type ip: string 190 @returns: a deferred to actually do the modeling 191 @rtype: deferred task 192 """ 193 self.state = TracerouteTask.STATE_TOPOLOGY_COLLECTION 194 # TODO: use a Python-level library rather than spawning a process 195 cmd = Cmd() 196 cmd.ds = "TRACEROUTE" 197 cmd.ip = ip 198 cmd.command = "traceroute -n %s" % ip 199 cmd.name = cmd.command 200 class DevProxy(object): 201 zCommandCommandTimeout = self._preferences.options.tracetimeoutseconds
202 cmd.deviceConfig = DevProxy() 203 204 runner = ProcessRunner() 205 d = runner.start(cmd) 206 cmd.lastStart = time.time() 207 d.addBoth(cmd.processCompleted) 208 return d
209
210 - def _parseResults(self, resultList):
211 """ 212 Take the raw results of the traceroute requests and format for 213 updates to the topology. 214 Note that just because a device cannot be traceroute'd does 215 *NOT* mean that it is down. 216 """ 217 self.state = TracerouteTask.STATE_TOPOLOGY_PROCESS 218 newResultList = [] 219 for success, command in resultList: 220 if success: 221 self._modeledCount += 1 222 result = self._parseResult(command) 223 newResultList.append((success, result)) 224 self._updateStatistics(command) 225 else: 226 self._failedModeledCount += 1 227 228 reason = command 229 if isinstance(reason.value, defer.CancelledError): 230 log.debug("Cancelled a traceroute process during shutdown") 231 continue 232 233 command = reason.value.args[0] 234 self._errorDevices.append(command.ip) 235 if isinstance(reason.value, TimeoutError): 236 msg = "Traceroute of %s timed out" % command.ip 237 log.debug(msg) 238 self._traceTimedOut.add(command.ip) 239 else: 240 log.warn(reason.command.getTraceback()) 241 return newResultList
242
243 - def _parseResult(self, command):
244 # Discard the first line of output 245 output = command.result.output.split('\n')[1:] 246 # 1 10.175.211.10 0.228 ms 0.146 ms 0.131 ms 247 route = [] 248 for line in output: 249 data = line.split() 250 if not data: 251 continue 252 gw = data[1] 253 route.append(gw) 254 255 # Note: sometimes we can't determine the route 256 if not route or route[-1] != command.ip: 257 route.append(command.ip) 258 log.debug("Route: %s", route) 259 return route
260
261 - def _updateStatistics(self, command):
262 """ 263 Track our traceroute statistics 264 """ 265 stat = self._statService.getStatistic(self._traceTimeStatName) 266 stat.value = command.lastStop - command.lastStart
267
268 - def _processResults(self, resultList):
269 """ 270 Given the list of gateways, add any nodes and construct 271 any edges required to add the device to the topology. 272 """ 273 self.state = TracerouteTask.STATE_TOPOLOGY_UPDATE 274 updates = 0 275 for success, route in resultList: 276 if success: 277 if self._network.updateTopology(route): 278 updates += 1 279 280 return "Updated %d routes." % updates
281
282 - def displayStatistics(self):
283 """ 284 Called by the collector framework scheduler, and allows us to 285 see how each task is doing. 286 """ 287 badDeviceCount = len(self._errorDevices) 288 display = "%s modelSuccesses: %d modelFailures: %s errorDevices: %s\n" % ( 289 self.name, self._modeledCount, self._failedModeledCount, 290 self._errorDevices if badDeviceCount < 10 else badDeviceCount) 291 if self._lastErrorMsg: 292 display += "%s\n" % self._lastErrorMsg 293 return display
294
295 - def cleanup(self):
296 for pr in self.deferredCmds: 297 pr.cancel()
298 299 300 if __name__=='__main__': 301 from twisted.internet import reactor 302 from Products.ZenCollector.daemon import CollectorDaemon 303 from Products.ZenStatus.zenping import PingCollectionPreferences 304 from Products.ZenStatus.NetworkModel import NetworkModel 305 from Products.ZenCollector.interfaces import ICollector 306 from Products.ZenCollector.tasks import SimpleTaskFactory,\ 307 SimpleTaskSplitter 308 309 # Evil hack to avoid daemon command-line parsing
310 - def postStartup():
311 daemon = zope.component.getUtility(ICollector) 312 daemon.network = NetworkModel() 313 daemon.ipv6network = NetworkModel(version=6)
314 315 myPreferences = PingCollectionPreferences() 316 myPreferences.postStartup = postStartup 317 318 # Daemon setup 319 myTaskFactory = SimpleTaskFactory(TracerouteTask) 320 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 321 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 322 323 # Now run traceroutes on devices 324 daemon.network.notModeled = set(daemon.args) 325 task = TracerouteTask('traceroute', 'traceroute', 300, 326 daemon._prefs) 327 daemon._scheduler.addTask(task, daemon._taskCompleteCallback, True) 328 log.setLevel(logging.DEBUG) 329 reactor.run() 330