MQTT

The MQTT connector adds support for MQTT to Reactive Messaging.

It lets you receive messages from an MQTT server or broker as well as send MQTT messages. The MQTT connector is based on the Vert.x MQTT Client.

Introduction

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.

The MQTT Connector allows consuming messages from MQTT as well as sending MQTT messages.

Using the MQTT connector

To you the MQTT Connector, add the following dependency to your project:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-MQTT</artifactId>
  <version>3.3.2</version>
</dependency>

The connector name is: smallrye-mqtt.

So, to indicate that a channel is managed by this connector you need:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-mqtt

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-mqtt

Receiving messages from MQTT

The MQTT Connector connects to a MQTT broker or router, and forward the messages to the Reactive Messaging application. It maps each of them into Reactive Messaging Messages.

Example

Let’s imagine you have a MQTT server/broker running, and accessible using the mqtt:1883 address (by default it would use localhost:1883). Configure your application to receive MQTT messages on the prices channel as follows:

mp.messaging.incoming.prices.connector=smallrye-mqtt       (1)
mp.messaging.incoming.prices.host=mqtt                     (2)
mp.messaging.incoming.prices.port=1883                     (3)
  1. Sets the connector for the prices channel

  2. Configure the broker/server host name.

  3. Configure the broker/server port. 1883 is the default.

You don’t need to set the MQTT topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it. NOTE: It is generally recommended to set the client-id. By default, the connector is generating a unique client-id.
Message coming from MQTT have a byte[] payload.

Then, your application receives Message<byte[]>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MqttPriceConsumer {

    @Incoming("prices")
    public void consume(byte[] raw) {
        double price = Double.parseDouble(new String(raw));

        // process your price.
    }

}

Or, you can retrieve the Message<byte[]>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class MqttPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<byte[]> price) {
        // process your price.

        // Acknowledge the incoming message
        return price.ack();
    }

}

The inbound topic can use the MQTT wildcards (+ and #).

Deserialization

The MQTT Connector does not handle the deserialization and creates a Message<byte[]>.

Inbound Metadata

The MQTT connector does not provide inbound metadata.

Failure Management

If a message produced from a MQTT message is nacked, a failure strategy is applied. The MQTT connector supports 3 strategies:

  • fail - fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed.

  • ignore - the failure is logged, but the processing continue.

Configuration Reference

Table 1. Incoming Attributes of the 'smallrye-mqtt' connector
Attribute (alias) Description Mandatory Default

client-id

Set the client identifier

Type: string

false

auto-generated-client-id

Set if the MQTT client must generate clientId automatically

Type: boolean

false

true

auto-keep-alive

Set if the MQTT client must handle PINGREQ automatically

Type: boolean

false

true

ssl

Set whether SSL/TLS is enabled

Type: boolean

false

false

ssl.keystore.type

Set the keystore type [pkcs12, jks, pem]

Type: string

false

pkcs12

ssl.keystore.location

Set the keystore location. In case of pem type this is the cert path

Type: string

false

ssl.keystore.password

Set the keystore password. In case of pem type this is the key path

Type: string

false

ssl.truststore.type

Set the truststore type

Type: string

false

pkcs12

ssl.truststore.location

Set the truststore location. In case of pem type this is the cert path

Type: string

false

ssl.truststore.password

Set the truststore password. In case of pem type this is not necessary

Type: string

false

keep-alive-seconds

Set the keep alive timeout in seconds

Type: int

false

30

max-inflight-queue

Set max count of unacknowledged messages

Type: int

false

10

auto-clean-session

Set to start with a clean session (true by default)

Type: boolean

false

true

will-flag

Set if will information are provided on connection

Type: boolean

false

false

will-retain

Set if the will message must be retained

Type: boolean

false

false

will-qos

Set the QoS level for the will message

Type: int

false

0

max-message-size

Set max MQTT message size in bytes

Type: int

false

8092

reconnect-attempts

Set the max reconnect attempts

Type: int

false

5

reconnect-interval-seconds

Set the reconnect interval in seconds

Type: int

false

1

username

Set the username to connect to the server

Type: string

false

password

Set the password to connect to the server

Type: string

false

connect-timeout-seconds

Set the connect timeout (in seconds)

Type: int

false

60

trust-all

Set whether all server certificates should be trusted

Type: boolean

false

false

host

Set the MQTT server host name/IP

Type: string

true

port

Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl

Type: int

false

server-name

Set the SNI server name

Type: string

false

topic

Set the MQTT topic. If not set, the channel name is used

Type: string

false

qos

Set the QoS level when subscribing to the topic or when sending a message

Type: int

false

0

broadcast

Whether or not the messages should be dispatched to multiple consumers

Type: boolean

false

false

failure-strategy

Specify the failure strategy to apply when a message produced from a MQTT message is nacked. Values can be fail (default), or ignore

Type: string

false

fail

The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.

A single instance of MqttClient and a single connection is used for each host / port / server-name / client-id. This client is reused for both the inbound and outbound connectors.

Sending messages to MQTT

The MQTT Connector can write Reactive Messaging Messages as MQTT Message.

Example

Let’s imagine you have a MQTT server/broker running, and accessible using the mqtt:1883 address (by default it would use localhost:1883). Configure your application to write the messages from the prices channel into a MQTT Messages as follows:

mp.messaging.outgoing.prices.type=smallrye-mqtt  (1)
mp.messaging.outgoing.prices.host=mqtt           (2)
mp.messaging.outgoing.prices.port=1883           (3)
  1. Sets the connector for the prices channel

  2. Configure the broker/server host name.

  3. Configure the broker/server port. 1883 is the default.

You don’t need to set the MQTT topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it. NOTE: It is generally recommended to set the client-id. By default, the connector is generating a unique client-id.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class MqttPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class MqttPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble()));
    }

}

Serialization

The Message sent to MQTT can have various payload types:

  • JsonObject: JSON string encoded as byte[]

  • JsonArray: JSON string encoded as byte[]

  • java.lang.String and Java primitive types: toString encoded as byte[]

  • byte[]

  • complex objects: The objects are encoded to JSON and passed as byte[]

Outbound Metadata

The MQTT connector does not provide outbound metadata.

Acknowledgement

MQTT acknowledgement depends on the QoS level. The message is acknowledged when the broker indicated the successful reception of the message (or immediately if the level of QoS does not support acknowledgment).

If a MQTT message cannot be sent to the broker, the message is nacked.

Configuration Reference

Table 2. Outgoing Attributes of the 'smallrye-mqtt' connector
Attribute (alias) Description Mandatory Default

auto-clean-session

Set to start with a clean session (true by default)

Type: boolean

false

true

auto-generated-client-id

Set if the MQTT client must generate clientId automatically

Type: boolean

false

true

auto-keep-alive

Set if the MQTT client must handle PINGREQ automatically

Type: boolean

false

true

client-id

Set the client identifier

Type: string

false

connect-timeout-seconds

Set the connect timeout (in seconds)

Type: int

false

60

host

Set the MQTT server host name/IP

Type: string

true

keep-alive-seconds

Set the keep alive timeout in seconds

Type: int

false

30

max-inflight-queue

Set max count of unacknowledged messages

Type: int

false

10

max-message-size

Set max MQTT message size in bytes

Type: int

false

8092

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

password

Set the password to connect to the server

Type: string

false

port

Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl

Type: int

false

qos

Set the QoS level when subscribing to the topic or when sending a message

Type: int

false

0

reconnect-attempts

Set the max reconnect attempts

Type: int

false

5

reconnect-interval-seconds

Set the reconnect interval in seconds

Type: int

false

1

server-name

Set the SNI server name

Type: string

false

ssl

Set whether SSL/TLS is enabled

Type: boolean

false

false

ssl.keystore.location

Set the keystore location. In case of pem type this is the cert path

Type: string

false

ssl.keystore.password

Set the keystore password. In case of pem type this is the key path

Type: string

false

ssl.keystore.type

Set the keystore type [pkcs12, jks, pem]

Type: string

false

pkcs12

ssl.truststore.location

Set the truststore location. In case of pem type this is the cert path

Type: string

false

ssl.truststore.password

Set the truststore password. In case of pem type this is not necessary

Type: string

false

ssl.truststore.type

Set the truststore type

Type: string

false

pkcs12

topic

Set the MQTT topic. If not set, the channel name is used

Type: string

false

trust-all

Set whether all server certificates should be trusted

Type: boolean

false

false

username

Set the username to connect to the server

Type: string

false

will-flag

Set if will information are provided on connection

Type: boolean

false

false

will-qos

Set the QoS level for the will message

Type: int

false

0

will-retain

Set if the will message must be retained

Type: boolean

false

false

The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.

A single instance of MqttClient and a single connection is used for each host / port / server-name / client-id. This client is reused for both the inbound and outbound connectors.