1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """PBDaemon
15
16 Base for daemons that connect to zenhub
17
18 """
19
20 import sys
21 import time
22 import traceback
23
24 import Globals
25
26 from Products.ZenUtils.ZenDaemon import ZenDaemon
27 from Products.ZenEvents.ZenEventClasses import Heartbeat
28 from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory
29 from Products.ZenUtils.DaemonStats import DaemonStats
30 from Products.ZenUtils.Driver import drive
31 from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \
32 Clear, Warning
33
34 from twisted.cred import credentials
35 from twisted.internet import reactor, defer
36 from twisted.internet.error import ConnectionLost, ReactorNotRunning
37 from twisted.spread import pb
38 from twisted.python.failure import Failure
39 import twisted.python.log
40
41 from ZODB.POSException import ConflictError
44 "Exception that can cross the PB barrier"
46 Exception.__init__(self, msg)
47 self.traceback = tb
49 return Exception.__str__(self) + self.traceback
50
51 pb.setUnjellyableForClass(RemoteException, RemoteException)
55 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError)
59
61 """
62 Decorator function to wrap remote exceptions into something
63 understandable by our daemon.
64
65 @parameter callable: function to wrap
66 @type callable: function
67 @return: function's return or an exception
68 @rtype: various
69 """
70 def inner(*args, **kw):
71 """
72 Interior decorator
73 """
74 try:
75 return callable(*args, **kw)
76 except ConflictError, ex:
77 raise RemoteConflictError(
78 'Remote exception: %s: %s' % (ex.__class__, ex),
79 traceback.format_exc())
80 except Exception, ex:
81 raise RemoteException(
82 'Remote exception: %s: %s' % (ex.__class__, ex),
83 traceback.format_exc())
84 return inner
85
86
87 PB_PORT = 8789
88
89 startEvent = {
90 'eventClass': App_Start,
91 'summary': 'started',
92 'severity': Clear,
93 }
94
95 stopEvent = {
96 'eventClass':App_Stop,
97 'summary': 'stopped',
98 'severity': Warning,
99 }
100
101
102 DEFAULT_HUB_HOST = 'localhost'
103 DEFAULT_HUB_PORT = PB_PORT
104 DEFAULT_HUB_USERNAME = 'admin'
105 DEFAULT_HUB_PASSWORD = 'zenoss'
106 DEFAULT_HUB_MONITOR = 'localhost'
109
114
115 -class PBDaemon(ZenDaemon, pb.Referenceable):
116
117 name = 'pbdaemon'
118 initialServices = ['EventService']
119 heartbeatEvent = {'eventClass':Heartbeat}
120 heartbeatTimeout = 60*3
121 _customexitcode = 0
122 _sendingEvents = False
123
124 - def __init__(self, noopts=0, keeproot=False, name=None):
152
154 """
155 This gets called every time we reconnect.
156
157 @parameter perspective: Twisted perspective object
158 @type perspective: Twisted perspective object
159 """
160 self.log.info("Connected to ZenHub")
161 self.perspective = perspective
162 d2 = self.getInitialServices()
163 if self.initialConnect:
164 self.log.debug('Chaining getInitialServices with d2')
165 self.initialConnect, d = None, self.initialConnect
166 d2.chainDeferred(d)
167
168
183 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect)
184 return self.initialConnect
185
187 self.log.error('Timeout connecting to zenhub: is it running?')
188 pass
189
192
193
195 if not svcName in self.services:
196 self.log.warning('No service %s named: ZenHub may be disconnected' % svcName)
197 return self.services.get(svcName, None) or FakeRemote()
198
199
200 - def getService(self, serviceName, serviceListeningInterface=None):
201 """
202 Attempt to get a service from zenhub. Returns a deferred.
203 When service is retrieved it is stashed in self.services with
204 serviceName as the key. When getService is called it will first
205 check self.services and if serviceName is already there it will return
206 the entry from self.services wrapped in a defer.succeed
207 """
208 if serviceName in self.services:
209 return defer.succeed(self.services[serviceName])
210
211 def removeService(ignored):
212 self.log.debug('Removing service %s' % serviceName)
213 if serviceName in self.services:
214 del self.services[serviceName]
215
216 def callback(result, serviceName):
217 self.log.debug('Loaded service %s from zenhub' % serviceName)
218 self.services[serviceName] = result
219 result.notifyOnDisconnect(removeService)
220 return result
221
222 def errback(error, serviceName):
223 self.log.debug('errback after getting service %s' % serviceName)
224 self.log.error('Could not retrieve service %s' % serviceName)
225 if serviceName in self.services:
226 del self.services[serviceName]
227 return error
228
229 d = self.perspective.callRemote('getService',
230 serviceName,
231 self.options.monitor,
232 serviceListeningInterface or self)
233 d.addCallback(callback, serviceName)
234 d.addErrback(errback, serviceName)
235 return d
236
238 """
239 After connecting to zenhub, gather our initial list of services.
240 """
241 def errback(error):
242 if isinstance(error, Failure):
243 self.log.critical( "Invalid monitor: %s" % self.options.monitor)
244 reactor.stop()
245 return defer.fail(RemoteBadMonitor(
246 "Invalid monitor: %s" % self.options.monitor, ''))
247 return error
248
249 self.log.debug('Setting up initial services: %s' % \
250 ', '.join(self.initialServices))
251 d = defer.DeferredList(
252 [self.getService(name) for name in self.initialServices],
253 fireOnOneErrback=True, consumeErrors=True)
254 d.addErrback(errback)
255 return d
256
257
260
270 d.addCallback(callback)
271 d.addErrback(twisted.python.log.err)
272 reactor.run()
273 if self._customexitcode:
274 sys.exit(self._customexitcode)
275
276 - def sigTerm(self, signum=None, frame=None):
277 try:
278 ZenDaemon.sigTerm(self, signum, frame)
279 except SystemExit:
280 pass
281
284
285 - def stop(self, ignored=''):
286 def stopNow(ignored):
287 if reactor.running:
288 try:
289 reactor.stop()
290 except ReactorNotRunning:
291 self.log.debug("Tried to stop reactor that was stopped")
292 if reactor.running and not self.stopped:
293 self.stopped = True
294 if 'EventService' in self.services:
295
296
297 if not hasattr(self.options, 'cycle') or \
298 getattr(self.options, 'cycle', True):
299 self.sendEvent(self.stopEvent)
300
301 drive(self.pushEvents).addBoth(stopNow)
302 self.log.debug( "Sent a 'stop' event" )
303 else:
304 self.log.debug( "No event sent as no EventService available." )
305
306 reactor.callLater(1, stopNow, True)
307 else:
308 self.log.debug( "stop() called when not running" )
309
312
314 ''' Add event to queue of events to be sent. If we have an event
315 service then process the queue.
316 '''
317 if not reactor.running: return
318 event = event.copy()
319 event['agent'] = self.name
320 event['monitor'] = self.options.monitor
321 event['manager'] = self.fqdn
322 event.update(kw)
323 if not self.options.allowduplicateclears:
324 statusKey = ( event['device'],
325 event.get('component', ''),
326 event.get('eventKey', ''),
327 event.get('eventClass', '') )
328 severity = event.get('severity', -1)
329 status = self._eventStatus.get(statusKey, -1)
330 self._eventStatus[statusKey] = severity
331 if severity == Clear and status == Clear:
332 self.log.debug("Dropping useless clear event %r", event)
333 return
334 self.eventQueue.append(event)
335 self.log.debug("Queued event (total of %d) %r",
336 len(self.eventQueue),
337 event)
338
340 """Periodially, wake up and flush events to ZenHub.
341 """
342 reactor.callLater(self.options.eventflushseconds, self.pushEventsLoop)
343 drive(self.pushEvents)
344
345
346 now = time.time()
347 if self.rrdStats.name and now >= (self.lastStats + 300):
348 self.lastStats = now
349 self.sendEvents(self.rrdStats.gauge('eventQueueLength',
350 300, len(self.eventQueue)))
351
353 """Flush events to ZenHub.
354 """
355 try:
356
357 queueLen = len(self.eventQueue)
358 if queueLen > self.options.maxqueuelen:
359 self.log.error(
360 'Discarding oldest %d events because maxqueuelen was '
361 'exceeded: %d/%d',
362 queueLen - self.options.maxqueuelen,
363 queueLen, self.options.maxqueuelen)
364 diff = queueLen - self.options.maxqueuelen
365 self.eventQueue = self.eventQueue[diff:]
366
367
368 if not reactor.running:
369 return
370 if self._sendingEvents:
371 return
372
373 self._sendingEvents = True
374 while self.eventQueue:
375
376 evtSvc = self.services.get('EventService', None)
377 if not evtSvc: break
378
379
380 chunkSize = self.options.eventflushchunksize
381 events = self.eventQueue[:chunkSize]
382 self.eventQueue = self.eventQueue[chunkSize:]
383
384 yield evtSvc.callRemote('sendEvents', events)
385 try:
386 driver.next()
387 except ConnectionLost, ex:
388 self.log.error('Error sending event: %s' % ex)
389 self.eventQueue = events + self.eventQueue
390 break
391 self._sendingEvents = False
392 except Exception, ex:
393 self._sendingEvents = False
394 self.log.exception(ex)
395
404
405
408
409
413
414
417
418
419 @translateError
428
429
431 self.parser.add_option('--hubhost',
432 dest='hubhost',
433 default=DEFAULT_HUB_HOST,
434 help='Host of zenhub daemon.'
435 ' Default is %s.' % DEFAULT_HUB_HOST)
436 self.parser.add_option('--hubport',
437 dest='hubport',
438 type='int',
439 default=DEFAULT_HUB_PORT,
440 help='Port zenhub listens on.'
441 'Default is %s.' % DEFAULT_HUB_PORT)
442 self.parser.add_option('--hubusername',
443 dest='hubusername',
444 default=DEFAULT_HUB_USERNAME,
445 help='Username for zenhub login.'
446 ' Default is %s.' % DEFAULT_HUB_USERNAME)
447 self.parser.add_option('--hubpassword',
448 dest='hubpassword',
449 default=DEFAULT_HUB_PASSWORD,
450 help='Password for zenhub login.'
451 ' Default is %s.' % DEFAULT_HUB_PASSWORD)
452 self.parser.add_option('--monitor',
453 dest='monitor',
454 default=DEFAULT_HUB_MONITOR,
455 help='Name of monitor instance to use for'
456 ' configuration. Default is %s.'
457 % DEFAULT_HUB_MONITOR)
458 self.parser.add_option('--initialHubTimeout',
459 dest='hubtimeout',
460 type='int',
461 default=30,
462 help='Initial time to wait for a ZenHub '
463 'connection')
464 self.parser.add_option('--allowduplicateclears',
465 dest='allowduplicateclears',
466 default=False,
467 action='store_true',
468 help='Send clear events even when the most '
469 'recent event was also a clear event.')
470
471 self.parser.add_option('--eventflushseconds',
472 dest='eventflushseconds',
473 default=5.,
474 type='float',
475 help='Seconds between attempts to flush '
476 'events to ZenHub.')
477
478 self.parser.add_option('--eventflushchunksize',
479 dest='eventflushchunksize',
480 default=50,
481 type='int',
482 help='Number of events to send to ZenHub'
483 'at one time')
484
485 self.parser.add_option('--maxqueuelen',
486 dest='maxqueuelen',
487 default=5000,
488 type='int',
489 help='Maximum number of events to queue')
490
491
492 ZenDaemon.buildOptions(self)
493