Package Products :: Package ZenEvents :: Module zensyslog
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zensyslog

  1  #! /usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3  # ########################################################################## 
  4  # 
  5  # This program is part of Zenoss Core, an open source monitoring platform. 
  6  # Copyright (C) 2008, 2011 Zenoss Inc. 
  7  # 
  8  # This program is free software; you can redistribute it and/or modify it 
  9  # under the terms of the GNU General Public License version 2 or (at your 
 10  # option) any later version as published by the Free Software Foundation. 
 11  # 
 12  # For complete information please visit: http://www.zenoss.com/oss/ 
 13  # 
 14  # ########################################################################## 
 15   
 16  __doc__ = """zensyslog 
 17   
 18  Turn syslog messages into events. 
 19   
 20  """ 
 21   
 22  import time 
 23  import socket 
 24  import os 
 25  import logging 
 26   
 27  from twisted.internet.protocol import DatagramProtocol 
 28  from twisted.internet import reactor, defer, udp 
 29  from twisted.python import failure 
 30   
 31  import Globals 
 32  import zope.interface 
 33  import zope.component 
 34   
 35   
 36  from Products.ZenCollector.daemon import CollectorDaemon 
 37  from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\ 
 38                                               IEventService, \ 
 39                                               IScheduledTask 
 40  from Products.ZenCollector.tasks import SimpleTaskFactory,\ 
 41                                          SimpleTaskSplitter,\ 
 42                                          BaseTask, TaskStates 
 43  from Products.ZenUtils.observable import ObservableMixin 
 44   
 45  from Products.ZenEvents.SyslogProcessing import SyslogProcessor 
 46   
 47  from Products.ZenUtils.Utils import zenPath 
 48  from Products.ZenUtils.IpUtil import asyncNameLookup 
 49   
 50  from Products.ZenEvents.EventServer import Stats 
 51  from Products.ZenUtils.Utils import unused 
 52  from Products.ZenCollector.services.config import DeviceProxy 
 53  unused(DeviceProxy) 
 54   
 55  COLLECTOR_NAME = 'zensyslog' 
 56  log = logging.getLogger("zen.%s" % COLLECTOR_NAME) 
 57   
 58   
59 -class SyslogPreferences(object):
60 zope.interface.implements(ICollectorPreferences) 61
62 - def __init__(self):
63 """ 64 Constructs a new PingCollectionPreferences instance and 65 provides default values for needed attributes. 66 """ 67 self.collectorName = COLLECTOR_NAME 68 self.defaultRRDCreateCommand = None 69 self.configCycleInterval = 20 # minutes 70 self.cycleInterval = 5 * 60 # seconds 71 72 # The configurationService attribute is the fully qualified class-name 73 # of our configuration service that runs within ZenHub 74 self.configurationService = 'Products.ZenHub.services.SyslogConfig' 75 76 # Will be filled in based on buildOptions 77 self.options = None 78 79 self.configCycleInterval = 20*60
80
81 - def postStartupTasks(self):
82 task = SyslogTask(COLLECTOR_NAME, configId=COLLECTOR_NAME) 83 yield task
84
85 - def buildOptions(self, parser):
86 """ 87 Command-line options to be supported 88 """ 89 SYSLOG_PORT = 514 90 try: 91 SYSLOG_PORT = socket.getservbyname('syslog', 'udp') 92 except socket.error: 93 pass 94 95 parser.add_option('--parsehost', dest='parsehost', 96 action='store_true', default=False, 97 help='Try to parse the hostname part of a syslog HEADER' 98 ) 99 parser.add_option('--stats', dest='stats', 100 action='store_true', default=False, 101 help='Print statistics to log every 2 secs') 102 parser.add_option('--logorig', dest='logorig', 103 action='store_true', default=False, 104 help='Log the original message') 105 parser.add_option('--logformat', dest='logformat', 106 default='human', 107 help='Human-readable (/var/log/messages) or raw (wire)' 108 ) 109 parser.add_option('--minpriority', dest='minpriority', 110 default=6, type='int', 111 help='Minimum priority message that zensyslog will accept' 112 ) 113 parser.add_option('--syslogport', dest='syslogport', 114 default=SYSLOG_PORT, type='int', 115 help='Port number to use for syslog events' 116 ) 117 parser.add_option('--listenip', dest='listenip', 118 default='0.0.0.0', 119 help='IP address to listen on. Default is %default' 120 ) 121 parser.add_option('--useFileDescriptor', 122 dest='useFileDescriptor', type='int', 123 help='Read from an existing connection rather opening a new port.' 124 , default=None) 125 parser.add_option('--noreverseLookup', dest='noreverseLookup', 126 action='store_true', default=False, 127 help="Don't convert the remote device's IP address to a hostname." 128 )
129
130 - def postStartup(self):
131 daemon = zope.component.getUtility(ICollector) 132 daemon.defaultPriority = 1
133 134
135 -class SyslogTask(BaseTask, DatagramProtocol):
136 """ 137 Listen for syslog messages and turn them into events 138 Connects to the TrapService service in zenhub. 139 """ 140 zope.interface.implements(IScheduledTask) 141 142 SYSLOG_DATE_FORMAT = '%b %d %H:%M:%S' 143 SAMPLE_DATE = 'Apr 10 15:19:22' 144
145 - def __init__(self, taskName, configId, 146 scheduleIntervalSeconds=3600, taskConfig=None):
147 BaseTask.__init__(self, taskName, configId, 148 scheduleIntervalSeconds, taskConfig) 149 self.log = log 150 151 # Needed for interface 152 self.name = taskName 153 self.configId = configId 154 self.state = TaskStates.STATE_IDLE 155 self.interval = scheduleIntervalSeconds 156 self._preferences = taskConfig 157 self._daemon = zope.component.getUtility(ICollector) 158 self._eventService = zope.component.queryUtility(IEventService) 159 self._preferences = self._daemon 160 161 self.options = self._daemon.options 162 163 self.stats = Stats() 164 165 if not self.options.useFileDescriptor\ 166 and self.options.syslogport < 1024: 167 self._daemon.openPrivilegedPort('--listen', '--proto=udp', 168 '--port=%s:%d' 169 % (self.options.listenip, 170 self.options.syslogport)) 171 self._daemon.changeUser() 172 self.minpriority = self.options.minpriority 173 self.processor = None 174 175 if self.options.logorig: 176 self.olog = logging.getLogger('origsyslog') 177 self.olog.setLevel(20) 178 self.olog.propagate = False 179 lname = zenPath('log/origsyslog.log') 180 hdlr = logging.FileHandler(lname) 181 hdlr.setFormatter(logging.Formatter('%(message)s')) 182 self.olog.addHandler(hdlr) 183 184 if self.options.useFileDescriptor is not None: 185 self.useUdpFileDescriptor(int(self.options.useFileDescriptor)) 186 else: 187 reactor.listenUDP(self.options.syslogport, self, 188 interface=self.options.listenip) 189 190 # yield self.model().callRemote('getDefaultPriority') 191 self.processor = SyslogProcessor(self._eventService.sendEvent, 192 self.options.minpriority, self.options.parsehost, 193 self.options.monitor, self._daemon.defaultPriority)
194
195 - def doTask(self):
196 """ 197 This is a wait-around task since we really are called 198 asynchronously. 199 """ 200 return defer.succeed("Waiting for syslog messages...")
201
202 - def useUdpFileDescriptor(self, fd):
203 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM) 204 os.close(fd) 205 port = s.getsockname()[1] 206 transport = udp.Port(port, self) 207 s.setblocking(0) 208 transport.socket = s 209 transport.fileno = s.fileno 210 transport.connected = 1 211 transport._realPortNumber = port 212 self.transport = transport 213 # hack around startListening not being called 214 self.numPorts = 1 215 transport.startReading()
216
217 - def expand(self, msg, client_address):
218 """ 219 Expands a syslog message into a string format suitable for writing 220 to the filesystem such that it appears the same as it would 221 had the message been logged by the syslog daemon. 222 223 @param msg: syslog message 224 @type msg: string 225 @param client_address: IP info of the remote device (ipaddr, port) 226 @type client_address: tuple of (string, number) 227 @return: message 228 @rtype: string 229 """ 230 # pri := facility * severity 231 stop = msg.find('>') 232 233 # check for a datestamp. default to right now if date not present 234 start = stop + 1 235 stop = start + len(SyslogTask.SAMPLE_DATE) 236 dateField = msg[start:stop] 237 try: 238 date = time.strptime(dateField, 239 SyslogTask.SYSLOG_DATE_FORMAT) 240 year = time.localtime()[0] 241 date = (year, ) + date[1:] 242 start = stop + 1 243 except ValueError: 244 245 # date not present, so use today's date 246 date = time.localtime() 247 248 # check for a hostname. default to localhost if not present 249 stop = msg.find(' ', start) 250 if msg[stop - 1] == ':': 251 hostname = client_address[0] 252 else: 253 hostname = msg[start:stop] 254 start = stop + 1 255 256 # the message content 257 body = msg[start:] 258 259 # assemble the message 260 prettyTime = time.strftime(SyslogTask.SYSLOG_DATE_FORMAT, date) 261 message = '%s %s %s' % (prettyTime, hostname, body) 262 return message
263
264 - def datagramReceived(self, msg, client_address):
265 """ 266 Consume the network packet 267 268 @param msg: syslog message 269 @type msg: string 270 @param client_address: IP info of the remote device (ipaddr, port) 271 @type client_address: tuple of (string, number) 272 """ 273 (ipaddr, port) = client_address 274 if self.options.logorig: 275 if self.options.logformat == 'human': 276 message = self.expand(msg, client_address) 277 else: 278 message = msg 279 self.olog.info(message) 280 281 if self.options.noreverseLookup: 282 d = defer.succeed(ipaddr) 283 else: 284 d = asyncNameLookup(ipaddr) 285 d.addBoth(self.gotHostname, (msg, ipaddr, time.time()))
286
287 - def gotHostname(self, response, data):
288 """ 289 Send the resolved address, if possible, and the event via the thread 290 291 @param response: Twisted response 292 @type response: Twisted response 293 @param data: (msg, ipaddr, rtime) 294 @type data: tuple of (string, string, datetime object) 295 """ 296 (msg, ipaddr, rtime) = data 297 if isinstance(response, failure.Failure): 298 host = ipaddr 299 else: 300 host = response 301 if self.processor: 302 self.processor.process(msg, ipaddr, host, rtime)
303
304 - def displayStatistics(self):
305 totalTime, totalEvents, maxTime = self.stats.report() 306 display = "%d events processed in %.2f seconds" % ( 307 totalEvents, 308 totalTime) 309 if totalEvents > 0: 310 display += """ 311 %.5f average seconds per event 312 Maximum processing time for one event was %.5f""" % ( 313 (totalTime / totalEvents), maxTime) 314 return display
315
316 - def cleanup(self):
317 status = self.displayStatistics() 318 self.log.info(status)
319 320
321 -class SyslogConfigTask(ObservableMixin):
322 """ 323 Receive a configuration object containing the default priority 324 """ 325 zope.interface.implements(IScheduledTask) 326
327 - def __init__(self, taskName, configId, 328 scheduleIntervalSeconds=3600, taskConfig=None):
329 super(SyslogConfigTask, self).__init__() 330 331 # Needed for ZCA interface contract 332 self.name = taskName 333 self.configId = configId 334 self.state = TaskStates.STATE_IDLE 335 self.interval = scheduleIntervalSeconds 336 self._preferences = taskConfig 337 self._daemon = zope.component.getUtility(ICollector) 338 339 self._daemon.defaultPriority = self._preferences.defaultPriority
340
341 - def doTask(self):
342 return defer.succeed("Already updated default syslog priority...")
343
344 - def cleanup(self):
345 pass
346 347 348 349 if __name__=='__main__': 350 myPreferences = SyslogPreferences() 351 myTaskFactory = SimpleTaskFactory(SyslogConfigTask) 352 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 353 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 354 daemon.run() 355