AMQP 1.0
The AMQP Connector adds support for AMQP 1.0 to Reactive Messaging.
Advanced Message Queuing Protocol 1.0 (AMQP 1.0) is an open standard for passing business messages between applications or organizations.
With this connector, you application can:
-
receive messages from an AMQP Broker or Router.
-
send
Message
to an AMQP address
The AMQP connector is based on the Vert.x AMQP Client.
Using the AMQP connector
To use the AMQP Connector, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>3.13.0</version>
</dependency>
The connector name is: smallrye-amqp
.
So, to indicate that a channel is managed by this connector you need:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
RabbitMQ
To use RabbitMQ, refer to Using RabbitMQ. |
Receiving messages from AMQP
The AMQP connector lets you retrieve messages from an AMQP broker or router.
The AMQP connector retrieves AMQP Messages and maps each of them into Reactive Messaging Messages
.
Example
Let’s imagine you have an AMQP broker (such as Apache ActiveMQ Artemis) running, and accessible using the amqp:5672
address (by default it would use localhost:5672
).
Configure your application to receive AMQP Messages on the prices
channel as follows:
amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)
mp.messaging.incoming.prices.connector=smallrye-amqp (5)
-
Configures the broker/router host name. You can do it per channel (using the
host
attribute) or globally usingamqp-host
-
Configures the broker/router port. You can do it per channel (using the
port
attribute) or globally usingamqp-port
. The default is5672
. -
Configures the broker/router username if required. You can do it per channel (using the
username
attribute) or globally usingamqp-username
. -
Configures the broker/router password if required. You can do it per channel (using the
password
attribute) or globally usingamqp-password
. -
Instructs the
prices
channel to be managed by the AMQP connector
You don’t need to set the AMQP address. By default, it uses the channel name (prices ). You can configure the address attribute to override it.
|
Then, your application receives Message<Double>
.
You can consumes the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class AmqpPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
Or, you can retrieve the Message<Double>
:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class AmqpPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message, marking the AMQP message as `accepted`.
return price.ack();
}
}
Deserialization
The connector converts incoming AMQP Messages into Reactive Messaging Message<T>
instances.
T
depends on the body of the received AMQP Message.
The AMQP Type System defines the supported types.
AMQP Body Type | <T> |
---|---|
AMQP Value containing a AMQP Primitive Type |
the corresponding Java type |
AMQP Value using the |
|
AMQP Sequence |
|
AMQP Data (with binary content) and the |
|
AMQP Data with a different |
|
If you send objects with this AMQP connector (outbound connector), it gets encoded as JSON and sent as binary.
The content-type
is set to application/json
.
You can receive this payload using (Vert.x) JSON Objects, and then map it to the object class you want:
@ApplicationScoped
public static class Generator {
@Outgoing("to-amqp")
public Multi<Price> prices() { (1)
AtomicInteger count = new AtomicInteger();
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.map(l -> new Price().setPrice(count.incrementAndGet()))
.onOverflow().drop();
}
}
@ApplicationScoped
public static class Consumer {
List<Price> prices = new CopyOnWriteArrayList<>();
@Incoming("from-amqp")
public void consume(JsonObject p) { (2)
Price price = p.mapTo(Price.class); (3)
prices.add(price);
}
public List<Price> list() {
return prices;
}
}
-
The
Price
instances are automatically encoded to JSON by the connector -
You can receive it using a
JsonObject
-
Then, you can reconstruct the instance using the
mapTo
method
Inbound Metadata
Messages coming from AMQP contains an instance of IncomingAmqpMetadata
in the metadata.
Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
String address = meta.getAddress();
String subject = meta.getSubject();
boolean durable = meta.isDurable();
// Use io.vertx.core.json.JsonObject
JsonObject properties = meta.getProperties();
// ...
});
Acknowledgement
When a Reactive Messaging Message
associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.
Failure Management
If a message produced from an AMQP message is nacked, a failure strategy is applied. The AMQP connector supports six strategies:
-
fail
- fail the application; no more AMQP messages will be processed (default). The AMQP message is marked as rejected. -
accept
- this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation. -
release
- this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation. -
reject
- this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation. -
modified-failed
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation -
modified-failed-undeliverable-here
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
address |
The AMQP address. If not set, the channel name is used Type: string |
false |
|
auto-acknowledgement |
Whether the received AMQP messages must be acknowledged when received Type: boolean |
false |
|
broadcast |
Whether the received AMQP messages must be dispatched to multiple subscribers Type: boolean |
false |
|
client-options-name (amqp-client-options-name) |
The name of the AMQP Client Option bean used to customize the AMQP client configuration Type: string |
false |
|
cloud-events |
Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. Type: boolean |
false |
|
connect-timeout (amqp-connect-timeout) |
The connection timeout in milliseconds Type: int |
false |
|
container-id |
The AMQP container id Type: string |
false |
|
durable |
Whether AMQP subscription is durable Type: boolean |
false |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are Type: string |
false |
|
health-timeout |
The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. Type: int |
false |
|
host (amqp-host) |
The broker hostname Type: string |
false |
|
link-name |
The name of the link. If not set, the channel name is used. Type: string |
false |
|
password (amqp-password) |
The password used to authenticate to the broker Type: string |
false |
|
port (amqp-port) |
The broker port Type: int |
false |
|
reconnect-attempts (amqp-reconnect-attempts) |
The number of reconnection attempts Type: int |
false |
|
reconnect-interval (amqp-reconnect-interval) |
The interval in second between two reconnection attempts Type: int |
false |
|
sni-server-name (amqp-sni-server-name) |
If set, explicitly override the hostname to use for the TLS SNI server name Type: string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
use-ssl (amqp-use-ssl) |
Whether the AMQP connection uses SSL/TLS Type: boolean |
false |
|
username (amqp-username) |
The username used to authenticate to the broker Type: string |
false |
|
virtual-host (amqp-virtual-host) |
If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) Type: string |
false |
You can also pass any property supported by the Vert.x AMQP client as attribute.
To use an existing address or queue, you need to configure the address
, container-id
and, optionally, the link-name
attributes.
For example, if you have an Apache Artemis broker configured with:
<queues>
<queue name="people">
<address>people</address>
<durable>true</durable>
<user>artemis</user>
</queue>
</queues>
You need the following configuration:
mp.messaging.incoming.people.connector=smallrye-amqp
mp.messaging.incoming.people.durable=true
mp.messaging.incoming.people.address=people
mp.messaging.incoming.people.container-id=people
You may need to configure the link-name
attribute, if the queue name is not the channel name:
mp.messaging.incoming.people-in.connector=smallrye-amqp
mp.messaging.incoming.people-in.durable=true
mp.messaging.incoming.people-in.address=people
mp.messaging.incoming.people-in.container-id=people
mp.messaging.incoming.people-in.link-name=people
Receiving Cloud Events
The AMQP connector supports Cloud Events.
When the connector detects a structured or binary Cloud Events, it adds a IncomingCloudEventMetadata<T> into the metadata of the Message
.
IncomingCloudEventMetadata
contains accessors to the mandatory and optional Cloud Event attributes.
If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.
Binary Cloud Events
For binary
Cloud Events, all mandatory Cloud Event attributes must be set in the AMQP application properties, prefixed by cloudEvents:
(as mandated by the protocol binding).
The connector considers headers starting with the cloudEvents:
prefix but not listed in the specification as extensions.
You can access them using the getExtension
method from IncomingCloudEventMetadata
.
The datacontenttype
attribute is mapped to the content-type
header of the record.
Structured Cloud Events
For structured
Cloud Events, the event is encoded in the record’s value.
Only JSON is supported, so your event must be encoded as JSON in the record’s value.
Structured Cloud Event must set the content-type
header of the record to application/cloudevents+json; charset=UTF-8
.
The message body must be a valid JSON object containing at least all the mandatory Cloud Events attributes.
If the record is a structured Cloud Event, the created Message’s payload is the Cloud Event data
.
Sending messages to AMQP
The AMQP connector can write Reactive Messaging Messages
as AMQP Messages.
Example
Let’s imagine you have an AMQP broker (such as Apache ActiveMQ Artemis) running, and accessible using the amqp:5672
address (by default it would use localhost:5672
).
Configure your application to send the messages from the prices
channel as AMQP Message as follows:
amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)
mp.messaging.outgoing.prices.connector=smallrye-amqp (5)
-
Configures the broker/router host name. You can do it per channel (using the
host
attribute) or globally usingamqp-host
-
Configures the broker/router port. You can do it per channel (using the
port
attribute) or globally usingamqp-port
. The default is5672
. -
Configures the broker/router username if required. You can do it per channel (using the
username
attribute) or globally usingamqp-username
. -
Configures the broker/router password if required. You can do it per channel (using the
password
attribute) or globally usingamqp-password
. -
Instructs the
prices
channel to be managed by the AMQP connector
You don’t need to set the address. By default, it uses the channel name (prices ). You can configure the address attribute to override it.
|
Then, your application must send Message<Double>
to the prices
channel.
It can use double
payloads as in the following snippet:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class AmqpPriceProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
Or, you can send Message<Double>
:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class AmqpPriceMessageProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Message<Double>> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble()));
}
}
Serialization
When receiving a Message<T>
, the connector convert the message into an AMQP Message.
The payload is converted to the AMQP Message body.
T |
AMQP Message Body |
---|---|
primitive types or |
AMQP Value with the payload |
|
AMQP Value using the corresponding AMQP Type |
AMQP Data using a binary content. The |
|
|
AMQP Data using a binary content. No |
Any other class |
The payload is converted to JSON (using a Json Mapper). The result is wrapped into AMQP Data using a binary content. The |
If the message payload cannot be serialized to JSON, the message is nacked.
Outbound Metadata
When sending Messages
, you can add an instance of OutgoingAmqpMetadata
to influence how the message is going to sent to AMQP.
For example, you can configure the subjects, properties:
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withSubject("my-subject")
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
Dynamic address names
Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the address inside your application configuration file, but instead, use the outbound metadata to set the address.
For example, you can send to a dynamic address based on the incoming message:
String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withAddress(addressName)
.withDurable(true)
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
To be able to set the address per message, the connector is using an anonymous sender. |
Acknowledgement
By default, the Reactive Messaging Message
is acknowledged when the broker acknowledged the message.
When using routers, this acknowledgement may not be enabled.
In this case, configure the auto-acknowledgement
attribute to acknowledge the message as soon as it has been sent to the router.
If an AMQP message is rejected/released/modified by the broker (or cannot be sent successfully), the message is nacked.
Back Pressure and Credits
The back-pressure is handled by AMQP credits. The outbound connector only requests the amount of allowed credits. When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
address |
The AMQP address. If not set, the channel name is used Type: string |
false |
|
client-options-name (amqp-client-options-name) |
The name of the AMQP Client Option bean used to customize the AMQP client configuration Type: string |
false |
|
cloud-events |
Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. Type: boolean |
false |
|
cloud-events-data-content-type (cloud-events-default-data-content-type) |
Configure the default Type: string |
false |
|
cloud-events-data-schema (cloud-events-default-data-schema) |
Configure the default Type: string |
false |
|
cloud-events-insert-timestamp (cloud-events-default-timestamp) |
Whether or not the connector should insert automatically the Type: boolean |
false |
|
cloud-events-mode |
The Cloud Event mode ( Type: string |
false |
|
cloud-events-source (cloud-events-default-source) |
Configure the default Type: string |
false |
|
cloud-events-subject (cloud-events-default-subject) |
Configure the default Type: string |
false |
|
cloud-events-type (cloud-events-default-type) |
Configure the default Type: string |
false |
|
connect-timeout (amqp-connect-timeout) |
The connection timeout in milliseconds Type: int |
false |
|
container-id |
The AMQP container id Type: string |
false |
|
credit-retrieval-period |
The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits. Type: int |
false |
|
durable |
Whether sent AMQP messages are marked durable Type: boolean |
false |
|
health-timeout |
The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. Type: int |
false |
|
host (amqp-host) |
The broker hostname Type: string |
false |
|
link-name |
The name of the link. If not set, the channel name is used. Type: string |
false |
|
merge |
Whether the connector should allow multiple upstreams Type: boolean |
false |
|
password (amqp-password) |
The password used to authenticate to the broker Type: string |
false |
|
port (amqp-port) |
The broker port Type: int |
false |
|
reconnect-attempts (amqp-reconnect-attempts) |
The number of reconnection attempts Type: int |
false |
|
reconnect-interval (amqp-reconnect-interval) |
The interval in second between two reconnection attempts Type: int |
false |
|
sni-server-name (amqp-sni-server-name) |
If set, explicitly override the hostname to use for the TLS SNI server name Type: string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
ttl |
The time-to-live of the send AMQP messages. 0 to disable the TTL Type: long |
false |
|
use-anonymous-sender |
Whether or not the connector should use an anonymous sender. Default value is Type: boolean |
false |
|
use-ssl (amqp-use-ssl) |
Whether the AMQP connection uses SSL/TLS Type: boolean |
false |
|
username (amqp-username) |
The username used to authenticate to the broker Type: string |
false |
|
virtual-host (amqp-virtual-host) |
If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) Type: string |
false |
You can also pass any property supported by the Vert.x AMQP client as attribute.
Using existing destinations
To use an existing address or queue, you need to configure the address
, container-id
and, optionally, the link-name
attributes.
For example, if you have an Apache Artemis broker configured with:
<queues>
<queue name="people">
<address>people</address>
<durable>true</durable>
<user>artemis</user>
</queue>
</queues>
You need the following configuration:
mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people
You may need to configure the link-name
attribute, if the queue name is not the channel name:
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people
To use a MULTICAST
queue, you need to provide the FQQN (Fully-qualified queue name) instead of just the name of the queue:
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo
mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people
More details about the AMQP Address model can be found on the Artemis documentation.
Sending Cloud Events
The AMQP connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:
-
the message metadata contains an
io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
instance, -
the channel configuration defines the
cloud-events-type
andcloud-events-source
attributes.
You can create io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
instances using:
package outbound;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.net.URI;
@ApplicationScoped
public class AmqpCloudEventProcessor {
@Outgoing("cloud-events")
public Message<String> toCloudEvents(Message<String> in) {
return in.addMetadata(OutgoingCloudEventMetadata.builder()
.withId("id-" + in.getPayload())
.withType("greetings")
.withSource(URI.create("http://example.com"))
.withSubject("greeting-message")
.build());
}
}
If the metadata does not contain an id, the connector generates one (random UUID).
The type
and source
can be configured per message or at the channel level using the cloud-events-type
and cloud-events-source
attributes.
Other attributes are also configurable.
The metadata can be contributed by multiple methods, however, you must always retrieve the already existing metadata to avoid overriding the values:
package outbound;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.net.URI;
@ApplicationScoped
public class AmqpCloudEventMultipleProcessors {
@Incoming("source")
@Outgoing("processed")
public Message<String> process(Message<String> in) {
return in.addMetadata(OutgoingCloudEventMetadata.builder()
.withId("id-" + in.getPayload())
.withType("greeting")
.build());
}
@SuppressWarnings("unchecked")
@Incoming("processed")
@Outgoing("cloud-events")
public Message<String> process2(Message<String> in) {
OutgoingCloudEventMetadata<String> metadata = in
.getMetadata(OutgoingCloudEventMetadata.class)
.orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
return in.addMetadata(OutgoingCloudEventMetadata.from(metadata)
.withSource(URI.create("source://me"))
.withSubject("test")
.build());
}
}
By default, the connector sends the Cloud Events using the binary format.
You can write structured Cloud Events by setting the cloud-events-mode
to structured
.
Only JSON is supported, so the created records had its content-type
header set to application/cloudevents+json; charset=UTF-8
you can disable the Cloud Event support by setting the cloud-events attribute to false
|
Customizing the underlying AMQP client
You can customize the underlying AMQP Client configuration by producing an instance of AmqpClientOptions
:
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// You can use the produced options to configure the TLS connection
PemKeyCertOptions keycert = new PemKeyCertOptions()
.addCertPath("./tls/tls.crt")
.addKeyPath("./tls/tls.key");
PemTrustOptions trust =
new PemTrustOptions().addCertPath("./tlc/ca.crt");
return new AmqpClientOptions()
.setSsl(true)
.setPemKeyCertOptions(keycert)
.setPemTrustOptions(trust)
.addEnabledSaslMechanism("EXTERNAL")
.setHostnameVerificationAlgorithm("")
.setConnectTimeout(30000)
.setReconnectInterval(5000)
.setContainerId("my-container");
}
This instance is retrieved and used to configure the client used by the connector.
You need to indicate the name of the client using the client-options-name
attribute:
mp.messaging.incoming.prices.client-options-name=my-named-options
Health reporting
The AMQP connector reports the startup, liveness, and readiness of each inbound (Receiving messages) and outbound (sending messages) channel managed by the connector:
- Startup
-
For both inbound and outbound, the startup probe reports OK when the connection with the broker is established, and the AMQP senders and receivers are opened (the links are attached to the broker).
- Liveness
-
For both inbound and outbound, the liveness check verifies that the connection is established. The check still returns OK if the connection got cut, but we are attempting a reconnection.
- Readiness
-
For the inbound, it checks that the connection is established and the receiver is opened. Unlike the liveness check, this probe reports KO until the connection is re-established. For the outbound, it checks that the connection is established and the sender is opened. Unlike the liveness check, this probe reports KO until the connection is re-established.
To disable health reporting, set the health-enabled attribute for the channel to false .
|
Note that a message processing failures nacks the message, which is then handled by the failure-strategy.
It is the responsibility of the failure-strategy
to report the failure and influence the outcome of the checks.
The fail
failure strategy reports the failure, and so the check will report the fault.
Using RabbitMQ
This connector is for AMQP 1.0. RabbitMQ implements AMQP 0.9.1. RabbitMQ does not provide AMQP 1.0 by default, but there is a plugin for it. To use RabbitMQ with this connector, enable and configure the AMQP 1.0 plugin.
Despite the plugin, a few features won’t work with RabbitMQ. Thus, we recommend the following configurations.
To receive messages from RabbitMQ:
-
Set
durable
tofalse
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false
To send messages to RabbitMQ:
-
set the destination
address
(anonymous sender are not supported)
mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
It’s not possible to change the destination dynamically (using message metadata) when using RabbitMQ. The connector automatically detects that the broker does not support anonymous sender (See http://docs.oasis-open.org/amqp/anonterm/v1.0/anonterm-v1.0.html).
Alternatively, you can use the RabbitMQ connector.