JMS
The JMS connector adds support for Jakarta Messaging to Reactive Messaging. It is designed to integrate with JakartaEE applications that are sending or receiving Jakarta Messaging Messages.
Introduction
Jakarta Messaging is a Java Message Oriented Middleware API for sending messages between two or more clients. It is a programming model to handle the producer-consumer messaging problem. It is a messaging standard that allows application components based on Jakarta EE to create, send, receive, and read messages. It allows the communication between different components of a distributed application to be loosely coupled, reliable, and asynchronous.
Using the JMS connector
To you the JMS Connector, add the following dependency to your project:
<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-jms</artifactId>
  <version>3.12.1</version>
</dependency>The connector name is: smallrye-jms.
So, to indicate that a channel is managed by this connector you need:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-jms
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-jmsThe JMS Connector requires a javax.jms.ConnectionFactory to be exposed (as CDI bean).
The connector looks for a javax.jms.ConnectionFactory and delegate the interaction with the JMS server to this factory.
In other words, it creates the JMS connection and context using this factory.
So, in order to use this connector you would need to expose a javax.jms.ConnectionFactory:
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.jms.ConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
@ApplicationScoped
public class ConnectionFactoryBean {
    @Produces
    ConnectionFactory factory() {
        return new ActiveMQJMSConnectionFactory(
                "tcp://localhost:61616",
                null, null);
    }
}The factory class may depends on your JMS connector/server.
Receiving messages from JMS
The JMS Connector retrieves JMS Message and maps each of them into Reactive Messaging Messages.
Example
Let’s imagine you have a javax.jms.ConnectionFactory bean exposed and connected to your JMS server.
Don’t forget that it’s required to use the JMS connector.
Configure your application to receive JMS messages on the prices channel as follows:
mp.messaging.incoming.prices.connector=smallrye-jms       (1)- 
Sets the connector for the priceschannel
| You don’t need to set the destination. By default, it uses the channel name ( prices). You can configure thedestinationattribute to override it. | 
| By default the connector uses a queue. You can configure it to use atopicby settingdestination-type=topic. | 
Then, your application receives Message<Double>.
You can consumes the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class JmsPriceConsumer {
    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }
}Or, you can retrieve the Message<Double>:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class JmsPriceMessageConsumer {
    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.
        // Acknowledge the incoming message
        return price.ack();
    }
}Deserialization
The content of the incoming JMS message is mapped to a Java object.
By default it extracts the JMS Message body as a java.lang.Object.
This can be changed by setting, in the incoming JMS Message:
- 
The _classnameproperty
- 
the JMSType
The value must be a fully qualified class name. The connector then load the associated class.
| The connector loads the associated Classusing theTCCLand if not found, the classloader used to load the connector. | 
If the target type is a primitive type ort String, the resulting message contains the mapped payload.
If the target type is a class, the object is built using included JSON deserializer (JSON-B and Jackson provided OOB, for more details see Serde]), from the JMSType.
If not, the default behavior is used (Java deserialization).
Inbound Metadata
Messages coming from JMS contains an instance of IncomingJmsMessageMetadata in the metadata.
Optional<IncomingJmsMessageMetadata> metadata = incoming.getMetadata(IncomingJmsMessageMetadata.class);
metadata.ifPresent(meta -> {
    long expiration = meta.getExpiration();
    Destination destination = meta.getDestination();
    String value = meta.getStringProperty("my-property");
});Acknowledgement
The the Reactive Messaging Message gets acknowledged, the associated JMS Message is acknowledged.
As JMS acknowledgement is blocking, this acknowledgement is delegated to a worker thread.
Configuration Reference
| Attribute (alias) | Description | Mandatory | Default | 
|---|---|---|---|
| broadcast | Whether or not the JMS message should be dispatched to multiple consumers Type: boolean | false | 
 | 
| client-id | The client id Type: String | false | |
| connection-factory-name | The name of the JMS connection factory  ( Type: String | false | |
| destination | The name of the JMS destination. If not set the name of the channel is used Type: String | false | |
| destination-type | The type of destination. It can be either  Type: string | false | 
 | 
| durable | Set to  Type: boolean | false | 
 | 
| no-local | Enable or disable local delivery Type: boolean | false | 
 | 
| password | The password to connect to to the JMS server Type: String | false | |
| selector | The JMS selector Type: String | false | |
| session-mode | The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE Type: String | false | 
 | 
| username | The username to connect to to the JMS server Type: String | false | 
Sending messages to JMS
The JMS Connector can send Reactive Messaging Messages as JMS Message.
Example
Let’s imagine you have a javax.jms.ConnectionFactory bean exposed and connected to your JMS server.
Don’t forget that it’s required to use the JMS connector.
Configure your application to write the messages from the prices channel into a JMS Message as follows:
mp.messaging.outgoing.prices.connector=smallrye-jms       (1)- 
Sets the connector for the priceschannel
| You don’t need to set the destination. By default, it uses the channel name ( prices). You can configure thedestinationattribute to override it. | 
| By default the connector uses a queue. You can configure it to use atopicby settingdestination-type=topic. | 
Then, your application must send Message<Double> to the prices channel.
It can use double payloads as in the following snippet:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class JmsPriceProducer {
    private Random random = new Random();
    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }
}Or, you can send Message<Double>:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class JmsPriceMessageProducer {
    private Random random = new Random();
    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble()));
    }
}Serialization
The connector serializes the incoming message payload into the body of the outgoing JMS Message.
If the payload is a String or a primitive type, the payload is encoded as String and the JMSType is set to the target class.
The _classname property is also set.
The JMS Message is a TextMessage.
If the payload is a byte[], it’s passed as byte[] in a JMS BytesMessage.
Otherwise, the payload is encoded using included JSON serializer (JSON-B and Jackson provided OOB, for more details see Serde]).
The JMSType is set to the target class.
The _classname property is also set.
The JMS Message is a TextMessage.
For example, the following code serialize the produced Person using JSON-B.
@Incoming("...")
@Outgoing("my-channel")
public Person sendToJms(...) {
  // ...
  return new Person("bob", 42);
}It requires that the Person class can be serialized to JSON.
The classname is passed in the JMSType property and _classname property.
Outbound Metadata
When sending Messages, you can add an instance of OutgoingJmsMessageMetadata to influence how the message is going to written to JMS.
OutgoingJmsMessageMetadata metadata = OutgoingJmsMessageMetadata.builder()
    .withProperties(JmsProperties.builder().with("some-property", "some-value").build())
    .build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);These metadata allow adding properties but also override the destination.
Acknowledgement
Once the JMS message is sent to the JMS server, the message is acknowledged. Sending a JMS message is a blocking operation. So, sending is done on a worker thread.
Configuration Reference
| Attribute (alias) | Description | Mandatory | Default | 
|---|---|---|---|
| client-id | The client id Type: String | false | |
| connection-factory-name | The name of the JMS connection factory  ( Type: String | false | |
| correlation-id | The JMS Message correlation id Type: string | false | |
| delivery-delay | The delivery delay Type: long | false | |
| delivery-mode | The delivery mode. Either  Type: string | false | |
| destination | The name of the JMS destination. If not set the name of the channel is used Type: String | false | |
| destination-type | The type of destination. It can be either  Type: string | false | 
 | 
| disable-message-id | Omit the message id in the outbound JMS message Type: boolean | false | |
| disable-message-timestamp | Omit the message timestamp in the outbound JMS message Type: boolean | false | |
| merge | Whether the connector should allow multiple upstreams Type: boolean | false | 
 | 
| password | The password to connect to to the JMS server Type: String | false | |
| priority | The JMS Message priority Type: int | false | |
| reply-to | The reply to destination if any Type: string | false | |
| reply-to-destination-type | The type of destination for the response. It can be either  Type: string | false | 
 | 
| session-mode | The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE Type: String | false | 
 | 
| ttl | The JMS Message time-to-live Type: long | false | |
| username | The username to connect to to the JMS server Type: String | false | 
Advanced configuration
Underlying thread pool
Lots of JMS operations are blocking and so not cannot be done on the caller thread. For this reason, these blocking operations are executed on a worker thread.
You can configure the thread pool providing these worker threads using the following MicroProfile Config properties:
- 
smallrye.jms.threads.max-pool-size- the max number of threads (Defaults to 10)
- 
smallrye.jms.threads.ttl- the ttl of the created threads (Defaults to 60 seconds)
Selecting the ConnectionFactory
The JMS Connector requires a javax.jms.ConnectionFactory to be exposed as a CDI bean.
The connector looks for a javax.jms.ConnectionFactory and delegate the interaction with the JMS server to this factory.
In case you have several connection factories, you can use the @Identifier qualifier on your factory to specify the name.
Then, in the channel configuration, configure the name as follows:
# Configure the connector globally
mp.messaging.connector.smallrye-jms.connection-factory-name=my-factory-name
# Configure a specific incoming channel
mp.messaging.incoming.my-channel.connection-factory-name=my-factory-name
# Configure a specific outgoing channel
mp.messaging.outgoing.my-channel.connection-factory-name=my-factory-name