Sending messages to RabbitMQ
The RabbitMQ connector can write Reactive Messaging Messages
as
RabbitMQ Messages.
Note
In this context, the reactive messaging concept of a Channel is realised as a RabbitMQ Exchange.
Example
Let’s imagine you have a RabbitMQ broker running, and accessible using
the rabbitmq:5672
address (by default it would use localhost:5672
).
Configure your application to send the messages from the prices
channel as a RabbitMQ Message as follows:
-
Configures the broker/router host name. You can do it per channel (using the
host
attribute) or globally usingrabbitmq-host
-
Configures the broker/router port. You can do it per channel (using the
port
attribute) or globally usingrabbitmq-port
. The default is5672
. -
Configures the broker/router username if required. You can do it per channel (using the
username
attribute) or globally usingrabbitmq-username
. -
Configures the broker/router password if required. You can do it per channel (using the
password
attribute) or globally usingrabbitmq-password
. -
Instructs the
prices
channel to be managed by the RabbitMQ connector -
Supplies the default routing key to be included in outbound messages; this will be if the "raw payload" form of message sending is used (see below).
Note
You don’t need to set the RabbitMQ exchange name. By default, it uses
the channel name (prices
) as the name of the exchange to send messages
to. You can configure the exchange.name
attribute to override it.
Then, your application can send Message<Double>
to the prices channel.
It can use double
payloads as in the following snippet:
Or, you can send Message<Double>
, which affords the opportunity to
explicitly specify metadata on the outgoing message:
Serialization
When sending a Message<T>
, the connector converts the message into a
RabbitMQ Message. The payload is converted to the RabbitMQ Message body.
T | RabbitMQ Message Body |
---|---|
primitive types or UUID /String |
String value with content_type set to text/plain |
JsonObject or JsonArray |
Serialized String payload with content_type set to application/json |
io.vertx.mutiny.core.buffer.Buffer |
Binary content, with content_type set to application/octet-stream |
byte[] |
Binary content, with content_type set to application/octet-stream |
Any other class | The payload is converted to JSON (using a Json Mapper) then serialized with content_type set to application/json |
If the message payload cannot be serialized to JSON, the message is nacked.
Outbound Metadata
When sending Messages
, you can add an instance of OutgoingRabbitMQMetadata
to influence how the message is handled by RabbitMQ. For example, you
can configure the routing key, timestamp and headers:
Acknowledgement
By default, the Reactive Messaging Message
is acknowledged when the
broker acknowledges the message.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
addresses (rabbitmq-addresses) | The multiple addresses for cluster mode, when given overrides the host and port | string | false | |
automatic-recovery-enabled | Whether automatic connection recovery is enabled | boolean | false | false |
automatic-recovery-on-initial-connection | Whether automatic recovery on initial connections is enabled | boolean | false | true |
client-options-name (rabbitmq-client-options-name) | The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration | string | false | |
connection-timeout | The TCP connection timeout (ms); 0 is interpreted as no timeout | int | false | 60000 |
credentials-provider-name (rabbitmq-credentials-provider-name) | The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client | string | false | |
default-routing-key | The default routing key to use when sending messages to the exchange | string | false | `` |
default-ttl | If specified, the time (ms) sent messages can remain in queues undelivered before they are dead | long | false | |
exchange.arguments | The identifier of the key-value Map exposed as bean used to provide arguments for exchange creation | string | false | rabbitmq-exchange-arguments |
exchange.auto-delete | Whether the exchange should be deleted after use | boolean | false | false |
exchange.declare | Whether to declare the exchange; set to false if the exchange is expected to be set up independently | boolean | false | true |
exchange.durable | Whether the exchange is durable | boolean | false | true |
exchange.name | The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used. | string | false | |
exchange.type | The exchange type: direct, fanout, headers or topic (default) | string | false | topic |
handshake-timeout | The AMQP 0-9-1 protocol handshake timeout (ms) | int | false | 10000 |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
health-readiness-enabled | Whether readiness health reporting is enabled (default) or disabled | boolean | false | true |
host (rabbitmq-host) | The broker hostname | string | false | localhost |
include-properties | Whether to include properties when a broker message is passed on the event bus | boolean | false | false |
max-inflight-messages | The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number | long | false | 1024 |
max-outgoing-internal-queue-size | The maximum size of the outgoing internal queue | int | false | |
network-recovery-interval | How long (ms) will automatic recovery wait before attempting to reconnect | int | false | 5000 |
password (rabbitmq-password) | The password used to authenticate to the broker | string | false | |
port (rabbitmq-port) | The broker port | int | false | 5672 |
publish-confirms | If set to true, published messages are acknowledged when the publish confirm is received from the broker | boolean | false | false |
reconnect-attempts (rabbitmq-reconnect-attempts) | The number of reconnection attempts | int | false | 100 |
reconnect-interval (rabbitmq-reconnect-interval) | The interval (in seconds) between two reconnection attempts | int | false | 10 |
requested-channel-max | The initially requested maximum channel number | int | false | 2047 |
requested-heartbeat | The initially requested heartbeat interval (seconds), zero for none | int | false | 60 |
retry-on-fail-attempts | The number of tentative to retry on failure | int | false | 6 |
retry-on-fail-interval | The interval (in seconds) between two sending attempts | int | false | 5 |
ssl (rabbitmq-ssl) | Whether or not the connection should use SSL | boolean | false | false |
ssl.hostname-verification-algorithm | Set the hostname verifier algorithm for the TLS connection. Accepted values are HTTPS , and NONE (defaults). NONE disables the hostname verification. |
string | false | NONE |
tracing.attribute-headers | A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true | string | false | `` |
tracing.enabled | Whether tracing is enabled (default) or disabled | boolean | false | true |
trust-all (rabbitmq-trust-all) | Whether to skip trust certificate verification | boolean | false | false |
trust-store-password (rabbitmq-trust-store-password) | The password of the JKS trust store | string | false | |
trust-store-path (rabbitmq-trust-store-path) | The path to a JKS trust store | string | false | |
use-nio | Whether usage of NIO Sockets is enabled | boolean | false | false |
user | The user name to use when connecting to the broker | string | false | guest |
username (rabbitmq-username) | The username used to authenticate to the broker | string | false | |
virtual-host (rabbitmq-virtual-host) | The virtual host to use when connecting to the broker | string | false | / |
Using existing destinations
To use an existing exchange, you may need to configure the
exchange.name
attribute.
For example, if you have a RabbitMQ broker already configured with an
exchange called people
that you wish to send messages to, you need the
following configuration:
You would need to configure the exchange.name
attribute, if the
exchange name were not the channel name:
If you want RabbitMQ to create the people
exchange, you need the
following configuration:
Note
The above example will create a topic
exchange and use an empty
default routing-key
(unless overridden programatically using outgoing
metadata for the message). If you want to create a different type of
exchange or have a different default routing key, then the
exchange.type
and default-routing-key
properties need to be
explicitly specified.
Sending to specific queues via the default exchange
To send a message to a specific queue (usually a reply queue),
you have to configure the default exchange as an outgoing channel and set the name of the queue as routing key in the message metadata.
The name of the exchange needs to be set to ""
.
Custom arguments for Exchange declaration
When exchange declaration is made by the Reactive Messaging channel, using the exchange.declare=true
configuration,
custom exchange arguments can be specified using the exchange.arguments
attribute.
exchange.arguments
accepts the identifier (using the @Identifier
qualifier) of a Map<String, Object>
exposed as a CDI bean.
If no arguments has been configured, the default rabbitmq-exchange-arguments identifier is looked for.
The following CDI bean produces such a configuration identified with my-arguments:
Then the channel can be configured to use those arguments in exchange declaration:
Similarly, the dead-letter-exchange.arguments
allows configuring custom arguments for dead letter exchange when one is declared (dlx.declare=true
).