1
2
3
4
5
6
7
8
9
10
11
12
13 from twisted.internet import reactor
14
15 import signal
16 import multiprocessing
17 import time
18 import socket
19 from datetime import datetime, timedelta
20
21 import Globals
22 from zope.component import getUtility, getUtilitiesFor
23 from zope.interface import implements
24
25 from amqplib.client_0_8.exceptions import AMQPConnectionException
26 from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender
27 from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask
28 from Products.ZenUtils.ZCmdBase import ZCmdBase
29 from Products.ZenUtils.ZenDaemon import ZenDaemon
30 from Products.ZenUtils.guid import guid
31 from zenoss.protocols.interfaces import IAMQPConnectionInfo, IQueueSchema
32 from zenoss.protocols.eventlet.amqp import getProtobufPubSub
33 from zenoss.protocols.protobufs.zep_pb2 import ZepRawEvent, Event
34 from zenoss.protocols.eventlet.amqp import Publishable
35 from zenoss.protocols.jsonformat import from_dict
36 from Products.ZenMessaging.queuemessaging.eventlet import BasePubSubMessageTask
37 from Products.ZenEvents.events2.processing import *
38 from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions
39 from Products.ZenEvents.interfaces import IPreEventPlugin, IPostEventPlugin
40
41 import logging
42 log = logging.getLogger("zen.eventd")
43
45
46 implements(IQueueConsumerTask)
47
48 SYNC_EVERY_EVENT = False
49
51 self.dmd = dmd
52 self._queueSchema = getUtility(IQueueSchema)
53 self.dest_routing_key_prefix = 'zenoss.zenevent'
54
55 self._dest_exchange = self._queueSchema.getExchange("$ZepZenEvents")
56 self._manager = Manager(self.dmd)
57 self._pipes = (
58 EventPluginPipe(self._manager, IPreEventPlugin, 'PreEventPluginPipe'),
59 CheckInputPipe(self._manager),
60 IdentifierPipe(self._manager),
61 AddDeviceContextAndTagsPipe(self._manager),
62 TransformAndReidentPipe(self._manager,
63 TransformPipe(self._manager),
64 [
65 IdentifierPipe(self._manager),
66 UpdateDeviceContextAndTagsPipe(self._manager),
67 ]),
68 AssignDefaultEventClassAndTagPipe(self._manager),
69 FingerprintPipe(self._manager),
70 SerializeContextPipe(self._manager),
71 EventPluginPipe(self._manager, IPostEventPlugin, 'PostEventPluginPipe'),
72 ClearClassRefreshPipe(self._manager),
73 )
74
75 if not self.SYNC_EVERY_EVENT:
76
77
78 self.nextSync = datetime.now()
79 self.syncInterval = timedelta(0,0,500000)
80
82 return (self.dest_routing_key_prefix +
83 event.event.event_class.replace('/', '.').lower())
84
86 """
87 Handles a queue message, can call "acknowledge" on the Queue Consumer
88 class when it is done with the message
89 """
90
91 if self.SYNC_EVERY_EVENT:
92 doSync = True
93 else:
94
95 currentTime = datetime.now()
96 doSync = currentTime > self.nextSync
97 self.nextSync = currentTime + self.syncInterval
98
99 if doSync:
100 self.dmd._p_jar.sync()
101
102 try:
103 retry = True
104 processed = False
105 while not processed:
106 try:
107
108 zepevent = ZepRawEvent()
109 zepevent.event.CopyFrom(message)
110 if log.isEnabledFor(logging.DEBUG):
111 log.debug("Received event: %s", to_dict(zepevent.event))
112
113 eventContext = EventContext(log, zepevent)
114
115 for pipe in self._pipes:
116 eventContext = pipe(eventContext)
117 if log.isEnabledFor(logging.DEBUG):
118 log.debug('After pipe %s, event context is %s' % ( pipe.name, to_dict(eventContext.zepRawEvent) ))
119 if eventContext.event.status == STATUS_DROPPED:
120 raise DropEvent('Dropped by %s' % pipe, eventContext.event)
121
122 processed = True
123
124 except AttributeError:
125
126
127 if retry:
128 retry=False
129 log.debug("Resetting connection to catalogs")
130 self._manager.reset()
131 else:
132 raise
133
134 except DropEvent:
135
136 raise
137 except Exception as e:
138 log.info("Failed to process event, forward original raw event: %s", to_dict(zepevent.event))
139
140
141 if not isinstance(e, ProcessingException):
142 log.exception(e)
143
144
145
146 origzepevent = ZepRawEvent()
147 origzepevent.event.CopyFrom(message)
148 failReportEvent = dict(
149 uuid = guid.generate(),
150 created_time = int(time.time()*1000),
151 fingerprint='|'.join(['zeneventd', 'processMessage', repr(e)]),
152
153 eventClass='/',
154 summary='Internal exception processing event: %r' % e,
155 message='Internal exception processing event: %r/%s' % (e, to_dict(origzepevent.event)),
156 severity=4,
157 )
158 zepevent = ZepRawEvent()
159 zepevent.event.CopyFrom(from_dict(Event, failReportEvent))
160 eventContext = EventContext(log, zepevent)
161 eventContext.eventProxy.device = 'zeneventd'
162 eventContext.eventProxy.component = 'processMessage'
163
164 if log.isEnabledFor(logging.DEBUG):
165 log.debug("Publishing event: %s", to_dict(eventContext.zepRawEvent))
166
167 yield Publishable(eventContext.zepRawEvent,
168 exchange=self._dest_exchange,
169 routingKey=self._routing_key(
170 eventContext.zepRawEvent))
171
172
174
176 super(EventDWorker, self).__init__()
177 self._amqpConnectionInfo = getUtility(IAMQPConnectionInfo)
178 self._queueSchema = getUtility(IQueueSchema)
179
185
187 self._shutdown = True
188 if self._pubsub:
189 self._pubsub.shutdown()
190 self._pubsub = None
191
192 - def _sigterm(self, signum=None, frame=None):
195
196 - def _listen(self, task, retry_wait=30):
197 self._pubsub = None
198 keepTrying = True
199 sleep = 0
200 while keepTrying and not self._shutdown:
201 try:
202 if sleep:
203 log.info("Waiting %s seconds to reconnect..." % sleep)
204 time.sleep(sleep)
205 sleep = min(retry_wait, sleep * 2)
206 else:
207 sleep = .1
208 log.info("Connecting to RabbitMQ...")
209 self._pubsub = getProtobufPubSub(self._amqpConnectionInfo, self._queueSchema, '$RawZenEvents')
210 self._pubsub.registerHandler('$Event', task)
211 self._pubsub.registerExchange('$ZepZenEvents')
212
213 sleep=0
214 self._pubsub.run()
215 except (socket.error, AMQPConnectionException) as e:
216 log.warn("RabbitMQ Connection error %s" % e)
217 except KeyboardInterrupt:
218 keepTrying = False
219 finally:
220 if self._pubsub:
221 self._pubsub.shutdown()
222 self._pubsub = None
223
225 super(EventDWorker, self).buildOptions()
226 self.parser.add_option('--workers',
227 type="int",
228 default=2,
229 help="The number of event processing workers to run "
230 "(ignored when running in the foreground)")
231
233 """
234 Don't ever allow a worker to be a daemon
235 """
236 super(EventDWorker, self).parseOptions()
237 self.options.daemon = False
238
239
241 name = multiprocessing.current_process().name
242 pid = multiprocessing.current_process().pid
243 log.info("Starting: %s (pid %s)" % (name, pid))
244 try:
245 worker = EventDWorker()
246 worker.run()
247 finally:
248 log.debug("Shutting down: %s" % (name,))
249
250
252
254 super(ZenEventD, self).__init__(*args, **kwargs)
255 self._heartbeatSender = QueueHeartbeatSender('localhost',
256 'zeneventd',
257 self.options.maintenancecycle *3)
258 self._workers = ProcessWorkers(self.options.workers, run_worker,
259 "Event worker")
260 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle,
261 self._heartbeatSender)
262
264 log.info("Shutting down...")
265 self._maintenanceCycle.stop()
266 self._workers.shutdown()
267
279
281 log.debug ('_sigUSR1_called %s' % signum)
282 self._workers.sendSignal(signum)
283
285 super(ZenEventD, self).buildOptions()
286
287 workersBuildOptions(self.parser, default=2)
288 maintenanceBuildOptions(self.parser)
289
290
291 self.parser.add_option('-R', '--dataroot',
292 dest="dataroot",
293 default="/zport/dmd",
294 help="root object for data load (i.e. /zport/dmd)")
295 self.parser.add_option('--cachesize',
296 dest="cachesize",default=1000, type='int',
297 help="in memory cachesize default: 1000")
298 self.parser.add_option('--host',
299 dest="host",default="localhost",
300 help="hostname of MySQL object store")
301 self.parser.add_option('--port',
302 dest="port", type="int", default=3306,
303 help="port of MySQL object store")
304 self.parser.add_option('--mysqluser', dest='mysqluser', default='zenoss',
305 help='username for MySQL object store')
306 self.parser.add_option('--mysqlpasswd', dest='mysqlpasswd', default='zenoss',
307 help='passwd for MySQL object store')
308 self.parser.add_option('--mysqldb', dest='mysqldb', default='zodb',
309 help='Name of database for MySQL object store')
310 self.parser.add_option('--mysqlsocket', dest='mysqlsocket', default=None,
311 help='Name of socket file for MySQL server connection')
312 self.parser.add_option('--cacheservers', dest='cacheservers', default="",
313 help='memcached servers to use for object cache (eg. 127.0.0.1:11211)')
314 self.parser.add_option('--poll-interval', dest='pollinterval', default=None, type='int',
315 help='Defer polling the database for the specified maximum time interval, in seconds.'
316 ' This will default to 60 only if --cacheservers is set.')
317 self.parser.add_option('--synceveryevent', dest='syncEveryEvent',
318 action="store_true", default=False,
319 help='Force sync() before processing every event; default is to sync() no more often '
320 'than once every 1/2 second.')
321
322
323
324 if __name__ == '__main__':
325 zed = ZenEventD()
326 zed.run()
327