Receiving messages from AMQP
The AMQP connector lets you retrieve messages from an AMQP broker or
router. The AMQP connector
retrieves AMQP Messages and maps each of them into Reactive Messaging
Messages
.
Example
Let’s imagine you have an AMQP broker (such as Apache ActiveMQ
Artemis) running, and
accessible using the amqp:5672
address (by default it would use
localhost:5672
). Configure your application to receive AMQP Messages
on the prices
channel as follows:
-
Configures the broker/router host name. You can do it per channel (using the
host
attribute) or globally usingamqp-host
-
Configures the broker/router port. You can do it per channel (using the
port
attribute) or globally usingamqp-port
. The default is5672
. -
Configures the broker/router username if required. You can do it per channel (using the
username
attribute) or globally usingamqp-username
. -
Configures the broker/router password if required. You can do it per channel (using the
password
attribute) or globally usingamqp-password
. -
Instructs the
prices
channel to be managed by the AMQP connector
Note
You don’t need to set the AMQP address. By default, it uses the
channel name (prices
). You can configure the address
attribute to
override it.
Then, your application receives Message<Double>
. You can consume the
payload directly:
Or, you can retrieve the Message<Double>
:
Deserialization
The connector converts incoming AMQP Messages into Reactive Messaging
Message<T>
instances. T
depends on the body of the received AMQP
Message.
The AMQP Type System defines the supported types.
AMQP Body Type | <T> |
---|---|
AMQP Value containing a AMQP Primitive Type | the corresponding Java type |
AMQP Value using the Binary type |
byte[] |
AMQP Sequence | List |
AMQP Data (with binary content) and the content-type is set to application/json |
JsonObject |
AMQP Data with a different content-type |
byte[] |
If you send objects with this AMQP connector (outbound connector), it
gets encoded as JSON and sent as binary. The content-type
is set to
application/json
. You can receive this payload using (Vert.x) JSON
Objects, and then map it to the object class you want:
-
The
Price
instances are automatically encoded to JSON by the connector -
You can receive it using a
JsonObject
-
Then, you can reconstruct the instance using the
mapTo
method
Inbound Metadata
Messages coming from AMQP contains an instance of IncomingAmqpMetadata
Acknowledgement
When a Reactive Messaging Message
associated with an AMQP Message is
acknowledged, it informs the broker that the message has been
accepted.
Failure Management
If a message produced from an AMQP message is nacked, a failure strategy is applied. The AMQP connector supports six strategies:
-
fail
- fail the application; no more AMQP messages will be processed (default). The AMQP message is marked as rejected. -
accept
- this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation. -
release
- this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation. -
reject
- this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation. -
modified-failed
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation -
modified-failed-undeliverable-here
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
address | The AMQP address. If not set, the channel name is used | string | false | |
auto-acknowledgement | Whether the received AMQP messages must be acknowledged when received | boolean | false | false |
broadcast | Whether the received AMQP messages must be dispatched to multiple subscribers | boolean | false | false |
capabilities | A comma-separated list of capabilities proposed by the sender or receiver client. | string | false | |
client-options-name (amqp-client-options-name) | The name of the AMQP Client Option bean used to customize the AMQP client configuration | string | false | |
cloud-events | Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. | boolean | false | true |
connect-timeout (amqp-connect-timeout) | The connection timeout in milliseconds | int | false | 1000 |
container-id | The AMQP container id | string | false | |
durable | Whether AMQP subscription is durable | boolean | false | false |
failure-strategy | Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are fail (default), accept , release , reject , modified-failed , modified-failed-undeliverable-here |
string | false | fail |
health-timeout | The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. | int | false | 3 |
host (amqp-host) | The broker hostname | string | false | localhost |
link-name | The name of the link. If not set, the channel name is used. | string | false | |
password (amqp-password) | The password used to authenticate to the broker | string | false | |
port (amqp-port) | The broker port | int | false | 5672 |
reconnect-attempts (amqp-reconnect-attempts) | The number of reconnection attempts | int | false | 100 |
reconnect-interval (amqp-reconnect-interval) | The interval in second between two reconnection attempts | int | false | 10 |
sni-server-name (amqp-sni-server-name) | If set, explicitly override the hostname to use for the TLS SNI server name | string | false | |
tracing-enabled | Whether tracing is enabled (default) or disabled | boolean | false | true |
use-ssl (amqp-use-ssl) | Whether the AMQP connection uses SSL/TLS | boolean | false | false |
username (amqp-username) | The username used to authenticate to the broker | string | false | |
virtual-host (amqp-virtual-host) | If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) | string | false |
You can also pass any property supported by the Vert.x AMQP client as attribute.
To use an existing address or queue, you need to configure the
address
, container-id
and, optionally, the link-name
attributes.
For example, if you have an Apache Artemis broker configured with:
You need the following configuration:
You may need to configure the link-name
attribute, if the queue name
is not the channel name:
Receiving Cloud Events
The AMQP connector supports Cloud Events.
When the connector detects a structured or binary Cloud Events, it
adds a IncomingCloudEventMetadata
into the metadata of the Message
. IncomingCloudEventMetadata
contains accessors to the mandatory and optional Cloud Event attributes.
If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.
Binary Cloud Events
For binary
Cloud Events, all mandatory Cloud Event attributes must
be set in the AMQP application properties, prefixed by cloudEvents:
(as mandated by the protocol
binding).
The connector considers headers starting with the cloudEvents:
prefix
but not listed in the specification as extensions. You can access them
using the getExtension
method from IncomingCloudEventMetadata
.
The datacontenttype
attribute is mapped to the content-type
header
of the record.
Structured Cloud Events
For structured
Cloud Events, the event is encoded in the record’s
value. Only JSON is supported, so your event must be encoded as JSON in
the record’s value.
Structured Cloud Event must set the content-type
header of the record
to application/cloudevents+json; charset=UTF-8
. The message body must
be a valid JSON object containing at least all the mandatory Cloud
Events attributes.
If the record is a structured Cloud Event, the created Message’s payload
is the Cloud Event data
.