1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """zenhub
15
16 Provide remote, authenticated, and possibly encrypted two-way
17 communications with the Model and Event databases.
18
19 """
20 import Globals
21
22 from XmlRpcService import XmlRpcService
23
24 import time
25 import pickle
26
27 from twisted.cred import portal, checkers, credentials
28 from twisted.spread import pb, banana
29 banana.SIZE_LIMIT = 1024 * 1024 * 10
30
31 from twisted.internet import reactor, protocol, defer
32 from twisted.web import server, xmlrpc
33 from zope.interface import implements
34 from zope.component import getUtility
35
36 from Products.DataCollector.Plugins import loadPlugins
37 from Products.ZenUtils.ZCmdBase import ZCmdBase
38 from Products.ZenUtils.Utils import zenPath, getExitMessage, unused, load_config_override
39 from Products.ZenUtils.DaemonStats import DaemonStats
40 from Products.ZenEvents.Event import Event, EventHeartbeat
41 from Products.ZenEvents.ZenEventClasses import App_Start
42 from Products.ZenMessaging.queuemessaging.interfaces import IEventPublisher
43 from Products.ZenHub.services.RenderConfig import RenderConfig
44
45
46 from Products.ZenHub.PBDaemon import RemoteBadMonitor
47 pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor)
48
49 from BTrees.IIBTree import IITreeSet
50 from Products.ZenHub.zodb import processInvalidations
51
52
53
54
55
56
57
58 from Products.DataCollector.plugins.DataMaps import ObjectMap
59
60 import sys
61 sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins'))
62 import DataMaps
63 unused(DataMaps, ObjectMap)
64
65
66 XML_RPC_PORT = 8081
67 PB_PORT = 8789
68 ZENHUB_ZENRENDER = 'zenhubrender'
69
71 "Provide some level of authentication for XML/RPC calls"
72
76
77
79 """
80 Call the inherited render engine after authentication succeeds.
81 See @L{XmlRpcService.XmlRpcService.Render}.
82 """
83 return XmlRpcService.render(self, request)
84
85
87 """
88 Render an XMLRPC error indicating an authentication failure.
89 @type request: HTTPRequest
90 @param request: the request for this xmlrpc call.
91 @return: None
92 """
93 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
94
95
97 """
98 Unpack the authorization header and check the credentials.
99 @type request: HTTPRequest
100 @param request: the request for this xmlrpc call.
101 @return: NOT_DONE_YET
102 """
103 auth = request.received_headers.get('authorization', None)
104 if not auth:
105 self.unauthorized(request)
106 else:
107 try:
108 type, encoded = auth.split()
109 if type not in ('Basic',):
110 self.unauthorized(request)
111 else:
112 user, passwd = encoded.decode('base64').split(':')
113 c = credentials.UsernamePassword(user, passwd)
114 d = self.checker.requestAvatarId(c)
115 d.addCallback(self.doRender, request)
116 def error(unused, request):
117 self.unauthorized(request)
118 d.addErrback(error, request)
119 except Exception:
120 self.unauthorized(request)
121 return server.NOT_DONE_YET
122
123
125 """
126 Connect collectors to their configuration Services
127 """
128
131
136 """
137 Allow a collector to find a Hub service by name. It also
138 associates the service with a collector so that changes can be
139 pushed back out to collectors.
140
141 @type serviceName: string
142 @param serviceName: a name, like 'EventService'
143 @type instance: string
144 @param instance: the collector's instance name, like 'localhost'
145 @type listener: a remote reference to the collector
146 @param listener: the callback interface to the collector
147 @return a remote reference to a service
148 """
149 service = self.hub.getService(serviceName, instance)
150 if service is not None and listener:
151 service.addListener(listener)
152 return service
153
155 """
156 Allow a worker register for work.
157
158 @type worker: a pb.RemoteReference
159 @param worker: a reference to zenhubworker
160 @return None
161 """
162 worker.busy = False
163 self.hub.workers.append(worker)
164 def removeWorker(worker):
165 if worker in self.hub.workers:
166 self.hub.workers.remove(worker)
167 reactor.callLater(1, self.hub.createWorker)
168 worker.notifyOnDisconnect(removeWorker)
169
170
172 """
173 Following the Twisted authentication framework.
174 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html
175 """
176 implements(portal.IRealm)
177
180
182 if pb.IPerspective not in interfaces:
183 raise NotImplementedError
184 return pb.IPerspective, self.hubAvitar, lambda:None
185
186
188 """Redirect service requests to one of the worker processes. Note
189 that everything else (like change notifications) go through
190 locally hosted services."""
191
192 callTime = 0.
193
197
199 "Intercept requests and send them down to workers"
200 svc = str(self.service.__class__).rsplit('.', 1)[0]
201 instance = self.service.instance
202 args = broker.unserialize(args)
203 kw = broker.unserialize(kw)
204
205
206
207 args = pickle.dumps( (args, kw) )
208 result = self.zenhub.deferToWorker( (svc, instance, message, args) )
209 return broker.serialize(result, self.perspective)
210
212 "Implement the HubService interface by forwarding to the local service"
213 return getattr(self.service, attr)
214
215
217 """
218 Listen for changes to objects in the Zeo database and update the
219 collectors' configuration.
220
221 The remote collectors connect the ZenHub and request configuration
222 information and stay connected. When changes are detected in the
223 Zeo database, configuration updates are sent out to collectors
224 asynchronously. In this way, changes made in the web GUI can
225 affect collection immediately, instead of waiting for a
226 configuration cycle.
227
228 Each collector uses a different, pluggable service within ZenHub
229 to translate objects into configuration and data. ZenPacks can
230 add services for their collectors. Collectors communicate using
231 Twisted's Perspective Broker, which provides authenticated,
232 asynchronous, bidirectional method invocation.
233
234 ZenHub also provides an XmlRPC interface to some common services
235 to support collectors written in other languages.
236 """
237
238 totalTime = 0.
239 totalEvents = 0
240 totalCallTime = 0.
241 name = 'zenhub'
242
244 """
245 Hook ourselves up to the Zeo database and wait for collectors
246 to connect.
247 """
248 self.workers = []
249 self.workList = []
250 self.worker_processes=set()
251
252 ZCmdBase.__init__(self)
253 self.zem = self.dmd.ZenEventManager
254 loadPlugins(self.dmd)
255 self.services = {}
256
257 er = HubRealm(self)
258 checker = self.loadChecker()
259 pt = portal.Portal(er, [checker])
260 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt))
261
262 xmlsvc = AuthXmlRpcService(self.dmd, checker)
263 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc))
264
265
266 self.renderConfig = RenderConfig(self.dmd, ZENHUB_ZENRENDER )
267
268
269 import Products.ZenMessaging.queuemessaging
270 load_config_override('twistedpublisher.zcml', Products.ZenMessaging.queuemessaging)
271 self.sendEvent(eventClass=App_Start,
272 summary="%s started" % self.name,
273 severity=0)
274
275 self._invalidation_queue = IITreeSet()
276 reactor.callLater(5, self.processQueue)
277
278 self.rrdStats = self.getRRDStats()
279 for i in range(int(self.options.workers)):
280 self.createWorker()
281
282
284 return self.dmd.Monitors.Performance._getOb(self.options.monitor, None)
285
299
315
316
318 """
319 Perform one cycle of update notifications.
320
321 @return: None
322 """
323 changes_dict = self.storage.poll_invalidations()
324 queue = self._invalidation_queue
325 if changes_dict is not None:
326 d = processInvalidations(self.dmd, self._invalidation_queue, changes_dict)
327 def done(n):
328 if n:
329 self.log.debug('Processed %s oids' % n)
330 d.addCallback(done)
331
332
334 """
335 Useful method for posting events to the EventManager.
336
337 @type kw: keywords (dict)
338 @param kw: the values for an event: device, summary, etc.
339 @return: None
340 """
341 if not 'device' in kw:
342 kw['device'] = self.options.monitor
343 if not 'component' in kw:
344 kw['component'] = self.name
345 try:
346 self.zem.sendEvent(Event(**kw))
347 except:
348 self.log.exception("Unable to send an event")
349
351 """
352 Load the password file
353
354 @return: an object satisfying the ICredentialsChecker
355 interface using a password file or an empty list if the file
356 is not available. Uses the file specified in the --passwd
357 command line option.
358 """
359 try:
360 checker = checkers.FilePasswordDB(self.options.passwordfile)
361
362 u, p = checker._loadCredentials().next()
363 self.workerUsername, self.workerPassword = u, p
364 return checker
365 except Exception, ex:
366 self.log.exception("Unable to load %s", self.options.passwordfile)
367 return []
368
369
371 """
372 Helper method to load services dynamically for a collector.
373 Returned instances are cached: reconnecting collectors will
374 get the same service object.
375
376 @type name: string
377 @param name: the dotted-name of the module to load
378 (uses @L{Products.ZenUtils.Utils.importClass})
379 @param instance: string
380 @param instance: each service serves only one specific collector
381 instances (like 'localhost'). instance defines the collector's
382 instance name.
383 @return: a service loaded from ZenHub/services or one of the zenpacks.
384 """
385
386 if not self.dmd.Monitors.Performance._getOb(instance, False):
387 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \
388 self.options.monitor + " is not in the current list" )
389
390 try:
391 return self.services[name, instance]
392
393 except KeyError:
394 from Products.ZenUtils.Utils import importClass
395 try:
396 ctor = importClass(name)
397 except ImportError:
398 ctor = importClass('Products.ZenHub.services.%s' % name, name)
399 svc = ctor(self.dmd, instance)
400 if self.options.workers:
401 svc = WorkerInterceptor(self, svc)
402 self.services[name, instance] = svc
403 return svc
404
406 """Take a remote request and queue it for worker processes.
407
408 @type args: tuple
409 @param args: the arguments to the remote_execute() method in the worker
410 @return: a Deferred for the eventual results of the method call
411
412 """
413 d = defer.Deferred()
414 svcName, instance, method = args[:3]
415 service = self.getService(svcName, instance).service
416 priority = service.getMethodPriority(method)
417
418 if self.options.prioritize:
419
420 for i, job in enumerate(self.workList):
421 if priority < job[1]:
422 self.workList.insert(i, (d, priority, args) )
423 break
424 else:
425 self.workList.append( (d, priority, args) )
426 else:
427
428 self.workList.append( (d, priority, args) )
429
430 self.giveWorkToWorkers()
431 return d
432
433
435 """Parcel out a method invocation to an available worker process
436 """
437 self.log.debug("worklist has %d items", len(self.workList))
438 while self.workList:
439 for i, worker in enumerate(self.workers):
440
441 if not worker.busy:
442 job = self.getJobForWorker(i)
443 if job is None: continue
444 worker.busy = True
445 def finished(result, finishedWorker):
446 finishedWorker.busy = False
447 self.giveWorkToWorkers()
448 return result
449 self.log.debug("Giving %s to worker %d", job[2][2], i)
450 d2 = worker.callRemote('execute', *job[2])
451 d2.addBoth(finished, worker)
452 d2.chainDeferred(job[0])
453 break
454 else:
455 self.log.debug("all workers are busy")
456 break
457
459 if self.options.anyworker:
460 return self.workList.pop(0)
461 else:
462
463 lenWorkers = float(len(self.workers))
464 for i in range(len(self.workList)):
465 priority = self.workList[i][1]
466 if priority < (workerId+1) / lenWorkers:
467 return self.workList.pop(i)
468
470 """Start a worker subprocess
471
472 @return: None
473 """
474
475 if len(self.workers) >= self.options.workers:
476 return
477
478 import os, tempfile
479 fd, tmp = tempfile.mkstemp()
480 try:
481 os.write(fd, "hubport %s\n" % self.options.pbport)
482 os.write(fd, "username %s\n" % self.workerUsername)
483 os.write(fd, "password %s\n" % self.workerPassword)
484 os.write(fd, "host %s\n" % self.options.host)
485 os.write(fd, "logseverity %s\n" % self.options.logseverity)
486 os.write(fd, "cachesize %s\n" % self.options.cachesize)
487 finally:
488 os.close(fd)
489
490 exe = zenPath('bin', 'zenhubworker')
491
492
493 class WorkerRunningProtocol(protocol.ProcessProtocol):
494
495 def outReceived(s, data):
496 self.log.debug("Worker reports %s" % (data,))
497
498 def errReceived(s, data):
499 self.log.info("Worker reports %s" % (data,))
500
501 def processEnded(s, reason):
502 os.unlink(tmp)
503 self.worker_processes.discard(s)
504 self.log.warning("Worker exited with status: %d (%s)",
505 reason.value.exitCode,
506 getExitMessage(reason.value.exitCode))
507 args = (exe, 'run', '-C', tmp)
508 self.log.debug("Starting %s", ' '.join(args))
509 proc = reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ)
510 self.worker_processes.add(proc)
512 """
513 Since we don't do anything on a regular basis, just
514 push heartbeats regularly.
515
516 @return: None
517 """
518 seconds = 30
519 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds)
520 self.zem.sendEvent(evt)
521 self.niceDoggie(seconds)
522 reactor.callLater(seconds, self.heartbeat)
523 r = self.rrdStats
524 totalTime = sum(s.callTime for s in self.services.values())
525 self.zem.sendEvents(
526 r.counter('totalTime', seconds, int(self.totalTime * 1000)) +
527 r.counter('totalEvents', seconds, self.totalEvents) +
528 r.gauge('services', seconds, len(self.services)) +
529 r.counter('totalCallTime', seconds, totalTime) +
530 r.gauge('workListLength', seconds, len(self.workList))
531 )
532
533
535 """
536 Start the main event loop.
537 """
538 if self.options.cycle:
539 self.heartbeat()
540 reactor.run()
541 for proc in self.worker_processes:
542 proc.signalProcess('KILL')
543 getUtility(IEventPublisher).close()
544
546 """
547 Adds our command line options to ZCmdBase command line options.
548 """
549 ZCmdBase.buildOptions(self)
550 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport',
551 type='int', default=XML_RPC_PORT,
552 help='Port to use for XML-based Remote Procedure Calls (RPC)')
553 self.parser.add_option('--pbport', dest='pbport',
554 type='int', default=PB_PORT,
555 help="Port to use for Twisted's pb service")
556 self.parser.add_option('--passwd', dest='passwordfile',
557 type='string', default=zenPath('etc','hubpasswd'),
558 help='File where passwords are stored')
559 self.parser.add_option('--monitor', dest='monitor',
560 default='localhost',
561 help='Name of the distributed monitor this hub runs on')
562 self.parser.add_option('--workers', dest='workers',
563 type='int', default=0,
564 help="Number of worker instances to handle requests")
565 self.parser.add_option('--prioritize', dest='prioritize',
566 action='store_true', default=False,
567 help="Run higher priority jobs before lower priority ones")
568 self.parser.add_option('--anyworker', dest='anyworker',
569 action='store_true', default=False,
570 help='Allow any priority job to run on any worker')
571
572 if __name__ == '__main__':
573 z = ZenHub()
574 z.main()
575