1
2
3
4
5
6
7
8
9
10
11
12
13
14 import sys
15 from collections import defaultdict
16 import threading
17 import Queue
18 import logging
19 log = logging.getLogger("zen.ApplyDataMap")
20
21 import transaction
22
23 from ZODB.transact import transact
24 from zope.event import notify
25 from zope.container.contained import ObjectRemovedEvent, ObjectMovedEvent
26 from zope.container.contained import ObjectAddedEvent
27 from Acquisition import aq_base
28
29 from Products.ZenUtils.Utils import importClass, getObjByPath
30 from Products.Zuul.catalog.events import IndexingEvent
31 from Exceptions import ObjectCreationError
32 from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked
33 from Products.ZenModel.Lockable import Lockable
34 from Products.ZenEvents import Event
35 from zExceptions import NotFound
36
37 zenmarker = "__ZENMARKER__"
38
39 CLASSIFIER_CLASS = '/Classifier'
40
41 _notAscii = dict.fromkeys(range(128,256), u'?')
45 """
46 A more comprehensive check to see if existing model data is the same as
47 newly modeled data. The primary focus is comparing unsorted lists of
48 dictionaries.
49 """
50 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)):
51 if len(x) > 0 and len(y) > 0 \
52 and isinstance(x[0], dict) and isinstance(y[0], dict):
53
54 x = set( tuple(sorted(d.items())) for d in x )
55 y = set( tuple(sorted(d.items())) for d in y )
56 else:
57 return sorted(x) == sorted(y)
58
59 return x == y
60
63
65 self.datacollector = datacollector
66
67
68 - def logChange(self, device, compname, eventClass, msg):
73
74
75 - def logEvent(self, device, component, eventClass, msg, severity):
76 ''' Used to report a change to a device model. Logs the given msg
77 to log.info and creates an event.
78 '''
79 device = device.device()
80 compname = ""
81 try:
82 compname = getattr(component, 'id', component)
83 if device.id == compname:
84 compname = ""
85 except: pass
86 log.debug(msg)
87 devname = device.device().id
88 if (self.datacollector
89
90
91 and getattr(self.datacollector, 'dmd', None)):
92 eventDict = {
93 'eventClass': eventClass,
94 'device': devname,
95 'component': compname,
96 'summary': msg,
97 'severity': severity,
98 'agent': 'ApplyDataMap',
99 'explanation': "Event sent as zCollectorLogChanges is True",
100 }
101 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
102
103
105 """
106 A modeler plugin specifies the protocol (eg SNMP, WMI) and
107 the specific data to retrieve from the device (eg an OID).
108 This data is then processed by the modeler plugin and then
109 passed to this method to apply the results to the ZODB.
110
111 @parameter device: DMD device object
112 @type device: DMD device object
113 @parameter collectorClient: results of modeling
114 @type collectorClient: DMD object
115 """
116 log.debug("Processing data for device %s", device.id)
117 devchanged = False
118 try:
119 for pname, results in collectorClient.getResults():
120 log.debug("Processing plugin %s on device %s", pname, device.id)
121 if not results:
122 log.warn("Plugin %s did not return any results", pname)
123 continue
124 plugin = self.datacollector.collectorPlugins.get(pname, None)
125 if not plugin:
126 log.warn("Unable to get plugin %s from %s", pname,
127 self.datacollector.collectorPlugins)
128 continue
129
130 results = plugin.preprocess(results, log)
131 datamaps = plugin.process(device, results, log)
132
133 if not isinstance(datamaps, (list, tuple, set)):
134 datamaps = [datamaps,]
135 for datamap in datamaps:
136 changed = self._applyDataMap(device, datamap)
137 if changed: devchanged=True
138 if devchanged:
139 device.setLastChange()
140 log.info("Changes applied")
141 else:
142 log.info("No change detected")
143 device.setSnmpLastCollection()
144 trans = transaction.get()
145 trans.setUser("datacoll")
146 trans.note("data applied from automated collection")
147 trans.commit()
148 except (SystemExit, KeyboardInterrupt):
149 raise
150 except:
151 transaction.abort()
152 log.exception("Plugin %s device %s", pname, device.getId())
153
154
155 - def applyDataMap(self, device, datamap, relname="", compname="", modname=""):
165
166
174
175
176 @transact
178 """Apply a datamap to a device.
179 """
180 persist = True
181 try:
182 device.dmd._p_jar.sync()
183 except AttributeError:
184
185 persist = False
186
187 if hasattr(datamap, "compname"):
188 if datamap.compname:
189 try:
190 tobj = device.getObjByPath(datamap.compname)
191 except NotFound:
192 log.warn("Unable to find compname '%s'" % datamap.compname)
193 return False
194 else:
195 tobj = device
196 if hasattr(datamap, "relname"):
197 changed = self._updateRelationship(tobj, datamap)
198 elif hasattr(datamap, 'modname'):
199 changed = self._updateObject(tobj, datamap)
200 else:
201 changed = False
202 log.warn("plugin returned unknown map skipping")
203 else:
204 changed = False
205 if not (changed and persist):
206 transaction.abort()
207 else:
208 device.setLastChange()
209 trans = transaction.get()
210 trans.setUser("datacoll")
211 trans.note("data applied from automated collection")
212 return changed
213
214
216 """Add/Update/Remote objects to the target relationship.
217 """
218 changed = False
219 rname = relmap.relname
220 rel = getattr(device, rname, None)
221 if not rel:
222 log.warn("no relationship:%s found on:%s (%s %s)",
223 relmap.relname, device.id, device.__class__, device.zPythonClass)
224 return changed
225 relids = rel.objectIdsAll()
226 seenids = defaultdict(int)
227 for objmap in relmap:
228 from Products.ZenModel.ZenModelRM import ZenModelRM
229 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'):
230 objmap_id = objmap.id
231 seenids[objmap_id] += 1
232 if seenids[objmap_id] > 1:
233 objmap_id = objmap.id = "%s_%s" % (objmap_id, seenids[objmap_id])
234 if objmap_id in relids:
235 obj = rel._getOb(objmap_id)
236
237
238
239 existing_modname = ''
240 existing_classname = ''
241 try:
242 import inspect
243 existing_modname = inspect.getmodule(obj).__name__
244 existing_classname = obj.__class__.__name__
245 except:
246 pass
247
248 if objmap.modname == existing_modname and \
249 objmap.classname in ('', existing_classname):
250
251 objchange = self._updateObject(obj, objmap)
252 if not changed: changed = objchange
253 else:
254 rel._delObject(objmap_id)
255 objchange, obj = self._createRelObject(device, objmap, rname)
256 if not changed: changed = objchange
257
258 if objmap_id in relids: relids.remove(objmap_id)
259 else:
260 objchange, obj = self._createRelObject(device, objmap, rname)
261 if objchange: changed = True
262 if obj and obj.id in relids: relids.remove(obj.id)
263 elif isinstance(objmap, ZenModelRM):
264 self.logChange(device, objmap.id, Change_Add,
265 "linking object %s to device %s relation %s" % (
266 objmap.id, device.id, rname))
267 device.addRelation(rname, objmap)
268 changed = True
269 else:
270 objchange, obj = self._createRelObject(device, objmap, rname)
271 if objchange: changed = True
272 if obj and obj.id in relids: relids.remove(obj.id)
273
274 for id in relids:
275 obj = rel._getOb(id)
276 if isinstance(obj, Lockable) and obj.isLockedFromDeletion():
277 objname = obj.id
278 try: objname = obj.name()
279 except: pass
280 msg = "Deletion Blocked: %s '%s' on %s" % (
281 obj.meta_type, objname,obj.device().id)
282 log.warn(msg)
283 if obj.sendEventWhenBlocked():
284 self.logEvent(device, obj, Change_Remove_Blocked,
285 msg, Event.Warning)
286 continue
287 self.logChange(device, obj, Change_Remove,
288 "removing object %s from rel %s on device %s" % (
289 id, rname, device.id))
290 rel._delObject(id)
291 if relids: changed=True
292 return changed
293
294
296 """Update an object using a objmap.
297 """
298 changed = False
299 device = obj.device()
300
301 if isinstance(obj, Lockable) and obj.isLockedFromUpdates():
302 if device.id == obj.id:
303 msg = 'Update Blocked: %s' % device.id
304 else:
305 objname = obj.id
306 try: objname = obj.name()
307 except: pass
308 msg = "Update Blocked: %s '%s' on %s" % (
309 obj.meta_type, objname ,device.id)
310 log.warn(msg)
311 if obj.sendEventWhenBlocked():
312 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning)
313 return changed
314 for attname, value in objmap.items():
315 if attname.startswith('_'):
316 continue
317 if isinstance(value, basestring):
318 try:
319
320
321
322
323
324
325
326
327
328
329
330 codec = obj.zCollectorDecoding or sys.getdefaultencoding()
331 value = value.decode(codec)
332 value = value.encode(sys.getdefaultencoding())
333 except UnicodeDecodeError:
334
335
336 continue
337 att = getattr(aq_base(obj), attname, zenmarker)
338 if att == zenmarker:
339 log.warn('The attribute %s was not found on object %s from device %s',
340 attname, obj.id, device.id)
341 continue
342 if callable(att):
343 setter = getattr(obj, attname)
344 gettername = attname.replace("set","get")
345 getter = getattr(obj, gettername, None)
346
347 if not getter:
348
349 log.warn("getter '%s' not found on obj '%s', "
350 "skipping", gettername, obj.id)
351
352 else:
353
354 from plugins.DataMaps import MultiArgs
355 if isinstance(value, MultiArgs):
356
357 args = value.args
358 change = not isSameData(value.args, getter())
359
360 else:
361
362 args = (value,)
363 try:
364 change = not isSameData(value, getter())
365 except UnicodeDecodeError:
366 change = True
367
368 if change:
369 setter(*args)
370 self.logChange(device, obj, Change_Set,
371 "calling function '%s' with '%s' on "
372 "object %s" % (attname, value, obj.id))
373 changed = True
374
375 else:
376 try:
377 change = not isSameData(att, value)
378 except UnicodeDecodeError:
379 change = True
380 if change:
381 setattr(aq_base(obj), attname, value)
382 self.logChange(device, obj, Change_Set,
383 "set attribute '%s' "
384 "to '%s' on object '%s'" %
385 (attname, value, obj.id))
386 changed = True
387 if not changed:
388 try: changed = obj._p_changed
389 except: pass
390 if changed:
391 if getattr(aq_base(obj), "index_object", False):
392 log.debug("indexing object %s", obj.id)
393 obj.index_object()
394 notify(IndexingEvent(obj))
395 else:
396 obj._p_deactivate()
397 return changed
398
399
401 """Create an object on a relationship using its objmap.
402 """
403 constructor = importClass(objmap.modname, objmap.classname)
404 if hasattr(objmap, 'id'):
405 remoteObj = constructor(objmap.id)
406 else:
407 remoteObj = constructor(device, objmap)
408 if remoteObj is None:
409 log.debug("Constructor returned None")
410 return False, None
411 id = remoteObj.id
412 if not remoteObj:
413 raise ObjectCreationError(
414 "failed to create object %s in relation %s" % (id, relname))
415
416 realdevice = device.device()
417 if realdevice.isLockedFromUpdates():
418 objtype = ""
419 try: objtype = objmap.modname.split(".")[-1]
420 except: pass
421 msg = "Add Blocked: %s '%s' on %s" % (
422 objtype, id, realdevice.id)
423 log.warn(msg)
424 if realdevice.sendEventWhenBlocked():
425 self.logEvent(realdevice, id, Change_Add_Blocked,
426 msg, Event.Warning)
427 return False, None
428 rel = device._getOb(relname, None)
429 if not rel:
430 raise ObjectCreationError(
431 "No relation %s found on device %s (%s)" % (relname, device.id, device.__class__ ))
432
433 changed = False
434 try:
435 remoteObj = rel._getOb(remoteObj.id)
436 except AttributeError:
437 self.logChange(realdevice, remoteObj, Change_Add,
438 "adding object %s to relationship %s" %
439 (remoteObj.id, relname))
440 rel._setObject(remoteObj.id, remoteObj)
441 remoteObj = rel._getOb(remoteObj.id)
442 changed = True
443 notify(ObjectMovedEvent(remoteObj, rel, remoteObj.id, rel, remoteObj.id))
444 return self._updateObject(remoteObj, objmap) or changed, remoteObj
445
446
447 - def stop(self): pass
448
451 """
452 Thread that applies datamaps to a device. It reads from a queue that
453 should have tuples of (devid, datamaps) where devid is the primaryId to
454 the device and datamps is a list of datamaps to apply. Cache is synced at
455 the start of each transaction and there is one transaction per device.
456 """
457
458 - def __init__(self, datacollector, app):
459 threading.Thread.__init__(self)
460 ApplyDataMap.__init__(self, datacollector)
461 self.setName("ApplyDataMapThread")
462 self.setDaemon(1)
463 self.app = app
464 log.debug("Thread conn:%s", self.app._p_jar)
465 self.inputqueue = Queue.Queue()
466 self.done = False
467
468
470 """Apply datamps to device.
471 """
472 devpath = device.getPrimaryPath()
473 self.inputqueue.put((devpath, collectorClient))
474
475
477 """Process collectorClients as they are passed in from a data collector.
478 """
479 log.info("starting applyDataMap thread")
480 while not self.done or not self.inputqueue.empty():
481 devpath = ()
482 try:
483 devpath, collectorClient = self.inputqueue.get(True,1)
484 self.app._p_jar.sync()
485 device = getObjByPath(self.app, devpath)
486 ApplyDataMap.processClient(self, device, collectorClient)
487 except Queue.Empty: pass
488 except (SystemExit, KeyboardInterrupt): raise
489 except:
490 transaction.abort()
491 log.exception("processing device %s", "/".join(devpath))
492 log.info("stopping applyDataMap thread")
493
494
496 """Stop the thread once all devices are processed.
497 """
498 self.done = True
499 self.join()
500