Receiving messages from MQTT
The MQTT Connector connects to a MQTT broker or router, and forward the
messages to the Reactive Messaging application. It maps each of them
into Reactive Messaging Messages
.
Example
Let’s imagine you have a MQTT server/broker running, and accessible
using the mqtt:1883
address (by default it would use
localhost:1883
). Configure your application to receive MQTT messages
on the prices
channel as follows:
prices
channel
2. Configure the broker/server host name.
3. Configure the broker/server port. 1883 is the default.
Note
You don’t need to set the MQTT topic. By default, it uses the channel
name (prices
). You can configure the topic
attribute to override it.
Note
It is generally recommended to set the client-id
. By default, the connector is generating a unique client-id
.
Important
Message coming from MQTT have a byte[]
payload.
Then, your application receives Message<byte[]>
. You can consume the
payload directly:
Or, you can retrieve the Message<byte[]>
:
The inbound topic can use the MQTT
wildcards (+
and #
).
Deserialization
The MQTT Connector does not handle the deserialization and creates a
Message<byte[]>
.
Inbound Metadata
The MQTT connector does not provide inbound metadata.
Failure Management
If a message produced from a MQTT message is nacked, a failure strategy is applied. The MQTT connector supports 3 strategies:
-
fail
- fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed. -
ignore
- the failure is logged, but the processing continue.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
auto-clean-session | Set to start with a clean session (true by default) |
boolean | false | true |
auto-generated-client-id | Set if the MQTT client must generate clientId automatically | boolean | false | true |
auto-keep-alive | Set if the MQTT client must handle PINGREQ automatically |
boolean | false | true |
broadcast | Whether or not the messages should be dispatched to multiple consumers | boolean | false | false |
buffer-size | The size buffer of incoming messages waiting to be processed | int | false | 128 |
client-id | Set the client identifier | string | false | |
client-options-name (mqtt-client-options-name) | The name of the MQTT Client Option bean (io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions ) used to customize the MQTT client configuration |
string | false | |
connect-timeout-seconds | Set the connect timeout (in seconds) | int | false | 60 |
failure-strategy | Specify the failure strategy to apply when a message produced from a MQTT message is nacked. Values can be fail (default), or ignore |
string | false | fail |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
host | Set the MQTT server host name/IP | string | true | |
keep-alive-seconds | Set the keep alive timeout in seconds | int | false | 30 |
max-inflight-queue | Set max count of unacknowledged messages | int | false | 10 |
max-message-size | Set max MQTT message size in bytes | int | false | 8092 |
password | Set the password to connect to the server | string | false | |
port | Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl | int | false | |
qos | Set the QoS level when subscribing to the topic or when sending a message | int | false | 0 |
reconnect-interval-seconds | Set the reconnect interval in seconds | int | false | 1 |
server-name | Set the SNI server name | string | false | |
ssl | Set whether SSL/TLS is enabled | boolean | false | false |
ssl.hostname-verification-algorithm | Set the hostname verifier algorithm for the TLS connection.Accepted values are HTTPS , LDAPS , and NONE (defaults). NONE disables the hostname verification. |
string | false | NONE |
ssl.keystore.location | Set the keystore location. In case of pem type this is the server ca cert path |
string | false | |
ssl.keystore.password | Set the keystore password. In case of pem type this is the key path |
string | false | |
ssl.keystore.type | Set the keystore type [pkcs12 , jks , pem ] |
string | false | pkcs12 |
ssl.truststore.location | Set the truststore location. In case of pem type this is the client cert path |
string | false | |
ssl.truststore.password | Set the truststore password. In case of pem type this is not necessary |
string | false | |
ssl.truststore.type | Set the truststore type [pkcs12 , jks , pem ] |
string | false | pkcs12 |
topic | Set the MQTT topic. If not set, the channel name is used | string | false | |
trust-all | Set whether all server certificates should be trusted | boolean | false | false |
unsubscribe-on-disconnection | This flag restore the old behavior to unsubscribe from the broken on disconnection | boolean | false | false |
username | Set the username to connect to the server | string | false | |
will-flag | Set if will information are provided on connection | boolean | false | false |
will-qos | Set the QoS level for the will message | int | false | 0 |
will-retain | Set if the will message must be retained | boolean | false | false |
The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.
Important
A single instance of MqttClient
and a single connection is used for
each host
/ port
/ server-name
/ client-id
. This client is
reused for both the inbound and outbound connectors.
Important
Using auto-clean-session=false
the MQTT Connector send Subscribe requests
to the broken only if a Persistent Session is not present (like on the first
connection). This means that if a Session is already present (maybe for a
previous run) and you add a new incoming channel, this will not be subscribed.
Beware to check always the subscription present on Broker when use
auto-clean-session=false
.