Skip to content

Sending messages to RabbitMQ

The RabbitMQ connector can write Reactive Messaging Messages as RabbitMQ Messages.

Note

In this context, the reactive messaging concept of a Channel is realised as a RabbitMQ Exchange.

Example

Let’s imagine you have a RabbitMQ broker running, and accessible using the rabbitmq:5672 address (by default it would use localhost:5672). Configure your application to send the messages from the prices channel as a RabbitMQ Message as follows:

1
2
3
4
5
6
7
rabbitmq-host=rabbitmq   # <1>
rabbitmq-port=5672       # <2>
rabbitmq-username=my-username # <3>
rabbitmq-password=my-password  # <4>

mp.messaging.outgoing.prices.connector=smallrye-rabbitmq # <5>
mp.messaging.outgoing.prices.default-routing-key=normal    # <6>
  1. Configures the broker/router host name. You can do it per channel (using the host attribute) or globally using rabbitmq-host

  2. Configures the broker/router port. You can do it per channel (using the port attribute) or globally using rabbitmq-port. The default is 5672.

  3. Configures the broker/router username if required. You can do it per channel (using the username attribute) or globally using rabbitmq-username.

  4. Configures the broker/router password if required. You can do it per channel (using the password attribute) or globally using rabbitmq-password.

  5. Instructs the prices channel to be managed by the RabbitMQ connector

  6. Supplies the default routing key to be included in outbound messages; this will be if the "raw payload" form of message sending is used (see below).

Note

You don’t need to set the RabbitMQ exchange name. By default, it uses the channel name (prices) as the name of the exchange to send messages to. You can configure the exchange.name attribute to override it.

Then, your application can send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package rabbitmq.outbound;

import java.time.Duration;
import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class RabbitMQPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>, which affords the opportunity to explicitly specify metadata on the outgoing message:

package rabbitmq.outbound;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.rabbitmq.OutgoingRabbitMQMetadata;

@ApplicationScoped
public class RabbitMQPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(random.nextDouble(),
                        Metadata.of(new OutgoingRabbitMQMetadata.Builder()
                                .withRoutingKey("normal")
                                .withTimestamp(ZonedDateTime.now())
                                .build())));
    }

}

Serialization

When sending a Message<T>, the connector converts the message into a RabbitMQ Message. The payload is converted to the RabbitMQ Message body.

T RabbitMQ Message Body
primitive types or UUID/String String value with content_type set to text/plain
JsonObject or JsonArray Serialized String payload with content_type set to application/json
io.vertx.mutiny.core.buffer.Buffer Binary content, with content_type set to application/octet-stream
byte[] Binary content, with content_type set to application/octet-stream
Any other class The payload is converted to JSON (using a Json Mapper) then serialized with content_type set to application/json

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingRabbitMQMetadata to influence how the message is handled by RabbitMQ. For example, you can configure the routing key, timestamp and headers:

1
2
3
4
5
6
7
8
final OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder()
        .withHeader("my-header", "xyzzy")
        .withRoutingKey("urgent")
        .withTimestamp(ZonedDateTime.now())
        .build();

// Add `metadata` to the metadata of the outgoing message.
return Message.of("Hello", Metadata.of(metadata));

Acknowledgement

By default, the Reactive Messaging Message is acknowledged when the broker acknowledges the message.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
addresses (rabbitmq-addresses) The multiple addresses for cluster mode, when given overrides the host and port string false
automatic-recovery-enabled Whether automatic connection recovery is enabled boolean false false
automatic-recovery-on-initial-connection Whether automatic recovery on initial connections is enabled boolean false true
client-options-name (rabbitmq-client-options-name) The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration string false
connection-timeout The TCP connection timeout (ms); 0 is interpreted as no timeout int false 60000
credentials-provider-name (rabbitmq-credentials-provider-name) The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client string false
default-routing-key The default routing key to use when sending messages to the exchange string false ``
default-ttl If specified, the time (ms) sent messages can remain in queues undelivered before they are dead long false
exchange.arguments The identifier of the key-value Map exposed as bean used to provide arguments for exchange creation string false rabbitmq-exchange-arguments
exchange.auto-delete Whether the exchange should be deleted after use boolean false false
exchange.declare Whether to declare the exchange; set to false if the exchange is expected to be set up independently boolean false true
exchange.durable Whether the exchange is durable boolean false true
exchange.name The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used. string false
exchange.type The exchange type: direct, fanout, headers or topic (default) string false topic
handshake-timeout The AMQP 0-9-1 protocol handshake timeout (ms) int false 10000
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
health-readiness-enabled Whether readiness health reporting is enabled (default) or disabled boolean false true
host (rabbitmq-host) The broker hostname string false localhost
include-properties Whether to include properties when a broker message is passed on the event bus boolean false false
max-inflight-messages The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number long false 1024
max-outgoing-internal-queue-size The maximum size of the outgoing internal queue int false
network-recovery-interval How long (ms) will automatic recovery wait before attempting to reconnect int false 5000
password (rabbitmq-password) The password used to authenticate to the broker string false
port (rabbitmq-port) The broker port int false 5672
publish-confirms If set to true, published messages are acknowledged when the publish confirm is received from the broker boolean false false
reconnect-attempts (rabbitmq-reconnect-attempts) The number of reconnection attempts int false 100
reconnect-interval (rabbitmq-reconnect-interval) The interval (in seconds) between two reconnection attempts int false 10
requested-channel-max The initially requested maximum channel number int false 2047
requested-heartbeat The initially requested heartbeat interval (seconds), zero for none int false 60
retry-on-fail-attempts The number of tentative to retry on failure int false 6
retry-on-fail-interval The interval (in seconds) between two sending attempts int false 5
ssl (rabbitmq-ssl) Whether or not the connection should use SSL boolean false false
ssl.hostname-verification-algorithm Set the hostname verifier algorithm for the TLS connection. Accepted values are HTTPS, and NONE (defaults). NONE disables the hostname verification. string false NONE
tracing.attribute-headers A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true string false ``
tracing.enabled Whether tracing is enabled (default) or disabled boolean false true
trust-all (rabbitmq-trust-all) Whether to skip trust certificate verification boolean false false
trust-store-password (rabbitmq-trust-store-password) The password of the JKS trust store string false
trust-store-path (rabbitmq-trust-store-path) The path to a JKS trust store string false
use-nio Whether usage of NIO Sockets is enabled boolean false false
user The user name to use when connecting to the broker string false guest
username (rabbitmq-username) The username used to authenticate to the broker string false
virtual-host (rabbitmq-virtual-host) The virtual host to use when connecting to the broker string false /

Using existing destinations

To use an existing exchange, you may need to configure the exchange.name attribute.

For example, if you have a RabbitMQ broker already configured with an exchange called people that you wish to send messages to, you need the following configuration:

mp.messaging.outgoing.people.connector=smallrye-rabbitmq

You would need to configure the exchange.name attribute, if the exchange name were not the channel name:

mp.messaging.outgoing.people-out.connector=smallrye-rabbitmq
mp.messaging.outgoing.people-out.exchange.name=people

If you want RabbitMQ to create the people exchange, you need the following configuration:

1
2
3
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.exchange.name=people
mp.messaging.outgoing.people-out.exchange.declare=true

Note

The above example will create a topic exchange and use an empty default routing-key (unless overridden programatically using outgoing metadata for the message). If you want to create a different type of exchange or have a different default routing key, then the exchange.type and default-routing-key properties need to be explicitly specified.

Sending to specific queues via the default exchange

To send a message to a specific queue (usually a reply queue), you have to configure the default exchange as an outgoing channel and set the name of the queue as routing key in the message metadata. The name of the exchange needs to be set to "".

mp.messaging.outgoing.channel-name-for-default-exchange.connector=smallrye-rabbitmq
mp.messaging.outgoing.channel-name-for-default-exchange.exchange.name=""

Custom arguments for Exchange declaration

When exchange declaration is made by the Reactive Messaging channel, using the exchange.declare=true configuration, custom exchange arguments can be specified using the exchange.arguments attribute. exchange.arguments accepts the identifier (using the @Identifier qualifier) of a Map<String, Object> exposed as a CDI bean. If no arguments has been configured, the default rabbitmq-exchange-arguments identifier is looked for.

The following CDI bean produces such a configuration identified with my-arguments:

package rabbitmq.customization;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class ArgumentProducers {
    @Produces
    @Identifier("my-arguments")
    Map<String, Object> customArguments() {
        return Map.of("custom-arg", "value");
    }
}

Then the channel can be configured to use those arguments in exchange declaration:

mp.messaging.outgoing.data.exchange.arguments=my-arguments

Similarly, the dead-letter-exchange.arguments allows configuring custom arguments for dead letter exchange when one is declared (dlx.declare=true).