JMS

The JMS connector adds support for Java Message Service to Reactive Messaging. It is designed to integrate with JavaEE/JakartaEE applications that are sending or receiving JMS Messages.

Introduction

The Java Message Service (JMS) API is a Java message-oriented middleware API for sending messages between two or more clients. JMS is a part of the Java Platform, Enterprise Edition (Java EE / JakartaEE) It is a messaging standard that allows application components based on Java EE to create, send, receive, and read messages. It allows the communication between different components of a distributed application to be loosely coupled, reliable, and asynchronous.

Using the JMS connector

To you the JMS Connector, add the following dependency to your project:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-jms</artifactId>
  <version>3.3.2</version>
</dependency>

The connector name is: smallrye-jms.

So, to indicate that a channel is managed by this connector you need:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-jms

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-jms

The JMS Connector requires a javax.jms.ConnectionFactory to be exposed (as CDI bean). The connector looks for a javax.jms.ConnectionFactory and delegate the interaction with the JMS server to this factory. In other words, it creates the JMS connection and context using this factory.

So, in order to use this connector you would need to expose a javax.jms.ConnectionFactory:

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.jms.ConnectionFactory;

import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;

@ApplicationScoped
public class ConnectionFactoryBean {

    @Produces
    ConnectionFactory factory() {
        return new ActiveMQJMSConnectionFactory(
                "tcp://localhost:61616",
                null, null);
    }

}

The factory class may depends on your JMS connector/server.

The connector uses JSON-B to serialize and deserialize objects. You need to have an implementation of JSON-B available. Check http://json-b.net/ for more details.

Receiving messages from JMS

The JMS Connector retrieves JMS Message and maps each of them into Reactive Messaging Messages.

Example

Let’s imagine you have a javax.jms.ConnectionFactory bean exposed and connected to your JMS server. Don’t forget that it’s required to use the JMS connector.

Configure your application to receive JMS messages on the prices channel as follows:

mp.messaging.incoming.prices.connector=smallrye-jms       (1)
  1. Sets the connector for the prices channel

You don’t need to set the destination. By default, it uses the channel name (prices). You can configure the destination attribute to override it.
By default the connector uses a queue. You can configure it to use a topic by setting destination-type=topic.

Then, your application receives Message<Double>. You can consumes the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class JmsPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

Or, you can retrieve the Message<Double>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class JmsPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message
        return price.ack();
    }

}

Deserialization

The content of the incoming JMS message is mapped to a Java object.

By default it extracts the JMS Message body as a java.lang.Object. This can be changed by setting, in the incoming JMS Message:

  1. The _classname property

  2. the JMSType

The value must be a fully qualified class name. The connector then load the associated class.

The connector loads the associated Class using the TCCL and if not found, the classloader used to load the connector.

If the target type is a primitive type ort String, the resulting message contains the mapped payload.

If the target type is a class, the object is built using JSON-B, from the JMSType. If not, the default behavior is used (Java deserialization).

Inbound Metadata

Messages coming from JMS contains an instance of IncomingJmsMessageMetadata in the metadata.

Optional<IncomingJmsMessageMetadata> metadata = incoming.getMetadata(IncomingJmsMessageMetadata.class);
metadata.ifPresent(meta -> {
    long expiration = meta.getExpiration();
    Destination destination = meta.getDestination();
    String value = meta.getStringProperty("my-property");
});

Acknowledgement

The the Reactive Messaging Message gets acknowledged, the associated JMS Message is acknowledged. As JMS acknowledgement is blocking, this acknowledgement is delegated to a worker thread.

Configuration Reference

Table 1. Incoming Attributes of the 'smallrye-jms' connector
Attribute (alias) Description Mandatory Default

connection-factory-name

The name of the JMS connection factory (javax.jms.ConnectionFactory) to be used. If not set, it uses any exposed JMS connection factory

Type: String

false

username

The username to connect to to the JMS server

Type: String

false

password

The password to connect to to the JMS server

Type: String

false

session-mode

The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE

Type: String

false

AUTO_ACKNOWLEDGE

client-id

The client id

Type: String

false

destination

The name of the JMS destination. If not set the name of the channel is used

Type: String

false

destination-type

The type of destination. It can be either queue or topic

Type: string

false

queue

selector

The JMS selector

Type: String

false

no-local

Enable or disable local delivery

Type: boolean

false

false

broadcast

Whether or not the JMS message should be dispatched to multiple consumers

Type: boolean

false

false

durable

Set to true to use a durable subscription

Type: boolean

false

false

Sending messages to JMS

The JMS Connector can send Reactive Messaging Messages as JMS Message.

Example

Let’s imagine you have a javax.jms.ConnectionFactory bean exposed and connected to your JMS server. Don’t forget that it’s required to use the JMS connector.

Configure your application to write the messages from the prices channel into a JMS Message as follows:

mp.messaging.outgoing.prices.connector=smallrye-jms       (1)
  1. Sets the connector for the prices channel

You don’t need to set the destination. By default, it uses the channel name (prices). You can configure the destination attribute to override it.
By default the connector uses a queue. You can configure it to use a topic by setting destination-type=topic.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class JmsPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class JmsPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble()));
    }

}

Serialization

The connector serializes the incoming message payload into the body of the outgoing JMS Message.

If the payload is a String or a primitive type, the payload is encoded as String and the JMSType is set to the target class. The _classname property is also set. The JMS Message is a TextMessage.

If the payload is a byte[], it’s passed as byte[] in a JMS BytesMessage.

Otherwise, the payload is encoded using JSON-P. The JMSType is set to the target class. The _classname property is also set. The JMS Message is a TextMessage.

For example, the following code serialize the produced Person using JSON-B.

@Incoming("...")
@Outgoing("my-channel")
public Person sendToJms(...) {
  // ...
  return new Person("bob", 42);
}

It requires that the Person class can be serialized to JSON. The classname is passed in the JMSType property and _classname property.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingJmsMessageMetadata to influence how the message is going to written to JMS.

OutgoingJmsMessageMetadata metadata = OutgoingJmsMessageMetadata.builder()
    .withProperties(JmsProperties.builder().with("some-property", "some-value").build())
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

These metadata allow adding properties but also override the destination.

Acknowledgement

Once the JMS message is sent to the JMS server, the message is acknowledged. Sending a JMS message is a blocking operation. So, sending is done on a worker thread.

Configuration Reference

Table 2. Outgoing Attributes of the 'smallrye-jms' connector
Attribute (alias) Description Mandatory Default

client-id

The client id

Type: String

false

connection-factory-name

The name of the JMS connection factory (javax.jms.ConnectionFactory) to be used. If not set, it uses any exposed JMS connection factory

Type: String

false

correlation-id

The JMS Message correlation id

Type: string

false

delivery-delay

The delivery delay

Type: long

false

delivery-mode

The delivery mode. Either persistent or non_persistent

Type: string

false

destination

The name of the JMS destination. If not set the name of the channel is used

Type: String

false

destination-type

The type of destination. It can be either queue or topic

Type: string

false

queue

disable-message-id

Omit the message id in the outbound JMS message

Type: boolean

false

disable-message-timestamp

Omit the message timestamp in the outbound JMS message

Type: boolean

false

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

password

The password to connect to to the JMS server

Type: String

false

priority

The JMS Message priority

Type: int

false

reply-to

The reply to destination if any

Type: string

false

reply-to-destination-type

The type of destination for the response. It can be either queue or topic

Type: string

false

queue

session-mode

The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE

Type: String

false

AUTO_ACKNOWLEDGE

ttl

The JMS Message time-to-live

Type: long

false

username

The username to connect to to the JMS server

Type: String

false

Advanced configuration

Underlying thread pool

Lots of JMS operations are blocking and so not cannot be done on the caller thread. For this reason, these blocking operations are executed on a worker thread.

You can configure the thread pool providing these worker threads using the following MicroProfile Config properties:

  • smallrye.jms.threads.max-pool-size - the max number of threads (Defaults to 10)

  • smallrye.jms.threads.ttl - the ttl of the created threads (Defaults to 60 seconds)

Selecting the ConnectionFactory

The JMS Connector requires a javax.jms.ConnectionFactory to be exposed as a CDI bean. The connector looks for a javax.jms.ConnectionFactory and delegate the interaction with the JMS server to this factory.

In case you have several connection factories, you can use the @Identifier qualifier on your factory to specify the name. Then, in the channel configuration, configure the name as follows:

# Configure the connector globally
mp.messaging.connector.smallrye-jms.connection-factory-name=my-factory-name
# Configure a specific incoming channel
mp.messaging.incoming.my-channel.connection-factory-name=my-factory-name
# Configure a specific outgoing channel
mp.messaging.outgoing.my-channel.connection-factory-name=my-factory-name