1
2
3
4
5
6
7
8
9
10
11
12
13
14 import Globals
15 from traceback import format_exc
16 from email.MIMEText import MIMEText
17 from email.MIMEMultipart import MIMEMultipart
18 from email.Utils import formatdate
19 from twisted.internet import reactor, defer
20
21 from zenoss.protocols.queueschema import SchemaException
22 from zenoss.protocols import hydrateQueueMessage
23 from zenoss.protocols.interfaces import IQueueSchema
24 from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender
25 from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions, exec_worker
26
27 from Products.ZenUtils.ZCmdBase import ZCmdBase
28 from Products.ZenUtils.Utils import getDefaultZopeUrl
29 from Products.ZenUtils.guid.interfaces import IGlobalIdentifier
30 from Products.ZenUtils.guid.guid import GUIDManager
31
32 from Products.ZenModel.NotificationSubscription import NotificationSubscriptionManager
33 from Products.ZenModel.actions import ActionMissingException, TargetableAction, ActionExecutionException
34 from Products.ZenModel.interfaces import IAction
35 from Products.ZenEvents.Event import Event
36 from Products.ZenMessaging.queuemessaging.QueueConsumer import QueueConsumer
37 from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask
38 from Products.ZenEvents.ZenEventClasses import Warning as SEV_WARNING
39 from zope.component import getUtility, getUtilitiesFor
40 from zope.component.interfaces import ComponentLookupError
41 from zope.interface import implements
42
43
44 import logging
45 log = logging.getLogger("zen.zenactiond")
53
55 self.dmd._p_jar.sync()
56 return self.notification_manager.getChildNodes()
57
59 """
60 Given a signal, find which notifications match this signal. In order to
61 match, a notification must be active (enabled and if has maintenance
62 windows, at least one must be active) and must be subscribed to the
63 signal.
64
65 @param signal: The signal for which to get subscribers.
66 @type signal: protobuf zep.Signal
67 """
68 active_matching_notifications = []
69 for notification in self.getNotifications():
70 if notification.isActive():
71 if self.notificationSubscribesToSignal(notification, signal):
72 active_matching_notifications.append(notification)
73 log.debug('Found matching notification: %s' % notification)
74 else:
75 log.debug('Notification "%s" does not subscribe to this signal.' % notification)
76 else:
77 log.debug('Notification "%s" is not active.' % notification)
78
79 return active_matching_notifications
80
82 """
83 Determine if the notification matches the specified signal.
84
85 @param notification: The notification to check
86 @type notification: NotificationSubscription
87 @param signal: The signal to match.
88 @type signal: zenoss.protocols.protbufs.zep_pb2.Signal
89
90 @rtype boolean
91 """
92 return signal.subscriber_uuid == IGlobalIdentifier(notification).getGUID()
93
95 implements(IQueueConsumerTask)
96
98 self.notificationDao = notificationDao
99
100
101 self.queueConsumer = None
102
103 self.schema = getUtility(IQueueSchema)
104 self.queue = self.schema.getQueue("$Signals")
105
111
113 """
114 Handles a queue message, can call "acknowledge" on the Queue Consumer
115 class when it is done with the message
116 """
117 log.debug('processing message.')
118
119 if message.content.body == self.queueConsumer.MARKER:
120 log.info("Received MARKER sentinel, exiting message loop")
121 self.queueConsumer.acknowledge(message)
122 return
123 try:
124 signal = hydrateQueueMessage(message, self.schema)
125 self.processSignal(signal)
126 log.debug('Done processing signal.')
127 except SchemaException:
128 log.error("Unable to hydrate protobuf %s. " % message.content.body)
129 self.queueConsumer.acknowledge(message)
130 except Exception, e:
131 log.exception(e)
132
133 log.error('Acknowledging broken message.')
134 self.queueConsumer.acknowledge(message)
135 else:
136 log.debug('Acknowledging message. (%s)' % signal.message)
137 self.queueConsumer.acknowledge(message)
138
140 matches = self.notificationDao.getSignalNotifications(signal)
141 log.debug('Found these matching notifications: %s' % matches)
142
143 trigger = self.notificationDao.guidManager.getObject(signal.trigger_uuid)
144 audit_event_trigger_info = "Event:'%s' Trigger:%s" % (
145 signal.event.occurrence[0].fingerprint,
146 trigger.id)
147 for notification in matches:
148 if signal.clear and not notification.send_clear:
149 log.debug('Ignoring clearing signal since send_clear is set to False on this subscription %s' % notification.id)
150 continue
151 try:
152 target = signal.subscriber_uuid or '<none>'
153 action = self.getAction(notification.action)
154 action.setupAction(notification.dmd)
155 if isinstance(action, TargetableAction):
156 target = ','.join(action.getTargets(notification))
157 action.execute(notification, signal)
158 except ActionMissingException, e:
159 log.error('Error finding action: {action}'.format(action = notification.action))
160 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
161 audit_event_trigger_info, notification.action, "FAIL", target, "<action not found>")
162 except ActionExecutionException, aee:
163 log.error('Error executing action: {action} on notification {notification}'.format(
164 action = notification.action,
165 notification = notification.id,
166 ))
167 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
168 audit_event_trigger_info, notification.action, "FAIL", target, aee)
169 except Exception, e:
170 msg = 'Error executing action {notification}'.format(
171 notification = notification.id,
172 )
173 log.exception(e)
174 log.error(msg)
175 traceback = format_exc()
176 event = Event(device="localhost",
177 eventClass="/App/Failed",
178 summary=msg,
179 message=traceback,
180 severity=SEV_WARNING, component="zenactiond")
181 self.dmd.ZenEventManager.sendEvent(event)
182 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
183 audit_event_trigger_info, notification.action, "FAIL", target, action.getInfo(notification))
184 else:
185
186 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
187 audit_event_trigger_info, notification.action, "SUCCESS", target, action.getInfo(notification))
188 log.info(audit_msg)
189 log.debug('Done processing signal. (%s)' % signal.message)
190
193 super(ZenActionD, self).__init__()
194 self._consumer = None
195 self._workers = ProcessWorkers(self.options.workers - 1,
196 exec_worker,
197 "zenactiond worker")
198 self._heartbeatSender = QueueHeartbeatSender('localhost',
199 'zenactiond',
200 self.options.maintenancecycle *3)
201
202 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle,
203 self._heartbeatSender)
204
206 super(ZenActionD, self).buildOptions()
207 maintenanceBuildOptions(self.parser)
208 workersBuildOptions(self.parser, 1)
209
210 default_max_commands = 10
211 self.parser.add_option('--maxcommands', dest="maxCommands", type="int", default=default_max_commands,
212 help='Max number of action commands to perform concurrently (default: %d)' % \
213 default_max_commands)
214 default_url = getDefaultZopeUrl()
215 self.parser.add_option('--zopeurl', dest='zopeurl', default=default_url,
216 help="http path to the root of the zope server (default: %s)" % default_url)
217
218
220
221 options_dict = dict(vars(self.options))
222 for name, action in getUtilitiesFor(IAction):
223 action.configure(options_dict)
224
225 task = ProcessSignalTask(NotificationDao(self.dmd))
226
227 if self.options.daemon:
228 self._maintenanceCycle.start()
229 if self.options.daemon and self.options.workers > 1:
230 self._workers.startWorkers()
231
232 self._consumer = QueueConsumer(task, self.dmd)
233 reactor.callWhenRunning(self._start)
234 reactor.run()
235
237 log.info('starting zenactiond consumer.')
238 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
239 self._consumer.run()
240
241
242 @defer.inlineCallbacks
244 log.info("Shutting down...")
245 self._maintenanceCycle.stop()
246 self._workers.shutdown()
247 if self._consumer:
248 yield self._consumer.shutdown()
249
250
251 if __name__ == '__main__':
252 zad = ZenActionD()
253 zad.run()
254