Skip to content

Commit 9224d00

Browse files
authored
Rename rpcclient to Requester and rpcServer to Responder (#72)
* Rename rpcclient to Requester and rpcServer to Responder, like .NET and Java AMQP 1.0 clients --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 051cc11 commit 9224d00

File tree

9 files changed

+83
-83
lines changed

9 files changed

+83
-83
lines changed

docs/examples/rpc_echo_server/main.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,39 @@ import (
1111
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
1212
)
1313

14-
type echoRpcServer struct {
14+
type echoResponder struct {
1515
conn *rabbitmqamqp.AmqpConnection
16-
server rabbitmqamqp.RpcServer
16+
server rabbitmqamqp.Responder
1717
}
1818

19-
func (s *echoRpcServer) stop(ctx context.Context) {
19+
func (s *echoResponder) stop(ctx context.Context) {
2020
s.server.Close(ctx)
2121
s.conn.Close(ctx)
2222
}
2323

24-
func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
24+
func newEchoResponder(conn *rabbitmqamqp.AmqpConnection) *echoResponder {
2525
_, err := conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
26-
Name: rpcServerQueueName,
26+
Name: requestQueue,
2727
})
2828
if err != nil {
2929
panic(err)
3030
}
31-
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
32-
RequestQueue: rpcServerQueueName,
31+
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
32+
RequestQueue: requestQueue,
3333
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
3434
return request, nil
3535
},
3636
})
3737
if err != nil {
3838
panic(err)
3939
}
40-
return &echoRpcServer{
40+
return &echoResponder{
4141
conn: conn,
4242
server: srv,
4343
}
4444
}
4545

46-
const rpcServerQueueName = "rpc-queue"
46+
const requestQueue = "go-amqp1.0-request-queue"
4747

4848
func main() {
4949
// Dial rabbit for RPC server connection
@@ -52,16 +52,16 @@ func main() {
5252
panic(err)
5353
}
5454

55-
srv := newEchoRpcServer(srvConn)
55+
srv := newEchoResponder(srvConn)
5656

5757
// Dial rabbit for RPC client connection
5858
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
5959
if err != nil {
6060
panic(err)
6161
}
6262

63-
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
64-
RequestQueueName: rpcServerQueueName,
63+
requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
64+
RequestQueueName: requestQueue,
6565
})
6666
if err != nil {
6767
panic(err)
@@ -94,7 +94,7 @@ func main() {
9494
continue
9595
}
9696

97-
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
97+
resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
9898
if err != nil {
9999
fmt.Printf("Error calling RPC: %v\n", err)
100100
continue

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,10 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti
192192
return newConsumer(ctx, a, destinationAdd, options)
193193
}
194194

195-
// NewRpcServer creates a new RPC server that processes requests from the
195+
// NewResponder creates a new RPC server that processes requests from the
196196
// specified queue. The requestQueue in options is mandatory, while other
197197
// fields are optional and will use defaults if not provided.
198-
func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOptions) (RpcServer, error) {
198+
func (a *AmqpConnection) NewResponder(ctx context.Context, options ResponderOptions) (Responder, error) {
199199
if err := options.validate(); err != nil {
200200
return nil, fmt.Errorf("rpc server options validation: %w", err)
201201
}
@@ -231,7 +231,7 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
231231
replyPostProcessor = defaultReplyPostProcessor
232232
}
233233

234-
server := &amqpRpcServer{
234+
server := &amqpResponder{
235235
requestHandler: handler,
236236
requestQueue: options.RequestQueue,
237237
publisher: publisher,
@@ -244,14 +244,14 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
244244
return server, nil
245245
}
246246

247-
// NewRpcClient creates a new RPC client that sends requests to the specified queue
247+
// NewRequester creates a new RPC client that sends requests to the specified queue
248248
// and receives replies on a dynamically created reply queue.
249-
func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOptions) (RpcClient, error) {
249+
func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (Requester, error) {
250250
if options == nil {
251251
return nil, fmt.Errorf("options cannot be nil")
252252
}
253253
if options.RequestQueueName == "" {
254-
return nil, fmt.Errorf("requestQueueName is mandatory")
254+
return nil, fmt.Errorf("request QueueName is mandatory")
255255
}
256256

257257
// Create publisher for sending requests
@@ -305,7 +305,7 @@ func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOpt
305305
correlationIdExtractor = defaultReplyCorrelationIdExtractor
306306
}
307307

308-
client := &amqpRpcClient{
308+
client := &amqpRequester{
309309
requestQueue: requestQueue,
310310
replyToQueue: &QueueAddress{Queue: replyQueueName},
311311
publisher: publisher,

pkg/rabbitmqamqp/example_rpc_custom_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func Example_customCorrelationId() {
3838
panic(err)
3939
}
4040

41-
server, err := srvConn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
41+
server, err := srvConn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
4242
RequestQueue: rpcServerQueueNameCustom,
4343
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
4444
fmt.Printf("Received: %s\n", request.GetData())
@@ -70,7 +70,7 @@ func Example_customCorrelationId() {
7070
}
7171
defer clientConn.Close(context.Background())
7272

73-
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
73+
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
7474
RequestQueueName: rpcServerQueueNameCustom,
7575
CorrelationIdSupplier: &customCorrelationIDSupplier{},
7676
CorrelationIdExtractor: func(message *amqp.Message) any {

pkg/rabbitmqamqp/example_rpc_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
type echoRpcServer struct {
1515
conn *rabbitmqamqp.AmqpConnection
16-
server rabbitmqamqp.RpcServer
16+
server rabbitmqamqp.Responder
1717
}
1818

1919
func (s *echoRpcServer) stop(ctx context.Context) {
@@ -25,7 +25,7 @@ func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
2525
conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
2626
Name: rpcServerQueueName,
2727
})
28-
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
28+
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
2929
RequestQueue: rpcServerQueueName,
3030
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
3131
fmt.Printf("echo: %s\n", request.GetData())
@@ -58,7 +58,7 @@ func Example() {
5858
panic(err)
5959
}
6060

61-
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
61+
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
6262
RequestQueueName: rpcServerQueueName,
6363
})
6464
if err != nil {

pkg/rabbitmqamqp/rpc_client.go renamed to pkg/rabbitmqamqp/requester.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/google/uuid"
1313
)
1414

15-
// RpcClient is an interface for making RPC (Remote Procedure Call) requests over AMQP.
15+
// Requester is an interface for making RPC (Remote Procedure Call) requests over AMQP.
1616
// Implementations of this interface should handle the sending of requests and
1717
// the receiving of corresponding replies, managing correlation IDs and timeouts.
1818
//
@@ -33,7 +33,7 @@ import (
3333
// - `Message` provides a basic AMQP message structure for RPC requests.
3434
// - `Publish` sends the request message and returns a channel that will receive
3535
// the reply message, or be closed if a timeout occurs or the client is closed.
36-
type RpcClient interface {
36+
type Requester interface {
3737
Close(context.Context) error
3838
Message(body []byte) *amqp.Message
3939
Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)
@@ -86,16 +86,16 @@ var defaultReplyCorrelationIdExtractor CorrelationIdExtractor = func(message *am
8686
// address for the reply queue. The function must return the modified message.
8787
//
8888
// The default `RequestPostProcessor` implementation (used when `RequestPostProcessor`
89-
// is not explicitly set in `RpcClientOptions`) performs the following:
89+
// is not explicitly set in `RequesterOptions`) performs the following:
9090
// - Assigns the `correlationID` to the `MessageID` property of the `amqp.Message`.
9191
// - Sets the `ReplyTo` message property to a client-generated exclusive auto-delete queue.
9292
type RequestPostProcessor func(request *amqp.Message, correlationID any) *amqp.Message
9393

9494
var DefaultRpcRequestTimeout = 30 * time.Second
9595

96-
// RpcClientOptions is a struct that contains the options for the RPC client.
96+
// RequesterOptions is a struct that contains the options for the RPC client.
9797
// It is used to configure the RPC client.
98-
type RpcClientOptions struct {
98+
type RequesterOptions struct {
9999
// The name of the queue to send requests to. This queue must exist.
100100
//
101101
// Mandatory.
@@ -130,7 +130,7 @@ type outstandingRequest struct {
130130
// err error
131131
}
132132

133-
type amqpRpcClient struct {
133+
type amqpRequester struct {
134134
requestQueue ITargetAddress
135135
replyToQueue ITargetAddress
136136
publisher *Publisher
@@ -149,7 +149,7 @@ type amqpRpcClient struct {
149149
// Close shuts down the RPC client, closing its underlying publisher and consumer.
150150
// It ensures that all pending requests are cleaned up by closing their respective
151151
// channels. This method is safe to call multiple times.
152-
func (a *amqpRpcClient) Close(ctx context.Context) error {
152+
func (a *amqpRequester) Close(ctx context.Context) error {
153153
var err error
154154
a.closer.Do(func() {
155155
a.mu.Lock()
@@ -173,7 +173,7 @@ func (a *amqpRpcClient) Close(ctx context.Context) error {
173173
return err
174174
}
175175

176-
func (a *amqpRpcClient) Message(body []byte) *amqp.Message {
176+
func (a *amqpRequester) Message(body []byte) *amqp.Message {
177177
return amqp.NewMessage(body)
178178
}
179179

@@ -184,9 +184,9 @@ func (a *amqpRpcClient) Message(body []byte) *amqp.Message {
184184
// an `outstandingRequest` is created and stored, and a channel is returned
185185
// for the reply. The channel will be closed if the request times out or the
186186
// client is closed before a reply is received.
187-
func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-chan *amqp.Message, error) {
187+
func (a *amqpRequester) Publish(ctx context.Context, message *amqp.Message) (<-chan *amqp.Message, error) {
188188
if a.isClosed() {
189-
return nil, fmt.Errorf("rpc client is closed")
189+
return nil, fmt.Errorf("requester is closed")
190190
}
191191
replyTo, err := a.replyToQueue.toAddress()
192192
if err != nil {
@@ -220,7 +220,7 @@ func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-c
220220
return ch, nil
221221
}
222222

223-
func (a *amqpRpcClient) isClosed() bool {
223+
func (a *amqpRequester) isClosed() bool {
224224
a.mu.Lock()
225225
defer a.mu.Unlock()
226226
return a.closed
@@ -231,7 +231,7 @@ func (a *amqpRpcClient) isClosed() bool {
231231
// If a request's `sentAt` timestamp is older than the `requestTimeout`,
232232
// its channel is closed, and the request is removed from `pendingRequests`.
233233
// The goroutine exits when the `done` channel is closed, typically when the client is closed.
234-
func (a *amqpRpcClient) requestTimeoutTask() {
234+
func (a *amqpRequester) requestTimeoutTask() {
235235
t := time.NewTicker(a.requestTimeout)
236236
defer t.Stop()
237237
for {
@@ -260,7 +260,7 @@ func (a *amqpRpcClient) requestTimeoutTask() {
260260
// corresponding request's channel, and the request is removed from `pendingRequests`.
261261
// If no match is found, the message is requeued. The goroutine exits when the `done`
262262
// channel is closed, typically when the client is closed.
263-
func (a *amqpRpcClient) messageReceivedHandler() {
263+
func (a *amqpRequester) messageReceivedHandler() {
264264
for {
265265
select {
266266
case <-a.done:

pkg/rabbitmqamqp/rpc_client_test.go renamed to pkg/rabbitmqamqp/requester_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
. "github.com/onsi/gomega"
1111
)
1212

13-
var _ = Describe("RpcClient", func() {
13+
var _ = Describe("Requester", func() {
1414
var (
1515
conn *AmqpConnection
1616
queueName string
@@ -85,7 +85,7 @@ var _ = Describe("RpcClient", func() {
8585
// Server goroutine to handle incoming requests
8686
go pongRpcServer(ctx, publisher, consumer)
8787

88-
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
88+
client, err := conn.NewRequester(ctx, &RequesterOptions{
8989
RequestQueueName: queueName,
9090
})
9191
Ω(err).ShouldNot(HaveOccurred())
@@ -110,7 +110,7 @@ var _ = Describe("RpcClient", func() {
110110

111111
It("uses a custom correlation id extractor and post processor", func(ctx SpecContext) {
112112
go pongRpcServer(ctx, publisher, consumer)
113-
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
113+
client, err := conn.NewRequester(ctx, &RequesterOptions{
114114
RequestQueueName: queueName,
115115
CorrelationIdExtractor: func(message *amqp.Message) any {
116116
return message.ApplicationProperties["correlationId"]

0 commit comments

Comments
 (0)