Skip to content

Receiving messages from MQTT

The MQTT Connector connects to a MQTT broker or router, and forward the messages to the Reactive Messaging application. It maps each of them into Reactive Messaging Messages.

Example

Let’s imagine you have a MQTT server/broker running, and accessible using the mqtt:1883 address (by default it would use localhost:1883). Configure your application to receive MQTT messages on the prices channel as follows:

1
2
3
mp.messaging.incoming.prices.connector=smallrye-mqtt # <1>
mp.messaging.incoming.prices.host=mqtt # <2>
mp.messaging.incoming.prices.port=1883 # <3>
1. Sets the connector for the prices channel 2. Configure the broker/server host name. 3. Configure the broker/server port. 1883 is the default.

Note

You don’t need to set the MQTT topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it.

Note

It is generally recommended to set the client-id. By default, the connector is generating a unique client-id.

Important

Message coming from MQTT have a byte[] payload.

Then, your application receives Message<byte[]>. You can consume the payload directly:

package mqtt.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class MqttPriceConsumer {

    @Incoming("prices")
    public void consume(byte[] raw) {
        double price = Double.parseDouble(new String(raw));

        // process your price.
    }

}

Or, you can retrieve the Message<byte[]>:

package mqtt.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class MqttPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<byte[]> price) {
        // process your price.

        // Acknowledge the incoming message
        return price.ack();
    }

}

The inbound topic can use the MQTT wildcards (+ and #).

Deserialization

The MQTT Connector does not handle the deserialization and creates a Message<byte[]>.

Inbound Metadata

The MQTT connector does not provide inbound metadata.

Failure Management

If a message produced from a MQTT message is nacked, a failure strategy is applied. The MQTT connector supports 3 strategies:

  • fail - fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed.

  • ignore - the failure is logged, but the processing continue.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
auto-clean-session Set to start with a clean session (true by default) boolean false true
auto-generated-client-id Set if the MQTT client must generate clientId automatically boolean false true
auto-keep-alive Set if the MQTT client must handle PINGREQ automatically boolean false true
broadcast Whether or not the messages should be dispatched to multiple consumers boolean false false
buffer-size The size buffer of incoming messages waiting to be processed int false 128
client-id Set the client identifier string false
client-options-name (mqtt-client-options-name) The name of the MQTT Client Option bean (io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions) used to customize the MQTT client configuration string false
connect-timeout-seconds Set the connect timeout (in seconds) int false 60
failure-strategy Specify the failure strategy to apply when a message produced from a MQTT message is nacked. Values can be fail (default), or ignore string false fail
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
host Set the MQTT server host name/IP string true
keep-alive-seconds Set the keep alive timeout in seconds int false 30
max-inflight-queue Set max count of unacknowledged messages int false 10
max-message-size Set max MQTT message size in bytes int false 8092
password Set the password to connect to the server string false
port Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl int false
qos Set the QoS level when subscribing to the topic or when sending a message int false 0
reconnect-interval-seconds Set the reconnect interval in seconds int false 1
server-name Set the SNI server name string false
ssl Set whether SSL/TLS is enabled boolean false false
ssl.keystore.location Set the keystore location. In case of pem type this is the server ca cert path string false
ssl.keystore.password Set the keystore password. In case of pem type this is the key path string false
ssl.keystore.type Set the keystore type [pkcs12, jks, pem] string false pkcs12
ssl.truststore.location Set the truststore location. In case of pem type this is the client cert path string false
ssl.truststore.password Set the truststore password. In case of pem type this is not necessary string false
ssl.truststore.type Set the truststore type [pkcs12, jks, pem] string false pkcs12
topic Set the MQTT topic. If not set, the channel name is used string false
trust-all Set whether all server certificates should be trusted boolean false false
unsubscribe-on-disconnection This flag restore the old behavior to unsubscribe from the broken on disconnection boolean false false
username Set the username to connect to the server string false
will-flag Set if will information are provided on connection boolean false false
will-qos Set the QoS level for the will message int false 0
will-retain Set if the will message must be retained boolean false false

The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.

Important

A single instance of MqttClient and a single connection is used for each host / port / server-name / client-id. This client is reused for both the inbound and outbound connectors.

Important

Using auto-clean-session=false the MQTT Connector send Subscribe requests to the broken only if a Persistent Session is not present (like on the first connection). This means that if a Session is already present (maybe for a previous run) and you add a new incoming channel, this will not be subscribed. Beware to check always the subscription present on Broker when use auto-clean-session=false.