Package Products :: Package ZenStatus :: Module PingService
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.PingService

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2011 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """PingService 
 15  Class that provides a way to asynchronously ping (ICMP packets) IP addresses. 
 16  """ 
 17   
 18  import sys 
 19  import os 
 20  import time 
 21  import socket 
 22  import errno 
 23  import logging 
 24  log = logging.getLogger("zen.PingService") 
 25   
 26  # Zenoss custom ICMP library 
 27  from icmpecho.Ping import Ping4, Ping6 
 28   
 29  from twisted.internet import reactor, defer 
 30  from twisted.python.failure import Failure 
 31   
 32  import Globals 
 33  from Products.ZenStatus.PingJob import PingJob 
 34   
35 -class PermissionError(Exception):
36 """Not permitted to access resource."""
37
38 -class IpConflict(Exception):
39 """Pinging two IP pingjobs simultaneously with different hostnames"""
40
41 -class PingJobError(Exception):
42 - def __init__(self, error_message, ipaddr):
43 Exception.__init__(self, error_message) 44 self.ipaddr = ipaddr
45
46 -class PingService(object):
47
48 - def __init__(self, protocol, timeout=2, defaultTries=2):
49 self.reconfigure(timeout) 50 self.procId = os.getpid() 51 self.defaultTries = defaultTries 52 self.jobqueue = {} 53 self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId) 54 55 self._protocol = protocol 56 reactor.addReader(self)
57
58 - def reconfigure(self, timeout=2):
59 self.timeout = timeout
60
61 - def fileno(self):
62 """ 63 The reactor will do reads only if we support a file-like interface 64 """ 65 return self._protocol.fileno()
66
67 - def logPrefix(self):
68 """ 69 The reactor will do reads only if we support a file-like interface 70 """ 71 return None
72
73 - def connectionLost(self, unused):
74 reactor.removeReader(self) 75 self._protocol.close()
76
77 - def ping(self, ip):
78 """ 79 Ping the IP address and return the result in a deferred 80 """ 81 if isinstance(ip, PingJob): 82 pj = ip 83 else: 84 pj = PingJob(ip, maxtries=self.defaultTries) 85 self._ping(pj) 86 return pj.deferred
87
88 - def _ping(self, pingJob):
89 """ 90 Take a pingjob and send an ICMP packet for it 91 """ 92 try: 93 family, sockaddr, echo_kwargs, socket_kwargs = \ 94 pingJob.pingArgs() 95 pingJob.start = self._protocol.send(sockaddr, 96 socket_kwargs, 97 echo_kwargs) 98 pingJob.sent += 1 99 100 reactor.callLater(self.timeout, self.checkTimeout, pingJob) 101 current = self.jobqueue.get(pingJob.ipaddr, None) 102 if current and pingJob.hostname != current.hostname: 103 raise IpConflict("Host %s and %s are both using IP %s" % 104 (pingJob.hostname, 105 current.hostname, 106 pingJob.ipaddr)) 107 self.jobqueue[pingJob.ipaddr] = pingJob 108 except Exception, e: # Note: sockets with bad addresses fail 109 log.debug("%s sendto error %s" % (pingJob.ipaddr, e)) 110 self.pingJobFail(pingJob)
111
112 - def _processPacket(self, reply):
113 """ 114 Examine the parsed reply and determine what to do with it. 115 """ 116 sourceIp = reply['address'] 117 pj = self.jobqueue.get(sourceIp) 118 if reply['alive'] and pj: 119 pj.rcvCount += 1 120 pj.rtt = time.time() - pj.start 121 pj.results.append(pj.rtt) 122 log.debug("%d bytes from %s: icmp_seq=%d time=%0.3f ms", 123 reply['data_size'], sourceIp, reply['sequence'], 124 pj.rtt * 1000) 125 126 if pj.rcvCount >= pj.sampleSize: 127 self.pingJobSucceed(pj) 128 else: 129 self._ping(pj) 130 131 elif not reply['alive'] and pj: 132 log.debug("ICMP unreachable message for %s", pj.ipaddr) 133 self.pingJobFail(pj)
134 135 #else: 136 #log.debug("Unexpected ICMP packet %s %s", sourceIp, reply) 137
138 - def doRead(self):
139 """ 140 Receive packets from the socket and process them. 141 142 The name is required by the reactor select() functionality 143 """ 144 try: 145 for reply, sockaddr in self._protocol.receive(): 146 if not reactor.running: 147 return 148 self._processPacket(reply) 149 except socket.error, err: 150 errnum, errmsg = err.args 151 if errnum == errno.EAGAIN: 152 return 153 raise err 154 except Exception, ex: 155 log.exception("Error while receiving packet: %s" % ex)
156
157 - def pingJobSucceed(self, pj):
158 """ 159 PingJob completed successfully. 160 """ 161 pj.message = "IP %s is up" % pj.ipaddr 162 pj.severity = 0 163 self.dequePingJob(pj) 164 if not pj.deferred.called: 165 pj.deferred.callback(pj)
166
167 - def pingJobFail(self, pj):
168 """ 169 PingJob has failed -- remove from jobqueue. 170 """ 171 pj.rtt = -1 172 pj.message = "IP %s is down" % pj.ipaddr 173 self.dequePingJob(pj) 174 if not pj.deferred.called: 175 pj.deferred.errback(Failure(PingJobError(pj.message, pj.ipaddr)))
176
177 - def dequePingJob(self, pj):
178 try: 179 del self.jobqueue[pj.ipaddr] 180 except KeyError: 181 pass
182
183 - def checkTimeout(self, pj):
184 if pj.ipaddr in self.jobqueue: 185 runtime = time.time() - pj.start 186 if runtime > self.timeout: 187 pj.loss += 1 188 log.debug("%s pingjob timeout on attempt %d (timeout=%ss, max tries=%s)", 189 pj.ipaddr, pj.loss, self.timeout, pj.maxtries) 190 if pj.loss >= pj.maxtries: 191 self.pingJobFail(pj) 192 else: 193 self._ping(pj) 194 else: 195 log.debug("Calling checkTimeout needlessly for %s", pj.ipaddr)
196
197 - def jobCount(self):
198 return len(self.jobqueue)
199 200
201 -def _printResults(results, start):
202 good = [pj for s, pj in results if s and pj.rtt >= 0] 203 bad = [pj for s, pj in results if s and pj.rtt < 0] 204 if good: print "Good IPs: %s" % " ".join(g.ipaddr for g in good) 205 if bad: print "Bad IPs: %s" % " ".join(b.ipaddr for b in bad) 206 print "Tested %d IPs in %.2f seconds" % (len(results), time.time() - start) 207 reactor.stop()
208 209 if __name__ == "__main__": 210 # Sockets are injected into the main module by pyraw 211 # pyraw PingService.py [ip_addresses] 212 213 protocol = Ping4(IPV4_SOCKET) 214 ping = PingService(protocol) 215 logging.basicConfig() 216 log = logging.getLogger() 217 log.setLevel(10) 218 if len(sys.argv) > 1: 219 targets = sys.argv[1:] 220 else: 221 targets = ("127.0.0.1",) 222 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True) 223 lst.addCallback(_printResults, time.time()) 224 reactor.run() 225