Receiving messages from RabbitMQ
The RabbitMQ connector lets you retrieve messages from a RabbitMQ
broker. The RabbitMQ connector retrieves
RabbitMQ Messages and maps each of them into Reactive Messaging
Messages
.
Note
In this context, the reactive messaging concept of a Channel is realised as a RabbitMQ Queue.
Example
Let’s imagine you have a RabbitMQ broker running, and accessible using
the rabbitmq:5672
address (by default it would use localhost:5672
).
Configure your application to receive RabbitMQ Messages on the prices
channel as follows:
-
Configures the broker/router host name. You can do it per channel (using the
host
attribute) or globally usingrabbitmq-host
. -
Configures the broker/router port. You can do it per channel (using the
port
attribute) or globally usingrabbitmq-port
. The default is 5672. -
Configures the broker/router username if required. You can do it per channel (using the
username
attribute) or globally usingrabbitmq-username
. -
Configures the broker/router password if required. You can do it per channel (using the
password
attribute) or globally usingrabbitmq-password
. -
Instructs the
prices
channel to be managed by the RabbitMQ connector. -
Configures the RabbitMQ queue to read messages from.
-
Configures the binding between the RabbitMQ exchange and the RabbitMQ queue using a routing key. The default is
#
(all messages will be forwarded from the exchange to the queue) but in general this can be a comma-separated list of one or more keys.
Then, your application receives Message<String>
. You can consume the
payload directly:
Or, you can retrieve the Message<String>
:
Note
Whether you need to explicitly acknowledge the message depends on the
auto-acknowledgement
channel setting; if that is set to true
then
your message will be automatically acknowledged on receipt.
Deserialization
The connector converts incoming RabbitMQ Messages into Reactive
Messaging Message<T>
instances. The payload type T
depends on the
value of the RabbitMQ received message Envelope content_type
and
content_encoding
properties.
content_encoding | content_type | Type |
---|---|---|
Value present | n/a | byte[] |
No value | text/plain |
String |
No value | application/json |
a JSON element which can be a JsonArray , JsonObject , String , ... if the buffer contains an array, object, string,... |
No value | Anything else | byte[] |
If you send objects with this RabbitMQ connector (outbound connector),
they are encoded as JSON and sent with content_type
set to
application/json
. You can receive this payload using (Vert.x) JSON
Objects, and then map it to the object class you want:
- The
Price
instances are automatically encoded to JSON by the connector - You can receive it using a
JsonObject
- Then, you can reconstruct the instance using the
mapTo
method
Inbound Metadata
Messages coming from RabbitMQ contain an instance of IncomingRabbitMQMetadata in the metadata.
RabbitMQ message headers can be accessed from the metadata either by
calling getHeader(String header, Class<T> type)
to retrieve a single
header value. or getHeaders()
to get a map of all header values.
The type <T>
of the header value depends on the RabbitMQ type used for
the header:
RabbitMQ Header Type | T |
---|---|
String | String |
Boolean | Boolean |
Number | Number |
List | java.util.List |
Acknowledgement
When a Reactive Messaging Message associated with a RabbitMQ Message is acknowledged, it informs the broker that the message has been accepted.
Whether you need to explicitly acknowledge the message depends on the
auto-acknowledgement
setting for the channel; if that is set to true
then your message will be automatically acknowledged on receipt.
Failure Management
If a message produced from a RabbitMQ message is nacked, a failure
strategy is applied. The RabbitMQ connector supports three strategies,
controlled by the failure-strategy
channel setting:
-
fail
- fail the application; no more RabbitMQ messages will be processed. The RabbitMQ message is marked as rejected. -
accept
- this strategy marks the RabbitMQ message as accepted. The processing continues ignoring the failure. -
reject
- this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message. -
requeue
- this strategy marks the RabbitMQ message as rejected with requeue flag to true. The processing continues with the next message, but the requeued message will be redelivered to the consumer.
When using dead-letter-queue
, it is also possible to change some
metadata of the record that is sent to the dead letter topic. To do
that, use the Message.nack(Throwable, Metadata)
method:
The RabbitMQ reject requeue
flag can be controlled on different failure strategies
using the RabbitMQRejectMetadata.
To do that, use the Message.nack(Throwable, Metadata)
method by including the
RabbitMQRejectMetadata
metadata with requeue
to true
.
Experimental
RabbitMQFailureHandler
is experimental and APIs are subject to change in the future
In addition, you can also provide your own failure strategy.
To provide a failure strategy implement a bean exposing the interface
RabbitMQFailureHandler,
qualified with a @Identifier
.
Set the name of the bean as the failure-strategy
channel setting.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
addresses (rabbitmq-addresses) | The multiple addresses for cluster mode, when given overrides the host and port | string | false | |
arguments | A comma-separated list of arguments [key1:value1,key2:value2,...] to bind the queue to the exchange. Relevant only if 'exchange.type' is headers | string | false | |
auto-acknowledgement | Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement | boolean | false | false |
auto-bind-dlq | Whether to automatically declare the DLQ and bind it to the binder DLX | boolean | false | false |
automatic-recovery-enabled | Whether automatic connection recovery is enabled | boolean | false | false |
automatic-recovery-on-initial-connection | Whether automatic recovery on initial connections is enabled | boolean | false | true |
broadcast | Whether the received RabbitMQ messages must be dispatched to multiple subscribers | boolean | false | false |
client-options-name (rabbitmq-client-options-name) | The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration | string | false | |
connection-timeout | The TCP connection timeout (ms); 0 is interpreted as no timeout | int | false | 60000 |
consumer-arguments | A comma-separated list of arguments [key1:value1,key2:value2,...] for created consumer | string | false | |
content-type-override | Override the content_type attribute of the incoming message, should be a valid MINE type | string | false | |
credentials-provider-name (rabbitmq-credentials-provider-name) | The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client | string | false | |
dead-letter-dlx | If specified, a DLX to assign to the DLQ. Relevant only if auto-bind-dlq is true | string | false | |
dead-letter-dlx-routing-key | If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true | string | false | |
dead-letter-exchange | A DLX to assign to the queue. Relevant only if auto-bind-dlq is true | string | false | DLX |
dead-letter-exchange-type | The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true | string | false | direct |
dead-letter-exchange.arguments | The identifier of the key-value Map exposed as bean used to provide arguments for dead-letter-exchange creation | string | false | |
dead-letter-queue-mode | If automatically declare DLQ, we can choose different modes of DLQ [lazy, default] | string | false | |
dead-letter-queue-name | The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended | string | false | |
dead-letter-queue-type | If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream] | string | false | |
dead-letter-queue.arguments | The identifier of the key-value Map exposed as bean used to provide arguments for dead-letter-queue creation | string | false | |
dead-letter-routing-key | A dead letter routing key to assign to the queue; if not supplied will default to the queue name | string | false | |
dead-letter-ttl | If specified, the time (ms) for which a message can remain in DLQ undelivered before it is dead. Relevant only if auto-bind-dlq is true | long | false | |
dlx.declare | Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently | boolean | false | false |
exchange.arguments | The identifier of the key-value Map exposed as bean used to provide arguments for exchange creation | string | false | rabbitmq-exchange-arguments |
exchange.auto-delete | Whether the exchange should be deleted after use | boolean | false | false |
exchange.declare | Whether to declare the exchange; set to false if the exchange is expected to be set up independently | boolean | false | true |
exchange.durable | Whether the exchange is durable | boolean | false | true |
exchange.name | The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used. | string | false | |
exchange.type | The exchange type: direct, fanout, headers or topic (default) | string | false | topic |
failure-strategy | The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are fail , accept , reject (default), requeue or name of a bean |
string | false | reject |
handshake-timeout | The AMQP 0-9-1 protocol handshake timeout (ms) | int | false | 10000 |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
health-lazy-subscription | Whether the liveness and readiness checks should report 'ok' when there is no subscription yet. This is useful when injecting the channel with @Inject @Channel("...") Multi<...> multi; |
boolean | false | false |
health-readiness-enabled | Whether readiness health reporting is enabled (default) or disabled | boolean | false | true |
host (rabbitmq-host) | The broker hostname | string | false | localhost |
include-properties | Whether to include properties when a broker message is passed on the event bus | boolean | false | false |
keep-most-recent | Whether to discard old messages instead of recent ones | boolean | false | false |
max-incoming-internal-queue-size | The maximum size of the incoming internal queue | int | false | 500000 |
max-outstanding-messages | The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number | int | false | |
network-recovery-interval | How long (ms) will automatic recovery wait before attempting to reconnect | int | false | 5000 |
password (rabbitmq-password) | The password used to authenticate to the broker | string | false | |
port (rabbitmq-port) | The broker port | int | false | 5672 |
queue.arguments | The identifier of the key-value Map exposed as bean used to provide arguments for queue creation | string | false | rabbitmq-queue-arguments |
queue.auto-delete | Whether the queue should be deleted after use | boolean | false | false |
queue.declare | Whether to declare the queue and binding; set to false if these are expected to be set up independently | boolean | false | true |
queue.durable | Whether the queue is durable | boolean | false | true |
queue.exclusive | Whether the queue is for exclusive use | boolean | false | false |
queue.name | The queue from which messages are consumed. If not set, the channel name is used. | string | false | |
queue.single-active-consumer | If set to true, only one consumer can actively consume messages | boolean | false | |
queue.ttl | If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead | long | false | |
queue.x-delivery-limit | If queue.x-queue-type is quorum, when a message has been returned more times than the limit the message will be dropped or dead-lettered | long | false | |
queue.x-max-priority | Define priority level queue consumer | int | false | |
queue.x-queue-mode | If automatically declare queue, we can choose different modes of queue [lazy, default] | string | false | |
queue.x-queue-type | If automatically declare queue, we can choose different types of queue [quorum, classic, stream] | string | false | |
reconnect-attempts (rabbitmq-reconnect-attempts) | The number of reconnection attempts | int | false | 100 |
reconnect-interval (rabbitmq-reconnect-interval) | The interval (in seconds) between two reconnection attempts | int | false | 10 |
requested-channel-max | The initially requested maximum channel number | int | false | 2047 |
requested-heartbeat | The initially requested heartbeat interval (seconds), zero for none | int | false | 60 |
routing-keys | A comma-separated list of routing keys to bind the queue to the exchange. Relevant only if 'exchange.type' is topic or direct | string | false | # |
ssl (rabbitmq-ssl) | Whether or not the connection should use SSL | boolean | false | false |
ssl.hostname-verification-algorithm | Set the hostname verifier algorithm for the TLS connection. Accepted values are HTTPS , and NONE (defaults). NONE disables the hostname verification. |
string | false | NONE |
tracing.attribute-headers | A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true | string | false | `` |
tracing.enabled | Whether tracing is enabled (default) or disabled | boolean | false | true |
trust-all (rabbitmq-trust-all) | Whether to skip trust certificate verification | boolean | false | false |
trust-store-password (rabbitmq-trust-store-password) | The password of the JKS trust store | string | false | |
trust-store-path (rabbitmq-trust-store-path) | The path to a JKS trust store | string | false | |
use-nio | Whether usage of NIO Sockets is enabled | boolean | false | false |
user | The user name to use when connecting to the broker | string | false | guest |
username (rabbitmq-username) | The username used to authenticate to the broker | string | false | |
virtual-host (rabbitmq-virtual-host) | The virtual host to use when connecting to the broker | string | false | / |
To use an existing queue, you need to configure the queue.name
attribute.
For example, if you have a RabbitMQ broker already configured with a
queue called peopleQueue
that you wish to read messages from, you need
the following configuration:
If you want RabbitMQ to create the queue for you but bind it to an
existing topic exchange people
, you need the following configuration:
Note
In the above the channel name people
is implicitly assumed to be the
name of the exchange; if this is not the case you would need to name the
exchange explicitly using the exchange.name
property.
Note
The connector supports RabbitMQ's "Server-named Queues" feature to create
an exclusive, auto-deleting, non-durable and randomly named queue. To
enable this feature you set the queue name to exactly (server.auto)
.
Using this name not only enables the server named queue feature but also
automatically makes ths queue exclusive, auto-deleting, and non-durable;
therefore ignoring any values provided for the exclusive
, auto-delete
and durable
options.
If you want RabbitMQ to create the people
exchange, queue and binding,
you need the following configuration:
In the above we have used an explicit list of routing keys rather than
the default (#
). Each component of the list creates a separate binding
between the queue and the exchange, so in the case above we would have
two bindings; one based on a routing key of tall
, the other based on
one of short
.
Note
The default value of routing-keys
is #
(indicating a match against
all possible routing keys) which is only appropriate for topic
Exchanges. If you are using other types of exchange and/or need to
declare queue bindings, you’ll need to supply a valid value for the
exchange in question.
Custom arguments for Queue declaration
When queue declaration is made by the Reactive Messaging channel, using the queue.declare=true
configuration,
custom queue arguments can be specified using the queue.arguments
attribute.
queue.arguments
accepts the identifier (using the @Identifier
qualifier) of a Map<String, Object>
exposed as a CDI bean.
If no arguments has been configured, the default rabbitmq-queue-arguments identifier is looked for.
The following CDI bean produces such a configuration identified with my-arguments:
Then the channel can be configured to use those arguments in exchange declaration:
Similarly, the dead-letter-queue.arguments
allows configuring custom arguments for dead letter queue when one is declared (auto-bind-dlq=true
).