1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """TracerouteTask
15
16 Determines the route to an IP addresses using TTLs (eg traceroute).
17
18 Invoke this file with the list of devices to traceroute to test by hand.
19
20 TracerouteTask.py host1 host2
21
22 """
23
24 import time
25 import logging
26 log = logging.getLogger("zen.tracerouteTask")
27
28 import Globals
29 import zope.interface
30 import zope.component
31
32 from twisted.internet import defer, error
33
34 from Products.ZenCollector.interfaces import IScheduledTask,\
35 ICollector,\
36 IStatisticsService
37 from Products.ZenCollector.tasks import TaskStates
38 from Products.ZenUtils.observable import ObservableMixin
39 from Products.ZenRRD.zencommand import Cmd, ProcessRunner, TimeoutError
40
41 MAX_TRACEROUTES = 10
42
43
45 zope.interface.implements(IScheduledTask)
46
47 STATE_TOPOLOGY_COLLECTION = 'TOPOLOGY_COLLECTION'
48 STATE_TOPOLOGY_PROCESS = 'TOPOLOGY_PROCESS_RESULTS'
49 STATE_TOPOLOGY_UPDATE = 'TOPOLOGY_UPDATE'
50
51 - def __init__(self,
52 taskName,
53 configId=None,
54 scheduleIntervalSeconds=60,
55 taskConfig=None):
56 """
57 @param deviceId: the Zenoss deviceId to watch
58 @type deviceId: string
59 @param taskName: the unique identifier for this task
60 @type taskName: string
61 @param scheduleIntervalSeconds: the interval at which this task will be
62 collected
63 @type scheduleIntervalSeconds: int
64 @param taskConfig: the configuration for this task
65 """
66 super(TracerouteTask, self).__init__()
67
68
69 self.name = taskName
70 self.configId = configId
71 self.state = TaskStates.STATE_IDLE
72 self.interval = scheduleIntervalSeconds
73
74 if taskConfig is None:
75 raise TypeError("taskConfig cannot be None")
76 self._preferences = taskConfig
77
78 self._daemon = zope.component.getUtility(ICollector)
79
80 if taskName.endswith('IPv6'):
81 self.version = 6
82 self._network = self._daemon.ipv6network
83 else:
84 self.version = 4
85 self._network = self._daemon.network
86
87 self._notModeled = self._network.notModeled
88 self._traceTimedOut = self._network.traceTimedOut
89 self._errorDevices = []
90
91
92 self._statService = zope.component.queryUtility(IStatisticsService)
93 self._traceTimeStatName = "%s_traceroute_time" % self.version
94 self._statService.addStatistic(self._traceTimeStatName, "GAUGE")
95
96 self._modeledCount = 0
97 self._failedModeledCount = 0
98
99 self._maxTraceroutes = MAX_TRACEROUTES
100
101 self._lastErrorMsg = ''
102
104 """
105 Twisted errBack to log the exception for a single device.
106
107 @parameter reason: explanation of the failure
108 @type reason: Twisted error instance
109 """
110
111 if isinstance(reason.value, error.TimeoutError):
112 msg = '%s second timeout connecting to device %s' % (
113 'timeout', self._devId)
114
115
116 reason = None
117
118 else:
119 msg = reason.getErrorMessage()
120 if not msg:
121 msg = reason.__class__
122 msg = '%s %s' % (self._devId, msg)
123
124
125
126 if self._lastErrorMsg != msg:
127 self._lastErrorMsg = msg
128 if msg:
129 log.error(msg)
130
131 return reason
132
134 """
135 Traceroute from the collector to the endpoint nodes,
136 with chunking.
137
138 @return: A task to traceroute devices
139 @rtype: Twisted deferred object
140 """
141 self._network.saveTopology()
142 self.deferredCmds = []
143 devices = self._chooseDevicesToTrace()
144 if not devices:
145 return defer.succeed("No devices to trace at this time.")
146
147 log.debug("Devices to trace: %s", devices)
148 for devIp in devices:
149 d = defer.maybeDeferred(self._modelRoute, devIp)
150 self.deferredCmds.append(d)
151
152 dl = defer.DeferredList(self.deferredCmds, consumeErrors=True)
153 dl.addCallback(self._parseResults)
154 dl.addCallback(self._processResults)
155 return dl
156
159
161 """
162 Select the devices to traceroute
163 """
164
165 chunkSize = self._preferences.options.tracechunk
166 traceDevices = self._network.disconnectedNodes()[:chunkSize]
167 if not traceDevices:
168 traceDevices = self._reTraceDevices(chunkSize)
169 return traceDevices
170
172 """
173 Re-traceroute devices based on reasonable criteria.
174 """
175 traceDevices = self._errorDevices[:chunkSize]
176 self._errorDevices = self._errorDevices[chunkSize:]
177 if not traceDevices:
178 pass
179
180 return traceDevices
181
183 """
184 Given an IP address, perform a traceroute to determine (from the
185 physical network devices) the underlying route to the IP from
186 this collector device.
187
188 @parameter ip: IP address of the device to search
189 @type ip: string
190 @returns: a deferred to actually do the modeling
191 @rtype: deferred task
192 """
193 self.state = TracerouteTask.STATE_TOPOLOGY_COLLECTION
194
195 cmd = Cmd()
196 cmd.ds = "TRACEROUTE"
197 cmd.ip = ip
198 cmd.command = "traceroute -n %s" % ip
199 cmd.name = cmd.command
200 class DevProxy(object):
201 zCommandCommandTimeout = self._preferences.options.tracetimeoutseconds
202 cmd.deviceConfig = DevProxy()
203
204 runner = ProcessRunner()
205 d = runner.start(cmd)
206 cmd.lastStart = time.time()
207 d.addBoth(cmd.processCompleted)
208 return d
209
242
244
245 output = command.result.output.split('\n')[1:]
246
247 route = []
248 for line in output:
249 data = line.split()
250 if not data:
251 continue
252 gw = data[1]
253 route.append(gw)
254
255
256 if not route or route[-1] != command.ip:
257 route.append(command.ip)
258 log.debug("Route: %s", route)
259 return route
260
267
269 """
270 Given the list of gateways, add any nodes and construct
271 any edges required to add the device to the topology.
272 """
273 self.state = TracerouteTask.STATE_TOPOLOGY_UPDATE
274 updates = 0
275 for success, route in resultList:
276 if success:
277 if self._network.updateTopology(route):
278 updates += 1
279
280 return "Updated %d routes." % updates
281
283 """
284 Called by the collector framework scheduler, and allows us to
285 see how each task is doing.
286 """
287 badDeviceCount = len(self._errorDevices)
288 display = "%s modelSuccesses: %d modelFailures: %s errorDevices: %s\n" % (
289 self.name, self._modeledCount, self._failedModeledCount,
290 self._errorDevices if badDeviceCount < 10 else badDeviceCount)
291 if self._lastErrorMsg:
292 display += "%s\n" % self._lastErrorMsg
293 return display
294
296 for pr in self.deferredCmds:
297 pr.cancel()
298
299
300 if __name__=='__main__':
301 from twisted.internet import reactor
302 from Products.ZenCollector.daemon import CollectorDaemon
303 from Products.ZenStatus.zenping import PingCollectionPreferences
304 from Products.ZenStatus.NetworkModel import NetworkModel
305 from Products.ZenCollector.interfaces import ICollector
306 from Products.ZenCollector.tasks import SimpleTaskFactory,\
307 SimpleTaskSplitter
308
309
311 daemon = zope.component.getUtility(ICollector)
312 daemon.network = NetworkModel()
313 daemon.ipv6network = NetworkModel(version=6)
314
315 myPreferences = PingCollectionPreferences()
316 myPreferences.postStartup = postStartup
317
318
319 myTaskFactory = SimpleTaskFactory(TracerouteTask)
320 myTaskSplitter = SimpleTaskSplitter(myTaskFactory)
321 daemon = CollectorDaemon(myPreferences, myTaskSplitter)
322
323
324 daemon.network.notModeled = set(daemon.args)
325 task = TracerouteTask('traceroute', 'traceroute', 300,
326 daemon._prefs)
327 daemon._scheduler.addTask(task, daemon._taskCompleteCallback, True)
328 log.setLevel(logging.DEBUG)
329 reactor.run()
330