1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """Provides a batch notifier to break up the expensive, blocking IO
15 involved with calls to DeviceOrganizer.getSubDevices which can call getObject
16 on the brains of every device in the system. Processes batches of 10 devices
17 giving way to the event loop between each batch. See ticket #26626. zenhub
18 calls update_all_services as the entry point into this module, everything else
19 in this module is private."""
20
21 import logging
22 import collections
23 from twisted.internet import reactor, defer
24 from Products.ZenModel.DeviceClass import DeviceClass
25 from Products.ZenModel.Device import Device
26 from Products.ZenModel.DataRoot import DataRoot
27
28 LOG = logging.getLogger("zen.hub.notify")
29
31 """These items are held in the BatchNotifier's queue. They contain all the
32 context needed to process the subdevices of a specific device class. This
33 context includes...
34
35 device class UID: e.g. /zport/dmd/Devices/Server/Linux)
36 subdevices: an iterator over the device classes subdevices)
37 notify_functions: a dictionary mapping Service UID to notifyAll
38 function. An example Service UID is
39 ('CommandPerformanceConfig', 'localhost')
40 d: the deferred for this item. Always has the following callback
41 chain:
42 Slot Callback Errback
43 1 BatchNotifier._callback None
44 2 None BatchNotifier._errback
45 """
46
47 - def __init__(self, device_class_uid, subdevices):
48 self.device_class_uid = device_class_uid
49 self.subdevices = subdevices
50
51 self.notify_functions = {}
52 self.d = None
53
55 args = (self.device_class_uid, self.notify_functions.keys())
56 return "<NotifyItem(device_class_uid=%s, notify_functions=%s)>" % args
57
59 """Processes the expensive getSubDevices call in batches. A singleton
60 instance is registered as a utility in zcml. The queue contains NotifyItem
61 instances. If notify_subdevices is called and an item exists in the queue
62 for the same device class, then the new service UID and notify function
63 are appended to the existing item. Once an item is moved from the queue to
64 _current_item member, it is being processed and further notify_subdevices
65 calls for the same device class will append a new item to the queue.
66 """
67
68 _BATCH_SIZE = 10
69 _DELAY = 0.05
70
72 self._current_item = None
73 self._queue = collections.deque()
74
76 LOG.debug("BatchNotifier.notify_subdevices: %r, %s" % (device_class, service_uid))
77 item = self._find_or_create_item(device_class)
78 item.notify_functions[service_uid] = notify_function
79
81 device_class_uid = device_class.getPrimaryId()
82 for item in self._queue:
83 if item.device_class_uid == device_class_uid:
84 retval = item
85 break
86 else:
87 subdevices = device_class.getSubDevicesGen()
88 retval = NotifyItem(device_class_uid, subdevices)
89 retval.d = self._create_deferred()
90 if not self._queue:
91 self._call_later(retval.d)
92 self._queue.appendleft(retval)
93 return retval
94
100
103
105 self._current_item = self._queue.pop() if self._queue else None
106
108 for service_uid, notify_function in self._current_item.notify_functions.items():
109 try:
110 notify_function(device)
111 except Exception, e:
112 args = (service_uid, device.getPrimaryId(), type(e).__name__, e)
113 LOG.error("%s failed to notify %s: %s: %s" % args)
114
135
139
140 BATCH_NOTIFIER = BatchNotifier()
141