1
2
3
4
5
6
7
8
9
10
11 import sys
12 import time
13 import logging
14 log = logging.getLogger('zen.AmqpDataManager')
15
16
17
18
19
20
21
22
23
25 """Objects that manage transactional storage.
26
27 These objects may manage data for other objects, or they may manage
28 non-object storages, such as relational databases. For example,
29 a ZODB.Connection.
30
31 Note that when some data is modified, that data's data manager should
32 join a transaction so that data can be committed when the user commits
33 the transaction.
34 """
35
36 - def __init__(self, channel, txnmgr = None):
37 self.channel = channel
38 self.channel.tx_select()
39 self.transaction_manager = txnmgr
40
41
42
43
44
45
46
47
48 - def abort(self, transaction):
49 """Abort a transaction and forget all changes.
50
51 Abort must be called outside of a two-phase commit.
52
53 Abort is called by the transaction manager to abort transactions
54 that are not yet in a two-phase commit.
55 """
56
57 log.debug("abort'ed")
58 self.channel.tx_rollback()
59
60
61
62
63
64
66 """Begin commit of a transaction, starting the two-phase commit.
67
68 transaction is the ITransaction instance associated with the
69 transaction being committed.
70 """
71
72 log.debug("tpc_begin'ed")
73
74 - def commit(self, transaction):
75 """Commit modifications to registered objects.
76
77 Save changes to be made persistent if the transaction commits (if
78 tpc_finish is called later). If tpc_abort is called later, changes
79 must not persist.
80
81 This includes conflict detection and handling. If no conflicts or
82 errors occur, the data manager should be prepared to make the
83 changes persist when tpc_finish is called.
84 """
85
86 log.debug("commit'ed")
87
88
90 """Indicate confirmation that the transaction is done.
91
92 Make all changes to objects modified by this transaction persist.
93
94 transaction is the ITransaction instance associated with the
95 transaction being committed.
96
97 This should never fail. If this raises an exception, the
98 database is not expected to maintain consistency; it's a
99 serious error.
100 """
101 log.debug("tpc_finish'ed")
102 try:
103 self.channel.tx_commit()
104 except Exception as e:
105 log.exception("tpc_finish completed FAIL")
106 else:
107 log.debug("tpc_finish completed OK")
108
109
111 """Verify that a data manager can commit the transaction.
112
113 This is the last chance for a data manager to vote 'no'. A
114 data manager votes 'no' by raising an exception.
115
116 transaction is the ITransaction instance associated with the
117 transaction being committed.
118 """
119
120 log.debug("tpc_voted")
121
122
124 """Abort a transaction.
125
126 This is called by a transaction manager to end a two-phase commit on
127 the data manager. Abandon all changes to objects modified by this
128 transaction.
129
130 transaction is the ITransaction instance associated with the
131 transaction being committed.
132
133 This should never fail.
134 """
135 log.debug("tpc_abort'ed")
136 try:
137 self.channel.tx_rollback()
138 except Exception as e:
139 log.exception(e)
140 log.debug("tpc_abort failed with exception")
141 else:
142 log.debug("tpc_abort completed")
143
144
146 """Return a key to use for ordering registered DataManagers.
147 """
148
149
150 return "~~~~~~~"
151
152
153
154
155
156
157
160 self.datamgr = AmqpDataManager(channel)
161 self.txnid = int(time.clock()*1e6) % sys.maxint
162
165
166 - def __exit__(self, type, value, traceback):
167 if type is None:
168 try:
169 self.datamgr.tpc_begin(self.txnid)
170 self.datamgr.commit(self.txnid)
171 self.datamgr.tpc_vote(self.txnid)
172 self.datamgr.tpc_finish(self.txnid)
173 except Exception as e:
174 self.datamgr.tpc_abort(self.txnid)
175 raise
176 else:
177 try:
178 self.datamgr.abort(self.txnid)
179 except Exception as e:
180 pass
181