Package Products :: Package ZenHub :: Module zenhubworker
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.zenhubworker

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2008, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  import Globals 
 14  from Products.DataCollector.Plugins import loadPlugins 
 15  from Products.ZenHub.zenhub import PB_PORT 
 16  from Products.ZenHub.PBDaemon import translateError, RemoteConflictError 
 17  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 18  from Products.ZenUtils.Utils import unused 
 19  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 20  # required to allow modeling with zenhubworker 
 21  from Products.DataCollector.plugins import DataMaps 
 22  unused(DataMaps) 
 23   
 24  from twisted.cred import credentials 
 25  from twisted.spread import pb 
 26  from twisted.internet import reactor 
 27  from ZODB.POSException import ConflictError 
 28   
 29  import pickle 
 30  import time 
31 32 -class zenhubworker(ZCmdBase, pb.Referenceable):
33 "Execute ZenHub requests in separate process" 34
35 - def __init__(self):
36 ZCmdBase.__init__(self) 37 self.zem = self.dmd.ZenEventManager 38 loadPlugins(self.dmd) 39 self.services = {} 40 factory = ReconnectingPBClientFactory() 41 self.log.debug("Connecting to %s:%d", 42 self.options.hubhost, 43 self.options.hubport) 44 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 45 self.log.debug("Logging in as %s", self.options.username) 46 c = credentials.UsernamePassword(self.options.username, 47 self.options.password) 48 factory.gotPerspective = self.gotPerspective 49 def stop(*args): 50 reactor.callLater(0, reactor.stop)
51 factory.clientConnectionLost = stop 52 factory.startLogin(c)
53
54 - def gotPerspective(self, perspective):
55 "Once we are connected to zenhub, register ourselves" 56 d = perspective.callRemote('reportingForWork', self) 57 def reportProblem(why): 58 self.log.error("Unable to report for work: %s", why) 59 reactor.stop()
60 d.addErrback(reportProblem) 61
62 - def _getService(self, name, instance):
63 """Utility method to create the service (like PingConfig) 64 for instance (like localhost) 65 66 @type name: string 67 @param name: the dotted-name of the module to load 68 (uses @L{Products.ZenUtils.Utils.importClass}) 69 @param instance: string 70 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 71 @return: a service loaded from ZenHub/services or one of the zenpacks. 72 """ 73 try: 74 return self.services[name, instance] 75 except KeyError: 76 from Products.ZenUtils.Utils import importClass 77 try: 78 ctor = importClass(name) 79 except ImportError: 80 ctor = importClass('Products.ZenHub.services.%s' % name, name) 81 svc = ctor(self.dmd, instance) 82 self.services[name, instance] = svc 83 return svc
84 85 @translateError
86 - def remote_execute(self, service, instance, method, args):
87 """Execute requests on behalf of zenhub 88 @type service: string 89 @param service: the name of a service, like PingConfig 90 91 @type instance: string 92 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 93 94 @type method: string 95 @param method: the name of the called method, like getPingTree 96 97 @type args: tuple 98 @param args: arguments to the method 99 100 @type kw: dictionary 101 @param kw: keyword arguments to the method 102 """ 103 self.log.debug("Servicing %s in %s", method, service) 104 now = time.time() 105 service = self._getService(service, instance) 106 m = getattr(service, 'remote_' + method) 107 # now that the service is loaded, we can unpack the arguments 108 args, kw = pickle.loads(args) 109 def runOnce(): 110 self.syncdb() 111 res = m(*args, **kw) 112 return res
113 try: 114 for i in range(4): 115 try: 116 return runOnce() 117 except RemoteConflictError, ex: 118 pass 119 # one last try, but don't hide the exception 120 return runOnce() 121 finally: 122 secs = time.time() - now 123 self.log.debug("Time in %s: %.2f", method, secs) 124 service.callTime += secs 125
126 - def buildOptions(self):
127 """Options, mostly to find where zenhub lives 128 These options should be passed (by file) from zenhub. 129 """ 130 ZCmdBase.buildOptions(self) 131 self.parser.add_option('--hubhost', 132 dest='hubhost', 133 default='localhost', 134 help="Host to use for connecting to ZenHub") 135 self.parser.add_option('--hubport', 136 dest='hubport', 137 type='int', 138 help="Port to use for connecting to ZenHub", 139 default=PB_PORT) 140 self.parser.add_option('--username', 141 dest='username', 142 help="Login name to use when connecting to ZenHub", 143 default='zenoss') 144 self.parser.add_option('--password', 145 dest='password', 146 help="password to use when connecting to ZenHub", 147 default='zenoss')
148 149 if __name__ == '__main__': 150 zhw = zenhubworker() 151 reactor.run() 152