Package Products :: Package ZenHub :: Module PBDaemon
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.PBDaemon

  1  ########################################################################## 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """PBDaemon 
 15   
 16  Base for daemons that connect to zenhub 
 17   
 18  """ 
 19   
 20  import sys 
 21  import time 
 22  import traceback 
 23   
 24  import Globals 
 25   
 26  from Products.ZenUtils.ZenDaemon import ZenDaemon 
 27  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 28  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 29  from Products.ZenUtils.DaemonStats import DaemonStats 
 30  from Products.ZenUtils.Driver import drive 
 31  from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \ 
 32                                                  Clear, Warning 
 33   
 34  from twisted.cred import credentials 
 35  from twisted.internet import reactor, defer 
 36  from twisted.internet.error import ConnectionLost, ReactorNotRunning 
 37  from twisted.spread import pb 
 38  from twisted.python.failure import Failure 
 39  import twisted.python.log 
 40   
 41  from ZODB.POSException import ConflictError 
42 43 -class RemoteException(Exception, pb.Copyable, pb.RemoteCopy):
44 "Exception that can cross the PB barrier"
45 - def __init__(self, msg, tb):
46 Exception.__init__(self, msg) 47 self.traceback = tb
48 - def __str__(self):
49 return Exception.__str__(self) + self.traceback
50 51 pb.setUnjellyableForClass(RemoteException, RemoteException)
52 53 # ZODB conflicts 54 -class RemoteConflictError(RemoteException): pass
55 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError)
56 57 # Invalid monitor specified 58 -class RemoteBadMonitor(RemoteException): pass
59
60 -def translateError(callable):
61 """ 62 Decorator function to wrap remote exceptions into something 63 understandable by our daemon. 64 65 @parameter callable: function to wrap 66 @type callable: function 67 @return: function's return or an exception 68 @rtype: various 69 """ 70 def inner(*args, **kw): 71 """ 72 Interior decorator 73 """ 74 try: 75 return callable(*args, **kw) 76 except ConflictError, ex: 77 raise RemoteConflictError( 78 'Remote exception: %s: %s' % (ex.__class__, ex), 79 traceback.format_exc()) 80 except Exception, ex: 81 raise RemoteException( 82 'Remote exception: %s: %s' % (ex.__class__, ex), 83 traceback.format_exc())
84 return inner 85 86 87 PB_PORT = 8789 88 89 startEvent = { 90 'eventClass': App_Start, 91 'summary': 'started', 92 'severity': Clear, 93 } 94 95 stopEvent = { 96 'eventClass':App_Stop, 97 'summary': 'stopped', 98 'severity': Warning, 99 } 100 101 102 DEFAULT_HUB_HOST = 'localhost' 103 DEFAULT_HUB_PORT = PB_PORT 104 DEFAULT_HUB_USERNAME = 'admin' 105 DEFAULT_HUB_PASSWORD = 'zenoss' 106 DEFAULT_HUB_MONITOR = 'localhost'
107 108 -class HubDown(Exception): pass
109
110 -class FakeRemote:
111 - def callRemote(self, *unused):
112 ex = HubDown("ZenHub is down") 113 return defer.fail(ex)
114
115 -class PBDaemon(ZenDaemon, pb.Referenceable):
116 117 name = 'pbdaemon' 118 initialServices = ['EventService'] 119 heartbeatEvent = {'eventClass':Heartbeat} 120 heartbeatTimeout = 60*3 121 _customexitcode = 0 122 _sendingEvents = False 123
124 - def __init__(self, noopts=0, keeproot=False, name=None):
125 # if we were provided our collector name via the constructor instead of 126 # via code, be sure to store it correctly. 127 if name is not None: 128 self.name = name 129 self.mname = name 130 131 try: 132 ZenDaemon.__init__(self, noopts, keeproot) 133 134 except IOError: 135 import traceback 136 self.log.critical( traceback.format_exc( 0 ) ) 137 sys.exit(1) 138 139 self.rrdStats = DaemonStats() 140 self.lastStats = 0 141 self.perspective = None 142 self.services = {} 143 self.eventQueue = [] 144 self.startEvent = startEvent.copy() 145 self.stopEvent = stopEvent.copy() 146 details = dict(component=self.name, device=self.options.monitor) 147 for evt in self.startEvent, self.stopEvent, self.heartbeatEvent: 148 evt.update(details) 149 self.initialConnect = defer.Deferred() 150 self.stopped = False 151 self._eventStatus = {}
152
153 - def gotPerspective(self, perspective):
154 """ 155 This gets called every time we reconnect. 156 157 @parameter perspective: Twisted perspective object 158 @type perspective: Twisted perspective object 159 """ 160 self.log.info("Connected to ZenHub") 161 self.perspective = perspective 162 d2 = self.getInitialServices() 163 if self.initialConnect: 164 self.log.debug('Chaining getInitialServices with d2') 165 self.initialConnect, d = None, self.initialConnect 166 d2.chainDeferred(d)
167 168
169 - def connect(self):
170 factory = ReconnectingPBClientFactory() 171 self.log.info("Connecting to %s:%d" % (self.options.hubhost, 172 self.options.hubport)) 173 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 174 username = self.options.hubusername 175 password = self.options.hubpassword 176 self.log.debug("Logging in as %s" % username) 177 c = credentials.UsernamePassword(username, password) 178 factory.gotPerspective = self.gotPerspective 179 factory.startLogin(c) 180 def timeout(d): 181 if not d.called: 182 self.connectTimeout()
183 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect) 184 return self.initialConnect
185
186 - def connectTimeout(self):
187 self.log.error('Timeout connecting to zenhub: is it running?') 188 pass
189
190 - def eventService(self):
191 return self.getServiceNow('EventService')
192 193
194 - def getServiceNow(self, svcName):
195 if not svcName in self.services: 196 self.log.warning('No service %s named: ZenHub may be disconnected' % svcName) 197 return self.services.get(svcName, None) or FakeRemote()
198 199
200 - def getService(self, serviceName, serviceListeningInterface=None):
201 """ 202 Attempt to get a service from zenhub. Returns a deferred. 203 When service is retrieved it is stashed in self.services with 204 serviceName as the key. When getService is called it will first 205 check self.services and if serviceName is already there it will return 206 the entry from self.services wrapped in a defer.succeed 207 """ 208 if serviceName in self.services: 209 return defer.succeed(self.services[serviceName]) 210 211 def removeService(ignored): 212 self.log.debug('Removing service %s' % serviceName) 213 if serviceName in self.services: 214 del self.services[serviceName]
215 216 def callback(result, serviceName): 217 self.log.debug('Loaded service %s from zenhub' % serviceName) 218 self.services[serviceName] = result 219 result.notifyOnDisconnect(removeService) 220 return result 221 222 def errback(error, serviceName): 223 self.log.debug('errback after getting service %s' % serviceName) 224 self.log.error('Could not retrieve service %s' % serviceName) 225 if serviceName in self.services: 226 del self.services[serviceName] 227 return error 228 229 d = self.perspective.callRemote('getService', 230 serviceName, 231 self.options.monitor, 232 serviceListeningInterface or self) 233 d.addCallback(callback, serviceName) 234 d.addErrback(errback, serviceName) 235 return d 236
237 - def getInitialServices(self):
238 """ 239 After connecting to zenhub, gather our initial list of services. 240 """ 241 def errback(error): 242 if isinstance(error, Failure): 243 self.log.critical( "Invalid monitor: %s" % self.options.monitor) 244 reactor.stop() 245 return defer.fail(RemoteBadMonitor( 246 "Invalid monitor: %s" % self.options.monitor, '')) 247 return error
248 249 self.log.debug('Setting up initial services: %s' % \ 250 ', '.join(self.initialServices)) 251 d = defer.DeferredList( 252 [self.getService(name) for name in self.initialServices], 253 fireOnOneErrback=True, consumeErrors=True) 254 d.addErrback(errback) 255 return d 256 257
258 - def connected(self):
259 pass
260
261 - def run(self):
262 self.log.debug('Starting PBDaemon initialization') 263 d = self.connect() 264 def callback(result): 265 self.sendEvent(self.startEvent) 266 self.pushEventsLoop() 267 self.log.debug('Calling connected.') 268 self.connected() 269 return result
270 d.addCallback(callback) 271 d.addErrback(twisted.python.log.err) 272 reactor.run() 273 if self._customexitcode: 274 sys.exit(self._customexitcode) 275
276 - def sigTerm(self, signum=None, frame=None):
277 try: 278 ZenDaemon.sigTerm(self, signum, frame) 279 except SystemExit: 280 pass
281
282 - def setExitCode(self, exitcode):
283 self._customexitcode = exitcode
284
285 - def stop(self, ignored=''):
286 def stopNow(ignored): 287 if reactor.running: 288 try: 289 reactor.stop() 290 except ReactorNotRunning: 291 self.log.debug("Tried to stop reactor that was stopped")
292 if reactor.running and not self.stopped: 293 self.stopped = True 294 if 'EventService' in self.services: 295 # send stop event if we don't have an implied --cycle, 296 # or if --cycle has been specified 297 if not hasattr(self.options, 'cycle') or \ 298 getattr(self.options, 'cycle', True): 299 self.sendEvent(self.stopEvent) 300 # give the reactor some time to send the shutdown event 301 drive(self.pushEvents).addBoth(stopNow) 302 self.log.debug( "Sent a 'stop' event" ) 303 else: 304 self.log.debug( "No event sent as no EventService available." ) 305 # but not too much time 306 reactor.callLater(1, stopNow, True) # requires bogus arg 307 else: 308 self.log.debug( "stop() called when not running" ) 309
310 - def sendEvents(self, events):
311 map(self.sendEvent, events)
312
313 - def sendEvent(self, event, **kw):
314 ''' Add event to queue of events to be sent. If we have an event 315 service then process the queue. 316 ''' 317 if not reactor.running: return 318 event = event.copy() 319 event['agent'] = self.name 320 event['monitor'] = self.options.monitor 321 event['manager'] = self.fqdn 322 event.update(kw) 323 if not self.options.allowduplicateclears: 324 statusKey = ( event['device'], 325 event.get('component', ''), 326 event.get('eventKey', ''), 327 event.get('eventClass', '') ) 328 severity = event.get('severity', -1) 329 status = self._eventStatus.get(statusKey, -1) 330 self._eventStatus[statusKey] = severity 331 if severity == Clear and status == Clear: 332 self.log.debug("Dropping useless clear event %r", event) 333 return 334 self.eventQueue.append(event) 335 self.log.debug("Queued event (total of %d) %r", 336 len(self.eventQueue), 337 event)
338
339 - def pushEventsLoop(self):
340 """Periodially, wake up and flush events to ZenHub. 341 """ 342 reactor.callLater(self.options.eventflushseconds, self.pushEventsLoop) 343 drive(self.pushEvents) 344 345 # Record the number of events in the queue every 5 minutes. 346 now = time.time() 347 if self.rrdStats.name and now >= (self.lastStats + 300): 348 self.lastStats = now 349 self.sendEvents(self.rrdStats.gauge('eventQueueLength', 350 300, len(self.eventQueue)))
351
352 - def pushEvents(self, driver):
353 """Flush events to ZenHub. 354 """ 355 try: 356 # Set a maximum size on the eventQueue to avoid consuming all RAM. 357 queueLen = len(self.eventQueue) 358 if queueLen > self.options.maxqueuelen: 359 self.log.error( 360 'Discarding oldest %d events because maxqueuelen was ' 361 'exceeded: %d/%d', 362 queueLen - self.options.maxqueuelen, 363 queueLen, self.options.maxqueuelen) 364 diff = queueLen - self.options.maxqueuelen 365 self.eventQueue = self.eventQueue[diff:] 366 367 # are we already shutting down? 368 if not reactor.running: 369 return 370 if self._sendingEvents: 371 return 372 # try to send everything we have, serially 373 self._sendingEvents = True 374 while self.eventQueue: 375 # are still connected to ZenHub? 376 evtSvc = self.services.get('EventService', None) 377 if not evtSvc: break 378 # send the events in large bundles, carefully reducing 379 # the eventQueue in case we get in here more than once 380 chunkSize = self.options.eventflushchunksize 381 events = self.eventQueue[:chunkSize] 382 self.eventQueue = self.eventQueue[chunkSize:] 383 # send the events and wait for the response 384 yield evtSvc.callRemote('sendEvents', events) 385 try: 386 driver.next() 387 except ConnectionLost, ex: 388 self.log.error('Error sending event: %s' % ex) 389 self.eventQueue = events + self.eventQueue 390 break 391 self._sendingEvents = False 392 except Exception, ex: 393 self._sendingEvents = False 394 self.log.exception(ex)
395
396 - def heartbeat(self):
397 'if cycling, send a heartbeat, else, shutdown' 398 if not self.options.cycle: 399 self.stop() 400 return 401 self.sendEvent(self.heartbeatEvent, timeout=self.heartbeatTimeout) 402 # heartbeat is normally 3x cycle time 403 self.niceDoggie(self.heartbeatTimeout / 3)
404 405
406 - def remote_getName(self):
407 return self.name
408 409
410 - def remote_shutdown(self, unused):
411 self.stop() 412 self.sigTerm()
413 414
415 - def remote_setPropertyItems(self, items):
416 pass
417 418 419 @translateError
420 - def remote_updateThresholdClasses(self, classes):
421 from Products.ZenUtils.Utils import importClass 422 self.log.debug("Loading classes %s", classes) 423 for c in classes: 424 try: 425 importClass(c) 426 except ImportError: 427 self.log.error("Unable to import class %s", c)
428 429
430 - def buildOptions(self):
431 self.parser.add_option('--hubhost', 432 dest='hubhost', 433 default=DEFAULT_HUB_HOST, 434 help='Host of zenhub daemon.' 435 ' Default is %s.' % DEFAULT_HUB_HOST) 436 self.parser.add_option('--hubport', 437 dest='hubport', 438 type='int', 439 default=DEFAULT_HUB_PORT, 440 help='Port zenhub listens on.' 441 'Default is %s.' % DEFAULT_HUB_PORT) 442 self.parser.add_option('--hubusername', 443 dest='hubusername', 444 default=DEFAULT_HUB_USERNAME, 445 help='Username for zenhub login.' 446 ' Default is %s.' % DEFAULT_HUB_USERNAME) 447 self.parser.add_option('--hubpassword', 448 dest='hubpassword', 449 default=DEFAULT_HUB_PASSWORD, 450 help='Password for zenhub login.' 451 ' Default is %s.' % DEFAULT_HUB_PASSWORD) 452 self.parser.add_option('--monitor', 453 dest='monitor', 454 default=DEFAULT_HUB_MONITOR, 455 help='Name of monitor instance to use for' 456 ' configuration. Default is %s.' 457 % DEFAULT_HUB_MONITOR) 458 self.parser.add_option('--initialHubTimeout', 459 dest='hubtimeout', 460 type='int', 461 default=30, 462 help='Initial time to wait for a ZenHub ' 463 'connection') 464 self.parser.add_option('--allowduplicateclears', 465 dest='allowduplicateclears', 466 default=False, 467 action='store_true', 468 help='Send clear events even when the most ' 469 'recent event was also a clear event.') 470 471 self.parser.add_option('--eventflushseconds', 472 dest='eventflushseconds', 473 default=5., 474 type='float', 475 help='Seconds between attempts to flush ' 476 'events to ZenHub.') 477 478 self.parser.add_option('--eventflushchunksize', 479 dest='eventflushchunksize', 480 default=50, 481 type='int', 482 help='Number of events to send to ZenHub' 483 'at one time') 484 485 self.parser.add_option('--maxqueuelen', 486 dest='maxqueuelen', 487 default=5000, 488 type='int', 489 help='Maximum number of events to queue') 490 491 492 ZenDaemon.buildOptions(self)
493