-
Notifications
You must be signed in to change notification settings - Fork 5
Implement direct reply to feature #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements the direct reply-to feature for RabbitMQ AMQP, allowing RPC clients to use ephemeral, temporary reply queues created by the broker instead of pre-declaring dedicated reply queues. This optimization reduces setup overhead and improves performance for RPC patterns.
Key changes:
- Added
DirectReplyTooption toRequesterOptionsandConsumerOptionsto enable the feature - Added
GetReplyQueue()andGetRequestQueue()methods to retrieve queue addresses for both requester and responder - Implemented dynamic receiver link creation with volatile queue capabilities for direct-reply-to consumers
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/rabbitmqamqp/requester.go | Added GetReplyQueue() method and DirectReplyTo option |
| pkg/rabbitmqamqp/responder.go | Added GetRequestQueue() method |
| pkg/rabbitmqamqp/amqp_consumer.go | Added dynamic receiver link support and GetQueue() method |
| pkg/rabbitmqamqp/amqp_connection.go | Modified consumer/requester creation to support direct-reply-to |
| pkg/rabbitmqamqp/amqp_utils.go | Added parseQueueAddress() and related helper functions |
| pkg/rabbitmqamqp/amqp_types.go | Added DirectReplyTo field and validation for RabbitMQ 4.2+ |
| pkg/rabbitmqamqp/address.go | Refactored encoding to use standard library url.QueryEscape |
| pkg/rabbitmqamqp/amqp_consumer_test.go | Added tests for direct-reply-to consumer behavior |
| pkg/rabbitmqamqp/amqp_utils_test.go | Added tests for queue address encoding/decoding |
| docs/examples/rpc_echo_server/main.go | Updated example to demonstrate direct-reply-to feature |
Comments suppressed due to low confidence (1)
pkg/rabbitmqamqp/amqp_connection.go:321
- When
DirectReplyTois true,replyQueueNamewill be an empty string, and this line creates a QueueAddress with an empty queue. This value is later overwritten at line 348, but it creates unnecessary allocation and could be confusing. Consider initializingreplyToQueueas nil here and setting it only after getting the actual queue name from the consumer.
replyToQueue: &QueueAddress{Queue: replyQueueName},
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // GetQueue returns the queue the consumer is connected to. | ||
| // When the user sets the destination address to a dynamic address, this function will return the dynamic address. | ||
| // like direct-reply-to address. In other cases, it will return the queue address. | ||
| func (c *Consumer) GetQueue() (string, error) { |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parseQueueAddress function expects addresses to start with '/queues/' but direct-reply-to addresses (like 'amq.rabbitmq.reply-to') don't have this prefix. This will cause an error when calling GetQueue() on a consumer created with DirectReplyTo: true. The function should handle both standard queue addresses and direct-reply-to addresses differently.
| func (c *Consumer) GetQueue() (string, error) { | |
| func (c *Consumer) GetQueue() (string, error) { | |
| // Handle direct-reply-to (dynamic) addresses | |
| if c.queue == "amq.rabbitmq.reply-to" { | |
| return c.queue, nil | |
| } |
| //q := &QueueAddress{Queue: qName} | ||
| //r, e := q.toAddress() | ||
| //Expect(e).To(BeNil()) | ||
| //Expect(r).To(Equal(consumer.GetQueue())) |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out code. If this verification is needed, uncomment and fix it; otherwise, delete these lines to keep the codebase clean.
| //q := &QueueAddress{Queue: qName} | |
| //r, e := q.toAddress() | |
| //Expect(e).To(BeNil()) | |
| //Expect(r).To(Equal(consumer.GetQueue())) |
| // The id of the consumer | ||
| Id string | ||
|
|
||
| // |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a documentation comment for the DirectReplyTo field to explain its purpose and usage, similar to other fields in ConsumerOptions.
| // | |
| // DirectReplyTo enables the direct reply-to feature for the consumer. | |
| // Mostly used for RPC consumers. | |
| // See https://www.rabbitmq.com/docs/next/direct-reply-to#overview for more details. |
pkg/rabbitmqamqp/amqp_connection.go
Outdated
| } else { | ||
| destination := &QueueAddress{ | ||
| Queue: queueName, | ||
| } | ||
| destinationAdd, err := destination.toAddress() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return newConsumer(ctx, a, destinationAdd, options) | ||
|
|
||
| } | ||
|
|
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Remove unnecessary blank line at line 197 and simplify the else block by returning directly without else since the if block already returns.
| } else { | |
| destination := &QueueAddress{ | |
| Queue: queueName, | |
| } | |
| destinationAdd, err := destination.toAddress() | |
| if err != nil { | |
| return nil, err | |
| } | |
| return newConsumer(ctx, a, destinationAdd, options) | |
| } | |
| } | |
| destination := &QueueAddress{ | |
| Queue: queueName, | |
| } | |
| destinationAdd, err := destination.toAddress() | |
| if err != nil { | |
| return nil, err | |
| } | |
| return newConsumer(ctx, a, destinationAdd, options) |
| reply, _ := srv.server.GetRequestQueue() | ||
|
|
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Error from GetRequestQueue() is silently ignored. While this is example code, it's better practice to handle or at least log errors to demonstrate proper error handling patterns.
| reply, _ := srv.server.GetRequestQueue() | |
| reply, err := srv.server.GetRequestQueue() | |
| if err != nil { | |
| panic(err) | |
| } |
| func (aco *ConsumerOptions) validate(available *featuresAvailable) error { | ||
| // direct reply to is supported since RabbitMQ 4.2.0 | ||
| if aco.DirectReplyTo && !available.is42rMore { | ||
| return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2 or later") |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message says 'RabbitMQ 4.2 or later' but the comment says 'RabbitMQ 4.2.0'. Be consistent and use 'RabbitMQ 4.2.0 or later' in both places for clarity.
| return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2 or later") | |
| return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2.0 or later") |
| func createDynamicReceiverLinkOptions(options IConsumerOptions) *amqp.ReceiverOptions { | ||
| prop := make(map[string]any) | ||
| prop["paired"] = true |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Properties field is not set in the returned amqp.ReceiverOptions. While the prop map is created with 'paired' set to true, it's never assigned to the Properties field of the returned struct. Add Properties: prop, to the returned struct if this property is intended to be used.
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Closes #65
This PR implements the direct reply-to feature for RabbitMQ AMQP, allowing RPC clients to use ephemeral, temporary reply queues created by the broker instead of pre-declaring dedicated reply queues. This optimization reduces setup overhead and improves performance for RPC patterns.
Key changes:
DirectReplyTooption toRequesterOptionsandConsumerOptionsto enable the featureGetReplyQueue()andGetRequestQueue()methods to retrieve queue addresses for both requester and responder