Sending messages to AMQP
The AMQP connector can write Reactive Messaging Messages as AMQP
Messages.
Example
Let’s imagine you have an AMQP broker (such as Apache ActiveMQ
Artemis) running, and
accessible using the amqp:5672 address (by default it would use
localhost:5672). Configure your application to send the messages from
the prices channel as AMQP Message as follows:
host attribute) or globally using amqp-host
- 
Configures the broker/router port. You can do it per channel (using the portattribute) or globally usingamqp-port. The default is5672.
- 
Configures the broker/router username if required. You can do it per channel (using the usernameattribute) or globally usingamqp-username.
- 
Configures the broker/router password if required. You can do it per channel (using the passwordattribute) or globally usingamqp-password.
- 
Instructs the priceschannel to be managed by the AMQP connector
Note
You don’t need to set the address. By default, it uses the channel
name (prices). You can configure the address attribute to override
it.
Then, your application must send Message<Double> to the prices
channel. It can use double payloads as in the following snippet:
Or, you can send Message<Double>:
Serialization
When receiving a Message<T>, the connector convert the message into an
AMQP Message. The payload is converted to the AMQP Message body.
| T | AMQP Message Body | 
|---|---|
| primitive types or String | AMQP Value with the payload | 
| InstantorUUID | AMQP Value using the corresponding AMQP Type | 
| JsonObjectorJsonArray | AMQP Data using a binary content. The content-typeis set toapplication/json | 
| io.vertx.mutiny.core.buffer.Buffer | AMQP Data using a binary content. No content-typeset | 
| Any other class | The payload is converted to JSON (using a Json Mapper). The result is wrapped into AMQP Data using a binary content. The content-typeis set toapplication/json | 
If the message payload cannot be serialized to JSON, the message is nacked.
Outbound Metadata
When sending Messages, you can add an instance of
OutgoingAmqpMetadata
to influence how the message is going to be sent to AMQP. For example, you
can configure the subjects, properties:
Dynamic address names
Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the address inside your application configuration file, but instead, use the outbound metadata to set the address.
For example, you can send to a dynamic address based on the incoming message:
Note
To be able to set the address per message, the connector is using an anonymous sender.
Acknowledgement
By default, the Reactive Messaging Message is acknowledged when the
broker acknowledged the message. When using routers, this
acknowledgement may not be enabled. In this case, configure the
auto-acknowledgement attribute to acknowledge the message as soon as
it has been sent to the router.
If an AMQP message is rejected/released/modified by the broker (or cannot be sent successfully), the message is nacked.
Back Pressure and Credits
The back-pressure is handled by the max-inflight-messages attribute and AMQP credits.
The outbound connector requests messages minimum between max-inflight-messages and credits allowed by the broker.
When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.
When max-inflight-messages is set to 0, only AMQP credits apply to limit the requests.
Note that if an AMQP message send fails, it is retried until reconnect-attempts is reached.
If the client reconnects to the broker during the retry, failing messages are sent again but the message order is not preserved.
To preserve the message order in this case you can set max-inflight-messages to
Configuration Reference
| Attribute (alias) | Description | Type | Mandatory | Default | 
|---|---|---|---|---|
| address | The AMQP address. If not set, the channel name is used | string | false | |
| capabilities | A comma-separated list of capabilities proposed by the sender or receiver client. | string | false | |
| client-options-name (amqp-client-options-name) | The name of the AMQP Client Option bean used to customize the AMQP client configuration | string | false | |
| cloud-events | Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. | boolean | false | true | 
| cloud-events-data-content-type (cloud-events-default-data-content-type) | Configure the default datacontenttypeattribute of the outgoing Cloud Event. Requirescloud-eventsto be set totrue. This value is used if the message does not configure thedatacontenttypeattribute itself | string | false | |
| cloud-events-data-schema (cloud-events-default-data-schema) | Configure the default dataschemaattribute of the outgoing Cloud Event. Requirescloud-eventsto be set totrue. This value is used if the message does not configure thedataschemaattribute itself | string | false | |
| cloud-events-insert-timestamp (cloud-events-default-timestamp) | Whether or not the connector should insert automatically the timeattribute into the outgoing Cloud Event. Requirescloud-eventsto be set totrue. This value is used if the message does not configure thetimeattribute itself | boolean | false | true | 
| cloud-events-mode | The Cloud Event mode ( structuredorbinary(default)). Indicates how are written the cloud events in the outgoing record | string | false | binary | 
| cloud-events-source (cloud-events-default-source) | Configure the default sourceattribute of the outgoing Cloud Event. Requirescloud-eventsto be set totrue. This value is used if the message does not configure thesourceattribute itself | string | false | |
| cloud-events-subject (cloud-events-default-subject) | Configure the default subjectattribute of the outgoing Cloud Event. Requirescloud-eventsto be set totrue. This value is used if the message does not configure thesubjectattribute itself | string | false | |
| cloud-events-type (cloud-events-default-type) | Configure the default typeattribute of the outgoing Cloud Event. Requirescloud-eventsto be set totrue. This value is used if the message does not configure thetypeattribute itself | string | false | |
| connect-timeout (amqp-connect-timeout) | The connection timeout in milliseconds | int | false | 1000 | 
| container-id | The AMQP container id | string | false | |
| credit-retrieval-period | The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits. | int | false | 2000 | 
| durable | Whether sent AMQP messages are marked durable | boolean | false | false | 
| health-timeout | The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. | int | false | 3 | 
| host (amqp-host) | The broker hostname | string | false | localhost | 
| link-name | The name of the link. If not set, the channel name is used. | string | false | |
| max-inflight-messages | The maximum number of messages to be written to the broker concurrently. The number of sent messages waiting to be acknowledged by the broker are limited by this value and credits granted by the broker. The default value 0means only credits apply. | long | false | 0 | 
| merge | Whether the connector should allow multiple upstreams | boolean | false | false | 
| password (amqp-password) | The password used to authenticate to the broker | string | false | |
| port (amqp-port) | The broker port | int | false | 5672 | 
| reconnect-attempts (amqp-reconnect-attempts) | The number of reconnection attempts | int | false | 100 | 
| reconnect-interval (amqp-reconnect-interval) | The interval in second between two reconnection attempts | int | false | 10 | 
| sni-server-name (amqp-sni-server-name) | If set, explicitly override the hostname to use for the TLS SNI server name | string | false | |
| tracing-enabled | Whether tracing is enabled (default) or disabled | boolean | false | true | 
| ttl | The time-to-live of the send AMQP messages. 0 to disable the TTL | long | false | 0 | 
| use-anonymous-sender | Whether or not the connector should use an anonymous sender. Default value is trueif the broker supports it,falseotherwise. If not supported, it is not possible to dynamically change the destination address. | boolean | false | |
| use-ssl (amqp-use-ssl) | Whether the AMQP connection uses SSL/TLS | boolean | false | false | 
| username (amqp-username) | The username used to authenticate to the broker | string | false | |
| virtual-host (amqp-virtual-host) | If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) | string | false | 
You can also pass any property supported by the Vert.x AMQP client as attribute.
Using existing destinations
To use an existing address or queue, you need to configure the
address, container-id and, optionally, the link-name attributes.
For example, if you have an Apache Artemis broker configured with:
You need the following configuration:
You may need to configure the link-name attribute, if the queue name
is not the channel name:
To use a MULTICAST queue, you need to provide the FQQN
(Fully-qualified queue name) instead of just the name of the queue:
More details about the AMQP Address model can be found on the Artemis documentation.
Sending Cloud Events
The AMQP connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:
- 
the message metadata contains an io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadatainstance,
- 
the channel configuration defines the cloud-events-typeandcloud-events-sourceattributes.
You can create
io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instances
using:
If the metadata does not contain an id, the connector generates one
(random UUID). The type and source can be configured per message or
at the channel level using the cloud-events-type and
cloud-events-source attributes. Other attributes are also
configurable.
The metadata can be contributed by multiple methods, however, you must always retrieve the already existing metadata to avoid overriding the values:
By default, the connector sends the Cloud Events using the binary
format. You can write structured Cloud Events by setting the
cloud-events-mode to structured. Only JSON is supported, so the
created records had its content-type header set to
application/cloudevents+json; charset=UTF-8
Note
you can disable the Cloud Event support by setting the cloud-events
attribute to false