1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 __doc__ = """zensyslog
17
18 Turn syslog messages into events.
19
20 """
21
22 import time
23 import socket
24 import os
25 import logging
26
27 from twisted.internet.protocol import DatagramProtocol
28 from twisted.internet import reactor, defer, udp
29 from twisted.python import failure
30
31 import Globals
32 import zope.interface
33 import zope.component
34
35
36 from Products.ZenCollector.daemon import CollectorDaemon
37 from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\
38 IEventService, \
39 IScheduledTask
40 from Products.ZenCollector.tasks import SimpleTaskFactory,\
41 SimpleTaskSplitter,\
42 BaseTask, TaskStates
43 from Products.ZenUtils.observable import ObservableMixin
44
45 from Products.ZenEvents.SyslogProcessing import SyslogProcessor
46
47 from Products.ZenUtils.Utils import zenPath
48 from Products.ZenUtils.IpUtil import asyncNameLookup
49
50 from Products.ZenEvents.EventServer import Stats
51 from Products.ZenUtils.Utils import unused
52 from Products.ZenCollector.services.config import DeviceProxy
53 unused(DeviceProxy)
54
55 COLLECTOR_NAME = 'zensyslog'
56 log = logging.getLogger("zen.%s" % COLLECTOR_NAME)
57
58
60 zope.interface.implements(ICollectorPreferences)
61
80
84
86 """
87 Command-line options to be supported
88 """
89 SYSLOG_PORT = 514
90 try:
91 SYSLOG_PORT = socket.getservbyname('syslog', 'udp')
92 except socket.error:
93 pass
94
95 parser.add_option('--parsehost', dest='parsehost',
96 action='store_true', default=False,
97 help='Try to parse the hostname part of a syslog HEADER'
98 )
99 parser.add_option('--stats', dest='stats',
100 action='store_true', default=False,
101 help='Print statistics to log every 2 secs')
102 parser.add_option('--logorig', dest='logorig',
103 action='store_true', default=False,
104 help='Log the original message')
105 parser.add_option('--logformat', dest='logformat',
106 default='human',
107 help='Human-readable (/var/log/messages) or raw (wire)'
108 )
109 parser.add_option('--minpriority', dest='minpriority',
110 default=6, type='int',
111 help='Minimum priority message that zensyslog will accept'
112 )
113 parser.add_option('--syslogport', dest='syslogport',
114 default=SYSLOG_PORT, type='int',
115 help='Port number to use for syslog events'
116 )
117 parser.add_option('--listenip', dest='listenip',
118 default='0.0.0.0',
119 help='IP address to listen on. Default is %default'
120 )
121 parser.add_option('--useFileDescriptor',
122 dest='useFileDescriptor', type='int',
123 help='Read from an existing connection rather opening a new port.'
124 , default=None)
125 parser.add_option('--noreverseLookup', dest='noreverseLookup',
126 action='store_true', default=False,
127 help="Don't convert the remote device's IP address to a hostname."
128 )
129
130 - def postStartup(self):
131 daemon = zope.component.getUtility(ICollector)
132 daemon.defaultPriority = 1
133
134
136 """
137 Listen for syslog messages and turn them into events
138 Connects to the TrapService service in zenhub.
139 """
140 zope.interface.implements(IScheduledTask)
141
142 SYSLOG_DATE_FORMAT = '%b %d %H:%M:%S'
143 SAMPLE_DATE = 'Apr 10 15:19:22'
144
145 - def __init__(self, taskName, configId,
146 scheduleIntervalSeconds=3600, taskConfig=None):
147 BaseTask.__init__(self, taskName, configId,
148 scheduleIntervalSeconds, taskConfig)
149 self.log = log
150
151
152 self.name = taskName
153 self.configId = configId
154 self.state = TaskStates.STATE_IDLE
155 self.interval = scheduleIntervalSeconds
156 self._preferences = taskConfig
157 self._daemon = zope.component.getUtility(ICollector)
158 self._eventService = zope.component.queryUtility(IEventService)
159 self._preferences = self._daemon
160
161 self.options = self._daemon.options
162
163 self.stats = Stats()
164
165 if not self.options.useFileDescriptor\
166 and self.options.syslogport < 1024:
167 self._daemon.openPrivilegedPort('--listen', '--proto=udp',
168 '--port=%s:%d'
169 % (self.options.listenip,
170 self.options.syslogport))
171 self._daemon.changeUser()
172 self.minpriority = self.options.minpriority
173 self.processor = None
174
175 if self.options.logorig:
176 self.olog = logging.getLogger('origsyslog')
177 self.olog.setLevel(20)
178 self.olog.propagate = False
179 lname = zenPath('log/origsyslog.log')
180 hdlr = logging.FileHandler(lname)
181 hdlr.setFormatter(logging.Formatter('%(message)s'))
182 self.olog.addHandler(hdlr)
183
184 if self.options.useFileDescriptor is not None:
185 self.useUdpFileDescriptor(int(self.options.useFileDescriptor))
186 else:
187 reactor.listenUDP(self.options.syslogport, self,
188 interface=self.options.listenip)
189
190
191 self.processor = SyslogProcessor(self._eventService.sendEvent,
192 self.options.minpriority, self.options.parsehost,
193 self.options.monitor, self._daemon.defaultPriority)
194
196 """
197 This is a wait-around task since we really are called
198 asynchronously.
199 """
200 return defer.succeed("Waiting for syslog messages...")
201
216
217 - def expand(self, msg, client_address):
263
265 """
266 Consume the network packet
267
268 @param msg: syslog message
269 @type msg: string
270 @param client_address: IP info of the remote device (ipaddr, port)
271 @type client_address: tuple of (string, number)
272 """
273 (ipaddr, port) = client_address
274 if self.options.logorig:
275 if self.options.logformat == 'human':
276 message = self.expand(msg, client_address)
277 else:
278 message = msg
279 self.olog.info(message)
280
281 if self.options.noreverseLookup:
282 d = defer.succeed(ipaddr)
283 else:
284 d = asyncNameLookup(ipaddr)
285 d.addBoth(self.gotHostname, (msg, ipaddr, time.time()))
286
288 """
289 Send the resolved address, if possible, and the event via the thread
290
291 @param response: Twisted response
292 @type response: Twisted response
293 @param data: (msg, ipaddr, rtime)
294 @type data: tuple of (string, string, datetime object)
295 """
296 (msg, ipaddr, rtime) = data
297 if isinstance(response, failure.Failure):
298 host = ipaddr
299 else:
300 host = response
301 if self.processor:
302 self.processor.process(msg, ipaddr, host, rtime)
303
315
319
320
322 """
323 Receive a configuration object containing the default priority
324 """
325 zope.interface.implements(IScheduledTask)
326
327 - def __init__(self, taskName, configId,
328 scheduleIntervalSeconds=3600, taskConfig=None):
340
342 return defer.succeed("Already updated default syslog priority...")
343
346
347
348
349 if __name__=='__main__':
350 myPreferences = SyslogPreferences()
351 myTaskFactory = SimpleTaskFactory(SyslogConfigTask)
352 myTaskSplitter = SimpleTaskSplitter(myTaskFactory)
353 daemon = CollectorDaemon(myPreferences, myTaskSplitter)
354 daemon.run()
355