Package Products :: Package ZenEvents :: Module zenmail
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zenmail

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2011 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  # Notes: database wants events in UTC time 
 14  # Events page shows local time, as determined on the server where zenoss runs 
 15   
 16  __doc__ = """zenmail 
 17   
 18  Listen on the SMTP port and convert email messages into events. 
 19   
 20  To test: 
 21   # Test pre-reqs 
 22   yum -y install mailx sendmail 
 23   
 24   # Mail to the local zenmail instance 
 25  mail -s "Hello world" bogo@localhost 
 26  Happy happy, joy, joy 
 27  . 
 28  Cc: 
 29   
 30  """ 
 31   
 32  import logging 
 33  from email.Header import Header 
 34  import email 
 35  import os 
 36  import socket 
 37   
 38  import Globals 
 39  import zope.interface 
 40  import zope.component 
 41  from zope.interface import implements 
 42   
 43  from twisted.mail import smtp 
 44  from twisted.internet import reactor, protocol, defer 
 45   
 46  from Products.ZenCollector.daemon import CollectorDaemon 
 47  from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\ 
 48                                               IEventService, \ 
 49                                               IScheduledTask 
 50  from Products.ZenCollector.tasks import NullTaskSplitter,\ 
 51                                          BaseTask, TaskStates 
 52   
 53  from Products.ZenEvents.MailProcessor import MailProcessor 
 54   
 55   
 56  COLLECTOR_NAME = 'zenmail' 
 57  log = logging.getLogger("zen.%s" % COLLECTOR_NAME) 
 58   
 59   
60 -class MailPreferences(object):
61 zope.interface.implements(ICollectorPreferences) 62
63 - def __init__(self):
64 """ 65 Constructs a new PingCollectionPreferences instance and 66 provides default values for needed attributes. 67 """ 68 self.collectorName = COLLECTOR_NAME 69 self.defaultRRDCreateCommand = None 70 self.configCycleInterval = 20 # minutes 71 self.cycleInterval = 5 * 60 # seconds 72 73 # The configurationService attribute is the fully qualified class-name 74 # of our configuration service that runs within ZenHub 75 self.configurationService = 'Products.ZenHub.services.NullConfig' 76 77 # Will be filled in based on buildOptions 78 self.options = None 79 80 self.configCycleInterval = 20*60
81
82 - def postStartupTasks(self):
83 task = MailListeningTask(COLLECTOR_NAME, configId=COLLECTOR_NAME) 84 yield task
85
86 - def buildOptions(self, parser):
87 """ 88 Command-line options to be supported 89 """ 90 SMTP_PORT = 25 91 try: 92 SMTP_PORT = socket.getservbyname('smtp', 'tcp') 93 except socket.error: 94 pass 95 96 parser.add_option('--useFileDescriptor', 97 dest='useFileDescriptor', 98 default=-1, 99 type="int", 100 help="File descriptor to use for listening") 101 parser.add_option('--listenPort', 102 dest='listenPort', 103 default=SMTP_PORT, 104 type="int", 105 help="Alternative listen port to use (default %default)") 106 parser.add_option('--eventseverity', 107 dest='eventseverity', 108 default="2", 109 type="int", 110 help="Severity for events created") 111 parser.add_option('--listenip', 112 dest='listenip', 113 default='0.0.0.0', 114 help='IP address to listen on. Default is 0.0.0.0')
115
116 - def postStartup(self):
117 pass
118 119
120 -class ZenossEventPoster(object):
121 """ 122 Implementation of interface definition for messages 123 that can be sent via SMTP. 124 """ 125 implements(smtp.IMessage) 126
127 - def __init__(self, processor):
128 self.lines = [] 129 self.processor = processor
130
131 - def lineReceived(self, line):
132 self.lines.append(line)
133
134 - def postEvent(self, messageStr):
135 email.message_from_string(messageStr) 136 self.processor.process(messageStr)
137
138 - def eomReceived(self):
139 log.info('Message data completed %s.', self.lines) 140 self.lines.append('') 141 messageData = '\n'.join(self.lines) 142 143 self.postEvent(messageData) 144 145 return defer.succeed("Received End Of Message marker")
146
147 - def connectionLost(self):
148 log.info('Connection lost unexpectedly') 149 del(self.lines)
150 151
152 -class ZenossDelivery(object):
153 implements(smtp.IMessageDelivery) 154
155 - def __init__(self, processor):
156 self.processor = processor
157
158 - def receivedHeader(self, helo, unused, ignored):
159 myHostname, self.clientIP = helo 160 date = smtp.rfc822date() 161 162 headerValue = 'by %s from %s with ESMTP ; %s' % ( 163 myHostname, self.clientIP, date) 164 165 log.info('Relayed (or sent directly) from: %s', self.clientIP) 166 167 header = 'Received: %s' % Header(headerValue) 168 return header
169
170 - def validateTo(self, user):
171 log.info('to: %s', user.dest) 172 return self.makePoster
173
174 - def makePoster(self):
175 return ZenossEventPoster(self.processor)
176
177 - def validateFrom(self, unused, originAddress):
178 log.info("from: %s", originAddress) 179 return originAddress
180 181
182 -class SMTPFactory(protocol.ServerFactory):
183 - def __init__(self, processor):
184 self.processor = processor
185
186 - def buildProtocol(self, unused):
187 delivery = ZenossDelivery(self.processor) 188 smtpProtocol = smtp.SMTP(delivery) 189 smtpProtocol.factory = self 190 return smtpProtocol
191 192
193 -class MailListeningTask(BaseTask):
194 zope.interface.implements(IScheduledTask) 195
196 - def __init__(self, taskName, configId, 197 scheduleIntervalSeconds=3600, taskConfig=None):
198 BaseTask.__init__(self, taskName, configId, 199 scheduleIntervalSeconds, taskConfig) 200 self.log = log 201 202 # Needed for interface 203 self.name = taskName 204 self.configId = configId 205 self.state = TaskStates.STATE_IDLE 206 self.interval = scheduleIntervalSeconds 207 self._preferences = taskConfig 208 self._daemon = zope.component.getUtility(ICollector) 209 self._eventService = zope.component.queryUtility(IEventService) 210 self._preferences = self._daemon 211 212 self.options = self._daemon.options 213 214 # Allow MailProcessor to work unmodified 215 self.sendEvent = self._eventService.sendEvent 216 217 if (self.options.useFileDescriptor < 0 and \ 218 self.options.listenPort < 1024): 219 self._daemon.openPrivilegedPort('--listen', 220 '--proto=tcp', '--port=%s:%d' % ( 221 self.options.listenip, self.options.listenPort)) 222 223 self._daemon.changeUser() 224 self.processor = MailProcessor(self, self.options.eventseverity) 225 226 self.factory = SMTPFactory(self.processor) 227 228 log.info("listening on %s:%d" % ( 229 self.options.listenip, self.options.listenPort)) 230 if self.options.useFileDescriptor != -1: 231 self.useTcpFileDescriptor(int(self.options.useFileDescriptor), 232 self.factory) 233 else: 234 reactor.listenTCP(self.options.listenPort, self.factory, 235 interface=self.options.listenip)
236
237 - def doTask(self):
238 """ 239 This is a wait-around task since we really are called 240 asynchronously. 241 """ 242 return defer.succeed("Waiting for SMTP messages...")
243
244 - def useTcpFileDescriptor(self, fd, factory):
245 for i in range(19800, 19999): 246 try: 247 p = reactor.listenTCP(i, factory) 248 os.dup2(fd, p.socket.fileno()) 249 p.socket.listen(p.backlog) 250 p.socket.setblocking(False) 251 p.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 252 os.close(fd) 253 return p 254 except socket.error: 255 pass 256 raise socket.error("Unable to find an open socket to listen on")
257
258 - def cleanup(self):
259 pass
260 261 if __name__=='__main__': 262 myPreferences = MailPreferences() 263 myTaskSplitter = NullTaskSplitter() 264 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 265 daemon.run() 266