Skip to content
13 changes: 12 additions & 1 deletion docs/examples/rpc_echo_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newEchoResponder(conn *rabbitmqamqp.AmqpConnection) *echoResponder {
}
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: requestQueue,

Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
return request, nil
},
Expand All @@ -51,9 +52,12 @@ func main() {
if err != nil {
panic(err)
}
_ = srvConn.Management().DeleteQueue(context.TODO(), requestQueue)

srv := newEchoResponder(srvConn)
reply, _ := srv.server.GetRequestQueue()

Comment on lines +58 to 59
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Error from GetRequestQueue() is silently ignored. While this is example code, it's better practice to handle or at least log errors to demonstrate proper error handling patterns.

Suggested change
reply, _ := srv.server.GetRequestQueue()
reply, err := srv.server.GetRequestQueue()
if err != nil {
panic(err)
}

Copilot uses AI. Check for mistakes.
fmt.Printf("request queue %s \n", reply)
// Dial rabbit for RPC client connection
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
if err != nil {
Expand All @@ -62,10 +66,18 @@ func main() {

requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: requestQueue,
// Enable Direct Reply To feature
// see: https://www.rabbitmq.com/direct-reply-to.html
DirectReplyTo: true,
})
if err != nil {
panic(err)
}
reply, err = requester.GetReplyQueue()
if err != nil {
panic(fmt.Errorf("failed to get reply queue: %w", err))
}
fmt.Printf("replyTo to %s \n", reply)

// Set up a channel to listen for OS signals
sigs := make(chan os.Signal, 1)
Expand Down Expand Up @@ -93,7 +105,6 @@ func main() {
if message == "" {
continue
}

resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
if err != nil {
fmt.Printf("Error calling RPC: %v\n", err)
Expand Down
48 changes: 26 additions & 22 deletions pkg/rabbitmqamqp/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmqamqp
import (
"errors"
"fmt"
"net/url"
"strings"
)

Expand Down Expand Up @@ -97,20 +98,23 @@ func purgeQueueAddress(queue *string) (string, error) {

// encodePathSegments takes a string and returns its percent-encoded representation.
func encodePathSegments(input string) string {
var encoded strings.Builder

// Iterate over each character in the input string
for _, char := range input {
// Check if the character is an unreserved character (i.e., it doesn't need encoding)
if isUnreserved(char) {
encoded.WriteRune(char) // Append as is
} else {
// Encode character To %HH format
encoded.WriteString(fmt.Sprintf("%%%02X", char))
}
}

return encoded.String()
encoded := url.QueryEscape(input)
return strings.ReplaceAll(encoded, "+", "%20")

//var encoded strings.Builder
//
//// Iterate over each character in the input string
//for _, char := range input {
// // Check if the character is an unreserved character (i.e., it doesn't need encoding)
// if isUnreserved(char) {
// encoded.WriteRune(char) // Append as is
// } else {
// // Encode character To %HH format
// encoded.WriteString(fmt.Sprintf("%%%02X", char))
// }
//}
//
//return encoded.String()
Comment on lines +104 to +117
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code from lines 104-117. The refactoring to use url.QueryEscape is complete, so the old implementation should be deleted.

Suggested change
//var encoded strings.Builder
//
//// Iterate over each character in the input string
//for _, char := range input {
// // Check if the character is an unreserved character (i.e., it doesn't need encoding)
// if isUnreserved(char) {
// encoded.WriteRune(char) // Append as is
// } else {
// // Encode character To %HH format
// encoded.WriteString(fmt.Sprintf("%%%02X", char))
// }
//}
//
//return encoded.String()

Copilot uses AI. Check for mistakes.
}

//// Decode takes a percent-encoded string and returns its decoded representation.
Expand All @@ -124,14 +128,14 @@ func encodePathSegments(input string) string {
// return decoded, nil
//}

// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
func isUnreserved(char rune) bool {
return (char >= 'A' && char <= 'Z') ||
(char >= 'a' && char <= 'z') ||
(char >= '0' && char <= '9') ||
char == '-' || char == '.' || char == '_' || char == '~'
}
//// isUnreserved checks if a character is an unreserved character in percent encoding
//// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
//func isUnreserved(char rune) bool {
// return (char >= 'A' && char <= 'Z') ||
// (char >= 'a' && char <= 'z') ||
// (char >= '0' && char <= '9') ||
// char == '-' || char == '.' || char == '_' || char == '~'
//}

Comment on lines +131 to 139
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out isUnreserved function from lines 131-138 since it's no longer used after the refactoring.

Suggested change
//// isUnreserved checks if a character is an unreserved character in percent encoding
//// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
//func isUnreserved(char rune) bool {
// return (char >= 'A' && char <= 'Z') ||
// (char >= 'a' && char <= 'z') ||
// (char >= '0' && char <= '9') ||
// char == '-' || char == '.' || char == '_' || char == '~'
//}

Copilot uses AI. Check for mistakes.
func bindingPath() string {
return "/" + bindings
Expand Down
51 changes: 35 additions & 16 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,25 @@ func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAd

// NewConsumer creates a new Consumer that listens to the provided Queue
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error) {
destination := &QueueAddress{
Queue: queueName,
}

if options != nil {
err := options.validate(a.featuresAvailable)
if err != nil {
return nil, err
}
}

if options != nil && options.isDirectReplyToEnable() {
return newConsumer(ctx, a, "", options)
}

destination := &QueueAddress{
Queue: queueName,
}
destinationAdd, err := destination.toAddress()
if err != nil {
return nil, err
}

return newConsumer(ctx, a, destinationAdd, options)
}

Expand Down Expand Up @@ -264,18 +268,23 @@ func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOpt
}

replyQueueName := options.ReplyToQueueName
if len(replyQueueName) == 0 {
replyQueueName = generateNameWithDefaultPrefix()
}
queueName := ""
if !options.DirectReplyTo {

// Declare reply queue as exclusive, auto-delete classic queue
q, err := a.management.DeclareQueue(ctx, &ClassicQueueSpecification{
Name: replyQueueName,
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
return nil, fmt.Errorf("failed to declare reply queue: %w", err)
if len(replyQueueName) == 0 {
replyQueueName = generateNameWithDefaultPrefix()
}

// Declare reply queue as exclusive, auto-delete classic queue
q, err := a.management.DeclareQueue(ctx, &ClassicQueueSpecification{
Name: replyQueueName,
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
return nil, fmt.Errorf("failed to declare reply queue: %w", err)
}
queueName = q.Name()
}

// Set defaults for optional fields
Expand Down Expand Up @@ -318,13 +327,23 @@ func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOpt
}

// Create consumer for receiving replies
consumer, err := a.NewConsumer(ctx, q.Name(), nil)
consumer, err := a.NewConsumer(ctx, queueName, &ConsumerOptions{
DirectReplyTo: options.DirectReplyTo,
})
if err != nil {
_ = publisher.Close(ctx) // cleanup publisher on failure
return nil, fmt.Errorf("failed to create consumer: %w", err)
}

client.consumer = consumer
reply, err := consumer.GetQueue()
if err != nil {
_ = publisher.Close(ctx) // cleanup publisher on failure
_ = consumer.Close(ctx)
return nil, fmt.Errorf("failed to get reply queue: %w", err)
}

client.replyToQueue = &QueueAddress{Queue: reply}

go client.messageReceivedHandler()
go client.requestTimeoutTask()
Expand Down
25 changes: 22 additions & 3 deletions pkg/rabbitmqamqp/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Consumer struct {
currentOffset int64

state consumerState

// see GetQueue method for more details.
queue string
}

func (c *Consumer) Id() string {
Expand All @@ -98,7 +101,7 @@ func (c *Consumer) Id() string {

func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options IConsumerOptions) (*Consumer, error) {
id := fmt.Sprintf("consumer-%s", uuid.New().String())
if options != nil && options.id() != "" {
if options != nil && len(options.id()) > 0 {
id = options.id()
}

Expand Down Expand Up @@ -130,13 +133,22 @@ func (c *Consumer) createReceiver(ctx context.Context) error {
}
}
}
// define a variable *amqp.ReceiverOptions type
var receiverOptions *amqp.ReceiverOptions

if c.options != nil && c.options.isDirectReplyToEnable() {
receiverOptions = createDynamicReceiverLinkOptions(c.options)
} else {
receiverOptions = createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce)
}

receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd,
createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce))
receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, receiverOptions)
if err != nil {
return err
}

c.queue = receiver.Address()

c.receiver.Swap(receiver)
return nil
}
Expand All @@ -160,6 +172,13 @@ func (c *Consumer) Close(ctx context.Context) error {
return c.receiver.Load().Close(ctx)
}

// GetQueue returns the queue the consumer is connected to.
// When the user sets the destination address to a dynamic address, this function will return the dynamic address.
// like direct-reply-to address. In other cases, it will return the queue address.
func (c *Consumer) GetQueue() (string, error) {
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parseQueueAddress function expects addresses to start with '/queues/' but direct-reply-to addresses (like 'amq.rabbitmq.reply-to') don't have this prefix. This will cause an error when calling GetQueue() on a consumer created with DirectReplyTo: true. The function should handle both standard queue addresses and direct-reply-to addresses differently.

Suggested change
func (c *Consumer) GetQueue() (string, error) {
func (c *Consumer) GetQueue() (string, error) {
// Handle direct-reply-to (dynamic) addresses
if c.queue == "amq.rabbitmq.reply-to" {
return c.queue, nil
}

Copilot uses AI. Check for mistakes.
return parseQueueAddress(c.queue)
}

// pause drains the credits of the receiver and stops issuing new credits.
func (c *Consumer) pause(ctx context.Context) error {
if c.state == consumerStatePaused || c.state == consumerStatePausing {
Expand Down
89 changes: 89 additions & 0 deletions pkg/rabbitmqamqp/amqp_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,92 @@ var _ = Describe("Consumer pause and unpause", func() {
}
}, SpecTimeout(time.Second*10))
})

var _ = Describe("Consumer direct reply to", func() {
It("Queue address should be the same passed by the user", func() {
qName := generateNameWithDateTime("Queue address should be the same passed by the user")
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())

consumer, err := connection.NewConsumer(context.Background(), qName, &ConsumerOptions{})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
q := &QueueAddress{Queue: qName}
r, e := q.toAddress()
Expect(e).To(BeNil())
qc, err := consumer.GetQueue()
Expect(err).To(BeNil())
Expect(qc).To(Equal(qName))

qResult := &QueueAddress{Queue: qName}
rqResultName, e := qResult.toAddress()
Expect(e).To(BeNil())

Expect(r).To(Equal(rqResultName))
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})

It("Queue address should be the dynamic name containing amq.rabbitmq.reply-to", func() {

connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())

consumer, err := connection.NewConsumer(context.Background(), "", &ConsumerOptions{
DirectReplyTo: true,
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
addr, err := consumer.GetQueue()
Expect(err).To(BeNil())
Expect(addr).NotTo(ContainSubstring("/queues/"))
Expect(addr).To(ContainSubstring("amq.rabbitmq.reply-to"))
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})

It("Validate consumer queues with special characters", func() {
type ExpectedQueuesAndDestination struct {
Queue string
Address string
}
e := []ExpectedQueuesAndDestination{
{"queue with spaces", "/queues/queue%20with%20spaces"},
{"queue+with+plus", "/queues/queue%2Bwith%2Bplus"},
{"特殊字符", "/queues/%E7%89%B9%E6%AE%8A%E5%AD%97%E7%AC%A6"},
{"myQueue", "/queues/myQueue"},
{"queue/with/slash", "/queues/queue%2Fwith%2Fslash"},
{"queue?with?question", "/queues/queue%3Fwith%3Fquestion"},
{"emoji😊queue", "/queues/emoji%F0%9F%98%8Aqueue"},
{"!@#$%^&*()", "/queues/%21%40%23%24%25%5E%26%2A%28%29"},
}
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
for i := range e {
queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
Name: e[i].Queue,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())

consumer, err := connection.NewConsumer(context.Background(), e[i].Queue, nil)
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
qc, err := consumer.GetQueue()
Expect(err).To(BeNil())
Expect(qc).To(Equal(e[i].Queue))
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(consumer.destinationAdd).To(Equal(e[i].Address))
Expect(connection.Management().DeleteQueue(context.Background(), e[i].Queue)).To(BeNil())
}
Expect(connection.Close(context.Background())).To(BeNil())

})

})
Loading