Package Products :: Package ZenEvents :: Module zenactiond
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zenactiond

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import Globals 
 15  from traceback import format_exc 
 16  from email.MIMEText import MIMEText 
 17  from email.MIMEMultipart import MIMEMultipart 
 18  from email.Utils import formatdate 
 19  from twisted.internet import reactor, defer 
 20   
 21  from zenoss.protocols.queueschema import SchemaException 
 22  from zenoss.protocols import hydrateQueueMessage 
 23  from zenoss.protocols.interfaces import IQueueSchema 
 24  from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender 
 25  from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions, exec_worker 
 26   
 27  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 28  from Products.ZenUtils.Utils import getDefaultZopeUrl 
 29  from Products.ZenUtils.guid.interfaces import IGlobalIdentifier 
 30  from Products.ZenUtils.guid.guid import GUIDManager 
 31   
 32  from Products.ZenModel.NotificationSubscription import NotificationSubscriptionManager 
 33  from Products.ZenModel.actions import ActionMissingException, TargetableAction, ActionExecutionException 
 34  from Products.ZenModel.interfaces import IAction 
 35  from Products.ZenEvents.Event import Event 
 36  from Products.ZenMessaging.queuemessaging.QueueConsumer import QueueConsumer 
 37  from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask 
 38  from Products.ZenEvents.ZenEventClasses import Warning as SEV_WARNING 
 39  from zope.component import getUtility, getUtilitiesFor 
 40  from zope.component.interfaces import ComponentLookupError 
 41  from zope.interface import implements 
 42   
 43   
 44  import logging 
 45  log = logging.getLogger("zen.zenactiond") 
46 47 48 -class NotificationDao(object):
49 - def __init__(self, dmd):
50 self.dmd = dmd 51 self.notification_manager = self.dmd.getDmdRoot(NotificationSubscriptionManager.root) 52 self.guidManager = GUIDManager(dmd)
53
54 - def getNotifications(self):
55 self.dmd._p_jar.sync() 56 return self.notification_manager.getChildNodes()
57
58 - def getSignalNotifications(self, signal):
59 """ 60 Given a signal, find which notifications match this signal. In order to 61 match, a notification must be active (enabled and if has maintenance 62 windows, at least one must be active) and must be subscribed to the 63 signal. 64 65 @param signal: The signal for which to get subscribers. 66 @type signal: protobuf zep.Signal 67 """ 68 active_matching_notifications = [] 69 for notification in self.getNotifications(): 70 if notification.isActive(): 71 if self.notificationSubscribesToSignal(notification, signal): 72 active_matching_notifications.append(notification) 73 log.debug('Found matching notification: %s' % notification) 74 else: 75 log.debug('Notification "%s" does not subscribe to this signal.' % notification) 76 else: 77 log.debug('Notification "%s" is not active.' % notification) 78 79 return active_matching_notifications
80
81 - def notificationSubscribesToSignal(self, notification, signal):
82 """ 83 Determine if the notification matches the specified signal. 84 85 @param notification: The notification to check 86 @type notification: NotificationSubscription 87 @param signal: The signal to match. 88 @type signal: zenoss.protocols.protbufs.zep_pb2.Signal 89 90 @rtype boolean 91 """ 92 return signal.subscriber_uuid == IGlobalIdentifier(notification).getGUID()
93
94 -class ProcessSignalTask(object):
95 implements(IQueueConsumerTask) 96
97 - def __init__(self, notificationDao):
98 self.notificationDao = notificationDao 99 100 # set by the constructor of queueConsumer 101 self.queueConsumer = None 102 103 self.schema = getUtility(IQueueSchema) 104 self.queue = self.schema.getQueue("$Signals")
105
106 - def getAction(self, action):
107 try: 108 return getUtility(IAction, action) 109 except ComponentLookupError, e: 110 raise ActionMissingException(action)
111
112 - def processMessage(self, message):
113 """ 114 Handles a queue message, can call "acknowledge" on the Queue Consumer 115 class when it is done with the message 116 """ 117 log.debug('processing message.') 118 119 if message.content.body == self.queueConsumer.MARKER: 120 log.info("Received MARKER sentinel, exiting message loop") 121 self.queueConsumer.acknowledge(message) 122 return 123 try: 124 signal = hydrateQueueMessage(message, self.schema) 125 self.processSignal(signal) 126 log.debug('Done processing signal.') 127 except SchemaException: 128 log.error("Unable to hydrate protobuf %s. " % message.content.body) 129 self.queueConsumer.acknowledge(message) 130 except Exception, e: 131 log.exception(e) 132 # FIXME: Send to an error queue instead of acknowledge. 133 log.error('Acknowledging broken message.') 134 self.queueConsumer.acknowledge(message) 135 else: 136 log.debug('Acknowledging message. (%s)' % signal.message) 137 self.queueConsumer.acknowledge(message)
138
139 - def processSignal(self, signal):
140 matches = self.notificationDao.getSignalNotifications(signal) 141 log.debug('Found these matching notifications: %s' % matches) 142 143 trigger = self.notificationDao.guidManager.getObject(signal.trigger_uuid) 144 audit_event_trigger_info = "Event:'%s' Trigger:%s" % ( 145 signal.event.occurrence[0].fingerprint, 146 trigger.id) 147 for notification in matches: 148 if signal.clear and not notification.send_clear: 149 log.debug('Ignoring clearing signal since send_clear is set to False on this subscription %s' % notification.id) 150 continue 151 try: 152 target = signal.subscriber_uuid or '<none>' 153 action = self.getAction(notification.action) 154 action.setupAction(notification.dmd) 155 if isinstance(action, TargetableAction): 156 target = ','.join(action.getTargets(notification)) 157 action.execute(notification, signal) 158 except ActionMissingException, e: 159 log.error('Error finding action: {action}'.format(action = notification.action)) 160 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 161 audit_event_trigger_info, notification.action, "FAIL", target, "<action not found>") 162 except ActionExecutionException, aee: 163 log.error('Error executing action: {action} on notification {notification}'.format( 164 action = notification.action, 165 notification = notification.id, 166 )) 167 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 168 audit_event_trigger_info, notification.action, "FAIL", target, aee) 169 except Exception, e: 170 msg = 'Error executing action {notification}'.format( 171 notification = notification.id, 172 ) 173 log.exception(e) 174 log.error(msg) 175 traceback = format_exc() 176 event = Event(device="localhost", 177 eventClass="/App/Failed", 178 summary=msg, 179 message=traceback, 180 severity=SEV_WARNING, component="zenactiond") 181 self.dmd.ZenEventManager.sendEvent(event) 182 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 183 audit_event_trigger_info, notification.action, "FAIL", target, action.getInfo(notification)) 184 else: 185 # audit trail of performed actions 186 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 187 audit_event_trigger_info, notification.action, "SUCCESS", target, action.getInfo(notification)) 188 log.info(audit_msg) 189 log.debug('Done processing signal. (%s)' % signal.message)
190
191 -class ZenActionD(ZCmdBase):
192 - def __init__(self):
193 super(ZenActionD, self).__init__() 194 self._consumer = None 195 self._workers = ProcessWorkers(self.options.workers - 1, 196 exec_worker, 197 "zenactiond worker") 198 self._heartbeatSender = QueueHeartbeatSender('localhost', 199 'zenactiond', 200 self.options.maintenancecycle *3) 201 202 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle, 203 self._heartbeatSender)
204
205 - def buildOptions(self):
206 super(ZenActionD, self).buildOptions() 207 maintenanceBuildOptions(self.parser) 208 workersBuildOptions(self.parser, 1) 209 210 default_max_commands = 10 211 self.parser.add_option('--maxcommands', dest="maxCommands", type="int", default=default_max_commands, 212 help='Max number of action commands to perform concurrently (default: %d)' % \ 213 default_max_commands) 214 default_url = getDefaultZopeUrl() 215 self.parser.add_option('--zopeurl', dest='zopeurl', default=default_url, 216 help="http path to the root of the zope server (default: %s)" % default_url)
217 218
219 - def run(self):
220 # Configure all actions with the command-line options 221 options_dict = dict(vars(self.options)) 222 for name, action in getUtilitiesFor(IAction): 223 action.configure(options_dict) 224 225 task = ProcessSignalTask(NotificationDao(self.dmd)) 226 227 if self.options.daemon: 228 self._maintenanceCycle.start() 229 if self.options.daemon and self.options.workers > 1: 230 self._workers.startWorkers() 231 232 self._consumer = QueueConsumer(task, self.dmd) 233 reactor.callWhenRunning(self._start) 234 reactor.run()
235
236 - def _start(self):
237 log.info('starting zenactiond consumer.') 238 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) 239 self._consumer.run()
240 241 242 @defer.inlineCallbacks
243 - def _shutdown(self, *ignored):
244 log.info("Shutting down...") 245 self._maintenanceCycle.stop() 246 self._workers.shutdown() 247 if self._consumer: 248 yield self._consumer.shutdown()
249 250 251 if __name__ == '__main__': 252 zad = ZenActionD() 253 zad.run() 254