|
26 | 26 |
|
27 | 27 | class Client(MessagingHandler): |
28 | 28 | def __init__(self, url, requests): |
29 | | - super(Client, self).__init__() |
| 29 | + super(Client, self).__init__(auto_accept=False) |
30 | 30 | self.url = url |
31 | 31 | self.requests = requests |
| 32 | + self.outstanding = {} |
| 33 | + self.msgs = 0 |
32 | 34 |
|
33 | 35 | def on_start(self, event): |
34 | 36 | self.sender = event.container.create_sender(self.url) |
35 | 37 | self.receiver = event.container.create_receiver(self.sender.connection, None, dynamic=True) |
36 | 38 |
|
37 | 39 | def next_request(self): |
38 | 40 | if self.receiver.remote_source.address: |
39 | | - req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0]) |
| 41 | + request = self.requests.pop(0) |
| 42 | + req = Message(reply_to=self.receiver.remote_source.address, correlation_id=self.msgs, body=request) |
| 43 | + self.outstanding[req.correlation_id] = request |
| 44 | + self.msgs += 1 |
40 | 45 | self.sender.send(req) |
41 | 46 |
|
42 | 47 | def on_link_opened(self, event): |
43 | 48 | if event.receiver == self.receiver: |
44 | 49 | self.next_request() |
45 | 50 |
|
46 | 51 | def on_message(self, event): |
47 | | - print("%s => %s" % (self.requests.pop(0), event.message.body)) |
48 | 52 | if self.requests: |
49 | 53 | self.next_request() |
| 54 | + |
| 55 | + message = event.message |
| 56 | + delivery = event.delivery |
| 57 | + correlation_id = message.correlation_id |
| 58 | + request = self.outstanding.pop(correlation_id, None) |
| 59 | + |
| 60 | + if request: |
| 61 | + print(f"{request}({correlation_id}) => {message.body}") |
| 62 | + self.accept(delivery) |
50 | 63 | else: |
| 64 | + print(f"Unexpected response - unknown correlation_id({correlation_id}): {message.body}") |
| 65 | + self.reject(delivery) |
| 66 | + if not self.outstanding: |
51 | 67 | event.connection.close() |
52 | 68 |
|
53 | 69 |
|
|
0 commit comments