Skip to content

Commit d1ce340

Browse files
committed
PROTON-2890: [Python examples] Broker: Improve transactional link handling
* Use new link iterator API * Abort any active transactions when coordinator link closed
1 parent 53cc394 commit d1ce340

File tree

1 file changed

+30
-10
lines changed

1 file changed

+30
-10
lines changed

python/examples/broker.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def on_link_opening(self, event):
156156
# requested = link.remote_target.capabilities.get_object()
157157
link.target.type = Terminus.COORDINATOR
158158
link.target.copy(link.remote_target)
159+
link._txns = set()
159160
elif link.remote_target.address:
160161
link.target.address = link.remote_target.address
161162

@@ -228,42 +229,61 @@ def _discharge_txn(self, tid, failed):
228229
def _coordinator_message(self, msg, delivery):
229230
body = msg.body
230231
if isinstance(body, Described):
232+
link = delivery.link
231233
d = body.descriptor
232-
if d == "amqp:declare:list":
234+
if d == "amqp:declare:list" or d == 0x31:
233235
# Allocate transaction id
234236
tid = self._declare_txn()
235237
self._verbose_print(f"{tid=}: Declare")
236238
delivery.local = TransactionalDisposition(tid)
237-
elif d == "amqp:discharge:list":
239+
link._txns.add(tid)
240+
elif d == "amqp:discharge:list" or d == 0x32:
238241
# Always accept commit/abort!
239242
value = body.value
240243
tid = bytes(value[0])
241244
failed = bool(value[1])
242-
if tid in self.txns:
245+
if tid in link._txns:
243246
self._discharge_txn(tid, failed)
244247
delivery.update(Disposition.ACCEPTED)
248+
link._txns.remove(tid)
245249
else:
246250
self._verbose_print(f"{tid=}: Discharge unknown txn-id: {failed=}")
247251
delivery.local.condition = Condition('amqp:transaction:unknown-id')
248252
delivery.update(Disposition.REJECTED)
249253
delivery.settle()
250254

251255
def on_link_closing(self, event):
252-
if event.link.is_sender:
253-
self._unsubscribe(event.link)
256+
link = event.link
257+
if link.is_sender:
258+
self._unsubscribe(link)
259+
elif link.target.type == Terminus.COORDINATOR:
260+
# Abort any remaining active transactions
261+
for tid in link._txns:
262+
self._discharge_txn(tid, failed=True)
263+
link._txns.clear()
254264

255265
def _remove_stale_consumers(self, connection):
256-
link = connection.link_head(Endpoint.REMOTE_ACTIVE)
257-
while link:
266+
for link in connection.links(Endpoint.REMOTE_ACTIVE):
258267
if link.is_sender:
259268
self._unsubscribe(link)
260-
link = link.next(Endpoint.REMOTE_ACTIVE)
269+
270+
def _abort_active_transactions(self, connection):
271+
for link in connection.links(Endpoint.LOCAL_ACTIVE):
272+
if link.target.type == Terminus.COORDINATOR:
273+
# Abort any remaining active transactions
274+
for tid in link._txns:
275+
self._discharge_txn(tid, failed=True)
276+
link._txns.clear()
261277

262278
def on_connection_closing(self, event):
263-
self._remove_stale_consumers(event.connection)
279+
connection = event.connection
280+
self._remove_stale_consumers(connection)
281+
self._abort_active_transactions(connection)
264282

265283
def on_disconnected(self, event):
266-
self._remove_stale_consumers(event.connection)
284+
connection = event.connection
285+
self._remove_stale_consumers(connection)
286+
self._abort_active_transactions(connection)
267287

268288
def on_sendable(self, event):
269289
link: Link = event.link

0 commit comments

Comments
 (0)