Package Products :: Package ZenUtils :: Module AmqpDataManager
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenUtils.AmqpDataManager

  1  ############################################################################# 
  2  # This program is part of Zenoss Core, an open source monitoring platform. 
  3  # Copyright (C) 2010, Zenoss Inc. 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify it 
  6  # under the terms of the GNU General Public License version 2 or (at your 
  7  # option) any later version as published by the Free Software Foundation. 
  8  # 
  9  # For complete information please visit: http://www.zenoss.com/oss/ 
 10  ############################################################################# 
 11  import sys 
 12  import time 
 13  import logging 
 14  log = logging.getLogger('zen.AmqpDataManager') 
 15   
 16  # DataManager class for adding msg queueing to zope and other XA 
 17  # transaction cohorts.  Usage with nested_transaction: 
 18  # 
 19  #  with nested_transaction(AmqpDataManager(publisher.channel)) as txn: 
 20  #      # perform zope db commands 
 21  #      # perform SQLAlchemy db commands 
 22  #      publisher.publish(msg) 
 23  # 
24 -class AmqpDataManager(object):
25 """Objects that manage transactional storage. 26 27 These objects may manage data for other objects, or they may manage 28 non-object storages, such as relational databases. For example, 29 a ZODB.Connection. 30 31 Note that when some data is modified, that data's data manager should 32 join a transaction so that data can be committed when the user commits 33 the transaction. 34 """ 35
36 - def __init__(self, channel, txnmgr = None):
37 self.channel = channel 38 self.channel.tx_select() 39 self.transaction_manager = txnmgr
40 41 #"""The transaction manager (TM) used by this data manager. 42 43 #This is a public attribute, intended for read-only use. The value 44 #is an instance of ITransactionManager, typically set by the data 45 #manager's constructor. 46 #""") 47
48 - def abort(self, transaction):
49 """Abort a transaction and forget all changes. 50 51 Abort must be called outside of a two-phase commit. 52 53 Abort is called by the transaction manager to abort transactions 54 that are not yet in a two-phase commit. 55 """ 56 # discard any messages that have been buffered 57 log.debug("abort'ed") 58 self.channel.tx_rollback()
59 60 # Two-phase commit protocol. These methods are called by the ITransaction 61 # object associated with the transaction being committed. The sequence 62 # of calls normally follows this regular expression: 63 # tpc_begin commit tpc_vote (tpc_finish | tpc_abort) 64
65 - def tpc_begin(self, transaction):
66 """Begin commit of a transaction, starting the two-phase commit. 67 68 transaction is the ITransaction instance associated with the 69 transaction being committed. 70 """ 71 # nothing special to do here 72 log.debug("tpc_begin'ed")
73
74 - def commit(self, transaction):
75 """Commit modifications to registered objects. 76 77 Save changes to be made persistent if the transaction commits (if 78 tpc_finish is called later). If tpc_abort is called later, changes 79 must not persist. 80 81 This includes conflict detection and handling. If no conflicts or 82 errors occur, the data manager should be prepared to make the 83 changes persist when tpc_finish is called. 84 """ 85 # nothing special to do here 86 log.debug("commit'ed")
87 88
89 - def tpc_finish(self, transaction):
90 """Indicate confirmation that the transaction is done. 91 92 Make all changes to objects modified by this transaction persist. 93 94 transaction is the ITransaction instance associated with the 95 transaction being committed. 96 97 This should never fail. If this raises an exception, the 98 database is not expected to maintain consistency; it's a 99 serious error. 100 """ 101 log.debug("tpc_finish'ed") 102 try: 103 self.channel.tx_commit() 104 except Exception as e: 105 log.exception("tpc_finish completed FAIL") 106 else: 107 log.debug("tpc_finish completed OK")
108 109
110 - def tpc_vote(self, transaction):
111 """Verify that a data manager can commit the transaction. 112 113 This is the last chance for a data manager to vote 'no'. A 114 data manager votes 'no' by raising an exception. 115 116 transaction is the ITransaction instance associated with the 117 transaction being committed. 118 """ 119 # Nothing to do here 120 log.debug("tpc_voted")
121 122
123 - def tpc_abort(self, transaction):
124 """Abort a transaction. 125 126 This is called by a transaction manager to end a two-phase commit on 127 the data manager. Abandon all changes to objects modified by this 128 transaction. 129 130 transaction is the ITransaction instance associated with the 131 transaction being committed. 132 133 This should never fail. 134 """ 135 log.debug("tpc_abort'ed") 136 try: 137 self.channel.tx_rollback() 138 except Exception as e: 139 log.exception(e) 140 log.debug("tpc_abort failed with exception") 141 else: 142 log.debug("tpc_abort completed")
143 144
145 - def sortKey(self):
146 """Return a key to use for ordering registered DataManagers. 147 """ 148 149 # this data manager must always go last 150 return "~~~~~~~"
151 152 # 153 # usage outside of zope transaction 154 # with AmqpTransaction(publisher.channel) as txn: 155 # publisher.publish(msg) 156 # publisher.publish(msg2) 157 #
158 -class AmqpTransaction(object):
159 - def __init__(self, channel):
160 self.datamgr = AmqpDataManager(channel) 161 self.txnid = int(time.clock()*1e6) % sys.maxint
162
163 - def __enter__(self):
164 return self
165
166 - def __exit__(self, type, value, traceback):
167 if type is None: 168 try: 169 self.datamgr.tpc_begin(self.txnid) 170 self.datamgr.commit(self.txnid) 171 self.datamgr.tpc_vote(self.txnid) 172 self.datamgr.tpc_finish(self.txnid) 173 except Exception as e: 174 self.datamgr.tpc_abort(self.txnid) 175 raise 176 else: 177 try: 178 self.datamgr.abort(self.txnid) 179 except Exception as e: 180 pass
181