Package Products :: Package ZenEvents :: Module zenpop3
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zenpop3

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, 2011 Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 or (at your 
  8  # option) any later version as published by the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  #! /usr/bin/env python  
 14  # Notes: database wants events in UTC time 
 15  # Events page shows local time, as determined on the server where zenoss runs 
 16   
 17  __doc__ = """zenpop3 
 18   
 19  Turn email messages obtained from POP3 accounts into events. 
 20   
 21  """ 
 22   
 23  import logging 
 24  import socket 
 25   
 26  import Globals 
 27  import zope.interface 
 28   
 29  from twisted.mail.pop3client import POP3Client 
 30  from twisted.internet.ssl import ClientContextFactory 
 31  from twisted.internet import reactor, protocol, defer, error 
 32   
 33  from Products.ZenCollector.daemon import CollectorDaemon 
 34  from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\ 
 35                                               IEventService, \ 
 36                                               IScheduledTask 
 37  from Products.ZenCollector.tasks import NullTaskSplitter,\ 
 38                                          BaseTask, TaskStates 
 39   
 40  from Products.ZenEvents.MailProcessor import POPProcessor 
 41   
 42   
 43  COLLECTOR_NAME = 'zenpop3' 
 44  log = logging.getLogger("zen.%s" % COLLECTOR_NAME) 
 45   
 46   
47 -class MailPreferences(object):
48 zope.interface.implements(ICollectorPreferences) 49
50 - def __init__(self):
51 """ 52 Constructs a new PingCollectionPreferences instance and 53 provides default values for needed attributes. 54 """ 55 self.collectorName = COLLECTOR_NAME 56 self.defaultRRDCreateCommand = None 57 self.configCycleInterval = 20 # minutes 58 self.cycleInterval = 5 * 60 # seconds 59 60 # The configurationService attribute is the fully qualified class-name 61 # of our configuration service that runs within ZenHub 62 self.configurationService = 'Products.ZenHub.services.NullConfig' 63 64 # Will be filled in based on buildOptions 65 self.options = None 66 67 self.configCycleInterval = 20*60
68
69 - def postStartupTasks(self):
70 task = MailCollectingTask(COLLECTOR_NAME, configId=COLLECTOR_NAME) 71 yield task
72
73 - def buildOptions(self, parser):
74 """ 75 Command-line options to be supported 76 """ 77 POP3_PORT = 110 78 try: 79 POP3_PORT = socket.getservbyname('pop3', 'tcp') 80 except socket.error: 81 pass 82 83 parser.add_option('--usessl', 84 dest='usessl', 85 default=False, 86 action="store_true", 87 help="Use SSL when connecting to POP server") 88 parser.add_option('--nodelete', 89 dest='nodelete', 90 default=False, 91 action="store_true", 92 help="Leave messages on POP server") 93 parser.add_option('--pophost', 94 dest='pophost', 95 default="pop.zenoss.com", 96 help="POP server from which emails are to be read") 97 parser.add_option('--popport', 98 dest='popport', 99 default=POP3_PORT, 100 type="int", 101 help="POP port from which emails are to be read") 102 parser.add_option('--popuser', 103 dest='popuser', 104 default="zenoss", 105 help="POP user") 106 parser.add_option('--poppass', 107 dest='poppass', 108 default="zenoss", 109 help="POP password") 110 parser.add_option('--cycletime', 111 dest='cycletime', 112 type="int", 113 default=60, 114 help="Frequency (in seconds) to poll the POP server") 115 parser.add_option('--eventseverity', 116 dest='eventseverity', 117 default="2", 118 type="int", 119 help="Severity for events created")
120
121 - def postStartup(self):
122 pass
123 124
125 -class POPProtocol(POP3Client):
126 """ 127 Protocol that is responsible for conversing with a POP server 128 after a connection has been established. Downloads messages (and 129 deletes them by default), and passes the messages back up to the 130 factory to process and turn into events. 131 """ 132 133 allowInsecureLogin = True 134 timeout = 15 135 totalMessages = 0 136
137 - def serverGreeting(self, unused):
138 log.debug('Server greeting received: Logging in...') 139 140 login = self.login(self.factory.user, self.factory.passwd) 141 login.addCallback(self._loggedIn) 142 login.addErrback(self.factory.deferred.errback)
143
144 - def _loggedIn(self, unused):
145 log.debug('Logged in') 146 return self.retrieveAndParse()
147
148 - def retrieveAndParse(self):
149 d = self.listSize() 150 d.addCallback(self._gotMessageSizes) 151 152 return d
153
154 - def _gotMessageSizes(self, sizes):
155 self.totalMessages = len(sizes) 156 log.info('Messages to retrieve: %d', self.totalMessages) 157 158 self.sizes = sizes 159 160 retreivers = [] 161 for i in range(len(sizes)): 162 log.debug('Retrieving message #%d...' % i) 163 d = self.retrieve(i) 164 d.addCallback(self._gotMessageLines) 165 retreivers.append(d) 166 167 deferreds = defer.DeferredList(retreivers) 168 deferreds.addCallback(self._delete) 169 return deferreds.addCallback(self.scanComplete)
170
171 - def _gotMessageLines(self, messageLines):
172 log.debug('Passing message up to factory') 173 self.factory.handleMessage("\r\n".join(messageLines))
174
175 - def _delete(self, unused):
176 deleters = [] 177 if not self.factory.nodelete: 178 for index in range(len(self.sizes)): 179 log.info('Deleting message #%d...' % index) 180 d = self.delete(index) 181 deleters.append(d) 182 183 deferreds = defer.DeferredList(deleters) 184 return deferreds
185
186 - def scanComplete(self, unused):
187 log.debug("Scan complete") 188 self.quit()
189 190
191 -class POPFactory(protocol.ClientFactory):
192 """ 193 Factory that stores the configuration the protocol uses to do 194 its job. 195 """ 196 protocol = POPProtocol 197
198 - def __init__(self, user, passwd, processor, nodelete):
199 self.user = user 200 self.passwd = passwd 201 self.processor = processor 202 self.deferred = defer.Deferred() 203 self.nodelete = nodelete
204
205 - def handleMessage(self, messageData):
206 self.processor.process(messageData)
207
208 - def clientConnectionFailed(self, unused, reason):
209 self.deferred.errback(reason)
210 211
212 -class MailCollectingTask(BaseTask):
213 zope.interface.implements(IScheduledTask) 214 215 STATE_COLLECTING = 'COLLECTING' 216
217 - def __init__(self, taskName, configId, 218 scheduleIntervalSeconds=60, taskConfig=None):
219 BaseTask.__init__(self, taskName, configId, 220 scheduleIntervalSeconds, taskConfig) 221 self.log = log 222 223 # Needed for interface 224 self.name = taskName 225 self.configId = configId 226 self.state = TaskStates.STATE_IDLE 227 self._preferences = taskConfig 228 self._daemon = zope.component.getUtility(ICollector) 229 self._eventService = zope.component.queryUtility(IEventService) 230 self._preferences = self._daemon 231 232 self.options = self._daemon.options 233 234 # This will take a bit to catch up, but.... 235 self.interval = self.options.cycletime 236 237 # Allow MailProcessor to work unmodified 238 self.sendEvent = self._eventService.sendEvent 239 240 self._daemon.changeUser() 241 self.processor = POPProcessor(self,self.options.eventseverity) 242 self._connection = None
243
244 - def doTask(self):
245 d = defer.maybeDeferred(self.checkForMessages) 246 return d
247
248 - def makeFactory(self):
249 self.factory = POPFactory(self.options.popuser, self.options.poppass, 250 self.processor, self.options.nodelete) 251 self.factory.deferred.addErrback(self.handleError)
252
253 - def checkForMessages(self):
254 self.state = MailCollectingTask.STATE_COLLECTING 255 256 self.makeFactory() 257 if self.options.usessl: 258 log.debug("Connecting to server %s:%s using SSL as %s", 259 self.options.pophost, self.options.popport, self.options.popuser) 260 self._connection = reactor.connectSSL(self.options.pophost, self.options.popport, 261 self.factory, ClientContextFactory()) 262 else: 263 log.debug("Connecting to server %s:%s using plaintext as %s", 264 self.options.pophost, self.options.popport, self.options.popuser) 265 self._connection = reactor.connectTCP(self.options.pophost, self.options.popport, 266 self.factory) 267 return defer.succeed("Connected to server %s:%s" % ( 268 self.options.pophost, self.options.popport))
269
270 - def _finished(self, result=None):
271 if self._connection: 272 self._connection.disconnect() 273 if self.factory: 274 message = "Last retrieved %d messages" % self.factory.protocl.totalMessages 275 else: 276 message = "Completed" 277 return defer.succeed(message)
278
279 - def handleError(self, err):
280 if err.type == error.TimeoutError: 281 message = "Timed out connecting to %s:%d" % ( 282 self.options.pophost, self.options.popport) 283 284 elif err.type == error.ConnectionRefusedError: 285 message = "Connection refused by %s:%d" % ( 286 self.options.pophost, self.options.popport) 287 288 elif err.type == error.ConnectError: 289 message = "Connection failed to %s:%d" % ( 290 self.options.pophost, self.options.popport) 291 else: 292 message = err.getErrorMessage() 293 self.sendEvent(dict( 294 device=socket.getfqdn(), 295 component=COLLECTOR_NAME, 296 severity=5, 297 summary="Fatal error in %s" % COLLECTOR_NAME, 298 message=message, 299 )) 300 301 # Force the task to quit 302 self.state = TaskStates.STATE_COMPLETED 303 304 log.error(message) 305 return defer.succeed(message)
306
307 - def cleanup(self):
308 self._finished()
309 310 311 if __name__=='__main__': 312 myPreferences = MailPreferences() 313 myTaskSplitter = NullTaskSplitter() 314 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 315 daemon.run() 316