1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 __doc__ = """zenmail
17
18 Listen on the SMTP port and convert email messages into events.
19
20 To test:
21 # Test pre-reqs
22 yum -y install mailx sendmail
23
24 # Mail to the local zenmail instance
25 mail -s "Hello world" bogo@localhost
26 Happy happy, joy, joy
27 .
28 Cc:
29
30 """
31
32 import logging
33 from email.Header import Header
34 import email
35 import os
36 import socket
37
38 import Globals
39 import zope.interface
40 import zope.component
41 from zope.interface import implements
42
43 from twisted.mail import smtp
44 from twisted.internet import reactor, protocol, defer
45
46 from Products.ZenCollector.daemon import CollectorDaemon
47 from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\
48 IEventService, \
49 IScheduledTask
50 from Products.ZenCollector.tasks import NullTaskSplitter,\
51 BaseTask, TaskStates
52
53 from Products.ZenEvents.MailProcessor import MailProcessor
54
55
56 COLLECTOR_NAME = 'zenmail'
57 log = logging.getLogger("zen.%s" % COLLECTOR_NAME)
58
59
61 zope.interface.implements(ICollectorPreferences)
62
81
85
87 """
88 Command-line options to be supported
89 """
90 SMTP_PORT = 25
91 try:
92 SMTP_PORT = socket.getservbyname('smtp', 'tcp')
93 except socket.error:
94 pass
95
96 parser.add_option('--useFileDescriptor',
97 dest='useFileDescriptor',
98 default=-1,
99 type="int",
100 help="File descriptor to use for listening")
101 parser.add_option('--listenPort',
102 dest='listenPort',
103 default=SMTP_PORT,
104 type="int",
105 help="Alternative listen port to use (default %default)")
106 parser.add_option('--eventseverity',
107 dest='eventseverity',
108 default="2",
109 type="int",
110 help="Severity for events created")
111 parser.add_option('--listenip',
112 dest='listenip',
113 default='0.0.0.0',
114 help='IP address to listen on. Default is 0.0.0.0')
115
116 - def postStartup(self):
118
119
120 -class ZenossEventPoster(object):
121 """
122 Implementation of interface definition for messages
123 that can be sent via SMTP.
124 """
125 implements(smtp.IMessage)
126
127 - def __init__(self, processor):
128 self.lines = []
129 self.processor = processor
130
131 - def lineReceived(self, line):
132 self.lines.append(line)
133
134 - def postEvent(self, messageStr):
135 email.message_from_string(messageStr)
136 self.processor.process(messageStr)
137
138 - def eomReceived(self):
139 log.info('Message data completed %s.', self.lines)
140 self.lines.append('')
141 messageData = '\n'.join(self.lines)
142
143 self.postEvent(messageData)
144
145 return defer.succeed("Received End Of Message marker")
146
147 - def connectionLost(self):
148 log.info('Connection lost unexpectedly')
149 del(self.lines)
150
151
153 implements(smtp.IMessageDelivery)
154
156 self.processor = processor
157
159 myHostname, self.clientIP = helo
160 date = smtp.rfc822date()
161
162 headerValue = 'by %s from %s with ESMTP ; %s' % (
163 myHostname, self.clientIP, date)
164
165 log.info('Relayed (or sent directly) from: %s', self.clientIP)
166
167 header = 'Received: %s' % Header(headerValue)
168 return header
169
173
174 - def makePoster(self):
175 return ZenossEventPoster(self.processor)
176
178 log.info("from: %s", originAddress)
179 return originAddress
180
181
184 self.processor = processor
185
187 delivery = ZenossDelivery(self.processor)
188 smtpProtocol = smtp.SMTP(delivery)
189 smtpProtocol.factory = self
190 return smtpProtocol
191
192
194 zope.interface.implements(IScheduledTask)
195
196 - def __init__(self, taskName, configId,
197 scheduleIntervalSeconds=3600, taskConfig=None):
198 BaseTask.__init__(self, taskName, configId,
199 scheduleIntervalSeconds, taskConfig)
200 self.log = log
201
202
203 self.name = taskName
204 self.configId = configId
205 self.state = TaskStates.STATE_IDLE
206 self.interval = scheduleIntervalSeconds
207 self._preferences = taskConfig
208 self._daemon = zope.component.getUtility(ICollector)
209 self._eventService = zope.component.queryUtility(IEventService)
210 self._preferences = self._daemon
211
212 self.options = self._daemon.options
213
214
215 self.sendEvent = self._eventService.sendEvent
216
217 if (self.options.useFileDescriptor < 0 and \
218 self.options.listenPort < 1024):
219 self._daemon.openPrivilegedPort('--listen',
220 '--proto=tcp', '--port=%s:%d' % (
221 self.options.listenip, self.options.listenPort))
222
223 self._daemon.changeUser()
224 self.processor = MailProcessor(self, self.options.eventseverity)
225
226 self.factory = SMTPFactory(self.processor)
227
228 log.info("listening on %s:%d" % (
229 self.options.listenip, self.options.listenPort))
230 if self.options.useFileDescriptor != -1:
231 self.useTcpFileDescriptor(int(self.options.useFileDescriptor),
232 self.factory)
233 else:
234 reactor.listenTCP(self.options.listenPort, self.factory,
235 interface=self.options.listenip)
236
238 """
239 This is a wait-around task since we really are called
240 asynchronously.
241 """
242 return defer.succeed("Waiting for SMTP messages...")
243
257
260
261 if __name__=='__main__':
262 myPreferences = MailPreferences()
263 myTaskSplitter = NullTaskSplitter()
264 daemon = CollectorDaemon(myPreferences, myTaskSplitter)
265 daemon.run()
266