Package Products :: Package DataCollector :: Module zenmodeler
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.zenmodeler

   1  ########################################################################## 
   2  # 
   3  # This program is part of Zenoss Core, an open source monitoring platform. 
   4  # Copyright (C) 2007, Zenoss Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify it 
   7  # under the terms of the GNU General Public License version 2 or (at your 
   8  # option) any later version as published by the Free Software Foundation. 
   9  # 
  10  # For complete information please visit: http://www.zenoss.com/oss/ 
  11  # 
  12  ########################################################################### 
  13   
  14  __doc__= """Discover (aka model) a device and its components. 
  15  For instance, find out what Ethernet interfaces and hard disks a server 
  16  has available. 
  17  This information should change much less frequently than performance metrics. 
  18  """ 
  19   
  20  # IMPORTANT! The import of the pysamba.twisted.reactor module should come before 
  21  # any other libraries that might possibly use twisted. This will ensure that 
  22  # the proper WmiReactor is installed before anyone else grabs a reference to 
  23  # the wrong reactor. 
  24  import pysamba.twisted.reactor 
  25   
  26  import Globals 
  27  from Products.ZenWin.WMIClient import WMIClient 
  28  from Products.ZenWin.utils import addNTLMv2Option, setNTLMv2Auth 
  29  from Products.ZenHub.PBDaemon import FakeRemote, PBDaemon 
  30  from Products.ZenUtils.DaemonStats import DaemonStats 
  31  from Products.ZenUtils.Driver import drive, driveLater 
  32  from Products.ZenUtils.Utils import unused 
  33  from Products.ZenEvents.ZenEventClasses import Heartbeat, Error 
  34   
  35  from PythonClient   import PythonClient 
  36  from SshClient      import SshClient 
  37  from TelnetClient   import TelnetClient, buildOptions as TCbuildOptions 
  38  from SnmpClient     import SnmpClient 
  39  from PortscanClient import PortscanClient 
  40   
  41  from Products.DataCollector import Classifier 
  42   
  43  from twisted.internet import reactor 
  44  from twisted.internet.defer import succeed 
  45   
  46  import time 
  47  import re 
  48  import DateTime 
  49   
  50  import os 
  51  import os.path 
  52  import sys 
  53  import traceback 
  54  from random import randint 
  55   
  56  defaultPortScanTimeout = 5 
  57  defaultParallel = 1 
  58  defaultProtocol = "ssh" 
  59  defaultPort = 22 
  60   
  61  # needed for Twisted's PB (Perspective Broker) to work 
  62  from Products.DataCollector import DeviceProxy 
  63  from Products.DataCollector import Plugins 
  64  unused(DeviceProxy, Plugins) 
  65   
66 -class ZenModeler(PBDaemon):
67 """ 68 Daemon class to attach to zenhub and pass along 69 device configuration information. 70 """ 71 72 name = 'zenmodeler' 73 initialServices = PBDaemon.initialServices + ['ModelerService'] 74 75 generateEvents = True 76 configCycleInterval = 360 77 78 classCollectorPlugins = () 79
80 - def __init__(self, single=False ):
81 """ 82 Initalizer 83 84 @param single: collect from a single device? 85 @type single: boolean 86 """ 87 PBDaemon.__init__(self) 88 # FIXME: cleanup --force option #2660 89 self.options.force = True 90 self.start = None 91 self.rrdStats = DaemonStats() 92 self.single = single 93 if self.options.device: 94 self.single = True 95 self.modelerCycleInterval = self.options.cycletime 96 self.collage = float( self.options.collage ) / 1440.0 97 self.pendingNewClients = False 98 self.clients = [] 99 self.finished = [] 100 self.devicegen = None 101 102 # Delay start for between 10 and 60 minutes when run as a daemon. 103 self.started = False 104 self.startDelay = 0 105 if self.options.daemon: 106 if self.options.now: 107 self.log.debug("Run as a daemon, starting immediately.") 108 else: 109 # self.startDelay = randint(10, 60) * 60 110 self.startDelay = randint(10, 60) * 1 111 self.log.info("Run as a daemon, waiting %s seconds to start." % 112 self.startDelay) 113 else: 114 self.log.debug("Run in foreground, starting immediately.")
115 116
117 - def reportError(self, error):
118 """ 119 Log errors that have occurred 120 121 @param error: error message 122 @type error: string 123 """ 124 self.log.error("Error occured: %s", error)
125 126
127 - def connected(self):
128 """ 129 Called after connected to the zenhub service 130 """ 131 d = self.configure() 132 d.addCallback(self.heartbeat) 133 d.addErrback(self.reportError)
134 135
136 - def configure(self):
137 """ 138 Get our configuration from zenhub 139 """ 140 # add in the code to fetch cycle time, etc. 141 def inner(driver): 142 """ 143 Generator function to gather our configuration 144 145 @param driver: driver object 146 @type driver: driver object 147 """ 148 self.log.debug('fetching monitor properties') 149 yield self.config().callRemote('propertyItems') 150 items = dict(driver.next()) 151 if self.options.cycletime == 0: 152 self.modelerCycleInterval = items.get('modelerCycleInterval', 153 self.modelerCycleInterval) 154 self.configCycleInterval = items.get('configCycleInterval', 155 self.configCycleInterval) 156 reactor.callLater(self.configCycleInterval * 60, self.configure) 157 158 self.log.debug("Getting threshold classes...") 159 yield self.config().callRemote('getThresholdClasses') 160 self.remote_updateThresholdClasses(driver.next()) 161 162 self.log.debug("Fetching default RRDCreateCommand...") 163 yield self.config().callRemote('getDefaultRRDCreateCommand') 164 createCommand = driver.next() 165 166 self.log.debug("Getting collector thresholds...") 167 yield self.config().callRemote('getCollectorThresholds') 168 self.rrdStats.config(self.options.monitor, 169 self.name, 170 driver.next(), 171 createCommand) 172 173 self.log.debug("Getting collector plugins for each DeviceClass") 174 yield self.config().callRemote('getClassCollectorPlugins') 175 self.classCollectorPlugins = driver.next()
176 177 return drive(inner)
178 179
180 - def config(self):
181 """ 182 Get the ModelerService 183 """ 184 return self.services.get('ModelerService', FakeRemote())
185 186
187 - def selectPlugins(self, device, transport):
188 """ 189 Build a list of active plugins for a device, based on: 190 191 * the --collect command-line option which is a regex 192 * the --ignore command-line option which is a regex 193 * transport which is a string describing the type of plugin 194 195 @param device: device to collect against 196 @type device: string 197 @param transport: python, ssh, snmp, telnet, cmd 198 @type transport: string 199 @return: results of the plugin 200 @type: string 201 @todo: determine if an event for the collector AND the device should be sent 202 """ 203 plugins = [] 204 valid_loaders = [] 205 for loader in device.plugins: 206 try: 207 plugin= loader.create() 208 self.log.debug( "Loaded plugin %s" % plugin.name() ) 209 plugins.append( plugin ) 210 valid_loaders.append( loader ) 211 212 except (SystemExit, KeyboardInterrupt), ex: 213 self.log.info( "Interrupted by external signal (%s)" % str(ex) ) 214 raise 215 216 except Plugins.PluginImportError, import_error: 217 import socket 218 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 219 collector_host= socket.gethostname() 220 # NB: an import errror affects all devices, 221 # so report the issue against the collector 222 # TOOD: determine if an event for the collector AND the device should be sent 223 evt= { "eventClass":"/Status/Update", "component":component, 224 "agent":collector_host, "device":collector_host, 225 "severity":Error } 226 227 info= "Problem loading plugin %s" % import_error.plugin 228 self.log.error( info ) 229 evt[ 'summary' ]= info 230 231 info= import_error.traceback 232 self.log.error( info ) 233 evt[ 'message' ]= info 234 235 info= ("Due to import errors, removing the %s plugin" 236 " from this collection cycle.") % import_error.plugin 237 self.log.error( info ) 238 evt[ 'message' ] += "%s\n" % info 239 self.sendEvent( evt ) 240 241 # Make sure that we don't generate messages for bad loaders again 242 # NB: doesn't update the device's zProperties 243 if len( device.plugins ) != len( valid_loaders ): 244 device.plugins= valid_loaders 245 246 # Create functions to search for what plugins we will and 247 # won't supply to the device 248 collectTest = lambda x: False 249 ignoreTest = lambda x: False 250 if self.options.collectPlugins: 251 collectTest = re.compile(self.options.collectPlugins).search 252 elif self.options.ignorePlugins: 253 ignoreTest = re.compile(self.options.ignorePlugins).search 254 255 result = [] 256 for plugin in plugins: 257 if plugin.transport != transport: 258 continue 259 name = plugin.name() 260 if ignoreTest(name): 261 self.log.debug("Ignoring %s on %s because of --ignore flag", 262 name, device.id) 263 elif collectTest(name): 264 self.log.debug("Using %s on %s because of --collect flag", 265 name, device.id) 266 result.append(plugin) 267 elif not self.options.collectPlugins: 268 self.log.debug("Using %s on %s", name, device.id) 269 result.append(plugin) 270 return result
271 272 273
274 - def collectDevice(self, device):
275 """ 276 Collect data from a single device. 277 278 @param device: device to collect against 279 @type device: string 280 """ 281 clientTimeout = getattr(device, 'zCollectorClientTimeout', 180) 282 ip = device.manageIp 283 timeout = clientTimeout + time.time() 284 self.wmiCollect(device, ip, timeout) 285 self.pythonCollect(device, ip, timeout) 286 self.cmdCollect(device, ip, timeout) 287 self.snmpCollect(device, ip, timeout) 288 self.portscanCollect(device, ip, timeout)
289 290 291
292 - def wmiCollect(self, device, ip, timeout):
293 """ 294 Start the Windows Management Instrumentation (WMI) collector 295 296 @param device: device to collect against 297 @type device: string 298 @param ip: IP address of device to collect against 299 @type ip: string 300 @param timeout: timeout before failing the connection 301 @type timeout: integer 302 """ 303 if self.options.nowmi: 304 return 305 306 client = None 307 try: 308 plugins = self.selectPlugins(device, 'wmi') 309 if not plugins: 310 self.log.info("No WMI plugins found for %s" % device.id) 311 return 312 if self.checkCollection(device): 313 self.log.info('WMI collector method for device %s' % device.id) 314 self.log.info("plugins: %s", 315 ", ".join(map(lambda p: p.name(), plugins))) 316 client = WMIClient(device, self, plugins) 317 if not client or not plugins: 318 self.log.warn("WMI collector creation failed") 319 return 320 except (SystemExit, KeyboardInterrupt): 321 raise 322 except Exception: 323 self.log.exception("Error opening WMI collector") 324 self.addClient(client, timeout, 'WMI', device.id)
325 326 327
328 - def pythonCollect(self, device, ip, timeout):
329 """ 330 Start local Python collection client. 331 332 @param device: device to collect against 333 @type device: string 334 @param ip: IP address of device to collect against 335 @type ip: string 336 @param timeout: timeout before failing the connection 337 @type timeout: integer 338 """ 339 client = None 340 try: 341 plugins = self.selectPlugins(device, "python") 342 if not plugins: 343 self.log.info("No Python plugins found for %s" % device.id) 344 return 345 if self.checkCollection(device): 346 self.log.info('Python collection device %s' % device.id) 347 self.log.info("plugins: %s", 348 ", ".join(map(lambda p: p.name(), plugins))) 349 client = PythonClient(device, self, plugins) 350 if not client or not plugins: 351 self.log.warn("Python client creation failed") 352 return 353 except (SystemExit, KeyboardInterrupt): raise 354 except: 355 self.log.exception("Error opening pythonclient") 356 self.addClient(client, timeout, 'python', device.id)
357 358
359 - def cmdCollect(self, device, ip, timeout):
360 """ 361 Start shell command collection client. 362 363 @param device: device to collect against 364 @type device: string 365 @param ip: IP address of device to collect against 366 @type ip: string 367 @param timeout: timeout before failing the connection 368 @type timeout: integer 369 """ 370 client = None 371 clientType = 'snmp' # default to SNMP if we can't figure out a protocol 372 373 hostname = device.id 374 try: 375 plugins = self.selectPlugins(device,"command") 376 if not plugins: 377 self.log.info("No command plugins found for %s" % hostname) 378 return 379 380 protocol = getattr(device, 'zCommandProtocol', defaultProtocol) 381 commandPort = getattr(device, 'zCommandPort', defaultPort) 382 383 if protocol == "ssh": 384 client = SshClient(hostname, ip, commandPort, 385 options=self.options, 386 plugins=plugins, device=device, 387 datacollector=self, isLoseConnection=True) 388 clientType = 'ssh' 389 self.log.info('Using SSH collection method for device %s' 390 % hostname) 391 392 elif protocol == 'telnet': 393 if commandPort == 22: commandPort = 23 #set default telnet 394 client = TelnetClient(hostname, ip, commandPort, 395 options=self.options, 396 plugins=plugins, device=device, 397 datacollector=self) 398 clientType = 'telnet' 399 self.log.info('Using telnet collection method for device %s' 400 % hostname) 401 402 else: 403 info = ("Unknown protocol %s for device %s -- " 404 "defaulting to %s collection method" % 405 (protocol, hostname, clientType )) 406 self.log.warn( info ) 407 import socket 408 component, _ = os.path.splitext( os.path.basename( sys.argv[0] ) ) 409 collector_host= socket.gethostname() 410 evt= { "eventClass":"/Status/Update", "agent":collector_host, 411 "device":hostname, "severity":Error } 412 evt[ 'summary' ]= info 413 self.sendEvent( evt ) 414 return 415 416 if not client: 417 self.log.warn("Shell command collector creation failed") 418 else: 419 self.log.info("plugins: %s", 420 ", ".join(map(lambda p: p.name(), plugins))) 421 except (SystemExit, KeyboardInterrupt): raise 422 except: 423 self.log.exception("Error opening command collector") 424 self.addClient(client, timeout, clientType, device.id)
425 426 427
428 - def snmpCollect(self, device, ip, timeout):
429 """ 430 Start SNMP collection client. 431 432 @param device: device to collect against 433 @type device: string 434 @param ip: IP address of device to collect against 435 @type ip: string 436 @param timeout: timeout before failing the connection 437 @type timeout: integer 438 """ 439 client = None 440 try: 441 hostname = device.id 442 if getattr( device, "zSnmpMonitorIgnore", True ): 443 self.log.info("SNMP monitoring off for %s" % hostname) 444 return 445 446 if not ip: 447 self.log.info("No manage IP for %s" % hostname) 448 return 449 450 plugins = [] 451 plugins = self.selectPlugins(device,"snmp") 452 if not plugins: 453 self.log.info("No SNMP plugins found for %s" % hostname) 454 return 455 456 if self.checkCollection(device): 457 self.log.info('SNMP collection device %s' % hostname) 458 self.log.info("plugins: %s", 459 ", ".join(map(lambda p: p.name(), plugins))) 460 client = SnmpClient(device.id, ip, self.options, 461 device, self, plugins) 462 if not client or not plugins: 463 self.log.warn("SNMP collector creation failed") 464 return 465 except (SystemExit, KeyboardInterrupt): raise 466 except: 467 self.log.exception("Error opening the SNMP collector") 468 self.addClient(client, timeout, 'SNMP', device.id)
469 470 471 ######## need to make async test for snmp work at some point -EAD ######### 472 # def checkSnmpConnection(self, device): 473 # """ 474 # Check to see if our current community string is still valid 475 # 476 # @param device: the device against which we will check 477 # @type device: a Device instance 478 # @return: result is None or a tuple containing 479 # (community, port, version, snmp name) 480 # @rtype: deferred: Twisted deferred 481 # """ 482 # from pynetsnmp.twistedsnmp import AgentProxy 483 # 484 # def inner(driver): 485 # self.log.debug("Checking SNMP community %s on %s", 486 # device.zSnmpCommunity, device.id) 487 # 488 # oid = ".1.3.6.1.2.1.1.5.0" 489 # proxy = AgentProxy(device.id, 490 # device.zSnmpPort, 491 # timeout=device.zSnmpTimeout, 492 # community=device.zSnmpCommunity, 493 # snmpVersion=device.zSnmpVer, 494 # tries=2) 495 # proxy.open() 496 # yield proxy.get([oid]) 497 # devname = driver.next().values()[0] 498 # if devname: 499 # yield succeed(True) 500 # yield succeed(False) 501 # 502 # return drive(inner) 503 504
505 - def addClient(self, device, timeout, clientType, name):
506 """ 507 If device is not None, schedule the device to be collected. 508 Otherwise log an error. 509 510 @param device: device to collect against 511 @type device: string 512 @param timeout: timeout before failing the connection 513 @type timeout: integer 514 @param clientType: description of the plugin type 515 @type clientType: string 516 @param name: plugin name 517 @type name: string 518 """ 519 if device: 520 device.timeout = timeout 521 device.timedOut = False 522 self.clients.append(device) 523 device.run() 524 else: 525 self.log.warn('Unable to create a %s collector for %s', 526 clientType, name)
527 528 529 # XXX double-check this, once the implementation is in place
530 - def portscanCollect(self, device, ip, timeout):
531 """ 532 Start portscan collection client. 533 534 @param device: device to collect against 535 @type device: string 536 @param ip: IP address of device to collect against 537 @type ip: string 538 @param timeout: timeout before failing the connection 539 @type timeout: integer 540 """ 541 client = None 542 try: 543 hostname = device.id 544 plugins = self.selectPlugins(device, "portscan") 545 if not plugins: 546 self.log.info("No portscan plugins found for %s" % hostname) 547 return 548 if self.checkCollection(device): 549 self.log.info('Portscan collector method for device %s' 550 % hostname) 551 self.log.info("plugins: %s", 552 ", ".join(map(lambda p: p.name(), plugins))) 553 client = PortscanClient(device.id, ip, self.options, 554 device, self, plugins) 555 if not client or not plugins: 556 self.log.warn("Portscan collector creation failed") 557 return 558 except (SystemExit, KeyboardInterrupt): raise 559 except: 560 self.log.exception("Error opening portscan collector") 561 self.addClient(client, timeout, 'portscan', device.id)
562 563
564 - def checkCollection(self, device):
565 """ 566 See how old the data is that we've collected 567 568 @param device: device to collect against 569 @type device: string 570 @return: is the SNMP status number > 0 and is the last collection time + collage older than now? 571 @type: boolean 572 """ 573 age = device.getSnmpLastCollection() + self.collage 574 if device.getSnmpStatusNumber() > 0 and age >= DateTime.DateTime(): 575 self.log.info("Skipped collection of %s" % device.id) 576 return False 577 return True
578
579 - def clientFinished(self, collectorClient):
580 """ 581 Callback that processes the return values from a device. 582 Python iterable. 583 @param collectorClient: collector instance 584 @type collectorClient: collector class 585 @return: Twisted deferred object 586 @type: Twisted deferred object 587 """ 588 device = collectorClient.device 589 self.log.debug("Client for %s finished collecting", device.id) 590 def processClient(driver): 591 try: 592 if (isinstance(collectorClient, SnmpClient) 593 and collectorClient.connInfo.changed == True): 594 self.log.info( 595 "SNMP connection info for %s changed. Updating...", 596 device.id) 597 yield self.config().callRemote('setSnmpConnectionInfo', 598 device.id, 599 collectorClient.connInfo.zSnmpVer, 600 collectorClient.connInfo.zSnmpPort, 601 collectorClient.connInfo.zSnmpCommunity 602 ) 603 driver.next() 604 605 pluginStats = {} 606 self.log.debug("Processing data for device %s", device.id) 607 devchanged = False 608 maps = [] 609 for plugin, results in collectorClient.getResults(): 610 if plugin is None: continue 611 self.log.debug("Processing plugin %s on device %s ...", 612 plugin.name(), device.id) 613 if not results: 614 self.log.warn("The plugin %s returned no results.", 615 plugin.name()) 616 continue 617 self.log.debug("Plugin %s results = %s", plugin.name(), results) 618 datamaps = [] 619 try: 620 results = plugin.preprocess(results, self.log) 621 if results: 622 datamaps = plugin.process(device, results, self.log) 623 if datamaps: 624 pluginStats.setdefault(plugin.name(), plugin.weight) 625 626 except (SystemExit, KeyboardInterrupt), ex: 627 self.log.info( "Plugin %s terminated due to external" 628 " signal (%s)" % (plugin.name(), str(ex) ) 629 ) 630 continue 631 632 except Exception, ex: 633 # NB: don't discard the plugin, as it might be a 634 # temporary issue 635 # Also, report it against the device, rather than at 636 # a collector as it might be just for this device. 637 import socket 638 component= os.path.splitext( 639 os.path.basename( sys.argv[0] ) 640 )[0] 641 collector_host= socket.gethostname() 642 evt= { "eventClass":"/Status/Update", 643 "agent":collector_host, "device":device.id, 644 "severity":Error } 645 646 info= "Problem while executing plugin %s" %plugin.name() 647 self.log.error( info ) 648 evt[ 'summary' ]= info 649 650 info= traceback.format_exc() 651 self.log.error( info ) 652 evt[ 'message' ]= info 653 self.sendEvent( evt ) 654 continue 655 656 # allow multiple maps to be returned from one plugin 657 if not isinstance(datamaps, (list, tuple)): 658 datamaps = [datamaps,] 659 if datamaps: 660 maps += [m for m in datamaps if m] 661 if maps: 662 deviceClass = Classifier.classifyDevice(pluginStats, 663 self.classCollectorPlugins) 664 yield self.config().callRemote( 665 'applyDataMaps', device.id, 666 maps, deviceClass) 667 668 if driver.next(): 669 devchanged = True 670 if devchanged: 671 self.log.info("Changes in configuration applied") 672 else: 673 self.log.info("No change in configuration detected") 674 675 if maps: 676 yield self.config().callRemote('setSnmpLastCollection', 677 device.id) 678 driver.next() 679 680 except Exception, ex: 681 self.log.exception(ex) 682 raise
683 684 def processClientFinished(result): 685 """ 686 Called after the client collection finishes 687 688 @param result: object (unused) 689 @type result: object 690 """ 691 if not result: 692 self.log.debug("Client %s finished" % device.id) 693 else: 694 self.log.error("Client %s finished with message: %s" % 695 (device.id, result)) 696 try: 697 self.clients.remove(collectorClient) 698 self.finished.append(collectorClient) 699 except ValueError: 700 self.log.debug("Client %s not found in in the list" 701 " of active clients", 702 device.id) 703 d = drive(self.fillCollectionSlots) 704 d.addErrback(self.fillError) 705 706 d = drive(processClient) 707 d.addBoth(processClientFinished) 708 709 710
711 - def fillError(self, reason):
712 """ 713 Twisted errback routine to log an error when 714 unable to collect some data 715 716 @param reason: error message 717 @type reason: string 718 """ 719 self.log.error("Unable to fill collection slots: %s" % reason)
720 721
722 - def cycleTime(self):
723 """ 724 Return our cycle time (in minutes) 725 726 @return: cycle time 727 @rtype: integer 728 """ 729 return self.modelerCycleInterval * 60
730 731
732 - def heartbeat(self, ignored=None):
733 """ 734 Twisted keep-alive mechanism to ensure that 735 we're still connected to zenhub 736 737 @param ignored: object (unused) 738 @type ignored: object 739 """ 740 ARBITRARY_BEAT = 30 741 reactor.callLater(ARBITRARY_BEAT, self.heartbeat) 742 if self.options.cycle: 743 evt = dict(eventClass=Heartbeat, 744 component='zenmodeler', 745 device=self.options.monitor, 746 timeout=3*ARBITRARY_BEAT) 747 self.sendEvent(evt) 748 self.niceDoggie(self.cycleTime()) 749 750 # We start modeling from here to accomodate the startup delay. 751 if not self.started: 752 self.started = True 753 reactor.callLater(self.startDelay, self.main)
754 755
756 - def checkStop(self, unused = None):
757 """ 758 Check to see if there's anything to do. 759 If there isn't, report our statistics and exit. 760 761 @param unused: unused (unused) 762 @type unused: string 763 """ 764 if self.clients: return 765 if self.devicegen: return 766 767 if self.start: 768 runTime = time.time() - self.start 769 self.start = None 770 self.log.info("Scan time: %0.2f seconds", runTime) 771 devices = len(self.finished) 772 timedOut = len([c for c in self.finished if c.timedOut]) 773 self.sendEvents( 774 self.rrdStats.gauge('cycleTime', self.cycleTime(), runTime) + 775 self.rrdStats.gauge('devices', self.cycleTime(), devices) + 776 self.rrdStats.gauge('timedOut', self.cycleTime(), timedOut) 777 ) 778 if not self.options.cycle: 779 self.stop() 780 self.finished = []
781
782 - def fillCollectionSlots(self, driver):
783 """ 784 An iterator which either returns a device to collect or 785 calls checkStop() 786 @param driver: driver object 787 @type driver: driver object 788 """ 789 count = len(self.clients) 790 while count < self.options.parallel and self.devicegen \ 791 and not self.pendingNewClients: 792 self.pendingNewClients = True 793 try: 794 device = self.devicegen.next() 795 yield self.config().callRemote('getDeviceConfig', [device], 796 self.options.checkStatus) 797 # just collect one device, and let the timer add more 798 devices = driver.next() 799 if devices: 800 self.collectDevice(devices[0]) 801 else: 802 self.log.info("Device %s not returned is it down?", device) 803 except StopIteration: 804 self.devicegen = None 805 self.pendingNewClients = False 806 break 807 update = len(self.clients) 808 if update != count and update != 1: 809 self.log.info('Running %d clients', update) 810 else: 811 self.log.debug('Running %d clients', update) 812 self.checkStop()
813
814 - def buildOptions(self):
815 """ 816 Build our list of command-line options 817 """ 818 PBDaemon.buildOptions(self) 819 self.parser.add_option('--debug', 820 dest='debug', action="store_true", default=False, 821 help="Don't fork threads for processing") 822 self.parser.add_option('--nowmi', 823 dest='nowmi', action="store_true", default=False, 824 help="Do not execute WMI plugins") 825 self.parser.add_option('--parallel', dest='parallel', 826 type='int', default=defaultParallel, 827 help="Number of devices to collect from in parallel") 828 self.parser.add_option('--cycletime', 829 dest='cycletime',default=720,type='int', 830 help="Run collection every x minutes") 831 self.parser.add_option('--ignore', 832 dest='ignorePlugins',default="", 833 help="Modeler plugins to ignore. Takes a regular expression") 834 self.parser.add_option('--collect', 835 dest='collectPlugins',default="", 836 help="Modeler plugins to use. Takes a regular expression") 837 self.parser.add_option('-p', '--path', dest='path', 838 help="Start class path for collection ie /NetworkDevices") 839 self.parser.add_option('-d', '--device', dest='device', 840 help="Fully qualified device name ie www.confmon.com") 841 self.parser.add_option('-a', '--collage', 842 dest='collage', default=0, type='float', 843 help="Do not collect from devices whose collect date " + 844 "is within this many minutes") 845 self.parser.add_option('--writetries', 846 dest='writetries',default=2,type='int', 847 help="Number of times to try to write if a " 848 "read conflict is found") 849 # FIXME: cleanup --force option #2660 850 self.parser.add_option("-F", "--force", 851 dest="force", action='store_true', default=True, 852 help="Force collection of config data (deprecated)") 853 self.parser.add_option('--portscantimeout', dest='portscantimeout', 854 type='int', default=defaultPortScanTimeout, 855 help="Time to wait for connection failures when port scanning") 856 self.parser.add_option('--now', 857 dest='now', action="store_true", default=False, 858 help="Start daemon now, do not sleep before starting") 859 self.parser.add_option('--communities', 860 dest='discoverCommunity', action="store_true", default=False, 861 help="If an snmp connection fails try and rediscover it's connection info") 862 self.parser.add_option('--checkstatus', 863 dest='checkStatus', action="store_true", default=False, 864 help="Don't model if the device is ping or snmp down") 865 TCbuildOptions(self.parser, self.usage) 866 addNTLMv2Option(self.parser)
867
868 - def processOptions(self):
869 """ 870 Check what the user gave us vs what we'll accept 871 for command-line options 872 """ 873 if not self.options.path and not self.options.device: 874 self.options.path = "/Devices" 875 if self.options.ignorePlugins and self.options.collectPlugins: 876 raise SystemExit( "Only one of --ignore or --collect" 877 " can be used at a time") 878 setNTLMv2Auth(self.options)
879
880 - def _timeoutClients(self):
881 """ 882 The guts of the timeoutClients method (minus the twisted reactor 883 stuff). Breaking this part out as a separate method facilitates unit 884 testing. 885 """ 886 active = [] 887 for client in self.clients: 888 if client.timeout < time.time(): 889 self.log.warn("Client %s timeout", client.hostname) 890 self.finished.append(client) 891 client.timedOut = True 892 try: 893 client.stop() 894 except AssertionError, ex: 895 pass # session closed twice http://dev.zenoss.org/trac/ticket/6354 896 else: 897 active.append(client) 898 self.clients = active
899 900 901
902 - def timeoutClients(self, unused=None):
903 """ 904 Check to see which clients have timed out and which ones haven't. 905 Stop processing anything that's timed out. 906 907 @param unused: unused (unused) 908 @type unused: string 909 """ 910 reactor.callLater(1, self.timeoutClients) 911 self._timeoutClients() 912 d = drive(self.fillCollectionSlots) 913 d.addCallback(self.checkStop) 914 d.addErrback(self.fillError)
915 916 917
918 - def reactorLoop(self):
919 """ 920 Twisted main loop 921 """ 922 reactor.startRunning() 923 while reactor.running: 924 try: 925 while reactor.running: 926 reactor.runUntilCurrent() 927 timeout = reactor.timeout() 928 reactor.doIteration(timeout) 929 except: 930 if reactor.running: 931 self.log.exception("Unexpected error in main loop.")
932 933 934
935 - def getDeviceList(self):
936 """ 937 Get the list of devices for which we are collecting: 938 * if -d devicename was used, use the devicename 939 * if a class path flag was supplied, gather the devices 940 along that organizer 941 * otherwise get all of the devices associated with our collector 942 943 @return: list of devices 944 @rtype: list 945 """ 946 if self.options.device: 947 self.log.info("Collecting for device %s", self.options.device) 948 return succeed([self.options.device]) 949 950 self.log.info("Collecting for path %s", self.options.path) 951 return self.config().callRemote('getDeviceListByOrganizer', 952 self.options.path, 953 self.options.monitor)
954 955
956 - def mainLoop(self, driver):
957 """ 958 Main collection loop, a Python iterable 959 960 @param driver: driver object 961 @type driver: driver object 962 @return: Twisted deferred object 963 @rtype: Twisted deferred object 964 """ 965 if self.options.cycle: 966 driveLater(self.cycleTime(), self.mainLoop) 967 968 if self.clients: 969 self.log.error("Modeling cycle taking too long") 970 return 971 972 self.start = time.time() 973 974 self.log.debug("Starting collector loop...") 975 yield self.getDeviceList() 976 self.devicegen = iter(driver.next()) 977 d = drive(self.fillCollectionSlots) 978 d.addErrback(self.fillError) 979 yield d 980 driver.next() 981 self.log.debug("Collection slots filled")
982 983 984
985 - def main(self, unused=None):
986 """ 987 Wrapper around the mainLoop 988 989 @param unused: unused (unused) 990 @type unused: string 991 @return: Twisted deferred object 992 @rtype: Twisted deferred object 993 """ 994 self.finished = [] 995 d = drive(self.mainLoop) 996 d.addCallback(self.timeoutClients) 997 return d
998 999 1000
1001 - def remote_deleteDevice(self, device):
1002 """ 1003 Stub function 1004 1005 @param device: device name (unused) 1006 @type device: string 1007 @todo: implement 1008 """ 1009 # we fetch the device list before every scan 1010 self.log.debug("Asynch deleteDevice %s" % device)
1011 1012 1013 if __name__ == '__main__': 1014 dc = ZenModeler() 1015 dc.processOptions() 1016 reactor.run = dc.reactorLoop 1017 dc.run() 1018