Package Products :: Package ZenEvents :: Package events2 :: Module processing
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.events2.processing

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  from Products.ZenEvents.events2.fields import EventField 
 15  from Products.ZenEvents.interfaces import IEventIdentifierPlugin 
 16  from Products.ZenModel.Device import Device 
 17  from Products.ZenModel.DeviceComponent import DeviceComponent 
 18  from Products.ZenModel.DataRoot import DataRoot 
 19  from Products.ZenEvents.events2.proxy import ZepRawEventProxy, EventProxy 
 20  from Products.ZenUtils.guid.interfaces import IGUIDManager, IGlobalIdentifier 
 21  from Products.ZenUtils.IpUtil import ipToDecimal, IpAddressError 
 22  from Products.Zuul.interfaces import ICatalogTool 
 23  from Products.AdvancedQuery import Eq, Or 
 24  from zope.component import getUtilitiesFor 
 25  from Acquisition import aq_chain 
 26  from Products.ZenEvents import ZenEventClasses 
 27   
 28  from zenoss.protocols.jsonformat import to_dict 
 29  from zenoss.protocols.protobufs.model_pb2 import DEVICE, COMPONENT 
 30  from zenoss.protocols.protobufs.zep_pb2 import ( 
 31      STATUS_NEW, 
 32      STATUS_CLOSED, 
 33      STATUS_DROPPED, 
 34      ) 
 35   
 36  import logging 
 37   
 38  log = logging.getLogger("zen.eventd") 
39 40 -class ProcessingException(Exception):
41 - def __init__(self, message, event=None):
42 super(ProcessingException, self).__init__(message) 43 self.event = event
44
45 -class DropEvent(ProcessingException):
46 """ 47 Raised when an event should be dropped from the queue. 48 """ 49 pass
50
51 -class EventLoggerAdapter(logging.LoggerAdapter):
52 """ 53 A logging adapter that adds the event UUID to the log output. 54 """ 55
56 - def process(self, msg, kwargs):
57 msg = '[{event_uuid}] {msg}'.format(event_uuid=self.extra['event_uuid'], 58 msg=msg) 59 return msg, kwargs
60
61 -class Manager(object):
62 """ 63 Provides lookup access to processing pipes and performs caching. 64 """ 65 66 ELEMENT_TYPE_MAP = { 67 DEVICE: Device, 68 COMPONENT: DeviceComponent, 69 } 70
71 - def __init__(self, dmd):
72 self.dmd = dmd 73 self._initCatalogs()
74
75 - def _initCatalogs(self):
76 self._guidManager = IGUIDManager(self.dmd) 77 78 self._devices = self.dmd._getOb('Devices') 79 self._networks = self.dmd._getOb('Networks') 80 self._events = self.dmd._getOb('Events') 81 82 self._catalogs = { 83 DEVICE: self._devices, 84 }
85
86 - def reset(self):
87 self._initCatalogs()
88
89 - def getEventClassOrganizer(self, eventClassName):
90 try: 91 return self._events.getOrganizer(eventClassName) 92 except KeyError: 93 # Unknown organizer 94 return None
95
96 - def lookupEventClass(self, eventContext):
97 """ 98 Find a Device's EventClass 99 """ 100 return self._events.lookup(eventContext.eventProxy, 101 eventContext.deviceObject)
102
103 - def getElementByUuid(self, uuid):
104 """ 105 Get a Device/Component by UUID 106 """ 107 if uuid: 108 return self._guidManager.getObject(uuid)
109
110 - def getElementUuidById(self, catalog, element_type_id, id):
111 """ 112 Find element by ID but only cache UUID. This forces us to lookup elements 113 each time by UUID (pretty fast) which gives us a chance to see if the element 114 has been deleted. 115 """ 116 cls = self.ELEMENT_TYPE_MAP.get(element_type_id) 117 if cls: 118 catalog = catalog or self._catalogs.get(element_type_id) 119 if catalog: 120 results = ICatalogTool(catalog).search(cls, 121 query=Or(Eq('id', id), 122 Eq('name', id)), 123 filterPermissions=False) 124 125 if results.total: 126 return results.results.next().uuid
127
128 - def getElementById(self, catalog, element_type_id, id):
129 """ 130 Find element by ID, first checking a cache for UUIDs then using that UUID 131 to load the element. If the element can't be found by UUID, the UUID 132 cache is cleared and lookup tried again. 133 """ 134 uuid = self.getElementUuidById(catalog, element_type_id, id) 135 if uuid: 136 element = self.getElementByUuid(uuid) 137 if not element: 138 # Lookup cache must be invalid, try looking up again 139 self.getElementUuidById.clear() 140 log.warning( 141 'Clearing ElementUuidById cache becase we could not find %s' % uuid) 142 uuid = self.getElementUuidById(catalog, element_type_id, id) 143 element = self.getElementByUuid(uuid) 144 return element
145
146 - def getElementUuid(self, obj):
147 if obj: 148 return IGlobalIdentifier(obj).getGUID()
149
150 - def findDeviceUuid(self, identifier, ipAddress):
151 """ 152 This will return the device's 153 @type identifier: string 154 @param identifier: The IP address or id of a device 155 @type ipaddress: string 156 @param ipaddress: The known ipaddress of the device 157 """ 158 cat = ICatalogTool(self._devices) 159 160 if ipAddress: 161 try: 162 ipAddress = str(ipToDecimal(ipAddress)) 163 except IpAddressError: 164 ipAddress = None 165 166 if identifier and not ipAddress: 167 try: 168 ipAddress = str(ipToDecimal(identifier)) 169 except IpAddressError: 170 pass 171 172 querySet = Or(Eq('id', identifier), 173 Eq('name', identifier), 174 Eq('ipAddress', ipAddress)) 175 176 results = cat.search(types=Device, query=querySet, limit=1, filterPermissions=False) 177 178 if results.total: 179 return results.results.next().uuid 180 else: 181 querySet = Eq('ipAddress', ipAddress) 182 183 # search the components 184 results = cat.search(types=DeviceComponent, query=querySet, limit=1, filterPermissions=False) 185 if results.total: 186 return self.getElementUuid( 187 results.results.next().getObject().device()) 188 else: 189 return None
190
191 - def findDevice(self, identifier, ipAddress):
192 uuid = self.findDeviceUuid(identifier, ipAddress) 193 if uuid: 194 return self.getElementByUuid(uuid)
195
196 - def getUuidsOfPath(self, node):
197 """ 198 Looks up all the UUIDs in the tree path of an Organizer 199 """ 200 uuids = set() 201 acquisition_chain = [] 202 for n in aq_chain(node.primaryAq()): 203 if isinstance(n, DataRoot): 204 acquisition_chain.pop() 205 break 206 acquisition_chain.append(n) 207 208 if acquisition_chain: 209 for obj in filter(None, acquisition_chain): 210 try: 211 uuids.add(self.getElementUuid(obj)) 212 except TypeError: 213 log.debug("Unable to get a uuid for %s " % obj) 214 215 return filter(None, uuids)
216
217 218 -class EventContext(object):
219 """ 220 Maintains the event context while processing. 221 """ 222
223 - def __init__(self, log, zepRawEvent):
224 self._zepRawEvent = zepRawEvent 225 self._event = self._zepRawEvent.event 226 self._eventProxy = ZepRawEventProxy(self._zepRawEvent) 227 228 # If this event is for a device, it will be attached here 229 self._deviceObject = None 230 self._componentObject = None 231 self.log = EventLoggerAdapter(log, {'event_uuid': self._event.uuid})
232
233 - def setDeviceObject(self, device):
234 self._deviceObject = device
235
236 - def refreshClearClasses(self):
237 self._eventProxy._refreshClearClasses()
238 239 @property
240 - def deviceObject(self):
241 return self._deviceObject
242
243 - def setComponentObject(self, component):
244 self._componentObject = component
245 246 @property
247 - def componentObject(self):
248 return self._componentObject
249 250 @property
251 - def zepRawEvent(self):
252 return self._zepRawEvent
253 254 @property
255 - def event(self):
256 return self._event
257 258 @property
259 - def eventProxy(self):
260 """ 261 A EventProxy that wraps the event protobuf and makes it look like an old style event. 262 """ 263 return self._eventProxy
264
265 -class EventProcessorPipe(object):
266 """ 267 An event context handler that is called in a chain. 268 """ 269 dependencies = [] 270
271 - def __init__(self, manager, name=None):
272 self._manager = manager 273 if name: 274 self.name = name 275 else: 276 self.name = self.__class__.__name__
277
278 - def __call__(self, eventContext):
279 """ 280 Called in a chain, must return modified eventContext. 281 """ 282 raise NotImplementedError()
283
284 -class CheckInputPipe(EventProcessorPipe):
285 """ 286 Validates that the event has required fields. 287 """ 288 REQUIRED_EVENT_FIELDS = ( 289 EventField.ACTOR, EventField.SUMMARY, EventField.SEVERITY) 290
291 - def __call__(self, eventContext):
292 missingFields = [] 293 for field in self.REQUIRED_EVENT_FIELDS: 294 if not eventContext.event.HasField(field): 295 missingFields.append(field) 296 297 if missingFields: 298 raise DropEvent('Required event fields %s not found' % ','.join( 299 missingFields), eventContext.event) 300 301 # Make sure summary and message are populated 302 if not eventContext.event.HasField( 303 'message') and eventContext.event.HasField('summary'): 304 eventContext.event.message = eventContext.event.summary 305 elif not eventContext.event.HasField( 306 'summary') and eventContext.event.HasField('message'): 307 eventContext.event.summary = eventContext.event.message[:255] 308 309 return eventContext
310
311 -class EventIdentifierPluginException(ProcessingException):
312 pass
313 -class EventIdentifierPluginFailure(EventIdentifierPluginException):
314 pass
315 -class EventIdentifierPluginAbort(EventIdentifierPluginException):
316 pass
317
318 -class BaseEventIdentifierPlugin(object):
319 - def _resolveElement(self, evtProcessorManager, catalog, eventContext, type_id_field, 320 identifier_field, uuid_field):
321 """ 322 Lookup an element by identifier or uuid and make sure both 323 identifier and uuid are set. 324 """ 325 actor = eventContext.event.actor 326 if actor.HasField(type_id_field): 327 if not (actor.HasField(identifier_field) and actor.HasField(uuid_field)): 328 if actor.HasField(uuid_field): 329 uuid = getattr(actor, uuid_field, None) 330 element = evtProcessorManager.getElementByUuid(uuid) 331 if element: 332 eventContext.log.debug('Identified element %s by uuid %s', 333 element, uuid) 334 setattr(actor, identifier_field, element.id) 335 else: 336 eventContext.log.warning('Could not find element by uuid %s' 337 , uuid) 338 339 elif actor.HasField(identifier_field): 340 type_id = getattr(actor, type_id_field, None) 341 identifier = getattr(actor, identifier_field, None) 342 if type_id == DEVICE: 343 element_uuid = evtProcessorManager.findDeviceUuid(identifier, 344 eventContext.eventProxy.ipAddress) 345 else: 346 element_uuid = evtProcessorManager.getElementUuidById(catalog, 347 type_id, 348 identifier) 349 350 if element_uuid: 351 eventContext.log.debug('Identified element %s by id %s', 352 element_uuid, identifier) 353 setattr(actor, uuid_field, element_uuid) 354 else: 355 eventContext.log.debug( 356 'Could not find element type %s with id %s', type_id 357 , identifier) 358 else: 359 if log.isEnabledFor(logging.DEBUG): 360 type_id = getattr(actor, type_id_field, None) 361 identifier = getattr(actor, identifier_field, None) 362 uuid = getattr(actor, uuid_field, None) 363 eventContext.log.debug('Element %s already fully identified by %s/%s', type_id, identifier, uuid)
364
365 - def resolveIdentifiers(self, eventContext, evtProcessorManager):
366 """ 367 Update eventContext in place, updating/resolving identifiers and respective uuid's 368 """ 369 eventContext.log.debug('Identifying event (%s)' % self.__class__.__name__) 370 371 # Get element, most likely a Device 372 self._resolveElement( 373 evtProcessorManager, 374 None, 375 eventContext, 376 EventField.Actor.ELEMENT_TYPE_ID, 377 EventField.Actor.ELEMENT_IDENTIFIER, 378 EventField.Actor.ELEMENT_UUID 379 ) 380 381 # Get element, most likely a Component 382 actor = eventContext.event.actor 383 if actor.HasField(EventField.Actor.ELEMENT_UUID): 384 parent = evtProcessorManager.getElementByUuid(actor.element_uuid) 385 else: 386 parent = None 387 self._resolveElement( 388 evtProcessorManager, 389 parent, 390 eventContext, 391 EventField.Actor.ELEMENT_SUB_TYPE_ID, 392 EventField.Actor.ELEMENT_SUB_IDENTIFIER, 393 EventField.Actor.ELEMENT_SUB_UUID 394 )
395
396 -class IdentifierPipe(EventProcessorPipe):
397 """ 398 Resolves element uuids and identifiers to make sure both are populated. 399 """ 400 401 dependencies = [CheckInputPipe] 402
403 - def __call__(self, eventContext):
404 eventContext.log.debug('Identifying event') 405 406 # get list of defined IEventIdentifierPlugins (add default identifier to the end) 407 evtIdentifierPlugins = list(getUtilitiesFor(IEventIdentifierPlugin)) 408 evtIdentifierPlugins.append(('default',BaseEventIdentifierPlugin())) 409 410 # iterate over all event identifier plugins 411 for name, plugin in evtIdentifierPlugins: 412 try: 413 eventContext.log.debug("running identifier plugin %s" % name) 414 plugin.resolveIdentifiers(eventContext, self._manager) 415 except EventIdentifierPluginAbort as e: 416 eventContext.log.debug(e) 417 raise 418 except EventIdentifierPluginException as e: 419 eventContext.log.debug(e) 420 421 return eventContext
422
423 -class AddDeviceContextAndTagsPipe(EventProcessorPipe):
424 """ 425 Adds device and component info to the context and event proxy. 426 """ 427 dependencies = [IdentifierPipe] 428 429 # use defined detail keys for consistent tag names 430 DEVICE_DEVICECLASS_TAG_KEY = EventProxy.DEVICE_CLASS_DETAIL_KEY 431 DEVICE_LOCATION_TAG_KEY = EventProxy.DEVICE_LOCATION_DETAIL_KEY 432 DEVICE_SYSTEMS_TAG_KEY = EventProxy.DEVICE_SYSTEMS_DETAIL_KEY 433 DEVICE_GROUPS_TAG_KEY = EventProxy.DEVICE_GROUPS_DETAIL_KEY 434 435 DEVICE_TAGGERS = { 436 DEVICE_DEVICECLASS_TAG_KEY : (lambda device: device.deviceClass(), 'DeviceClass'), 437 DEVICE_LOCATION_TAG_KEY : (lambda device: device.location(), 'Location'), 438 DEVICE_SYSTEMS_TAG_KEY : (lambda device: device.systems(), 'Systems'), 439 DEVICE_GROUPS_TAG_KEY : (lambda device: device.groups(), 'DeviceGroups'), 440 } 441
442 - def _addDeviceOrganizerNames(self, orgs, orgtypename, evtproxy, proxydetailkey, asDelimitedList=False):
443 if orgtypename not in orgs: 444 return 445 446 orgnames = orgs[orgtypename] 447 if orgnames: 448 if asDelimitedList: 449 detailOrgnames = orgnames 450 proxyOrgname = '|' + '|'.join(orgnames) 451 else: 452 # just use 0'th element 453 detailOrgnames = orgnames[0] 454 proxyOrgname = orgnames 455 evtproxy.setReadOnly(orgtypename, proxyOrgname) 456 evtproxy.details[proxydetailkey] = detailOrgnames
457
458 - def _addDeviceContext(self, eventContext, device):
459 evtproxy = eventContext.eventProxy 460 ipAddress = evtproxy.ipAddress or device.manageIp 461 if ipAddress: 462 evtproxy.ipAddress = ipAddress 463 464 prodState = device.productionState 465 if prodState: 466 evtproxy.prodState = prodState 467 468 devicePriority = device.getPriority() 469 if devicePriority: 470 evtproxy.DevicePriority = devicePriority
471
472 - def _addDeviceOrganizers(self, eventContext, orgs):
473 evtproxy = eventContext.eventProxy 474 self._addDeviceOrganizerNames(orgs, 'Location', evtproxy, EventProxy.DEVICE_LOCATION_DETAIL_KEY) 475 self._addDeviceOrganizerNames(orgs, 'DeviceClass', evtproxy, EventProxy.DEVICE_CLASS_DETAIL_KEY) 476 self._addDeviceOrganizerNames(orgs, 'DeviceGroups', evtproxy, EventProxy.DEVICE_GROUPS_DETAIL_KEY, asDelimitedList=True) 477 self._addDeviceOrganizerNames(orgs, 'Systems', evtproxy, EventProxy.DEVICE_SYSTEMS_DETAIL_KEY, asDelimitedList=True)
478
479 - def _findTypeIdAndElement(self, eventContext, sub_element):
480 actor = eventContext.event.actor 481 if sub_element: 482 type_id_field = EventField.Actor.ELEMENT_SUB_TYPE_ID 483 uuid_field = EventField.Actor.ELEMENT_SUB_UUID 484 else: 485 type_id_field = EventField.Actor.ELEMENT_TYPE_ID 486 uuid_field = EventField.Actor.ELEMENT_UUID 487 type_id = None 488 element = None 489 if actor.HasField(type_id_field): 490 type_id = getattr(actor, type_id_field) 491 if actor.HasField(uuid_field): 492 element = self._manager.getElementByUuid(getattr(actor, uuid_field)) 493 return type_id, element
494
495 - def __call__(self, eventContext):
496 actor = eventContext.event.actor 497 498 # Set identifier and title based on resolved object 499 element_type_id, element = self._findTypeIdAndElement(eventContext, False) 500 if element: 501 actor.element_identifier = element.id 502 elementTitle = element.titleOrId() 503 if elementTitle != actor.element_identifier: 504 actor.element_title = elementTitle 505 506 sub_element_type_id, sub_element = self._findTypeIdAndElement(eventContext, True) 507 if sub_element: 508 actor.element_sub_identifier = sub_element.id 509 subElementTitle = sub_element.titleOrId() 510 if subElementTitle != actor.element_sub_identifier: 511 actor.element_sub_title = subElementTitle 512 513 device = eventContext.deviceObject 514 if device is None: 515 if element_type_id == DEVICE: 516 device = element 517 elif sub_element_type_id == DEVICE: 518 device = sub_element 519 520 if device: 521 eventContext.setDeviceObject(device) 522 523 # find all organizers for this device, and add their uuids to 524 # the appropriate event tags 525 deviceOrgs = {} 526 for tagType, orgProcessValues in self.DEVICE_TAGGERS.iteritems(): 527 getOrgFunc,orgTypeName = orgProcessValues 528 objList = getOrgFunc(device) 529 if objList: 530 if not isinstance(objList, list): 531 objList = [objList] 532 uuids = set(sum((self._manager.getUuidsOfPath(obj) for obj in objList), [])) 533 if uuids: 534 eventContext.eventProxy.tags.addAll(tagType, uuids) 535 536 # save this list of organizers names of this type, to add their names 537 # to the device event context 538 deviceOrgs[orgTypeName] = [obj.getOrganizerName() for obj in objList] 539 540 self._addDeviceContext(eventContext, device) 541 self._addDeviceOrganizers(eventContext, deviceOrgs) 542 543 component = eventContext.componentObject 544 if component is None: 545 if element_type_id == COMPONENT: 546 component = element 547 elif sub_element_type_id == COMPONENT: 548 component = sub_element 549 550 if component: 551 eventContext.setComponentObject(component) 552 553 return eventContext
554
555 -class UpdateDeviceContextAndTagsPipe(AddDeviceContextAndTagsPipe):
556
557 - def __call__(self, eventContext):
558 evtproxy = eventContext.eventProxy 559 560 if eventContext.deviceObject is None: 561 # Clear title fields 562 actor = eventContext.event.actor 563 actor.ClearField(EventField.Actor.ELEMENT_TITLE) 564 actor.ClearField(EventField.Actor.ELEMENT_SUB_TITLE) 565 566 eventContext.log.debug("device was cleared, must purge references in current event: %s" % to_dict(eventContext._zepRawEvent)) 567 # clear out device-specific tags and details 568 deviceOrganizerTypeNames = list(type for function,type in self.DEVICE_TAGGERS.values()) 569 deviceDetailNames = set(deviceOrganizerTypeNames + 570 self.DEVICE_TAGGERS.keys() + 571 [ 572 EventProxy.DEVICE_IP_ADDRESS_DETAIL_KEY, 573 EventProxy.DEVICE_PRIORITY_DETAIL_KEY, 574 EventProxy.PRODUCTION_STATE_DETAIL_KEY, 575 ]) 576 577 # clear device context details 578 for detail in deviceDetailNames: 579 evtproxy.resetReadOnly(detail) 580 if detail in evtproxy.details: 581 del evtproxy.details[detail] 582 583 # clear device-dependent tags 584 evtproxy.tags.clearType(self.DEVICE_TAGGERS.keys()) 585 eventContext.log.debug("reset device values in event before reidentifying: %s" % to_dict(eventContext._zepRawEvent)) 586 587 return super(UpdateDeviceContextAndTagsPipe, self).__call__(eventContext) 588 589 else: 590 return eventContext
591
592 -class SerializeContextPipe(EventProcessorPipe):
593 """ 594 Takes fields added to the eventProxy that couldn't directly be mapped out of the 595 proxy and applies them to the event protobuf. 596 """ 597 dependencies = [AddDeviceContextAndTagsPipe] 598
599 - def __call__(self, eventContext):
600 eventContext.log.debug('Saving context back to event') 601 return eventContext
602
603 -class AssignDefaultEventClassAndTagPipe(EventProcessorPipe):
604 """ 605 If the event class has not yet been set by the time this pipe is reached, set 606 it to a default value. 607 """
608 - def __call__(self, eventContext):
609 eventClassName = eventContext.eventProxy.eventClass 610 # Set event class to Unknown if not specified 611 if not eventClassName: 612 eventContext.eventProxy.eventClass = eventClassName = ZenEventClasses.Unknown 613 614 # Define tags for this event class 615 eventClass = self._manager.getEventClassOrganizer(eventClassName) 616 if eventClass and not eventContext.eventProxy.tags.getByType(TransformPipe.EVENT_CLASS_TAG): 617 try: 618 eventClassUuids = self._manager.getUuidsOfPath(eventClass) 619 if eventClassUuids: 620 eventContext.eventProxy.tags.addAll(TransformPipe.EVENT_CLASS_TAG, eventClassUuids) 621 except (KeyError, AttributeError): 622 log.info("Event has nonexistent event class %s." % eventClass) 623 624 return eventContext
625
626 -class FingerprintPipe(EventProcessorPipe):
627 """ 628 Calculates event's fingerprint/dedupid. 629 """ 630 631 DEFAULT_FINGERPRINT_FIELDS = ( 632 'device', 'component', 'eventClass', 'eventKey', 'severity') 633 NO_EVENT_KEY_FINGERPRINT_FIELDS = ( 634 'device', 'component', 'eventClass', 'severity', 'summary') 635 636 dependencies = [AddDeviceContextAndTagsPipe] 637
638 - def __call__(self, eventContext):
639 event = eventContext.event 640 641 if event.HasField(EventField.FINGERPRINT): 642 fp = event.fingerprint 643 eventContext.eventProxy.dedupid = fp 644 eventContext.log.debug("incoming event has a preset fingerprint %s" % fp) 645 else: 646 dedupFields = self.DEFAULT_FINGERPRINT_FIELDS 647 if not (event.HasField(EventField.EVENT_KEY) and 648 getattr(event, EventField.EVENT_KEY, None)): 649 dedupFields = self.NO_EVENT_KEY_FINGERPRINT_FIELDS 650 651 dedupIdList = [str(getattr(eventContext.eventProxy, field, '')) for 652 field in dedupFields] 653 654 eventContext.eventProxy.dedupid = '|'.join(dedupIdList) 655 656 eventContext.log.debug('Created dedupid of %s from %s', 657 eventContext.eventProxy.dedupid, dedupIdList) 658 659 return eventContext
660
661 -class TransformAndReidentPipe(EventProcessorPipe):
662 dependencies = [AddDeviceContextAndTagsPipe] 663
664 - def __init__(self, manager, transformpipe, reidentpipes):
665 super(TransformAndReidentPipe, self).__init__(manager) 666 self.transformPipe = transformpipe 667 self.reidentpipes = reidentpipes
668
669 - def __call__(self, eventContext):
670 # save original values of device and component, to see if they get modified in the transform 671 original_device = eventContext.eventProxy.device 672 original_component = eventContext.eventProxy.component 673 674 # perform transform 675 eventContext = self.transformPipe(eventContext) 676 677 # see if we need to rerun indent/context pipes 678 if (eventContext.eventProxy.device != original_device or 679 eventContext.eventProxy.component != original_component): 680 681 # clear object references if device/components change 682 if eventContext.eventProxy.device != original_device: 683 eventContext.setDeviceObject(None) 684 eventContext.setComponentObject(None) 685 686 if eventContext.eventProxy.component != original_component: 687 eventContext.setComponentObject(None) 688 689 # rerun any pipes necessary to reidentify event 690 for pipe in self.reidentpipes: 691 eventContext = pipe(eventContext) 692 693 return eventContext
694
695 -class TransformPipe(EventProcessorPipe):
696 697 EVENT_CLASS_TAG = 'zenoss.event.event_class' 698 699 ACTION_HISTORY = 'history' 700 ACTION_DROP = 'drop' 701 ACTION_STATUS = 'status' 702 ACTION_HEARTBEAT = 'heartbeat' 703 ACTION_LOG = 'log' 704 ACTION_ALERT_STATE = 'alert_state' 705 ACTION_DETAIL = 'detail' 706 707 ACTION_STATUS_MAP = { 708 ACTION_HISTORY: STATUS_CLOSED, 709 ACTION_STATUS: STATUS_NEW, 710 ACTION_DROP: STATUS_DROPPED, 711 } 712
713 - def _tagEventClasses(self, eventContext, eventClass):
714 """ 715 Adds a set of tags for the hierarchy of event classes for this event 716 NOTE: We must tag the event classes at this part of the pipeline 717 before a mapping has been applied otherwise the mapping instance 718 won't be tagged, just the Event Class that was mapped. 719 """ 720 try: 721 eventClassUuids = self._manager.getUuidsOfPath(eventClass) 722 if eventClassUuids: 723 eventContext.eventProxy.tags.addAll(self.EVENT_CLASS_TAG, eventClassUuids) 724 except (KeyError, AttributeError): 725 log.info("Event has nonexistent event class %s." % eventClass)
726
727 - def __call__(self, eventContext):
728 eventContext.log.debug('Mapping and Transforming event') 729 apply_transforms = getattr(eventContext.event, 'apply_transforms', True) 730 if not apply_transforms: 731 eventContext.log.debug('Not applying transforms, regexes or zProperties because apply_transforms was false') 732 evtclass = self._manager.lookupEventClass(eventContext) 733 if evtclass: 734 self._tagEventClasses(eventContext, evtclass) 735 736 if apply_transforms: 737 evtclass.applyExtraction(eventContext.eventProxy) 738 evtclass.applyValues(eventContext.eventProxy) 739 if eventContext.eventProxy.eventClassMapping: 740 eventContext.event.event_class_mapping_uuid = IGlobalIdentifier(evtclass).getGUID() 741 if apply_transforms: 742 evtclass.applyTransform(eventContext.eventProxy, 743 eventContext.deviceObject, 744 eventContext.componentObject) 745 return eventContext
746
747 -class EventPluginPipe(EventProcessorPipe):
748 - def __init__(self, manager, pluginInterface, name=''):
749 super(EventPluginPipe, self).__init__(manager, name) 750 751 self._eventPlugins = tuple(getUtilitiesFor(pluginInterface))
752
753 - def __call__(self, eventContext):
754 for name, plugin in self._eventPlugins: 755 try: 756 plugin.apply(eventContext._eventProxy, self._manager.dmd) 757 except Exception as e: 758 eventContext.log.error( 759 'Event plugin %s encountered an error -- skipping.' % name) 760 eventContext.log.exception(e) 761 continue 762 763 return eventContext
764
765 -class ClearClassRefreshPipe(EventProcessorPipe):
766 - def __call__(self, eventContext):
767 eventContext.refreshClearClasses() 768 return eventContext
769
770 -class TestPipeExceptionPipe(EventProcessorPipe):
771 # pipe used for testing exception handling in event processor
772 - def __init__(self, exceptionClass=ProcessingException):
773 self.exceptionClass = exceptionClass
774
775 - def __call__(self, eventContext):
776 raise self.exceptionClass('Testing pipe processing failure')
777