Package Products :: Package DataCollector :: Module ApplyDataMap
[hide private]
[frames] | no frames]

Source Code for Module Products.DataCollector.ApplyDataMap

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import sys 
 15  from collections import defaultdict 
 16  import threading 
 17  import Queue 
 18  import logging 
 19  log = logging.getLogger("zen.ApplyDataMap") 
 20   
 21  import transaction 
 22   
 23  from ZODB.transact import transact 
 24  from zope.event import notify 
 25  from zope.container.contained import ObjectRemovedEvent, ObjectMovedEvent 
 26  from zope.container.contained import ObjectAddedEvent 
 27  from Acquisition import aq_base 
 28   
 29  from Products.ZenUtils.Utils import importClass, getObjByPath 
 30  from Products.Zuul.catalog.events import IndexingEvent 
 31  from Exceptions import ObjectCreationError 
 32  from Products.ZenEvents.ZenEventClasses import Change_Add,Change_Remove,Change_Set,Change_Add_Blocked,Change_Remove_Blocked,Change_Set_Blocked 
 33  from Products.ZenModel.Lockable import Lockable 
 34  from Products.ZenEvents import Event 
 35  from zExceptions import NotFound 
 36   
 37  zenmarker = "__ZENMARKER__" 
 38   
 39  CLASSIFIER_CLASS = '/Classifier' 
 40   
 41  _notAscii = dict.fromkeys(range(128,256), u'?') 
42 43 44 -def isSameData(x, y):
45 """ 46 A more comprehensive check to see if existing model data is the same as 47 newly modeled data. The primary focus is comparing unsorted lists of 48 dictionaries. 49 """ 50 if isinstance(x, (tuple, list)) and isinstance(y, (tuple, list)): 51 if len(x) > 0 and len(y) > 0 \ 52 and isinstance(x[0], dict) and isinstance(y[0], dict): 53 54 x = set( tuple(sorted(d.items())) for d in x ) 55 y = set( tuple(sorted(d.items())) for d in y ) 56 else: 57 return sorted(x) == sorted(y) 58 59 return x == y
60
61 62 -class ApplyDataMap(object):
63
64 - def __init__(self, datacollector=None):
65 self.datacollector = datacollector
66 67
68 - def logChange(self, device, compname, eventClass, msg):
69 if not getattr(device, 'zCollectorLogChanges', True): return 70 if isinstance(msg, unicode): 71 msg = msg.translate(_notAscii) 72 self.logEvent(device, compname, eventClass, msg, Event.Info)
73 74
75 - def logEvent(self, device, component, eventClass, msg, severity):
76 ''' Used to report a change to a device model. Logs the given msg 77 to log.info and creates an event. 78 ''' 79 device = device.device() 80 compname = "" 81 try: 82 compname = getattr(component, 'id', component) 83 if device.id == compname: 84 compname = "" 85 except: pass 86 log.debug(msg) 87 devname = device.device().id 88 if (self.datacollector 89 # why is this line here? Blocks evnets from model in zope 90 #and getattr(self.datacollector, 'generateEvents', False) 91 and getattr(self.datacollector, 'dmd', None)): 92 eventDict = { 93 'eventClass': eventClass, 94 'device': devname, 95 'component': compname, 96 'summary': msg, 97 'severity': severity, 98 'agent': 'ApplyDataMap', 99 'explanation': "Event sent as zCollectorLogChanges is True", 100 } 101 self.datacollector.dmd.ZenEventManager.sendEvent(eventDict)
102 103
104 - def processClient(self, device, collectorClient):
105 """ 106 A modeler plugin specifies the protocol (eg SNMP, WMI) and 107 the specific data to retrieve from the device (eg an OID). 108 This data is then processed by the modeler plugin and then 109 passed to this method to apply the results to the ZODB. 110 111 @parameter device: DMD device object 112 @type device: DMD device object 113 @parameter collectorClient: results of modeling 114 @type collectorClient: DMD object 115 """ 116 log.debug("Processing data for device %s", device.id) 117 devchanged = False 118 try: 119 for pname, results in collectorClient.getResults(): 120 log.debug("Processing plugin %s on device %s", pname, device.id) 121 if not results: 122 log.warn("Plugin %s did not return any results", pname) 123 continue 124 plugin = self.datacollector.collectorPlugins.get(pname, None) 125 if not plugin: 126 log.warn("Unable to get plugin %s from %s", pname, 127 self.datacollector.collectorPlugins) 128 continue 129 130 results = plugin.preprocess(results, log) 131 datamaps = plugin.process(device, results, log) 132 #allow multiple maps to be returned from one plugin 133 if not isinstance(datamaps, (list, tuple, set)): 134 datamaps = [datamaps,] 135 for datamap in datamaps: 136 changed = self._applyDataMap(device, datamap) 137 if changed: devchanged=True 138 if devchanged: 139 device.setLastChange() 140 log.info("Changes applied") 141 else: 142 log.info("No change detected") 143 device.setSnmpLastCollection() 144 trans = transaction.get() 145 trans.setUser("datacoll") 146 trans.note("data applied from automated collection") 147 trans.commit() 148 except (SystemExit, KeyboardInterrupt): 149 raise 150 except: 151 transaction.abort() 152 log.exception("Plugin %s device %s", pname, device.getId())
153 154
155 - def applyDataMap(self, device, datamap, relname="", compname="", modname=""):
156 """Apply a datamap passed as a list of dicts through XML-RPC. 157 """ 158 from plugins.DataMaps import RelationshipMap, ObjectMap 159 if relname: 160 datamap = RelationshipMap(relname=relname, compname=compname, 161 modname=modname, objmaps=datamap) 162 else: 163 datamap = ObjectMap(datamap, compname=compname, modname=modname) 164 self._applyDataMap(device, datamap)
165 166
167 - def setDeviceClass(self, device, deviceClass=None):
168 """ 169 If a device class has been passed and the current class is not /Classifier 170 then move the device to the newly clssified device class. 171 """ 172 if deviceClass and device.getDeviceClassPath().startswith(CLASSIFIER_CLASS): 173 device.changeDeviceClass(deviceClass)
174 175 176 @transact
177 - def _applyDataMap(self, device, datamap):
178 """Apply a datamap to a device. 179 """ 180 persist = True 181 try: 182 device.dmd._p_jar.sync() 183 except AttributeError: 184 # This can occur in unit testing when the device is not persisted. 185 persist = False 186 187 if hasattr(datamap, "compname"): 188 if datamap.compname: 189 try: 190 tobj = device.getObjByPath(datamap.compname) 191 except NotFound: 192 log.warn("Unable to find compname '%s'" % datamap.compname) 193 return False 194 else: 195 tobj = device 196 if hasattr(datamap, "relname"): 197 changed = self._updateRelationship(tobj, datamap) 198 elif hasattr(datamap, 'modname'): 199 changed = self._updateObject(tobj, datamap) 200 else: 201 changed = False 202 log.warn("plugin returned unknown map skipping") 203 else: 204 changed = False 205 if not (changed and persist): 206 transaction.abort() 207 else: 208 device.setLastChange() 209 trans = transaction.get() 210 trans.setUser("datacoll") 211 trans.note("data applied from automated collection") 212 return changed
213 214
215 - def _updateRelationship(self, device, relmap):
216 """Add/Update/Remote objects to the target relationship. 217 """ 218 changed = False 219 rname = relmap.relname 220 rel = getattr(device, rname, None) 221 if not rel: 222 log.warn("no relationship:%s found on:%s (%s %s)", 223 relmap.relname, device.id, device.__class__, device.zPythonClass) 224 return changed 225 relids = rel.objectIdsAll() 226 seenids = defaultdict(int) 227 for objmap in relmap: 228 from Products.ZenModel.ZenModelRM import ZenModelRM 229 if hasattr(objmap, 'modname') and hasattr(objmap, 'id'): 230 objmap_id = objmap.id 231 seenids[objmap_id] += 1 232 if seenids[objmap_id] > 1: 233 objmap_id = objmap.id = "%s_%s" % (objmap_id, seenids[objmap_id]) 234 if objmap_id in relids: 235 obj = rel._getOb(objmap_id) 236 237 # Handle the possibility of objects changing class by 238 # recreating them. Ticket #5598. 239 existing_modname = '' 240 existing_classname = '' 241 try: 242 import inspect 243 existing_modname = inspect.getmodule(obj).__name__ 244 existing_classname = obj.__class__.__name__ 245 except: 246 pass 247 248 if objmap.modname == existing_modname and \ 249 objmap.classname in ('', existing_classname): 250 251 objchange = self._updateObject(obj, objmap) 252 if not changed: changed = objchange 253 else: 254 rel._delObject(objmap_id) 255 objchange, obj = self._createRelObject(device, objmap, rname) 256 if not changed: changed = objchange 257 258 if objmap_id in relids: relids.remove(objmap_id) 259 else: 260 objchange, obj = self._createRelObject(device, objmap, rname) 261 if objchange: changed = True 262 if obj and obj.id in relids: relids.remove(obj.id) 263 elif isinstance(objmap, ZenModelRM): 264 self.logChange(device, objmap.id, Change_Add, 265 "linking object %s to device %s relation %s" % ( 266 objmap.id, device.id, rname)) 267 device.addRelation(rname, objmap) 268 changed = True 269 else: 270 objchange, obj = self._createRelObject(device, objmap, rname) 271 if objchange: changed = True 272 if obj and obj.id in relids: relids.remove(obj.id) 273 274 for id in relids: 275 obj = rel._getOb(id) 276 if isinstance(obj, Lockable) and obj.isLockedFromDeletion(): 277 objname = obj.id 278 try: objname = obj.name() 279 except: pass 280 msg = "Deletion Blocked: %s '%s' on %s" % ( 281 obj.meta_type, objname,obj.device().id) 282 log.warn(msg) 283 if obj.sendEventWhenBlocked(): 284 self.logEvent(device, obj, Change_Remove_Blocked, 285 msg, Event.Warning) 286 continue 287 self.logChange(device, obj, Change_Remove, 288 "removing object %s from rel %s on device %s" % ( 289 id, rname, device.id)) 290 rel._delObject(id) 291 if relids: changed=True 292 return changed
293 294
295 - def _updateObject(self, obj, objmap):
296 """Update an object using a objmap. 297 """ 298 changed = False 299 device = obj.device() 300 301 if isinstance(obj, Lockable) and obj.isLockedFromUpdates(): 302 if device.id == obj.id: 303 msg = 'Update Blocked: %s' % device.id 304 else: 305 objname = obj.id 306 try: objname = obj.name() 307 except: pass 308 msg = "Update Blocked: %s '%s' on %s" % ( 309 obj.meta_type, objname ,device.id) 310 log.warn(msg) 311 if obj.sendEventWhenBlocked(): 312 self.logEvent(device, obj,Change_Set_Blocked,msg,Event.Warning) 313 return changed 314 for attname, value in objmap.items(): 315 if attname.startswith('_'): 316 continue 317 if isinstance(value, basestring): 318 try: 319 # This looks confusing, and it is. The scenario is: 320 # A collector gathers some data as a raw byte stream, 321 # but really it has a specific encoding specified by 322 # by the zCollectorDecoding zProperty. Say, latin-1 or 323 # utf-16, etc. We need to decode that byte stream to get 324 # back a UnicodeString object. But, this version of Zope 325 # doesn't like UnicodeString objects for a variety of 326 # fields, such as object ids, so we then need to convert 327 # that UnicodeString back into a regular string of bytes, 328 # and for that we use the system default encoding, which 329 # is now utf-8. 330 codec = obj.zCollectorDecoding or sys.getdefaultencoding() 331 value = value.decode(codec) 332 value = value.encode(sys.getdefaultencoding()) 333 except UnicodeDecodeError: 334 # We don't know what to do with this, so don't set the 335 # value 336 continue 337 att = getattr(aq_base(obj), attname, zenmarker) 338 if att == zenmarker: 339 log.warn('The attribute %s was not found on object %s from device %s', 340 attname, obj.id, device.id) 341 continue 342 if callable(att): 343 setter = getattr(obj, attname) 344 gettername = attname.replace("set","get") 345 getter = getattr(obj, gettername, None) 346 347 if not getter: 348 349 log.warn("getter '%s' not found on obj '%s', " 350 "skipping", gettername, obj.id) 351 352 else: 353 354 from plugins.DataMaps import MultiArgs 355 if isinstance(value, MultiArgs): 356 357 args = value.args 358 change = not isSameData(value.args, getter()) 359 360 else: 361 362 args = (value,) 363 try: 364 change = not isSameData(value, getter()) 365 except UnicodeDecodeError: 366 change = True 367 368 if change: 369 setter(*args) 370 self.logChange(device, obj, Change_Set, 371 "calling function '%s' with '%s' on " 372 "object %s" % (attname, value, obj.id)) 373 changed = True 374 375 else: 376 try: 377 change = not isSameData(att, value) 378 except UnicodeDecodeError: 379 change = True 380 if change: 381 setattr(aq_base(obj), attname, value) 382 self.logChange(device, obj, Change_Set, 383 "set attribute '%s' " 384 "to '%s' on object '%s'" % 385 (attname, value, obj.id)) 386 changed = True 387 if not changed: 388 try: changed = obj._p_changed 389 except: pass 390 if changed: 391 if getattr(aq_base(obj), "index_object", False): 392 log.debug("indexing object %s", obj.id) 393 obj.index_object() 394 notify(IndexingEvent(obj)) 395 else: 396 obj._p_deactivate() 397 return changed
398 399
400 - def _createRelObject(self, device, objmap, relname):
401 """Create an object on a relationship using its objmap. 402 """ 403 constructor = importClass(objmap.modname, objmap.classname) 404 if hasattr(objmap, 'id'): 405 remoteObj = constructor(objmap.id) 406 else: 407 remoteObj = constructor(device, objmap) 408 if remoteObj is None: 409 log.debug("Constructor returned None") 410 return False, None 411 id = remoteObj.id 412 if not remoteObj: 413 raise ObjectCreationError( 414 "failed to create object %s in relation %s" % (id, relname)) 415 416 realdevice = device.device() 417 if realdevice.isLockedFromUpdates(): 418 objtype = "" 419 try: objtype = objmap.modname.split(".")[-1] 420 except: pass 421 msg = "Add Blocked: %s '%s' on %s" % ( 422 objtype, id, realdevice.id) 423 log.warn(msg) 424 if realdevice.sendEventWhenBlocked(): 425 self.logEvent(realdevice, id, Change_Add_Blocked, 426 msg, Event.Warning) 427 return False, None 428 rel = device._getOb(relname, None) 429 if not rel: 430 raise ObjectCreationError( 431 "No relation %s found on device %s (%s)" % (relname, device.id, device.__class__ )) 432 #"No relation %s found on device %s" % (relname, device.id)) 433 changed = False 434 try: 435 remoteObj = rel._getOb(remoteObj.id) 436 except AttributeError: 437 self.logChange(realdevice, remoteObj, Change_Add, 438 "adding object %s to relationship %s" % 439 (remoteObj.id, relname)) 440 rel._setObject(remoteObj.id, remoteObj) 441 remoteObj = rel._getOb(remoteObj.id) 442 changed = True 443 notify(ObjectMovedEvent(remoteObj, rel, remoteObj.id, rel, remoteObj.id)) 444 return self._updateObject(remoteObj, objmap) or changed, remoteObj
445 446
447 - def stop(self): pass
448
449 450 -class ApplyDataMapThread(threading.Thread, ApplyDataMap):
451 """ 452 Thread that applies datamaps to a device. It reads from a queue that 453 should have tuples of (devid, datamaps) where devid is the primaryId to 454 the device and datamps is a list of datamaps to apply. Cache is synced at 455 the start of each transaction and there is one transaction per device. 456 """ 457
458 - def __init__(self, datacollector, app):
459 threading.Thread.__init__(self) 460 ApplyDataMap.__init__(self, datacollector) 461 self.setName("ApplyDataMapThread") 462 self.setDaemon(1) 463 self.app = app 464 log.debug("Thread conn:%s", self.app._p_jar) 465 self.inputqueue = Queue.Queue() 466 self.done = False
467 468
469 - def processClient(self, device, collectorClient):
470 """Apply datamps to device. 471 """ 472 devpath = device.getPrimaryPath() 473 self.inputqueue.put((devpath, collectorClient))
474 475
476 - def run(self):
477 """Process collectorClients as they are passed in from a data collector. 478 """ 479 log.info("starting applyDataMap thread") 480 while not self.done or not self.inputqueue.empty(): 481 devpath = () 482 try: 483 devpath, collectorClient = self.inputqueue.get(True,1) 484 self.app._p_jar.sync() 485 device = getObjByPath(self.app, devpath) 486 ApplyDataMap.processClient(self, device, collectorClient) 487 except Queue.Empty: pass 488 except (SystemExit, KeyboardInterrupt): raise 489 except: 490 transaction.abort() 491 log.exception("processing device %s", "/".join(devpath)) 492 log.info("stopping applyDataMap thread")
493 494
495 - def stop(self):
496 """Stop the thread once all devices are processed. 497 """ 498 self.done = True 499 self.join()
500