MQTT
The MQTT connector adds support for MQTT to Reactive Messaging.
It lets you receive messages from an MQTT server or broker as well as send MQTT messages. The MQTT connector is based on the Vert.x MQTT Client.
Introduction
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.
The MQTT Connector allows consuming messages from MQTT as well as sending MQTT messages.
Using the MQTT connector
To you the MQTT Connector, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-MQTT</artifactId>
<version>3.3.2</version>
</dependency>
The connector name is: smallrye-mqtt
.
So, to indicate that a channel is managed by this connector you need:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-mqtt
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-mqtt
Receiving messages from MQTT
The MQTT Connector connects to a MQTT broker or router, and forward the messages to the Reactive Messaging application.
It maps each of them into Reactive Messaging Messages
.
Example
Let’s imagine you have a MQTT server/broker running, and accessible using the mqtt:1883
address (by default it would use localhost:1883
).
Configure your application to receive MQTT messages on the prices
channel as follows:
mp.messaging.incoming.prices.connector=smallrye-mqtt (1)
mp.messaging.incoming.prices.host=mqtt (2)
mp.messaging.incoming.prices.port=1883 (3)
-
Sets the connector for the
prices
channel -
Configure the broker/server host name.
-
Configure the broker/server port. 1883 is the default.
You don’t need to set the MQTT topic. By default, it uses the channel name (prices ). You can configure the topic attribute to override it.
NOTE: It is generally recommended to set the client-id . By default, the connector is generating a unique client-id .
|
Message coming from MQTT have a byte[] payload.
|
Then, your application receives Message<byte[]>
.
You can consumes the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MqttPriceConsumer {
@Incoming("prices")
public void consume(byte[] raw) {
double price = Double.parseDouble(new String(raw));
// process your price.
}
}
Or, you can retrieve the Message<byte[]>
:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class MqttPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<byte[]> price) {
// process your price.
// Acknowledge the incoming message
return price.ack();
}
}
The inbound topic can use the MQTT wildcards (+
and #
).
Deserialization
The MQTT Connector does not handle the deserialization and creates a Message<byte[]>
.
Failure Management
If a message produced from a MQTT message is nacked, a failure strategy is applied. The MQTT connector supports 3 strategies:
-
fail
- fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed. -
ignore
- the failure is logged, but the processing continue.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
client-id |
Set the client identifier Type: string |
false |
|
auto-generated-client-id |
Set if the MQTT client must generate clientId automatically Type: boolean |
false |
|
auto-keep-alive |
Set if the MQTT client must handle Type: boolean |
false |
|
ssl |
Set whether SSL/TLS is enabled Type: boolean |
false |
|
ssl.keystore.type |
Set the keystore type [ Type: string |
false |
|
ssl.keystore.location |
Set the keystore location. In case of Type: string |
false |
|
ssl.keystore.password |
Set the keystore password. In case of Type: string |
false |
|
ssl.truststore.type |
Set the truststore type Type: string |
false |
|
ssl.truststore.location |
Set the truststore location. In case of Type: string |
false |
|
ssl.truststore.password |
Set the truststore password. In case of Type: string |
false |
|
keep-alive-seconds |
Set the keep alive timeout in seconds Type: int |
false |
|
max-inflight-queue |
Set max count of unacknowledged messages Type: int |
false |
|
auto-clean-session |
Set to start with a clean session ( Type: boolean |
false |
|
will-flag |
Set if will information are provided on connection Type: boolean |
false |
|
will-retain |
Set if the will message must be retained Type: boolean |
false |
|
will-qos |
Set the QoS level for the will message Type: int |
false |
|
max-message-size |
Set max MQTT message size in bytes Type: int |
false |
|
reconnect-attempts |
Set the max reconnect attempts Type: int |
false |
|
reconnect-interval-seconds |
Set the reconnect interval in seconds Type: int |
false |
|
username |
Set the username to connect to the server Type: string |
false |
|
password |
Set the password to connect to the server Type: string |
false |
|
connect-timeout-seconds |
Set the connect timeout (in seconds) Type: int |
false |
|
trust-all |
Set whether all server certificates should be trusted Type: boolean |
false |
|
host |
Set the MQTT server host name/IP Type: string |
true |
|
port |
Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl Type: int |
false |
|
server-name |
Set the SNI server name Type: string |
false |
|
topic |
Set the MQTT topic. If not set, the channel name is used Type: string |
false |
|
qos |
Set the QoS level when subscribing to the topic or when sending a message Type: int |
false |
|
broadcast |
Whether or not the messages should be dispatched to multiple consumers Type: boolean |
false |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from a MQTT message is nacked. Values can be Type: string |
false |
|
The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.
A single instance of MqttClient and a single connection is used for each host / port / server-name / client-id .
This client is reused for both the inbound and outbound connectors.
|
Sending messages to MQTT
The MQTT Connector can write Reactive Messaging Messages
as MQTT Message.
Example
Let’s imagine you have a MQTT server/broker running, and accessible using the mqtt:1883
address (by default it would use localhost:1883
).
Configure your application to write the messages from the prices
channel into a MQTT Messages as follows:
mp.messaging.outgoing.prices.type=smallrye-mqtt (1)
mp.messaging.outgoing.prices.host=mqtt (2)
mp.messaging.outgoing.prices.port=1883 (3)
-
Sets the connector for the
prices
channel -
Configure the broker/server host name.
-
Configure the broker/server port. 1883 is the default.
You don’t need to set the MQTT topic. By default, it uses the channel name (prices ). You can configure the topic attribute to override it.
NOTE: It is generally recommended to set the client-id . By default, the connector is generating a unique client-id .
|
Then, your application must send Message<Double>
to the prices
channel.
It can use double
payloads as in the following snippet:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class MqttPriceProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
Or, you can send Message<Double>
:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class MqttPriceMessageProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Message<Double>> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble()));
}
}
Serialization
The Message
sent to MQTT can have various payload types:
-
JsonObject
: JSON string encoded asbyte[]
-
JsonArray
: JSON string encoded asbyte[]
-
java.lang.String
and Java primitive types:toString
encoded asbyte[]
-
byte[]
-
complex objects: The objects are encoded to JSON and passed as
byte[]
Acknowledgement
MQTT acknowledgement depends on the QoS level. The message is acknowledged when the broker indicated the successful reception of the message (or immediately if the level of QoS does not support acknowledgment).
If a MQTT message cannot be sent to the broker, the message is nacked
.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
auto-clean-session |
Set to start with a clean session ( Type: boolean |
false |
|
auto-generated-client-id |
Set if the MQTT client must generate clientId automatically Type: boolean |
false |
|
auto-keep-alive |
Set if the MQTT client must handle Type: boolean |
false |
|
client-id |
Set the client identifier Type: string |
false |
|
connect-timeout-seconds |
Set the connect timeout (in seconds) Type: int |
false |
|
host |
Set the MQTT server host name/IP Type: string |
true |
|
keep-alive-seconds |
Set the keep alive timeout in seconds Type: int |
false |
|
max-inflight-queue |
Set max count of unacknowledged messages Type: int |
false |
|
max-message-size |
Set max MQTT message size in bytes Type: int |
false |
|
merge |
Whether the connector should allow multiple upstreams Type: boolean |
false |
|
password |
Set the password to connect to the server Type: string |
false |
|
port |
Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl Type: int |
false |
|
qos |
Set the QoS level when subscribing to the topic or when sending a message Type: int |
false |
|
reconnect-attempts |
Set the max reconnect attempts Type: int |
false |
|
reconnect-interval-seconds |
Set the reconnect interval in seconds Type: int |
false |
|
server-name |
Set the SNI server name Type: string |
false |
|
ssl |
Set whether SSL/TLS is enabled Type: boolean |
false |
|
ssl.keystore.location |
Set the keystore location. In case of Type: string |
false |
|
ssl.keystore.password |
Set the keystore password. In case of Type: string |
false |
|
ssl.keystore.type |
Set the keystore type [ Type: string |
false |
|
ssl.truststore.location |
Set the truststore location. In case of Type: string |
false |
|
ssl.truststore.password |
Set the truststore password. In case of Type: string |
false |
|
ssl.truststore.type |
Set the truststore type Type: string |
false |
|
topic |
Set the MQTT topic. If not set, the channel name is used Type: string |
false |
|
trust-all |
Set whether all server certificates should be trusted Type: boolean |
false |
|
username |
Set the username to connect to the server Type: string |
false |
|
will-flag |
Set if will information are provided on connection Type: boolean |
false |
|
will-qos |
Set the QoS level for the will message Type: int |
false |
|
will-retain |
Set if the will message must be retained Type: boolean |
false |
|
The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.
A single instance of MqttClient and a single connection is used for each host / port / server-name / client-id .
This client is reused for both the inbound and outbound connectors.
|