Package Products :: Package ZenEvents :: Module zeneventd
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zeneventd

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2010, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  from twisted.internet import reactor 
 14   
 15  import signal 
 16  import multiprocessing 
 17  import time 
 18  import socket 
 19  from datetime import datetime, timedelta 
 20   
 21  import Globals 
 22  from zope.component import getUtility, getUtilitiesFor 
 23  from zope.interface import implements 
 24   
 25  from amqplib.client_0_8.exceptions import AMQPConnectionException 
 26  from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender 
 27  from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask 
 28  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 29  from Products.ZenUtils.ZenDaemon import ZenDaemon 
 30  from Products.ZenUtils.guid import guid 
 31  from zenoss.protocols.interfaces import IAMQPConnectionInfo, IQueueSchema 
 32  from zenoss.protocols.eventlet.amqp import getProtobufPubSub 
 33  from zenoss.protocols.protobufs.zep_pb2 import ZepRawEvent, Event 
 34  from zenoss.protocols.eventlet.amqp import Publishable 
 35  from zenoss.protocols.jsonformat import from_dict 
 36  from Products.ZenMessaging.queuemessaging.eventlet import BasePubSubMessageTask 
 37  from Products.ZenEvents.events2.processing import * 
 38  from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions 
 39  from Products.ZenEvents.interfaces import IPreEventPlugin, IPostEventPlugin 
 40   
 41  import logging 
 42  log = logging.getLogger("zen.eventd") 
 43   
44 -class ProcessEventMessageTask(BasePubSubMessageTask):
45 46 implements(IQueueConsumerTask) 47 48 SYNC_EVERY_EVENT = False 49
50 - def __init__(self, dmd):
51 self.dmd = dmd 52 self._queueSchema = getUtility(IQueueSchema) 53 self.dest_routing_key_prefix = 'zenoss.zenevent' 54 55 self._dest_exchange = self._queueSchema.getExchange("$ZepZenEvents") 56 self._manager = Manager(self.dmd) 57 self._pipes = ( 58 EventPluginPipe(self._manager, IPreEventPlugin, 'PreEventPluginPipe'), 59 CheckInputPipe(self._manager), 60 IdentifierPipe(self._manager), 61 AddDeviceContextAndTagsPipe(self._manager), 62 TransformAndReidentPipe(self._manager, 63 TransformPipe(self._manager), 64 [ 65 IdentifierPipe(self._manager), 66 UpdateDeviceContextAndTagsPipe(self._manager), 67 ]), 68 AssignDefaultEventClassAndTagPipe(self._manager), 69 FingerprintPipe(self._manager), 70 SerializeContextPipe(self._manager), 71 EventPluginPipe(self._manager, IPostEventPlugin, 'PostEventPluginPipe'), 72 ClearClassRefreshPipe(self._manager), 73 ) 74 75 if not self.SYNC_EVERY_EVENT: 76 # don't call sync() more often than 1 every 0.5 sec - helps throughput 77 # when receiving events in bursts 78 self.nextSync = datetime.now() 79 self.syncInterval = timedelta(0,0,500000)
80
81 - def _routing_key(self, event):
82 return (self.dest_routing_key_prefix + 83 event.event.event_class.replace('/', '.').lower())
84
85 - def processMessage(self, message):
86 """ 87 Handles a queue message, can call "acknowledge" on the Queue Consumer 88 class when it is done with the message 89 """ 90 91 if self.SYNC_EVERY_EVENT: 92 doSync = True 93 else: 94 # sync() db if it has been longer than self.syncInterval since the last time 95 currentTime = datetime.now() 96 doSync = currentTime > self.nextSync 97 self.nextSync = currentTime + self.syncInterval 98 99 if doSync: 100 self.dmd._p_jar.sync() 101 102 try: 103 retry = True 104 processed = False 105 while not processed: 106 try: 107 # extract event from message body 108 zepevent = ZepRawEvent() 109 zepevent.event.CopyFrom(message) 110 if log.isEnabledFor(logging.DEBUG): 111 log.debug("Received event: %s", to_dict(zepevent.event)) 112 113 eventContext = EventContext(log, zepevent) 114 115 for pipe in self._pipes: 116 eventContext = pipe(eventContext) 117 if log.isEnabledFor(logging.DEBUG): 118 log.debug('After pipe %s, event context is %s' % ( pipe.name, to_dict(eventContext.zepRawEvent) )) 119 if eventContext.event.status == STATUS_DROPPED: 120 raise DropEvent('Dropped by %s' % pipe, eventContext.event) 121 122 processed = True 123 124 except AttributeError: 125 # _manager throws Attribute errors if connection to zope is lost - reset 126 # and retry ONE time 127 if retry: 128 retry=False 129 log.debug("Resetting connection to catalogs") 130 self._manager.reset() 131 else: 132 raise 133 134 except DropEvent: 135 # we want these to propagate out 136 raise 137 except Exception as e: 138 log.info("Failed to process event, forward original raw event: %s", to_dict(zepevent.event)) 139 # Pipes and plugins may raise ProcessingException's for their own reasons - only log unexpected 140 # exceptions of other type (will insert stack trace in log) 141 if not isinstance(e, ProcessingException): 142 log.exception(e) 143 144 # construct wrapper event to report this event processing failure (including content of the 145 # original event) 146 origzepevent = ZepRawEvent() 147 origzepevent.event.CopyFrom(message) 148 failReportEvent = dict( 149 uuid = guid.generate(), 150 created_time = int(time.time()*1000), 151 fingerprint='|'.join(['zeneventd', 'processMessage', repr(e)]), 152 # Don't send the *same* event class or we trash and and crash endlessly 153 eventClass='/', 154 summary='Internal exception processing event: %r' % e, 155 message='Internal exception processing event: %r/%s' % (e, to_dict(origzepevent.event)), 156 severity=4, 157 ) 158 zepevent = ZepRawEvent() 159 zepevent.event.CopyFrom(from_dict(Event, failReportEvent)) 160 eventContext = EventContext(log, zepevent) 161 eventContext.eventProxy.device = 'zeneventd' 162 eventContext.eventProxy.component = 'processMessage' 163 164 if log.isEnabledFor(logging.DEBUG): 165 log.debug("Publishing event: %s", to_dict(eventContext.zepRawEvent)) 166 167 yield Publishable(eventContext.zepRawEvent, 168 exchange=self._dest_exchange, 169 routingKey=self._routing_key( 170 eventContext.zepRawEvent))
171 172
173 -class EventDWorker(ZCmdBase):
174
175 - def __init__(self):
176 super(EventDWorker, self).__init__() 177 self._amqpConnectionInfo = getUtility(IAMQPConnectionInfo) 178 self._queueSchema = getUtility(IQueueSchema)
179
180 - def run(self):
181 self._shutdown = False 182 signal.signal(signal.SIGTERM, self._sigterm) 183 task = ProcessEventMessageTask(self.dmd) 184 self._listen(task)
185
186 - def shutdown(self):
187 self._shutdown = True 188 if self._pubsub: 189 self._pubsub.shutdown() 190 self._pubsub = None
191
192 - def _sigterm(self, signum=None, frame=None):
193 log.debug("worker sigterm...") 194 self.shutdown()
195
196 - def _listen(self, task, retry_wait=30):
197 self._pubsub = None 198 keepTrying = True 199 sleep = 0 200 while keepTrying and not self._shutdown: 201 try: 202 if sleep: 203 log.info("Waiting %s seconds to reconnect..." % sleep) 204 time.sleep(sleep) 205 sleep = min(retry_wait, sleep * 2) 206 else: 207 sleep = .1 208 log.info("Connecting to RabbitMQ...") 209 self._pubsub = getProtobufPubSub(self._amqpConnectionInfo, self._queueSchema, '$RawZenEvents') 210 self._pubsub.registerHandler('$Event', task) 211 self._pubsub.registerExchange('$ZepZenEvents') 212 #reset sleep time 213 sleep=0 214 self._pubsub.run() 215 except (socket.error, AMQPConnectionException) as e: 216 log.warn("RabbitMQ Connection error %s" % e) 217 except KeyboardInterrupt: 218 keepTrying = False 219 finally: 220 if self._pubsub: 221 self._pubsub.shutdown() 222 self._pubsub = None
223
224 - def buildOptions(self):
225 super(EventDWorker, self).buildOptions() 226 self.parser.add_option('--workers', 227 type="int", 228 default=2, 229 help="The number of event processing workers to run " 230 "(ignored when running in the foreground)")
231
232 - def parseOptions(self):
233 """ 234 Don't ever allow a worker to be a daemon 235 """ 236 super(EventDWorker, self).parseOptions() 237 self.options.daemon = False
238 239
240 -def run_worker():
241 name = multiprocessing.current_process().name 242 pid = multiprocessing.current_process().pid 243 log.info("Starting: %s (pid %s)" % (name, pid)) 244 try: 245 worker = EventDWorker() 246 worker.run() 247 finally: 248 log.debug("Shutting down: %s" % (name,))
249 250
251 -class ZenEventD(ZenDaemon):
252
253 - def __init__(self, *args, **kwargs):
254 super(ZenEventD, self).__init__(*args, **kwargs) 255 self._heartbeatSender = QueueHeartbeatSender('localhost', 256 'zeneventd', 257 self.options.maintenancecycle *3) 258 self._workers = ProcessWorkers(self.options.workers, run_worker, 259 "Event worker") 260 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle, 261 self._heartbeatSender)
262
263 - def _shutdown(self, *ignored):
264 log.info("Shutting down...") 265 self._maintenanceCycle.stop() 266 self._workers.shutdown()
267
268 - def run(self):
269 ProcessEventMessageTask.SYNC_EVERY_EVENT = self.options.syncEveryEvent 270 271 if self.options.daemon: 272 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) 273 self._workers.startWorkers() 274 self._maintenanceCycle.start() 275 reactor.run() 276 277 else: 278 EventDWorker().run()
279
280 - def _sigUSR1_called(self, signum, frame):
281 log.debug ('_sigUSR1_called %s' % signum) 282 self._workers.sendSignal(signum)
283
284 - def buildOptions(self):
285 super(ZenEventD, self).buildOptions() 286 287 workersBuildOptions(self.parser, default=2) 288 maintenanceBuildOptions(self.parser) 289 # Have to add in all the ZCmdBase options because they get passed 290 # through to the workers but will be invalid if not allowed here 291 self.parser.add_option('-R', '--dataroot', 292 dest="dataroot", 293 default="/zport/dmd", 294 help="root object for data load (i.e. /zport/dmd)") 295 self.parser.add_option('--cachesize', 296 dest="cachesize",default=1000, type='int', 297 help="in memory cachesize default: 1000") 298 self.parser.add_option('--host', 299 dest="host",default="localhost", 300 help="hostname of MySQL object store") 301 self.parser.add_option('--port', 302 dest="port", type="int", default=3306, 303 help="port of MySQL object store") 304 self.parser.add_option('--mysqluser', dest='mysqluser', default='zenoss', 305 help='username for MySQL object store') 306 self.parser.add_option('--mysqlpasswd', dest='mysqlpasswd', default='zenoss', 307 help='passwd for MySQL object store') 308 self.parser.add_option('--mysqldb', dest='mysqldb', default='zodb', 309 help='Name of database for MySQL object store') 310 self.parser.add_option('--mysqlsocket', dest='mysqlsocket', default=None, 311 help='Name of socket file for MySQL server connection') 312 self.parser.add_option('--cacheservers', dest='cacheservers', default="", 313 help='memcached servers to use for object cache (eg. 127.0.0.1:11211)') 314 self.parser.add_option('--poll-interval', dest='pollinterval', default=None, type='int', 315 help='Defer polling the database for the specified maximum time interval, in seconds.' 316 ' This will default to 60 only if --cacheservers is set.') 317 self.parser.add_option('--synceveryevent', dest='syncEveryEvent', 318 action="store_true", default=False, 319 help='Force sync() before processing every event; default is to sync() no more often ' 320 'than once every 1/2 second.')
321 322 323 324 if __name__ == '__main__': 325 zed = ZenEventD() 326 zed.run() 327