Skip to content

Receiving messages from RabbitMQ

The RabbitMQ connector lets you retrieve messages from a RabbitMQ broker. The RabbitMQ connector retrieves RabbitMQ Messages and maps each of them into Reactive Messaging Messages.

Note

In this context, the reactive messaging concept of a Channel is realised as a RabbitMQ Queue.

Example

Let’s imagine you have a RabbitMQ broker running, and accessible using the rabbitmq:5672 address (by default it would use localhost:5672). Configure your application to receive RabbitMQ Messages on the prices channel as follows:

1
2
3
4
5
6
7
8
rabbitmq-host=rabbitmq  # <1>
rabbitmq-port=5672      # <2>
rabbitmq-username=my-username   # <3>
rabbitmq-password=my-password   # <4>

mp.messaging.incoming.prices.connector=smallrye-rabbitmq # <5>
mp.messaging.incoming.prices.queue.name=my-queue         # <6>
mp.messaging.incoming.prices.routing-keys=urgent         # <7>
  1. Configures the broker/router host name. You can do it per channel (using the host attribute) or globally using rabbitmq-host.

  2. Configures the broker/router port. You can do it per channel (using the port attribute) or globally using rabbitmq-port. The default is 5672.

  3. Configures the broker/router username if required. You can do it per channel (using the username attribute) or globally using rabbitmq-username.

  4. Configures the broker/router password if required. You can do it per channel (using the password attribute) or globally using rabbitmq-password.

  5. Instructs the prices channel to be managed by the RabbitMQ connector.

  6. Configures the RabbitMQ queue to read messages from.

  7. Configures the binding between the RabbitMQ exchange and the RabbitMQ queue using a routing key. The default is # (all messages will be forwarded from the exchange to the queue) but in general this can be a comma-separated list of one or more keys.

Then, your application receives Message<String>. You can consume the payload directly:

package rabbitmq.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class RabbitMQPriceConsumer {

    @Incoming("prices")
    public void consume(String price) {
        // process your price.
    }

}

Or, you can retrieve the Message<String>:

package rabbitmq.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class RabbitMQPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<String> price) {
        // process your price.

        // Acknowledge the incoming message, marking the RabbitMQ message as `accepted`.
        return price.ack();
    }

}

Note

Whether you need to explicitly acknowledge the message depends on the auto-acknowledgement channel setting; if that is set to true then your message will be automatically acknowledged on receipt.

Deserialization

The connector converts incoming RabbitMQ Messages into Reactive Messaging Message<T> instances. The payload type T depends on the value of the RabbitMQ received message Envelope content_type and content_encoding properties.

content_encoding content_type Type
Value present n/a byte[]
No value text/plain String
No value application/json a JSON element which can be a JsonArray, JsonObject, String, ... if the buffer contains an array, object, string,...
No value Anything else byte[]

If you send objects with this RabbitMQ connector (outbound connector), they are encoded as JSON and sent with content_type set to application/json. You can receive this payload using (Vert.x) JSON Objects, and then map it to the object class you want:

@ApplicationScoped
public static class Generator {

    @Outgoing("to-rabbitmq")
    public Multi<Price> prices() {             // <1>
        AtomicInteger count = new AtomicInteger();
        return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
                .map(l -> new Price().setPrice(count.incrementAndGet()))
                .onOverflow().drop();
    }

}

@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-rabbitmq")
    public void consume(JsonObject p) {      // <2>
        Price price = p.mapTo(Price.class);  // <3>
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
  1. The Price instances are automatically encoded to JSON by the connector
  2. You can receive it using a JsonObject
  3. Then, you can reconstruct the instance using the mapTo method

Inbound Metadata

Messages coming from RabbitMQ contain an instance of IncomingRabbitMQMetadata in the metadata.

RabbitMQ message headers can be accessed from the metadata either by calling getHeader(String header, Class<T> type) to retrieve a single header value. or getHeaders() to get a map of all header values.

final Optional<IncomingRabbitMQMetadata> metadata = incomingMessage.getMetadata(IncomingRabbitMQMetadata.class);
metadata.ifPresent(meta -> {
    final Optional<String> contentEncoding = meta.getContentEncoding();
    final Optional<String> contentType = meta.getContentType();
    final Optional<String> correlationId = meta.getCorrelationId();
    final Optional<ZonedDateTime> timestamp = meta.getTimestamp(ZoneId.systemDefault());
    final Optional<Integer> priority = meta.getPriority();
    final Optional<String> replyTo = meta.getReplyTo();
    final Optional<String> userId = meta.getUserId();

    // Access a single String-valued header
    final Optional<String> stringHeader = meta.getHeader("my-header", String.class);

    // Access all headers
    final Map<String, Object> headers = meta.getHeaders();
    // ...
});

The type <T> of the header value depends on the RabbitMQ type used for the header:

RabbitMQ Header Type T
String String
Boolean Boolean
Number Number
List java.util.List

Acknowledgement

When a Reactive Messaging Message associated with a RabbitMQ Message is acknowledged, it informs the broker that the message has been accepted.

Whether you need to explicitly acknowledge the message depends on the auto-acknowledgement setting for the channel; if that is set to true then your message will be automatically acknowledged on receipt.

Failure Management

If a message produced from a RabbitMQ message is nacked, a failure strategy is applied. The RabbitMQ connector supports three strategies, controlled by the failure-strategy channel setting:

  • fail - fail the application; no more RabbitMQ messages will be processed. The RabbitMQ message is marked as rejected.

  • accept - this strategy marks the RabbitMQ message as accepted. The processing continues ignoring the failure.

  • reject - this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message.

  • requeue - this strategy marks the RabbitMQ message as rejected with requeue flag to true. The processing continues with the next message, but the requeued message will be redelivered to the consumer.

When using dead-letter-queue, it is also possible to change some metadata of the record that is sent to the dead letter topic. To do that, use the Message.nack(Throwable, Metadata) method:

The RabbitMQ reject requeue flag can be controlled on different failure strategies using the RabbitMQRejectMetadata. To do that, use the Message.nack(Throwable, Metadata) method by including the RabbitMQRejectMetadata metadata with requeue to true.

1
2
3
4
5
@Incoming("in")
public CompletionStage<Void> consume(Message<String> message) {
    return message.nack(new Exception("Failed!"), Metadata.of(
            new RabbitMQRejectMetadata(true)));
}

Experimental

RabbitMQFailureHandler is experimental and APIs are subject to change in the future

In addition, you can also provide your own failure strategy. To provide a failure strategy implement a bean exposing the interface RabbitMQFailureHandler, qualified with a @Identifier. Set the name of the bean as the failure-strategy channel setting.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
addresses (rabbitmq-addresses) The multiple addresses for cluster mode, when given overrides the host and port string false
arguments A comma-separated list of arguments [key1:value1,key2:value2,...] to bind the queue to the exchange. Relevant only if 'exchange.type' is headers string false
auto-acknowledgement Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement boolean false false
auto-bind-dlq Whether to automatically declare the DLQ and bind it to the binder DLX boolean false false
automatic-recovery-enabled Whether automatic connection recovery is enabled boolean false false
automatic-recovery-on-initial-connection Whether automatic recovery on initial connections is enabled boolean false true
broadcast Whether the received RabbitMQ messages must be dispatched to multiple subscribers boolean false false
client-options-name (rabbitmq-client-options-name) The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration string false
connection-timeout The TCP connection timeout (ms); 0 is interpreted as no timeout int false 60000
consumer-arguments A comma-separated list of arguments [key1:value1,key2:value2,...] for created consumer string false
content-type-override Override the content_type attribute of the incoming message, should be a valid MINE type string false
credentials-provider-name (rabbitmq-credentials-provider-name) The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client string false
dead-letter-dlx If specified, a DLX to assign to the DLQ. Relevant only if auto-bind-dlq is true string false
dead-letter-dlx-routing-key If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true string false
dead-letter-exchange A DLX to assign to the queue. Relevant only if auto-bind-dlq is true string false DLX
dead-letter-exchange-type The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true string false direct
dead-letter-exchange.arguments The identifier of the key-value Map exposed as bean used to provide arguments for dead-letter-exchange creation string false
dead-letter-queue-mode If automatically declare DLQ, we can choose different modes of DLQ [lazy, default] string false
dead-letter-queue-name The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended string false
dead-letter-queue-type If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream] string false
dead-letter-queue.arguments The identifier of the key-value Map exposed as bean used to provide arguments for dead-letter-queue creation string false
dead-letter-routing-key A dead letter routing key to assign to the queue; if not supplied will default to the queue name string false
dead-letter-ttl If specified, the time (ms) for which a message can remain in DLQ undelivered before it is dead. Relevant only if auto-bind-dlq is true long false
dlx.declare Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently boolean false false
exchange.arguments The identifier of the key-value Map exposed as bean used to provide arguments for exchange creation string false rabbitmq-exchange-arguments
exchange.auto-delete Whether the exchange should be deleted after use boolean false false
exchange.declare Whether to declare the exchange; set to false if the exchange is expected to be set up independently boolean false true
exchange.durable Whether the exchange is durable boolean false true
exchange.name The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used. string false
exchange.type The exchange type: direct, fanout, headers or topic (default) string false topic
failure-strategy The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are fail, accept, reject (default), requeue or name of a bean string false reject
handshake-timeout The AMQP 0-9-1 protocol handshake timeout (ms) int false 10000
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
health-lazy-subscription Whether the liveness and readiness checks should report 'ok' when there is no subscription yet. This is useful when injecting the channel with @Inject @Channel("...") Multi<...> multi; boolean false false
health-readiness-enabled Whether readiness health reporting is enabled (default) or disabled boolean false true
host (rabbitmq-host) The broker hostname string false localhost
include-properties Whether to include properties when a broker message is passed on the event bus boolean false false
keep-most-recent Whether to discard old messages instead of recent ones boolean false false
max-incoming-internal-queue-size The maximum size of the incoming internal queue int false 500000
max-outstanding-messages The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number int false
network-recovery-interval How long (ms) will automatic recovery wait before attempting to reconnect int false 5000
password (rabbitmq-password) The password used to authenticate to the broker string false
port (rabbitmq-port) The broker port int false 5672
queue.arguments The identifier of the key-value Map exposed as bean used to provide arguments for queue creation string false rabbitmq-queue-arguments
queue.auto-delete Whether the queue should be deleted after use boolean false false
queue.declare Whether to declare the queue and binding; set to false if these are expected to be set up independently boolean false true
queue.durable Whether the queue is durable boolean false true
queue.exclusive Whether the queue is for exclusive use boolean false false
queue.name The queue from which messages are consumed. If not set, the channel name is used. string false
queue.single-active-consumer If set to true, only one consumer can actively consume messages boolean false
queue.ttl If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead long false
queue.x-delivery-limit If queue.x-queue-type is quorum, when a message has been returned more times than the limit the message will be dropped or dead-lettered long false
queue.x-max-priority Define priority level queue consumer int false
queue.x-queue-mode If automatically declare queue, we can choose different modes of queue [lazy, default] string false
queue.x-queue-type If automatically declare queue, we can choose different types of queue [quorum, classic, stream] string false
reconnect-attempts (rabbitmq-reconnect-attempts) The number of reconnection attempts int false 100
reconnect-interval (rabbitmq-reconnect-interval) The interval (in seconds) between two reconnection attempts int false 10
requested-channel-max The initially requested maximum channel number int false 2047
requested-heartbeat The initially requested heartbeat interval (seconds), zero for none int false 60
routing-keys A comma-separated list of routing keys to bind the queue to the exchange. Relevant only if 'exchange.type' is topic or direct string false #
ssl (rabbitmq-ssl) Whether or not the connection should use SSL boolean false false
tracing.attribute-headers A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true string false ``
tracing.enabled Whether tracing is enabled (default) or disabled boolean false true
trust-all (rabbitmq-trust-all) Whether to skip trust certificate verification boolean false false
trust-store-password (rabbitmq-trust-store-password) The password of the JKS trust store string false
trust-store-path (rabbitmq-trust-store-path) The path to a JKS trust store string false
use-nio Whether usage of NIO Sockets is enabled boolean false false
user The user name to use when connecting to the broker string false guest
username (rabbitmq-username) The username used to authenticate to the broker string false
virtual-host (rabbitmq-virtual-host) The virtual host to use when connecting to the broker string false /

To use an existing queue, you need to configure the queue.name attribute.

For example, if you have a RabbitMQ broker already configured with a queue called peopleQueue that you wish to read messages from, you need the following configuration:

mp.messaging.incoming.people.connector=smallrye-rabbitmq
mp.messaging.incoming.people.queue.name=peopleQueue

If you want RabbitMQ to create the queue for you but bind it to an existing topic exchange people, you need the following configuration:

1
2
3
mp.messaging.incoming.people.connector=smallrye-rabbitmq
mp.messaging.incoming.people.queue.name=peopleQueue
mp.messaging.incoming.people.queue.declare=true

Note

In the above the channel name people is implicitly assumed to be the name of the exchange; if this is not the case you would need to name the exchange explicitly using the exchange.name property.

Note

The connector supports RabbitMQ's "Server-named Queues" feature to create an exclusive, auto-deleting, non-durable and randomly named queue. To enable this feature you set the queue name to exactly (server.auto). Using this name not only enables the server named queue feature but also automatically makes ths queue exclusive, auto-deleting, and non-durable; therefore ignoring any values provided for the exclusive, auto-delete and durable options.

If you want RabbitMQ to create the people exchange, queue and binding, you need the following configuration:

1
2
3
4
5
mp.messaging.incoming.people.connector=smallrye-rabbitmq
mp.messaging.incoming.people.exchange.declare=true
mp.messaging.incoming.people.queue.name=peopleQueue
mp.messaging.incoming.people.queue.declare=true
mp.messaging.incoming.people.queue.routing-keys=tall,short

In the above we have used an explicit list of routing keys rather than the default (#). Each component of the list creates a separate binding between the queue and the exchange, so in the case above we would have two bindings; one based on a routing key of tall, the other based on one of short.

Note

The default value of routing-keys is # (indicating a match against all possible routing keys) which is only appropriate for topic Exchanges. If you are using other types of exchange and/or need to declare queue bindings, you’ll need to supply a valid value for the exchange in question.

Custom arguments for Queue declaration

When queue declaration is made by the Reactive Messaging channel, using the queue.declare=true configuration, custom queue arguments can be specified using the queue.arguments attribute. queue.arguments accepts the identifier (using the @Identifier qualifier) of a Map<String, Object> exposed as a CDI bean. If no arguments has been configured, the default rabbitmq-queue-arguments identifier is looked for.

The following CDI bean produces such a configuration identified with my-arguments:

package rabbitmq.customization;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class ArgumentProducers {
    @Produces
    @Identifier("my-arguments")
    Map<String, Object> customArguments() {
        return Map.of("custom-arg", "value");
    }
}

Then the channel can be configured to use those arguments in exchange declaration:

mp.messaging.outgoing.data.queue.arguments=my-arguments

Similarly, the dead-letter-queue.arguments allows configuring custom arguments for dead letter queue when one is declared (auto-bind-dlq=true).