-
Notifications
You must be signed in to change notification settings - Fork 5
Implement direct reply to feature #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
700c525
9b547b6
33ccaa4
44d91d1
84cd2ae
4598ad5
ca2f78a
0e5a8e6
9202744
3f02324
ccc1b66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ package rabbitmqamqp | |||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||
| "net/url" | ||||||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
@@ -97,20 +98,23 @@ func purgeQueueAddress(queue *string) (string, error) { | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // encodePathSegments takes a string and returns its percent-encoded representation. | ||||||||||||||||||||||||||||||
| func encodePathSegments(input string) string { | ||||||||||||||||||||||||||||||
| var encoded strings.Builder | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Iterate over each character in the input string | ||||||||||||||||||||||||||||||
| for _, char := range input { | ||||||||||||||||||||||||||||||
| // Check if the character is an unreserved character (i.e., it doesn't need encoding) | ||||||||||||||||||||||||||||||
| if isUnreserved(char) { | ||||||||||||||||||||||||||||||
| encoded.WriteRune(char) // Append as is | ||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||
| // Encode character To %HH format | ||||||||||||||||||||||||||||||
| encoded.WriteString(fmt.Sprintf("%%%02X", char)) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| return encoded.String() | ||||||||||||||||||||||||||||||
| encoded := url.QueryEscape(input) | ||||||||||||||||||||||||||||||
| return strings.ReplaceAll(encoded, "+", "%20") | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| //var encoded strings.Builder | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| //// Iterate over each character in the input string | ||||||||||||||||||||||||||||||
| //for _, char := range input { | ||||||||||||||||||||||||||||||
| // // Check if the character is an unreserved character (i.e., it doesn't need encoding) | ||||||||||||||||||||||||||||||
| // if isUnreserved(char) { | ||||||||||||||||||||||||||||||
| // encoded.WriteRune(char) // Append as is | ||||||||||||||||||||||||||||||
| // } else { | ||||||||||||||||||||||||||||||
| // // Encode character To %HH format | ||||||||||||||||||||||||||||||
| // encoded.WriteString(fmt.Sprintf("%%%02X", char)) | ||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||
| //} | ||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||
| //return encoded.String() | ||||||||||||||||||||||||||||||
|
Comment on lines
+104
to
+117
|
||||||||||||||||||||||||||||||
| //var encoded strings.Builder | |
| // | |
| //// Iterate over each character in the input string | |
| //for _, char := range input { | |
| // // Check if the character is an unreserved character (i.e., it doesn't need encoding) | |
| // if isUnreserved(char) { | |
| // encoded.WriteRune(char) // Append as is | |
| // } else { | |
| // // Encode character To %HH format | |
| // encoded.WriteString(fmt.Sprintf("%%%02X", char)) | |
| // } | |
| //} | |
| // | |
| //return encoded.String() |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out isUnreserved function from lines 131-138 since it's no longer used after the refactoring.
| //// isUnreserved checks if a character is an unreserved character in percent encoding | |
| //// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ | |
| //func isUnreserved(char rune) bool { | |
| // return (char >= 'A' && char <= 'Z') || | |
| // (char >= 'a' && char <= 'z') || | |
| // (char >= '0' && char <= '9') || | |
| // char == '-' || char == '.' || char == '_' || char == '~' | |
| //} |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -90,6 +90,9 @@ type Consumer struct { | |||||||||||||
| currentOffset int64 | ||||||||||||||
|
|
||||||||||||||
| state consumerState | ||||||||||||||
|
|
||||||||||||||
| // see GetQueue method for more details. | ||||||||||||||
| queue string | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (c *Consumer) Id() string { | ||||||||||||||
|
|
@@ -98,7 +101,7 @@ func (c *Consumer) Id() string { | |||||||||||||
|
|
||||||||||||||
| func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options IConsumerOptions) (*Consumer, error) { | ||||||||||||||
| id := fmt.Sprintf("consumer-%s", uuid.New().String()) | ||||||||||||||
| if options != nil && options.id() != "" { | ||||||||||||||
| if options != nil && len(options.id()) > 0 { | ||||||||||||||
| id = options.id() | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -130,13 +133,22 @@ func (c *Consumer) createReceiver(ctx context.Context) error { | |||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| // define a variable *amqp.ReceiverOptions type | ||||||||||||||
| var receiverOptions *amqp.ReceiverOptions | ||||||||||||||
|
|
||||||||||||||
| if c.options != nil && c.options.isDirectReplyToEnable() { | ||||||||||||||
| receiverOptions = createDynamicReceiverLinkOptions(c.options) | ||||||||||||||
| } else { | ||||||||||||||
| receiverOptions = createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, | ||||||||||||||
| createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce)) | ||||||||||||||
| receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, receiverOptions) | ||||||||||||||
| if err != nil { | ||||||||||||||
| return err | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| c.queue = receiver.Address() | ||||||||||||||
|
|
||||||||||||||
| c.receiver.Swap(receiver) | ||||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
@@ -160,6 +172,13 @@ func (c *Consumer) Close(ctx context.Context) error { | |||||||||||||
| return c.receiver.Load().Close(ctx) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // GetQueue returns the queue the consumer is connected to. | ||||||||||||||
| // When the user sets the destination address to a dynamic address, this function will return the dynamic address. | ||||||||||||||
| // like direct-reply-to address. In other cases, it will return the queue address. | ||||||||||||||
| func (c *Consumer) GetQueue() (string, error) { | ||||||||||||||
|
||||||||||||||
| func (c *Consumer) GetQueue() (string, error) { | |
| func (c *Consumer) GetQueue() (string, error) { | |
| // Handle direct-reply-to (dynamic) addresses | |
| if c.queue == "amq.rabbitmq.reply-to" { | |
| return c.queue, nil | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Error from
GetRequestQueue()is silently ignored. While this is example code, it's better practice to handle or at least log errors to demonstrate proper error handling patterns.