Package Products :: Package ZenHub :: Module notify
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.notify

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2011, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """Provides a batch notifier to break up the expensive, blocking IO 
 15  involved with calls to DeviceOrganizer.getSubDevices which can call getObject 
 16  on the brains of every device in the system. Processes batches of 10 devices 
 17  giving way to the event loop between each batch. See ticket #26626. zenhub 
 18  calls update_all_services as the entry point into this module, everything else 
 19  in this module is private.""" 
 20   
 21  import logging 
 22  import collections 
 23  from twisted.internet import reactor, defer 
 24  from Products.ZenModel.DeviceClass import DeviceClass 
 25  from Products.ZenModel.Device import Device 
 26  from Products.ZenModel.DataRoot import DataRoot 
 27   
 28  LOG = logging.getLogger("zen.hub.notify") 
 29   
30 -class NotifyItem(object):
31 """These items are held in the BatchNotifier's queue. They contain all the 32 context needed to process the subdevices of a specific device class. This 33 context includes... 34 35 device class UID: e.g. /zport/dmd/Devices/Server/Linux) 36 subdevices: an iterator over the device classes subdevices) 37 notify_functions: a dictionary mapping Service UID to notifyAll 38 function. An example Service UID is 39 ('CommandPerformanceConfig', 'localhost') 40 d: the deferred for this item. Always has the following callback 41 chain: 42 Slot Callback Errback 43 1 BatchNotifier._callback None 44 2 None BatchNotifier._errback 45 """ 46
47 - def __init__(self, device_class_uid, subdevices):
48 self.device_class_uid = device_class_uid 49 self.subdevices = subdevices 50 # keys are service_uids eg ('CommandPerformanceConfig', 'localhost') 51 self.notify_functions = {} 52 self.d = None
53
54 - def __repr__(self):
55 args = (self.device_class_uid, self.notify_functions.keys()) 56 return "<NotifyItem(device_class_uid=%s, notify_functions=%s)>" % args
57
58 -class BatchNotifier(object):
59 """Processes the expensive getSubDevices call in batches. A singleton 60 instance is registered as a utility in zcml. The queue contains NotifyItem 61 instances. If notify_subdevices is called and an item exists in the queue 62 for the same device class, then the new service UID and notify function 63 are appended to the existing item. Once an item is moved from the queue to 64 _current_item member, it is being processed and further notify_subdevices 65 calls for the same device class will append a new item to the queue. 66 """ 67 68 _BATCH_SIZE = 10 69 _DELAY = 0.05 70
71 - def __init__(self):
72 self._current_item = None 73 self._queue = collections.deque()
74
75 - def notify_subdevices(self, device_class, service_uid, notify_function):
76 LOG.debug("BatchNotifier.notify_subdevices: %r, %s" % (device_class, service_uid)) 77 item = self._find_or_create_item(device_class) 78 item.notify_functions[service_uid] = notify_function
79
80 - def _find_or_create_item(self, device_class):
81 device_class_uid = device_class.getPrimaryId() 82 for item in self._queue: 83 if item.device_class_uid == device_class_uid: 84 retval = item 85 break 86 else: 87 subdevices = device_class.getSubDevicesGen() 88 retval = NotifyItem(device_class_uid, subdevices) 89 retval.d = self._create_deferred() 90 if not self._queue: 91 self._call_later(retval.d) 92 self._queue.appendleft(retval) 93 return retval
94
95 - def _create_deferred(self):
96 d = defer.Deferred() 97 d.addCallback(self._callback) 98 d.addErrback(self._errback) 99 return d
100
101 - def _call_later(self, d):
102 reactor.callLater(BATCH_NOTIFIER._DELAY, d.callback, None)
103
104 - def _switch_to_next_item(self):
105 self._current_item = self._queue.pop() if self._queue else None
106
107 - def _call_notify_functions(self, device):
108 for service_uid, notify_function in self._current_item.notify_functions.items(): 109 try: 110 notify_function(device) 111 except Exception, e: 112 args = (service_uid, device.getPrimaryId(), type(e).__name__, e) 113 LOG.error("%s failed to notify %s: %s: %s" % args)
114
115 - def _callback(self, result):
116 if self._current_item is None: 117 self._current_item = self._queue.pop() 118 batch_count = 0 119 try: 120 for device in self._current_item.subdevices: 121 self._call_notify_functions(device) 122 batch_count += 1 123 if batch_count == BatchNotifier._BATCH_SIZE: 124 self._current_item.d = self._create_deferred() 125 break 126 else: 127 LOG.debug("BatchNotifier._callback: no more devices, %s in queue", len(self._queue)) 128 self._switch_to_next_item() 129 except Exception, e: 130 args = (self._current_item.device_class_uid, type(e).__name__, e) 131 LOG.warn("Failed to get subdevice of %s: %s: %s" % args) 132 self._switch_to_next_item() 133 if self._current_item is not None: 134 self._call_later(self._current_item.d)
135
136 - def _errback(self, failure):
137 LOG.error("Failure in batch notifier: %s: %s" % (failure.type.__name__, failure.value)) 138 LOG.debug("BatchNotifier._errback: failure=%s" % failure)
139 140 BATCH_NOTIFIER = BatchNotifier() 141