Skip to content

Conversation

@Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Nov 14, 2025

Closes #65

This PR implements the direct reply-to feature for RabbitMQ AMQP, allowing RPC clients to use ephemeral, temporary reply queues created by the broker instead of pre-declaring dedicated reply queues. This optimization reduces setup overhead and improves performance for RPC patterns.

Key changes:

  • Added DirectReplyTo option to RequesterOptions and ConsumerOptions to enable the feature
  • Added GetReplyQueue() and GetRequestQueue() methods to retrieve queue addresses for both requester and responder
  • Implemented dynamic receiver link creation with volatile queue capabilities for direct-reply-to consumers

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio added this to the 0.4.0 milestone Nov 14, 2025
@Gsantomaggio Gsantomaggio added the enhancement New feature or request label Nov 14, 2025
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio self-assigned this Nov 14, 2025
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio requested a review from Copilot November 17, 2025 11:51
Copilot finished reviewing on behalf of Gsantomaggio November 17, 2025 11:54
@Gsantomaggio Gsantomaggio marked this pull request as ready for review November 17, 2025 11:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements the direct reply-to feature for RabbitMQ AMQP, allowing RPC clients to use ephemeral, temporary reply queues created by the broker instead of pre-declaring dedicated reply queues. This optimization reduces setup overhead and improves performance for RPC patterns.

Key changes:

  • Added DirectReplyTo option to RequesterOptions and ConsumerOptions to enable the feature
  • Added GetReplyQueue() and GetRequestQueue() methods to retrieve queue addresses for both requester and responder
  • Implemented dynamic receiver link creation with volatile queue capabilities for direct-reply-to consumers

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
pkg/rabbitmqamqp/requester.go Added GetReplyQueue() method and DirectReplyTo option
pkg/rabbitmqamqp/responder.go Added GetRequestQueue() method
pkg/rabbitmqamqp/amqp_consumer.go Added dynamic receiver link support and GetQueue() method
pkg/rabbitmqamqp/amqp_connection.go Modified consumer/requester creation to support direct-reply-to
pkg/rabbitmqamqp/amqp_utils.go Added parseQueueAddress() and related helper functions
pkg/rabbitmqamqp/amqp_types.go Added DirectReplyTo field and validation for RabbitMQ 4.2+
pkg/rabbitmqamqp/address.go Refactored encoding to use standard library url.QueryEscape
pkg/rabbitmqamqp/amqp_consumer_test.go Added tests for direct-reply-to consumer behavior
pkg/rabbitmqamqp/amqp_utils_test.go Added tests for queue address encoding/decoding
docs/examples/rpc_echo_server/main.go Updated example to demonstrate direct-reply-to feature
Comments suppressed due to low confidence (1)

pkg/rabbitmqamqp/amqp_connection.go:321

  • When DirectReplyTo is true, replyQueueName will be an empty string, and this line creates a QueueAddress with an empty queue. This value is later overwritten at line 348, but it creates unnecessary allocation and could be confusing. Consider initializing replyToQueue as nil here and setting it only after getting the actual queue name from the consumer.
		replyToQueue:           &QueueAddress{Queue: replyQueueName},

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

// GetQueue returns the queue the consumer is connected to.
// When the user sets the destination address to a dynamic address, this function will return the dynamic address.
// like direct-reply-to address. In other cases, it will return the queue address.
func (c *Consumer) GetQueue() (string, error) {
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parseQueueAddress function expects addresses to start with '/queues/' but direct-reply-to addresses (like 'amq.rabbitmq.reply-to') don't have this prefix. This will cause an error when calling GetQueue() on a consumer created with DirectReplyTo: true. The function should handle both standard queue addresses and direct-reply-to addresses differently.

Suggested change
func (c *Consumer) GetQueue() (string, error) {
func (c *Consumer) GetQueue() (string, error) {
// Handle direct-reply-to (dynamic) addresses
if c.queue == "amq.rabbitmq.reply-to" {
return c.queue, nil
}

Copilot uses AI. Check for mistakes.
Comment on lines 226 to 229
//q := &QueueAddress{Queue: qName}
//r, e := q.toAddress()
//Expect(e).To(BeNil())
//Expect(r).To(Equal(consumer.GetQueue()))
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code. If this verification is needed, uncomment and fix it; otherwise, delete these lines to keep the codebase clean.

Suggested change
//q := &QueueAddress{Queue: qName}
//r, e := q.toAddress()
//Expect(e).To(BeNil())
//Expect(r).To(Equal(consumer.GetQueue()))

Copilot uses AI. Check for mistakes.
// The id of the consumer
Id string

//
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a documentation comment for the DirectReplyTo field to explain its purpose and usage, similar to other fields in ConsumerOptions.

Suggested change
//
// DirectReplyTo enables the direct reply-to feature for the consumer.
// Mostly used for RPC consumers.
// See https://www.rabbitmq.com/docs/next/direct-reply-to#overview for more details.

Copilot uses AI. Check for mistakes.
Comment on lines 187 to 198
} else {
destination := &QueueAddress{
Queue: queueName,
}
destinationAdd, err := destination.toAddress()
if err != nil {
return nil, err
}
return newConsumer(ctx, a, destinationAdd, options)

}

Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Remove unnecessary blank line at line 197 and simplify the else block by returning directly without else since the if block already returns.

Suggested change
} else {
destination := &QueueAddress{
Queue: queueName,
}
destinationAdd, err := destination.toAddress()
if err != nil {
return nil, err
}
return newConsumer(ctx, a, destinationAdd, options)
}
}
destination := &QueueAddress{
Queue: queueName,
}
destinationAdd, err := destination.toAddress()
if err != nil {
return nil, err
}
return newConsumer(ctx, a, destinationAdd, options)

Copilot uses AI. Check for mistakes.
Comment on lines +58 to 59
reply, _ := srv.server.GetRequestQueue()

Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Error from GetRequestQueue() is silently ignored. While this is example code, it's better practice to handle or at least log errors to demonstrate proper error handling patterns.

Suggested change
reply, _ := srv.server.GetRequestQueue()
reply, err := srv.server.GetRequestQueue()
if err != nil {
panic(err)
}

Copilot uses AI. Check for mistakes.
func (aco *ConsumerOptions) validate(available *featuresAvailable) error {
// direct reply to is supported since RabbitMQ 4.2.0
if aco.DirectReplyTo && !available.is42rMore {
return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2 or later")
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message says 'RabbitMQ 4.2 or later' but the comment says 'RabbitMQ 4.2.0'. Be consistent and use 'RabbitMQ 4.2.0 or later' in both places for clarity.

Suggested change
return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2 or later")
return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2.0 or later")

Copilot uses AI. Check for mistakes.
Comment on lines +73 to +75
func createDynamicReceiverLinkOptions(options IConsumerOptions) *amqp.ReceiverOptions {
prop := make(map[string]any)
prop["paired"] = true
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Properties field is not set in the returned amqp.ReceiverOptions. While the prop map is created with 'paired' set to true, it's never assigned to the Properties field of the returned struct. Add Properties: prop, to the returned struct if this property is intended to be used.

Copilot uses AI. Check for mistakes.
@Gsantomaggio Gsantomaggio merged commit 013ce81 into main Nov 17, 2025
2 checks passed
@Gsantomaggio Gsantomaggio deleted the feat/direct-reply-to branch November 17, 2025 13:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Direct Reply-To

2 participants