1
2
3
4
5
6
7
8
9
10
11
12
13
14 from Products.ZenEvents.events2.fields import EventField
15 from Products.ZenEvents.interfaces import IEventIdentifierPlugin
16 from Products.ZenModel.Device import Device
17 from Products.ZenModel.DeviceComponent import DeviceComponent
18 from Products.ZenModel.DataRoot import DataRoot
19 from Products.ZenEvents.events2.proxy import ZepRawEventProxy, EventProxy
20 from Products.ZenUtils.guid.interfaces import IGUIDManager, IGlobalIdentifier
21 from Products.ZenUtils.IpUtil import ipToDecimal, IpAddressError
22 from Products.Zuul.interfaces import ICatalogTool
23 from Products.AdvancedQuery import Eq, Or
24 from zope.component import getUtilitiesFor
25 from Acquisition import aq_chain
26 from Products.ZenEvents import ZenEventClasses
27
28 from zenoss.protocols.jsonformat import to_dict
29 from zenoss.protocols.protobufs.model_pb2 import DEVICE, COMPONENT
30 from zenoss.protocols.protobufs.zep_pb2 import (
31 STATUS_NEW,
32 STATUS_CLOSED,
33 STATUS_DROPPED,
34 )
35
36 import logging
37
38 log = logging.getLogger("zen.eventd")
41 - def __init__(self, message, event=None):
44
46 """
47 Raised when an event should be dropped from the queue.
48 """
49 pass
50
52 """
53 A logging adapter that adds the event UUID to the log output.
54 """
55
57 msg = '[{event_uuid}] {msg}'.format(event_uuid=self.extra['event_uuid'],
58 msg=msg)
59 return msg, kwargs
60
62 """
63 Provides lookup access to processing pipes and performs caching.
64 """
65
66 ELEMENT_TYPE_MAP = {
67 DEVICE: Device,
68 COMPONENT: DeviceComponent,
69 }
70
74
76 self._guidManager = IGUIDManager(self.dmd)
77
78 self._devices = self.dmd._getOb('Devices')
79 self._networks = self.dmd._getOb('Networks')
80 self._events = self.dmd._getOb('Events')
81
82 self._catalogs = {
83 DEVICE: self._devices,
84 }
85
88
90 try:
91 return self._events.getOrganizer(eventClassName)
92 except KeyError:
93
94 return None
95
102
104 """
105 Get a Device/Component by UUID
106 """
107 if uuid:
108 return self._guidManager.getObject(uuid)
109
111 """
112 Find element by ID but only cache UUID. This forces us to lookup elements
113 each time by UUID (pretty fast) which gives us a chance to see if the element
114 has been deleted.
115 """
116 cls = self.ELEMENT_TYPE_MAP.get(element_type_id)
117 if cls:
118 catalog = catalog or self._catalogs.get(element_type_id)
119 if catalog:
120 results = ICatalogTool(catalog).search(cls,
121 query=Or(Eq('id', id),
122 Eq('name', id)),
123 filterPermissions=False)
124
125 if results.total:
126 return results.results.next().uuid
127
145
149
151 """
152 This will return the device's
153 @type identifier: string
154 @param identifier: The IP address or id of a device
155 @type ipaddress: string
156 @param ipaddress: The known ipaddress of the device
157 """
158 cat = ICatalogTool(self._devices)
159
160 if ipAddress:
161 try:
162 ipAddress = str(ipToDecimal(ipAddress))
163 except IpAddressError:
164 ipAddress = None
165
166 if identifier and not ipAddress:
167 try:
168 ipAddress = str(ipToDecimal(identifier))
169 except IpAddressError:
170 pass
171
172 querySet = Or(Eq('id', identifier),
173 Eq('name', identifier),
174 Eq('ipAddress', ipAddress))
175
176 results = cat.search(types=Device, query=querySet, limit=1, filterPermissions=False)
177
178 if results.total:
179 return results.results.next().uuid
180 else:
181 querySet = Eq('ipAddress', ipAddress)
182
183
184 results = cat.search(types=DeviceComponent, query=querySet, limit=1, filterPermissions=False)
185 if results.total:
186 return self.getElementUuid(
187 results.results.next().getObject().device())
188 else:
189 return None
190
195
197 """
198 Looks up all the UUIDs in the tree path of an Organizer
199 """
200 uuids = set()
201 acquisition_chain = []
202 for n in aq_chain(node.primaryAq()):
203 if isinstance(n, DataRoot):
204 acquisition_chain.pop()
205 break
206 acquisition_chain.append(n)
207
208 if acquisition_chain:
209 for obj in filter(None, acquisition_chain):
210 try:
211 uuids.add(self.getElementUuid(obj))
212 except TypeError:
213 log.debug("Unable to get a uuid for %s " % obj)
214
215 return filter(None, uuids)
216
217
218 -class EventContext(object):
219 """
220 Maintains the event context while processing.
221 """
222
223 - def __init__(self, log, zepRawEvent):
224 self._zepRawEvent = zepRawEvent
225 self._event = self._zepRawEvent.event
226 self._eventProxy = ZepRawEventProxy(self._zepRawEvent)
227
228
229 self._deviceObject = None
230 self._componentObject = None
231 self.log = EventLoggerAdapter(log, {'event_uuid': self._event.uuid})
232
233 - def setDeviceObject(self, device):
234 self._deviceObject = device
235
237 self._eventProxy._refreshClearClasses()
238
239 @property
240 - def deviceObject(self):
241 return self._deviceObject
242
243 - def setComponentObject(self, component):
244 self._componentObject = component
245
246 @property
247 - def componentObject(self):
248 return self._componentObject
249
250 @property
251 - def zepRawEvent(self):
252 return self._zepRawEvent
253
254 @property
257
258 @property
259 - def eventProxy(self):
260 """
261 A EventProxy that wraps the event protobuf and makes it look like an old style event.
262 """
263 return self._eventProxy
264
266 """
267 An event context handler that is called in a chain.
268 """
269 dependencies = []
270
271 - def __init__(self, manager, name=None):
277
279 """
280 Called in a chain, must return modified eventContext.
281 """
282 raise NotImplementedError()
283
310
317
319 - def _resolveElement(self, evtProcessorManager, catalog, eventContext, type_id_field,
320 identifier_field, uuid_field):
321 """
322 Lookup an element by identifier or uuid and make sure both
323 identifier and uuid are set.
324 """
325 actor = eventContext.event.actor
326 if actor.HasField(type_id_field):
327 if not (actor.HasField(identifier_field) and actor.HasField(uuid_field)):
328 if actor.HasField(uuid_field):
329 uuid = getattr(actor, uuid_field, None)
330 element = evtProcessorManager.getElementByUuid(uuid)
331 if element:
332 eventContext.log.debug('Identified element %s by uuid %s',
333 element, uuid)
334 setattr(actor, identifier_field, element.id)
335 else:
336 eventContext.log.warning('Could not find element by uuid %s'
337 , uuid)
338
339 elif actor.HasField(identifier_field):
340 type_id = getattr(actor, type_id_field, None)
341 identifier = getattr(actor, identifier_field, None)
342 if type_id == DEVICE:
343 element_uuid = evtProcessorManager.findDeviceUuid(identifier,
344 eventContext.eventProxy.ipAddress)
345 else:
346 element_uuid = evtProcessorManager.getElementUuidById(catalog,
347 type_id,
348 identifier)
349
350 if element_uuid:
351 eventContext.log.debug('Identified element %s by id %s',
352 element_uuid, identifier)
353 setattr(actor, uuid_field, element_uuid)
354 else:
355 eventContext.log.debug(
356 'Could not find element type %s with id %s', type_id
357 , identifier)
358 else:
359 if log.isEnabledFor(logging.DEBUG):
360 type_id = getattr(actor, type_id_field, None)
361 identifier = getattr(actor, identifier_field, None)
362 uuid = getattr(actor, uuid_field, None)
363 eventContext.log.debug('Element %s already fully identified by %s/%s', type_id, identifier, uuid)
364
366 """
367 Update eventContext in place, updating/resolving identifiers and respective uuid's
368 """
369 eventContext.log.debug('Identifying event (%s)' % self.__class__.__name__)
370
371
372 self._resolveElement(
373 evtProcessorManager,
374 None,
375 eventContext,
376 EventField.Actor.ELEMENT_TYPE_ID,
377 EventField.Actor.ELEMENT_IDENTIFIER,
378 EventField.Actor.ELEMENT_UUID
379 )
380
381
382 actor = eventContext.event.actor
383 if actor.HasField(EventField.Actor.ELEMENT_UUID):
384 parent = evtProcessorManager.getElementByUuid(actor.element_uuid)
385 else:
386 parent = None
387 self._resolveElement(
388 evtProcessorManager,
389 parent,
390 eventContext,
391 EventField.Actor.ELEMENT_SUB_TYPE_ID,
392 EventField.Actor.ELEMENT_SUB_IDENTIFIER,
393 EventField.Actor.ELEMENT_SUB_UUID
394 )
395
397 """
398 Resolves element uuids and identifiers to make sure both are populated.
399 """
400
401 dependencies = [CheckInputPipe]
402
422
424 """
425 Adds device and component info to the context and event proxy.
426 """
427 dependencies = [IdentifierPipe]
428
429
430 DEVICE_DEVICECLASS_TAG_KEY = EventProxy.DEVICE_CLASS_DETAIL_KEY
431 DEVICE_LOCATION_TAG_KEY = EventProxy.DEVICE_LOCATION_DETAIL_KEY
432 DEVICE_SYSTEMS_TAG_KEY = EventProxy.DEVICE_SYSTEMS_DETAIL_KEY
433 DEVICE_GROUPS_TAG_KEY = EventProxy.DEVICE_GROUPS_DETAIL_KEY
434
435 DEVICE_TAGGERS = {
436 DEVICE_DEVICECLASS_TAG_KEY : (lambda device: device.deviceClass(), 'DeviceClass'),
437 DEVICE_LOCATION_TAG_KEY : (lambda device: device.location(), 'Location'),
438 DEVICE_SYSTEMS_TAG_KEY : (lambda device: device.systems(), 'Systems'),
439 DEVICE_GROUPS_TAG_KEY : (lambda device: device.groups(), 'DeviceGroups'),
440 }
441
443 if orgtypename not in orgs:
444 return
445
446 orgnames = orgs[orgtypename]
447 if orgnames:
448 if asDelimitedList:
449 detailOrgnames = orgnames
450 proxyOrgname = '|' + '|'.join(orgnames)
451 else:
452
453 detailOrgnames = orgnames[0]
454 proxyOrgname = orgnames
455 evtproxy.setReadOnly(orgtypename, proxyOrgname)
456 evtproxy.details[proxydetailkey] = detailOrgnames
457
471
473 evtproxy = eventContext.eventProxy
474 self._addDeviceOrganizerNames(orgs, 'Location', evtproxy, EventProxy.DEVICE_LOCATION_DETAIL_KEY)
475 self._addDeviceOrganizerNames(orgs, 'DeviceClass', evtproxy, EventProxy.DEVICE_CLASS_DETAIL_KEY)
476 self._addDeviceOrganizerNames(orgs, 'DeviceGroups', evtproxy, EventProxy.DEVICE_GROUPS_DETAIL_KEY, asDelimitedList=True)
477 self._addDeviceOrganizerNames(orgs, 'Systems', evtproxy, EventProxy.DEVICE_SYSTEMS_DETAIL_KEY, asDelimitedList=True)
478
494
496 actor = eventContext.event.actor
497
498
499 element_type_id, element = self._findTypeIdAndElement(eventContext, False)
500 if element:
501 actor.element_identifier = element.id
502 elementTitle = element.titleOrId()
503 if elementTitle != actor.element_identifier:
504 actor.element_title = elementTitle
505
506 sub_element_type_id, sub_element = self._findTypeIdAndElement(eventContext, True)
507 if sub_element:
508 actor.element_sub_identifier = sub_element.id
509 subElementTitle = sub_element.titleOrId()
510 if subElementTitle != actor.element_sub_identifier:
511 actor.element_sub_title = subElementTitle
512
513 device = eventContext.deviceObject
514 if device is None:
515 if element_type_id == DEVICE:
516 device = element
517 elif sub_element_type_id == DEVICE:
518 device = sub_element
519
520 if device:
521 eventContext.setDeviceObject(device)
522
523
524
525 deviceOrgs = {}
526 for tagType, orgProcessValues in self.DEVICE_TAGGERS.iteritems():
527 getOrgFunc,orgTypeName = orgProcessValues
528 objList = getOrgFunc(device)
529 if objList:
530 if not isinstance(objList, list):
531 objList = [objList]
532 uuids = set(sum((self._manager.getUuidsOfPath(obj) for obj in objList), []))
533 if uuids:
534 eventContext.eventProxy.tags.addAll(tagType, uuids)
535
536
537
538 deviceOrgs[orgTypeName] = [obj.getOrganizerName() for obj in objList]
539
540 self._addDeviceContext(eventContext, device)
541 self._addDeviceOrganizers(eventContext, deviceOrgs)
542
543 component = eventContext.componentObject
544 if component is None:
545 if element_type_id == COMPONENT:
546 component = element
547 elif sub_element_type_id == COMPONENT:
548 component = sub_element
549
550 if component:
551 eventContext.setComponentObject(component)
552
553 return eventContext
554
591
592 -class SerializeContextPipe(EventProcessorPipe):
593 """
594 Takes fields added to the eventProxy that couldn't directly be mapped out of the
595 proxy and applies them to the event protobuf.
596 """
597 dependencies = [AddDeviceContextAndTagsPipe]
598
599 - def __call__(self, eventContext):
600 eventContext.log.debug('Saving context back to event')
601 return eventContext
602
604 """
605 If the event class has not yet been set by the time this pipe is reached, set
606 it to a default value.
607 """
625
660
694
746
748 - def __init__(self, manager, pluginInterface, name=''):
752
754 for name, plugin in self._eventPlugins:
755 try:
756 plugin.apply(eventContext._eventProxy, self._manager.dmd)
757 except Exception as e:
758 eventContext.log.error(
759 'Event plugin %s encountered an error -- skipping.' % name)
760 eventContext.log.exception(e)
761 continue
762
763 return eventContext
764
769
771
773 self.exceptionClass = exceptionClass
774
776 raise self.exceptionClass('Testing pipe processing failure')
777