Skip to content

Sending messages to MQTT

The MQTT Connector can write Reactive Messaging Messages as MQTT Message.

Example

Let’s imagine you have a MQTT server/broker running, and accessible using the mqtt:1883 address (by default it would use localhost:1883). Configure your application to write the messages from the prices channel into a MQTT Messages as follows:

1
2
3
mp.messaging.outgoing.prices.type=smallrye-mqtt
mp.messaging.outgoing.prices.host=mqtt
mp.messaging.outgoing.prices.port=1883
  1. Sets the connector for the prices channel
  2. Configure the broker/server host name.
  3. Configure the broker/server port. 1883 is the default.

Note

You don’t need to set the MQTT topic. By default, it uses the channel name (prices). You can configure the topic attribute to override it. NOTE: It is generally recommended to set the client-id. By default, the connector is generating a unique client-id.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package mqtt.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class MqttPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package mqtt.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class MqttPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(random.nextDouble()));
    }

}

Serialization

The Message sent to MQTT can have various payload types:

  • JsonObject: JSON string encoded as byte[]

  • JsonArray: JSON string encoded as byte[]

  • java.lang.String and Java primitive types: toString encoded as byte[]

  • byte[]

  • complex objects: The objects are encoded to JSON and passed as byte[]

Outbound Metadata

The MQTT connector does not provide outbound metadata.

Acknowledgement

MQTT acknowledgement depends on the QoS level. The message is acknowledged when the broker indicated the successful reception of the message (or immediately if the level of QoS does not support acknowledgment).

If a MQTT message cannot be sent to the broker, the message is nacked.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
auto-clean-session Set to start with a clean session (true by default) boolean false true
auto-generated-client-id Set if the MQTT client must generate clientId automatically boolean false true
auto-keep-alive Set if the MQTT client must handle PINGREQ automatically boolean false true
client-id Set the client identifier string false
client-options-name (mqtt-client-options-name) The name of the MQTT Client Option bean (io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions) used to customize the MQTT client configuration string false
connect-timeout-seconds Set the connect timeout (in seconds) int false 60
host Set the MQTT server host name/IP string true
keep-alive-seconds Set the keep alive timeout in seconds int false 30
max-inflight-queue Set max count of unacknowledged messages int false 10
max-message-size Set max MQTT message size in bytes int false 8092
merge Whether the connector should allow multiple upstreams boolean false false
password Set the password to connect to the server string false
port Set the MQTT server port. Default to 8883 if ssl is enabled, or 1883 without ssl int false
qos Set the QoS level when subscribing to the topic or when sending a message int false 0
reconnect-interval-seconds Set the reconnect interval in seconds int false 1
server-name Set the SNI server name string false
ssl Set whether SSL/TLS is enabled boolean false false
ssl.keystore.location Set the keystore location. In case of pem type this is the server ca cert path string false
ssl.keystore.password Set the keystore password. In case of pem type this is the key path string false
ssl.keystore.type Set the keystore type [pkcs12, jks, pem] string false pkcs12
ssl.truststore.location Set the truststore location. In case of pem type this is the client cert path string false
ssl.truststore.password Set the truststore password. In case of pem type this is not necessary string false
ssl.truststore.type Set the truststore type [pkcs12, jks, pem] string false pkcs12
topic Set the MQTT topic. If not set, the channel name is used string false
trust-all Set whether all server certificates should be trusted boolean false false
unsubscribe-on-disconnection This flag restore the old behavior to unsubscribe from the broken on disconnection boolean false false
username Set the username to connect to the server string false
will-flag Set if will information are provided on connection boolean false false
will-qos Set the QoS level for the will message int false 0
will-retain Set if the will message must be retained boolean false false

The MQTT connector is based on the Vert.x MQTT client. So you can pass any attribute supported by this client.

Important

A single instance of MqttClient and a single connection is used for each host / port / server-name / client-id. This client is reused for both the inbound and outbound connectors.