Skip to content

Commit 81417af

Browse files
committed
PROTON-2873: Add a disposition for transaction declaration
The original work adding in the transactional state disposition did not include the declared disposition used by the transaction coordinator to allocate a transaction id to a new transaction. Subsequent code used the tranasaction state disposition instead of the declared disposition which is incorrect.
1 parent 36b6ec5 commit 81417af

File tree

13 files changed

+190
-14
lines changed

13 files changed

+190
-14
lines changed

c/include/proton/disposition.h

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ typedef struct pn_disposition_t pn_disposition_t;
9090
*/
9191
#define PN_MODIFIED (0x0000000000000027)
9292

93+
/**
94+
* The PN_DECLARED delivery state is a terminal state
95+
* indicating that a transaction has been declared and indicating its
96+
* transaction identifier.
97+
*/
98+
#define PN_DECLARED (0x0000000000000033)
99+
93100
/**
94101
* The PN_TRANSACTIONAL_STATE delivery state is a non terminal state
95102
* indicating the transactional state of a delivery.
@@ -254,6 +261,13 @@ typedef struct pn_rejected_disposition_t pn_rejected_disposition_t;
254261
*/
255262
typedef struct pn_modified_disposition_t pn_modified_disposition_t;
256263

264+
/**
265+
* A transaction declared delivery disposition
266+
*
267+
* This represents a transaction declared disposition.
268+
*/
269+
typedef struct pn_declared_disposition_t pn_declared_disposition_t;
270+
257271
/**
258272
* A transactional delivery disposition
259273
*
@@ -304,6 +318,15 @@ PN_EXTERN pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *d
304318
*/
305319
PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition);
306320

321+
/**
322+
* Convert a delivery disposition to a transaction declared disposition
323+
*
324+
* @param[in] disposition delivery disposition object
325+
* @return a pointer to the transaction declared disposition or NULL
326+
* if the disposition is not that type
327+
*/
328+
PN_EXTERN pn_declared_disposition_t *pn_declared_disposition(pn_disposition_t *disposition);
329+
307330
/**
308331
* Convert a delivery disposition to a transactional disposition
309332
*
@@ -441,9 +464,24 @@ PN_EXTERN void pn_modified_disposition_set_undeliverable(pn_modified_disposition
441464
*/
442465
PN_EXTERN pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition);
443466

467+
/**
468+
* Get the transaction id for a transaction declared disposition
469+
*
470+
* @param[in] disposition a transaction declared disposition object
471+
* @return the transaction id
472+
*/
473+
PN_EXTERN pn_bytes_t pn_declared_disposition_get_id(pn_declared_disposition_t *disposition);
474+
475+
/**
476+
* Set the transaction id for a transaction declared disposition
477+
*
478+
* @param[in] disposition a transactional disposition object
479+
* @param[in] id the transaction id
480+
*/
481+
PN_EXTERN void pn_declared_disposition_set_id(pn_declared_disposition_t *disposition, pn_bytes_t id);
444482

445483
/**
446-
* Get the transaction id for a transactional disposition
484+
* Get the transaction id for a transaction declared disposition
447485
*
448486
* @param[in] disposition a transactional disposition object
449487
* @return the transaction id

c/src/core/emitters.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,15 @@ static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compoun
647647
}
648648
}
649649

650-
static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition);
650+
static inline void emit_declared_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_declared_disposition_t *disposition){
651+
for (bool small_encoding = true; ; small_encoding = false) {
652+
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
653+
pni_compound_context compound = c;
654+
emit_binary_bytes(emitter, &compound, disposition->id);
655+
emit_end_list(emitter, &compound, small_encoding);
656+
if (encode_succeeded(emitter, &compound)) break;
657+
}
658+
}
651659

652660
static inline void emit_transactional_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_transactional_disposition_t *disposition){
653661
for (bool small_encoding = true; ; small_encoding = false) {
@@ -700,6 +708,10 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context
700708
emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED);
701709
emit_modified_disposition(emitter, compound0, &disposition->u.s_modified);
702710
return;
711+
case PN_DISP_DECLARED:
712+
emit_descriptor(emitter, compound0, AMQP_DESC_DECLARED);
713+
emit_declared_disposition(emitter, compound0, &disposition->u.s_declared);
714+
return;
703715
case PN_DISP_TRANSACTIONAL:
704716
emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE);
705717
emit_transactional_disposition(emitter, compound0, &disposition->u.s_transactional);

c/src/core/engine-internal.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t {
341341
PN_DISP_REJECTED = PN_REJECTED,
342342
PN_DISP_RELEASED = PN_RELEASED,
343343
PN_DISP_MODIFIED = PN_MODIFIED,
344+
PN_DISP_DECLARED = PN_DECLARED,
344345
PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE,
345346
} pn_disposition_type_t;
346347

@@ -360,6 +361,10 @@ struct pn_modified_disposition_t {
360361
bool undeliverable;
361362
};
362363

364+
struct pn_declared_disposition_t {
365+
pn_bytes_t id;
366+
};
367+
363368
struct pn_transactional_disposition_t {
364369
pn_bytes_t id;
365370
pn_bytes_t outcome_raw;
@@ -376,6 +381,7 @@ struct pn_disposition_t {
376381
struct pn_received_disposition_t s_received;
377382
struct pn_rejected_disposition_t s_rejected;
378383
struct pn_modified_disposition_t s_modified;
384+
struct pn_declared_disposition_t s_declared;
379385
struct pn_transactional_disposition_t s_transactional;
380386
struct pn_custom_disposition_t s_custom;
381387
} u;

c/src/core/engine.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2013,6 +2013,13 @@ pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition
20132013
return &disposition->u.s_modified;
20142014
}
20152015

2016+
pn_declared_disposition_t *pn_declared_disposition(pn_disposition_t *disposition)
2017+
{
2018+
if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_DECLARED;
2019+
else if (disposition->type!=PN_DISP_DECLARED) return NULL;
2020+
return &disposition->u.s_declared;
2021+
}
2022+
20162023
pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition)
20172024
{
20182025
if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_TRANSACTIONAL;
@@ -2099,6 +2106,19 @@ pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *dispos
20992106
return disposition->annotations;
21002107
}
21012108

2109+
pn_bytes_t pn_declared_disposition_get_id(pn_declared_disposition_t *disposition)
2110+
{
2111+
assert(disposition);
2112+
return disposition->id;
2113+
}
2114+
2115+
void pn_declared_disposition_set_id(pn_declared_disposition_t *disposition, pn_bytes_t id)
2116+
{
2117+
assert(disposition);
2118+
pn_bytes_free(disposition->id);
2119+
disposition->id = pn_bytes_dup(id);
2120+
}
2121+
21022122
pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition)
21032123
{
21042124
assert(disposition);
@@ -2473,6 +2493,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
24732493
case PN_RECEIVED:
24742494
case PN_MODIFIED:
24752495
case PN_RELEASED:
2496+
case PN_DECLARED:
24762497
case PN_TRANSACTIONAL_STATE:
24772498
break;
24782499
default:
@@ -2488,6 +2509,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
24882509
case PN_RECEIVED:
24892510
case PN_MODIFIED:
24902511
case PN_RELEASED:
2512+
case PN_DECLARED:
24912513
case PN_TRANSACTIONAL_STATE:
24922514
delivery->local.type = state;
24932515
break;
@@ -2859,6 +2881,7 @@ const char *pn_disposition_type_name(uint64_t d) {
28592881
case PN_REJECTED: return "rejected";
28602882
case PN_RELEASED: return "released";
28612883
case PN_MODIFIED: return "modified";
2884+
case PN_DECLARED: return "transaction_declared";
28622885
case PN_TRANSACTIONAL_STATE: return "transactional_state";
28632886
default: return "unknown";
28642887
}

c/src/core/transport.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,14 @@ static void pni_amqp_decode_disposition (uint64_t type, pn_bytes_t disp_data, pn
16051605
}
16061606
break;
16071607
}
1608+
case AMQP_DESC_DECLARED: {
1609+
pn_bytes_t id;
1610+
pn_amqp_decode_Eze(disp_data, &id);
1611+
disp->type = PN_DISP_DECLARED;
1612+
pn_bytes_free(disp->u.s_declared.id);
1613+
disp->u.s_declared.id = pn_bytes_dup(id);
1614+
break;
1615+
}
16081616
case AMQP_DESC_TRANSACTIONAL_STATE: {
16091617
pn_bytes_t id;
16101618
bool qoutcome;

c/tools/codec-generator/specs.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"[D.[sSR]]",
2828
"[?o?oR]",
2929
"[z?R]",
30+
"[z]",
3031
"D.R",
3132
"D.[.....D..D.[.....RR]]",
3233
"D.[.....D..D.[R]...]",

python/cproton.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,12 +410,14 @@ typedef struct pn_custom_disposition_t pn_custom_disposition_t;
410410
typedef struct pn_received_disposition_t pn_received_disposition_t;
411411
typedef struct pn_rejected_disposition_t pn_rejected_disposition_t;
412412
typedef struct pn_modified_disposition_t pn_modified_disposition_t;
413+
typedef struct pn_declared_disposition_t pn_declared_disposition_t;
413414
typedef struct pn_transactional_disposition_t pn_transactional_disposition_t;
414415

415416
pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *disposition);
416417
pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition);
417418
pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition);
418419
pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition);
420+
pn_declared_disposition_t *pn_declared_disposition(pn_disposition_t *disposition);
419421
pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition);
420422

421423
void pn_custom_disposition_set_type(pn_custom_disposition_t *disposition, uint64_t type);
@@ -431,6 +433,8 @@ void pn_modified_disposition_set_failed(pn_modified_disposition_t *disposition,
431433
_Bool pn_modified_disposition_is_undeliverable(pn_modified_disposition_t *disposition);
432434
void pn_modified_disposition_set_undeliverable(pn_modified_disposition_t *disposition, _Bool undeliverable);
433435
pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition);
436+
pn_bytes_t pn_declared_disposition_get_id(pn_declared_disposition_t *disposition);
437+
void pn_declared_disposition_set_id(pn_declared_disposition_t *disposition, pn_bytes_t id);
434438
pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition);
435439
void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id);
436440
uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition);
@@ -664,6 +668,7 @@ int pn_transport_unbind(pn_transport_t *transport);
664668
#define PN_REJECTED ...
665669
#define PN_RELEASED ...
666670
#define PN_MODIFIED ...
671+
#define PN_DECLARED ...
667672
#define PN_TRANSACTIONAL_STATE ...
668673

669674
// Default message priority

python/cproton.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
PN_SSL_RESUME_UNKNOWN, PN_SSL_SHA1, PN_SSL_SHA256, PN_SSL_SHA512,
5555
PN_SSL_VERIFY_PEER, PN_SSL_VERIFY_PEER_NAME, PN_STRING, PN_SYMBOL,
5656
PN_TARGET, PN_TIMEOUT, PN_TIMER_TASK, PN_TIMESTAMP, PN_TRACE_DRV,
57-
PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSACTIONAL_STATE, PN_TRANSPORT,
57+
PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_DECLARED,
58+
PN_TRANSACTIONAL_STATE, PN_TRANSPORT,
5859
PN_TRANSPORT_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED,
5960
PN_TRANSPORT_TAIL_CLOSED, PN_UBYTE, PN_UINT, PN_ULONG, PN_UNSPECIFIED,
6061
PN_USHORT, PN_UUID, PN_VERSION_MAJOR, PN_VERSION_MINOR,
@@ -178,7 +179,8 @@
178179
pn_modified_disposition_set_failed,
179180
pn_modified_disposition_is_undeliverable,
180181
pn_modified_disposition_set_undeliverable,
181-
pn_modified_disposition_annotations)
182+
pn_modified_disposition_annotations,
183+
pn_declared_disposition)
182184

183185

184186
def isnull(obj):
@@ -774,3 +776,11 @@ def pn_transactional_disposition_get_id(disp):
774776

775777
def pn_transactional_disposition_set_id(disp, id):
776778
return lib.pn_transactional_disposition_set_id(disp, py2bytes(id))
779+
780+
781+
def pn_declared_disposition_get_id(disp):
782+
return bytes2pybytes(lib.pn_declared_disposition_get_id(disp))
783+
784+
785+
def pn_declared_disposition_set_id(disp, id):
786+
return lib.pn_declared_disposition_set_id(disp, py2bytes(id))

python/examples/broker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from typing import Optional, Union
2828

2929
from proton import (Condition, Delivery, Described, Disposition, DispositionType,
30-
Endpoint, Link, Sender, Message, Terminus, TransactionalDisposition)
30+
Endpoint, Link, Sender, Message, Terminus, DeclaredDisposition)
3131
from proton.handlers import MessagingHandler
3232
from proton.reactor import Container
3333

@@ -231,13 +231,13 @@ def _coordinator_message(self, msg, delivery):
231231
if isinstance(body, Described):
232232
link = delivery.link
233233
d = body.descriptor
234-
if d == "amqp:declare:list" or d == 0x31:
234+
if d == "amqp:declare:list":
235235
# Allocate transaction id
236236
tid = self._declare_txn()
237237
self._verbose_print(f"{tid=}: Declare")
238-
delivery.local = TransactionalDisposition(tid)
238+
delivery.local = DeclaredDisposition(tid)
239239
link._txns.add(tid)
240-
elif d == "amqp:discharge:list" or d == 0x32:
240+
elif d == "amqp:discharge:list":
241241
# Always accept commit/abort!
242242
value = body.value
243243
tid = bytes(value[0])

python/proton/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \
3838
byte, short, int32, float32, decimal32, decimal64, decimal128, AnnotationDict, PropertyDict, SymbolList
3939
from ._delivery import Delivery, Disposition, DispositionType, CustomDisposition, RejectedDisposition, \
40-
ModifiedDisposition, ReceivedDisposition, TransactionalDisposition
40+
ModifiedDisposition, ReceivedDisposition, DeclaredDisposition, TransactionalDisposition
4141
from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus
4242
from ._events import Collector, Event, EventType
4343
from ._exceptions import ProtonException, MessageException, DataException, TransportException, \
@@ -58,6 +58,7 @@
5858
"CustomDisposition",
5959
"Data",
6060
"DataException",
61+
"DeclaredDisposition",
6162
"Delivery",
6263
"Disposition",
6364
"DispositionType",

0 commit comments

Comments
 (0)