Skip to content

Sending messages to AMQP

The AMQP connector can write Reactive Messaging Messages as AMQP Messages.

Example

Let’s imagine you have an AMQP broker (such as Apache ActiveMQ Artemis) running, and accessible using the amqp:5672 address (by default it would use localhost:5672). Configure your application to send the messages from the prices channel as AMQP Message as follows:

1
2
3
4
5
6
amqp-host=amqp  # <1>
amqp-port=5672  # <2>
amqp-username=my-username # <3>
amqp-password=my-password # <4>

mp.messaging.outgoing.prices.connector=smallrye-amqp # <5>
1. Configures the broker/router host name. You can do it per channel (using the host attribute) or globally using amqp-host

  1. Configures the broker/router port. You can do it per channel (using the port attribute) or globally using amqp-port. The default is 5672.

  2. Configures the broker/router username if required. You can do it per channel (using the username attribute) or globally using amqp-username.

  3. Configures the broker/router password if required. You can do it per channel (using the password attribute) or globally using amqp-password.

  4. Instructs the prices channel to be managed by the AMQP connector

Note

You don’t need to set the address. By default, it uses the channel name (prices). You can configure the address attribute to override it.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package amqp.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class AmqpPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package amqp.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class AmqpPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(random.nextDouble()));
    }

}

Serialization

When receiving a Message<T>, the connector convert the message into an AMQP Message. The payload is converted to the AMQP Message body.

T AMQP Message Body
primitive types or String AMQP Value with the payload
Instant or UUID AMQP Value using the corresponding AMQP Type
JsonObject or JsonArray AMQP Data using a binary content. The content-type is set to application/json
io.vertx.mutiny.core.buffer.Buffer AMQP Data using a binary content. No content-type set
Any other class The payload is converted to JSON (using a Json Mapper). The result is wrapped into AMQP Data using a binary content. The content-type is set to application/json

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingAmqpMetadata to influence how the message is going to be sent to AMQP. For example, you can configure the subjects, properties:

1
2
3
4
5
6
7
8
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
        .withDurable(true)
        .withSubject("my-subject")
        .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Dynamic address names

Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the address inside your application configuration file, but instead, use the outbound metadata to set the address.

For example, you can send to a dynamic address based on the incoming message:

1
2
3
4
5
6
7
8
9
String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
        .withAddress(addressName)
        .withDurable(true)
        .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Note

To be able to set the address per message, the connector is using an anonymous sender.

Acknowledgement

By default, the Reactive Messaging Message is acknowledged when the broker acknowledged the message. When using routers, this acknowledgement may not be enabled. In this case, configure the auto-acknowledgement attribute to acknowledge the message as soon as it has been sent to the router.

If an AMQP message is rejected/released/modified by the broker (or cannot be sent successfully), the message is nacked.

Back Pressure and Credits

The back-pressure is handled by AMQP credits. The outbound connector only requests the amount of allowed credits. When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
address The AMQP address. If not set, the channel name is used string false
capabilities A comma-separated list of capabilities proposed by the sender or receiver client. string false
client-options-name (amqp-client-options-name) The name of the AMQP Client Option bean used to customize the AMQP client configuration string false
cloud-events Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. boolean false true
cloud-events-data-content-type (cloud-events-default-data-content-type) Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself string false
cloud-events-data-schema (cloud-events-default-data-schema) Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself string false
cloud-events-insert-timestamp (cloud-events-default-timestamp) Whether or not the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself boolean false true
cloud-events-mode The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record string false binary
cloud-events-source (cloud-events-default-source) Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself string false
cloud-events-subject (cloud-events-default-subject) Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself string false
cloud-events-type (cloud-events-default-type) Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself string false
connect-timeout (amqp-connect-timeout) The connection timeout in milliseconds int false 1000
container-id The AMQP container id string false
credit-retrieval-period The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits. int false 2000
durable Whether sent AMQP messages are marked durable boolean false false
health-timeout The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. int false 3
host (amqp-host) The broker hostname string false localhost
link-name The name of the link. If not set, the channel name is used. string false
merge Whether the connector should allow multiple upstreams boolean false false
password (amqp-password) The password used to authenticate to the broker string false
port (amqp-port) The broker port int false 5672
reconnect-attempts (amqp-reconnect-attempts) The number of reconnection attempts int false 100
reconnect-interval (amqp-reconnect-interval) The interval in second between two reconnection attempts int false 10
sni-server-name (amqp-sni-server-name) If set, explicitly override the hostname to use for the TLS SNI server name string false
tracing-enabled Whether tracing is enabled (default) or disabled boolean false true
ttl The time-to-live of the send AMQP messages. 0 to disable the TTL long false 0
use-anonymous-sender Whether or not the connector should use an anonymous sender. Default value is true if the broker supports it, false otherwise. If not supported, it is not possible to dynamically change the destination address. boolean false
use-ssl (amqp-use-ssl) Whether the AMQP connection uses SSL/TLS boolean false false
username (amqp-username) The username used to authenticate to the broker string false
virtual-host (amqp-virtual-host) If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) string false

You can also pass any property supported by the Vert.x AMQP client as attribute.

Using existing destinations

To use an existing address or queue, you need to configure the address, container-id and, optionally, the link-name attributes. For example, if you have an Apache Artemis broker configured with:

1
2
3
4
5
6
7
<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

You need the following configuration:

1
2
3
4
mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

You may need to configure the link-name attribute, if the queue name is not the channel name:

1
2
3
4
5
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

To use a MULTICAST queue, you need to provide the FQQN (Fully-qualified queue name) instead of just the name of the queue:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

More details about the AMQP Address model can be found on the Artemis documentation.

Sending Cloud Events

The AMQP connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:

  • the message metadata contains an io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instance,

  • the channel configuration defines the cloud-events-type and cloud-events-source attributes.

You can create io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instances using:

package amqp.outbound;

import java.net.URI;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;

@ApplicationScoped
public class AmqpCloudEventProcessor {

    @Outgoing("cloud-events")
    public Message<String> toCloudEvents(Message<String> in) {
        return in.addMetadata(OutgoingCloudEventMetadata.builder()
                .withId("id-" + in.getPayload())
                .withType("greetings")
                .withSource(URI.create("http://example.com"))
                .withSubject("greeting-message")
                .build());
    }

}

If the metadata does not contain an id, the connector generates one (random UUID). The type and source can be configured per message or at the channel level using the cloud-events-type and cloud-events-source attributes. Other attributes are also configurable.

The metadata can be contributed by multiple methods, however, you must always retrieve the already existing metadata to avoid overriding the values:

package amqp.outbound;

import java.net.URI;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;

@ApplicationScoped
public class AmqpCloudEventMultipleProcessors {

    @Incoming("source")
    @Outgoing("processed")
    public Message<String> process(Message<String> in) {
        return in.addMetadata(OutgoingCloudEventMetadata.builder()
                .withId("id-" + in.getPayload())
                .withType("greeting")
                .build());
    }

    @SuppressWarnings("unchecked")
    @Incoming("processed")
    @Outgoing("cloud-events")
    public Message<String> process2(Message<String> in) {
        OutgoingCloudEventMetadata<String> metadata = in
                .getMetadata(OutgoingCloudEventMetadata.class)
                .orElseGet(() -> OutgoingCloudEventMetadata.builder().build());

        return in.addMetadata(OutgoingCloudEventMetadata.from(metadata)
                .withSource(URI.create("source://me"))
                .withSubject("test")
                .build());
    }

}

By default, the connector sends the Cloud Events using the binary format. You can write structured Cloud Events by setting the cloud-events-mode to structured. Only JSON is supported, so the created records had its content-type header set to application/cloudevents+json; charset=UTF-8

Note

you can disable the Cloud Event support by setting the cloud-events attribute to false