AMQP Request/Reply
Experimental
AMQP Request Reply Emitter is an experimental feature.
The AMQP Request-Reply pattern allows you to publish a message to an AMQP address and then await for a reply message that responds to the initial request.
The AmqpRequestReply emitter implements the requestor (or the client) of the request-reply pattern for AMQP 1.0 outbound channels:
The request method publishes the request message to the configured target address of the outgoing channel,
and listens on a reply address (by default, the channel name with -reply suffix) for a reply message.
When the reply is received the returned Uni is completed with the message payload.
The request send operation generates a correlation id and sets the AMQP message-id property,
which it expects to be sent back in the reply message's correlation-id property.
The replier (or the server) can be implemented using a Reactive Messaging processor:
When you need more control over the reply message, such as setting the reply-to address and correlation id explicitly, you can use the Message type:
Given the following configuration example:
Requesting with Message types
Like the core Emitter's send methods, request method also can receive a Message type and return a message:
Note
The ingested reply type of the AmqpRequestReply is discovered at runtime,
in order to configure a MessageConverter to be applied on the incoming message before returning the Uni result.
Requesting multiple replies
You can use the requestMulti method to expect any number of replies represented by the Multi return type.
For example this can be used to aggregate multiple replies to a single request.
request you can also request Message types.
Note
The channel attribute reply.timeout will be applied between each message, if reached the returned Multi will
fail.
Pending replies and reply timeout
By default, the Uni returned from the request method is configured to fail with timeout exception if no reply is received after 5 seconds.
This timeout is configurable with the channel attribute reply.timeout (in milliseconds).
A snapshot of the list of pending replies is available through the AmqpRequestReply#getPendingReplies method.
Scaling Request/Reply
If multiple requestor instances are configured on the same outgoing address, and the same reply address, each requestor instance will receive replies of all instances. If an observed correlation id doesn't match the id of any pending replies, the reply is simply discarded. With the additional network traffic this allows scaling requestors, (and repliers) dynamically.
Correlation Ids
The AMQP Request/Reply allows configuring the correlation id mechanism completely through a CorrelationIdHandler implementation.
The default handler is based on randomly generated UUID strings, set as the AMQP message-id on the request message.
The reply message is expected to carry the same value in the correlation-id property.
The correlation id handler implementation can be configured using the reply.correlation-id.handler attribute.
The default configuration is uuid, which uses randomly generated UUID strings as correlation ids.
An alternative bytes implementation can be used to generate random binary correlation ids.
The bytes id handler generates 12 random bytes,
but the length can be configured with the smallrye.amqp.request-reply.correlation-id.bytes.length attribute.
Custom handlers can be implemented by proposing a CDI-managed bean with @Identifier qualifier.
Reply Error Handling
If the reply server produces an error, it can propagate the error back to the requestor, failing the returned Uni.
If configured using the reply.failure.handler channel attribute,
the ReplyFailureHandler implementations are discovered through CDI, matching the @Identifier qualifier.
A sample reply error handler can lookup application properties and return the error to be thrown by the reply:
null return value indicates that no error has been found in the reply message, and it can be delivered to the application.
Connection Sharing
Multiple AMQP channels can share the same underlying connection when configured with the same container-id.
This reduces resource consumption and is particularly useful for request-reply patterns
where the sender and reply receiver can share a single connection.
Both channels above will share the same AMQP connection because they use the same container-id.
Note
Connection sharing requires all channels with the same container-id to have compatible connection settings (host, port, credentials, etc.).
If the settings differ, the connector will detect the conflict and raise an error.
Using with RabbitMQ
RabbitMQ 4.0+ with native AMQP 1.0 support is compatible with the request-reply pattern.
When using RabbitMQ, remember to use /queues/ prefixed addresses (v2 address format)
and set use-anonymous-sender=false as anonymous senders are not supported.
See Using RabbitMQ for more details on RabbitMQ-specific configuration.
Limitations
The following features are not yet supported:
- Direct reply-to (
amq.rabbitmq.reply-to): RabbitMQ's direct reply-to mechanism requires link pairing, which is not yet implemented. - Link pairing: Pairing sender and receiver links on the same connection for request-reply routing is not yet available.