Package Products :: Package ZenHub :: Module zenhub
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.zenhub

  1  #! /usr/bin/env python 
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 or (at your 
  9  # option) any later version as published by the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14  __doc__ = """zenhub 
 15   
 16  Provide remote, authenticated, and possibly encrypted two-way 
 17  communications with the Model and Event databases. 
 18   
 19  """ 
 20  import Globals 
 21   
 22  from XmlRpcService import XmlRpcService 
 23   
 24  import time 
 25  import pickle 
 26   
 27  from twisted.cred import portal, checkers, credentials 
 28  from twisted.spread import pb, banana 
 29  banana.SIZE_LIMIT = 1024 * 1024 * 10 
 30   
 31  from twisted.internet import reactor, protocol, defer 
 32  from twisted.web import server, xmlrpc 
 33  from zope.interface import implements 
 34  from zope.component import getUtility 
 35   
 36  from Products.DataCollector.Plugins import loadPlugins 
 37  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 38  from Products.ZenUtils.Utils import zenPath, getExitMessage, unused, load_config_override 
 39  from Products.ZenUtils.DaemonStats import DaemonStats 
 40  from Products.ZenEvents.Event import Event, EventHeartbeat 
 41  from Products.ZenEvents.ZenEventClasses import App_Start 
 42  from Products.ZenMessaging.queuemessaging.interfaces import IEventPublisher 
 43  from Products.ZenHub.services.RenderConfig import RenderConfig 
 44   
 45   
 46  from Products.ZenHub.PBDaemon import RemoteBadMonitor 
 47  pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor) 
 48   
 49  from BTrees.IIBTree import IITreeSet 
 50  from Products.ZenHub.zodb import processInvalidations 
 51   
 52  # Due to the manipulation of sys.path during the loading of plugins, 
 53  # we can get ObjectMap imported both as DataMaps.ObjectMap and the 
 54  # full-path from Products.  The following gets the class registered 
 55  # with the jelly serialization engine under both names: 
 56  # 
 57  #  1st: get Products.DataCollector.plugins.DataMaps.ObjectMap 
 58  from Products.DataCollector.plugins.DataMaps import ObjectMap 
 59  #  2nd: get DataMaps.ObjectMap 
 60  import sys 
 61  sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins')) 
 62  import DataMaps 
 63  unused(DataMaps, ObjectMap) 
 64   
 65   
 66  XML_RPC_PORT = 8081 
 67  PB_PORT = 8789 
 68  ZENHUB_ZENRENDER = 'zenhubrender' 
 69   
70 -class AuthXmlRpcService(XmlRpcService):
71 "Provide some level of authentication for XML/RPC calls" 72
73 - def __init__(self, dmd, checker):
74 XmlRpcService.__init__(self, dmd) 75 self.checker = checker
76 77
78 - def doRender(self, unused, request):
79 """ 80 Call the inherited render engine after authentication succeeds. 81 See @L{XmlRpcService.XmlRpcService.Render}. 82 """ 83 return XmlRpcService.render(self, request)
84 85
86 - def unauthorized(self, request):
87 """ 88 Render an XMLRPC error indicating an authentication failure. 89 @type request: HTTPRequest 90 @param request: the request for this xmlrpc call. 91 @return: None 92 """ 93 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
94 95
96 - def render(self, request):
97 """ 98 Unpack the authorization header and check the credentials. 99 @type request: HTTPRequest 100 @param request: the request for this xmlrpc call. 101 @return: NOT_DONE_YET 102 """ 103 auth = request.received_headers.get('authorization', None) 104 if not auth: 105 self.unauthorized(request) 106 else: 107 try: 108 type, encoded = auth.split() 109 if type not in ('Basic',): 110 self.unauthorized(request) 111 else: 112 user, passwd = encoded.decode('base64').split(':') 113 c = credentials.UsernamePassword(user, passwd) 114 d = self.checker.requestAvatarId(c) 115 d.addCallback(self.doRender, request) 116 def error(unused, request): 117 self.unauthorized(request)
118 d.addErrback(error, request) 119 except Exception: 120 self.unauthorized(request) 121 return server.NOT_DONE_YET
122 123
124 -class HubAvitar(pb.Avatar):
125 """ 126 Connect collectors to their configuration Services 127 """ 128
129 - def __init__(self, hub):
130 self.hub = hub
131
132 - def perspective_getService(self, 133 serviceName, 134 instance = None, 135 listener = None):
136 """ 137 Allow a collector to find a Hub service by name. It also 138 associates the service with a collector so that changes can be 139 pushed back out to collectors. 140 141 @type serviceName: string 142 @param serviceName: a name, like 'EventService' 143 @type instance: string 144 @param instance: the collector's instance name, like 'localhost' 145 @type listener: a remote reference to the collector 146 @param listener: the callback interface to the collector 147 @return a remote reference to a service 148 """ 149 service = self.hub.getService(serviceName, instance) 150 if service is not None and listener: 151 service.addListener(listener) 152 return service
153
154 - def perspective_reportingForWork(self, worker):
155 """ 156 Allow a worker register for work. 157 158 @type worker: a pb.RemoteReference 159 @param worker: a reference to zenhubworker 160 @return None 161 """ 162 worker.busy = False 163 self.hub.workers.append(worker) 164 def removeWorker(worker): 165 if worker in self.hub.workers: 166 self.hub.workers.remove(worker) 167 reactor.callLater(1, self.hub.createWorker)
168 worker.notifyOnDisconnect(removeWorker)
169 170
171 -class HubRealm(object):
172 """ 173 Following the Twisted authentication framework. 174 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html 175 """ 176 implements(portal.IRealm) 177
178 - def __init__(self, hub):
179 self.hubAvitar = HubAvitar(hub)
180
181 - def requestAvatar(self, collName, mind, *interfaces):
182 if pb.IPerspective not in interfaces: 183 raise NotImplementedError 184 return pb.IPerspective, self.hubAvitar, lambda:None
185 186
187 -class WorkerInterceptor(pb.Referenceable):
188 """Redirect service requests to one of the worker processes. Note 189 that everything else (like change notifications) go through 190 locally hosted services.""" 191 192 callTime = 0. 193
194 - def __init__(self, zenhub, service):
195 self.zenhub = zenhub 196 self.service = service
197
198 - def remoteMessageReceived(self, broker, message, args, kw):
199 "Intercept requests and send them down to workers" 200 svc = str(self.service.__class__).rsplit('.', 1)[0] 201 instance = self.service.instance 202 args = broker.unserialize(args) 203 kw = broker.unserialize(kw) 204 # hide the types in the args: subverting the jelly protection mechanism, 205 # but the types just passed through and the worker may not have loaded 206 # the required service before we try passing types for that service 207 args = pickle.dumps( (args, kw) ) 208 result = self.zenhub.deferToWorker( (svc, instance, message, args) ) 209 return broker.serialize(result, self.perspective)
210
211 - def __getattr__(self, attr):
212 "Implement the HubService interface by forwarding to the local service" 213 return getattr(self.service, attr)
214 215
216 -class ZenHub(ZCmdBase):
217 """ 218 Listen for changes to objects in the Zeo database and update the 219 collectors' configuration. 220 221 The remote collectors connect the ZenHub and request configuration 222 information and stay connected. When changes are detected in the 223 Zeo database, configuration updates are sent out to collectors 224 asynchronously. In this way, changes made in the web GUI can 225 affect collection immediately, instead of waiting for a 226 configuration cycle. 227 228 Each collector uses a different, pluggable service within ZenHub 229 to translate objects into configuration and data. ZenPacks can 230 add services for their collectors. Collectors communicate using 231 Twisted's Perspective Broker, which provides authenticated, 232 asynchronous, bidirectional method invocation. 233 234 ZenHub also provides an XmlRPC interface to some common services 235 to support collectors written in other languages. 236 """ 237 238 totalTime = 0. 239 totalEvents = 0 240 totalCallTime = 0. 241 name = 'zenhub' 242
243 - def __init__(self):
244 """ 245 Hook ourselves up to the Zeo database and wait for collectors 246 to connect. 247 """ 248 self.workers = [] 249 self.workList = [] 250 self.worker_processes=set() 251 252 ZCmdBase.__init__(self) 253 self.zem = self.dmd.ZenEventManager 254 loadPlugins(self.dmd) 255 self.services = {} 256 257 er = HubRealm(self) 258 checker = self.loadChecker() 259 pt = portal.Portal(er, [checker]) 260 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt)) 261 262 xmlsvc = AuthXmlRpcService(self.dmd, checker) 263 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc)) 264 265 #start listening for zenrender requests 266 self.renderConfig = RenderConfig(self.dmd, ZENHUB_ZENRENDER ) 267 268 # responsible for sending messages to the queues 269 import Products.ZenMessaging.queuemessaging 270 load_config_override('twistedpublisher.zcml', Products.ZenMessaging.queuemessaging) 271 self.sendEvent(eventClass=App_Start, 272 summary="%s started" % self.name, 273 severity=0) 274 275 self._invalidation_queue = IITreeSet() 276 reactor.callLater(5, self.processQueue) 277 278 self.rrdStats = self.getRRDStats() 279 for i in range(int(self.options.workers)): 280 self.createWorker()
281 282
283 - def _getConf(self):
284 return self.dmd.Monitors.Performance._getOb(self.options.monitor, None)
285
286 - def getRRDStats(self):
287 """ 288 Return the most recent RRD statistic information. 289 """ 290 rrdStats = DaemonStats() 291 perfConf = self._getConf() 292 293 from Products.ZenModel.BuiltInDS import BuiltInDS 294 threshs = perfConf.getThresholdInstances(BuiltInDS.sourcetype) 295 createCommand = getattr(perfConf, 'defaultRRDCreateCommand', None) 296 rrdStats.config(perfConf.id, 'zenhub', threshs, createCommand) 297 298 return rrdStats
299
300 - def processQueue(self):
301 """ 302 Periodically (once a second) process database changes 303 304 @return: None 305 """ 306 now = time.time() 307 self.syncdb() # reads the object invalidations 308 try: 309 self.doProcessQueue() 310 except Exception, ex: 311 self.log.exception(ex) 312 reactor.callLater(1, self.processQueue) 313 self.totalEvents += 1 314 self.totalTime += time.time() - now
315 316
317 - def doProcessQueue(self):
318 """ 319 Perform one cycle of update notifications. 320 321 @return: None 322 """ 323 changes_dict = self.storage.poll_invalidations() 324 queue = self._invalidation_queue 325 if changes_dict is not None: 326 d = processInvalidations(self.dmd, self._invalidation_queue, changes_dict) 327 def done(n): 328 if n: 329 self.log.debug('Processed %s oids' % n)
330 d.addCallback(done)
331 332
333 - def sendEvent(self, **kw):
334 """ 335 Useful method for posting events to the EventManager. 336 337 @type kw: keywords (dict) 338 @param kw: the values for an event: device, summary, etc. 339 @return: None 340 """ 341 if not 'device' in kw: 342 kw['device'] = self.options.monitor 343 if not 'component' in kw: 344 kw['component'] = self.name 345 try: 346 self.zem.sendEvent(Event(**kw)) 347 except: 348 self.log.exception("Unable to send an event")
349
350 - def loadChecker(self):
351 """ 352 Load the password file 353 354 @return: an object satisfying the ICredentialsChecker 355 interface using a password file or an empty list if the file 356 is not available. Uses the file specified in the --passwd 357 command line option. 358 """ 359 try: 360 checker = checkers.FilePasswordDB(self.options.passwordfile) 361 # grab credentials for the workers to login 362 u, p = checker._loadCredentials().next() 363 self.workerUsername, self.workerPassword = u, p 364 return checker 365 except Exception, ex: 366 self.log.exception("Unable to load %s", self.options.passwordfile) 367 return []
368 369
370 - def getService(self, name, instance):
371 """ 372 Helper method to load services dynamically for a collector. 373 Returned instances are cached: reconnecting collectors will 374 get the same service object. 375 376 @type name: string 377 @param name: the dotted-name of the module to load 378 (uses @L{Products.ZenUtils.Utils.importClass}) 379 @param instance: string 380 @param instance: each service serves only one specific collector 381 instances (like 'localhost'). instance defines the collector's 382 instance name. 383 @return: a service loaded from ZenHub/services or one of the zenpacks. 384 """ 385 # Sanity check the names given to us 386 if not self.dmd.Monitors.Performance._getOb(instance, False): 387 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \ 388 self.options.monitor + " is not in the current list" ) 389 390 try: 391 return self.services[name, instance] 392 393 except KeyError: 394 from Products.ZenUtils.Utils import importClass 395 try: 396 ctor = importClass(name) 397 except ImportError: 398 ctor = importClass('Products.ZenHub.services.%s' % name, name) 399 svc = ctor(self.dmd, instance) 400 if self.options.workers: 401 svc = WorkerInterceptor(self, svc) 402 self.services[name, instance] = svc 403 return svc
404
405 - def deferToWorker(self, args):
406 """Take a remote request and queue it for worker processes. 407 408 @type args: tuple 409 @param args: the arguments to the remote_execute() method in the worker 410 @return: a Deferred for the eventual results of the method call 411 412 """ 413 d = defer.Deferred() 414 svcName, instance, method = args[:3] 415 service = self.getService(svcName, instance).service 416 priority = service.getMethodPriority(method) 417 418 if self.options.prioritize: 419 # Insert job into workList so that it stays sorted by priority. 420 for i, job in enumerate(self.workList): 421 if priority < job[1]: 422 self.workList.insert(i, (d, priority, args) ) 423 break 424 else: 425 self.workList.append( (d, priority, args) ) 426 else: 427 # Run jobs on a first come, first serve basis. 428 self.workList.append( (d, priority, args) ) 429 430 self.giveWorkToWorkers() 431 return d
432 433
434 - def giveWorkToWorkers(self):
435 """Parcel out a method invocation to an available worker process 436 """ 437 self.log.debug("worklist has %d items", len(self.workList)) 438 while self.workList: 439 for i, worker in enumerate(self.workers): 440 # linear search is not ideal, but simple enough 441 if not worker.busy: 442 job = self.getJobForWorker(i) 443 if job is None: continue 444 worker.busy = True 445 def finished(result, finishedWorker): 446 finishedWorker.busy = False 447 self.giveWorkToWorkers() 448 return result
449 self.log.debug("Giving %s to worker %d", job[2][2], i) 450 d2 = worker.callRemote('execute', *job[2]) 451 d2.addBoth(finished, worker) 452 d2.chainDeferred(job[0]) 453 break 454 else: 455 self.log.debug("all workers are busy") 456 break 457
458 - def getJobForWorker(self, workerId):
459 if self.options.anyworker: 460 return self.workList.pop(0) 461 else: 462 # Restrict lower priority jobs to a subset of the workers. 463 lenWorkers = float(len(self.workers)) 464 for i in range(len(self.workList)): 465 priority = self.workList[i][1] 466 if priority < (workerId+1) / lenWorkers: 467 return self.workList.pop(i)
468
469 - def createWorker(self):
470 """Start a worker subprocess 471 472 @return: None 473 """ 474 # this probably can't happen, but let's make sure 475 if len(self.workers) >= self.options.workers: 476 return 477 # create a config file for the slave to pass credentials 478 import os, tempfile 479 fd, tmp = tempfile.mkstemp() 480 try: 481 os.write(fd, "hubport %s\n" % self.options.pbport) 482 os.write(fd, "username %s\n" % self.workerUsername) 483 os.write(fd, "password %s\n" % self.workerPassword) 484 os.write(fd, "host %s\n" % self.options.host) 485 os.write(fd, "logseverity %s\n" % self.options.logseverity) 486 os.write(fd, "cachesize %s\n" % self.options.cachesize) 487 finally: 488 os.close(fd) 489 # start the worker 490 exe = zenPath('bin', 'zenhubworker') 491 492 # watch for output, and generally just take notice 493 class WorkerRunningProtocol(protocol.ProcessProtocol): 494 495 def outReceived(s, data): 496 self.log.debug("Worker reports %s" % (data,))
497 498 def errReceived(s, data): 499 self.log.info("Worker reports %s" % (data,)) 500 501 def processEnded(s, reason): 502 os.unlink(tmp) 503 self.worker_processes.discard(s) 504 self.log.warning("Worker exited with status: %d (%s)", 505 reason.value.exitCode, 506 getExitMessage(reason.value.exitCode)) 507 args = (exe, 'run', '-C', tmp) 508 self.log.debug("Starting %s", ' '.join(args)) 509 proc = reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ) 510 self.worker_processes.add(proc)
511 - def heartbeat(self):
512 """ 513 Since we don't do anything on a regular basis, just 514 push heartbeats regularly. 515 516 @return: None 517 """ 518 seconds = 30 519 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds) 520 self.zem.sendEvent(evt) 521 self.niceDoggie(seconds) 522 reactor.callLater(seconds, self.heartbeat) 523 r = self.rrdStats 524 totalTime = sum(s.callTime for s in self.services.values()) 525 self.zem.sendEvents( 526 r.counter('totalTime', seconds, int(self.totalTime * 1000)) + 527 r.counter('totalEvents', seconds, self.totalEvents) + 528 r.gauge('services', seconds, len(self.services)) + 529 r.counter('totalCallTime', seconds, totalTime) + 530 r.gauge('workListLength', seconds, len(self.workList)) 531 )
532 533
534 - def main(self):
535 """ 536 Start the main event loop. 537 """ 538 if self.options.cycle: 539 self.heartbeat() 540 reactor.run() 541 for proc in self.worker_processes: 542 proc.signalProcess('KILL') 543 getUtility(IEventPublisher).close()
544
545 - def buildOptions(self):
546 """ 547 Adds our command line options to ZCmdBase command line options. 548 """ 549 ZCmdBase.buildOptions(self) 550 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport', 551 type='int', default=XML_RPC_PORT, 552 help='Port to use for XML-based Remote Procedure Calls (RPC)') 553 self.parser.add_option('--pbport', dest='pbport', 554 type='int', default=PB_PORT, 555 help="Port to use for Twisted's pb service") 556 self.parser.add_option('--passwd', dest='passwordfile', 557 type='string', default=zenPath('etc','hubpasswd'), 558 help='File where passwords are stored') 559 self.parser.add_option('--monitor', dest='monitor', 560 default='localhost', 561 help='Name of the distributed monitor this hub runs on') 562 self.parser.add_option('--workers', dest='workers', 563 type='int', default=0, 564 help="Number of worker instances to handle requests") 565 self.parser.add_option('--prioritize', dest='prioritize', 566 action='store_true', default=False, 567 help="Run higher priority jobs before lower priority ones") 568 self.parser.add_option('--anyworker', dest='anyworker', 569 action='store_true', default=False, 570 help='Allow any priority job to run on any worker')
571 572 if __name__ == '__main__': 573 z = ZenHub() 574 z.main() 575