JMS
The JMS connector adds support for Java Message Service to Reactive Messaging. It is designed to integrate with JavaEE/JakartaEE applications that are sending or receiving JMS Messages.
Introduction
The Java Message Service (JMS) API is a Java message-oriented middleware API for sending messages between two or more clients. JMS is a part of the Java Platform, Enterprise Edition (Java EE / JakartaEE) It is a messaging standard that allows application components based on Java EE to create, send, receive, and read messages. It allows the communication between different components of a distributed application to be loosely coupled, reliable, and asynchronous.
Using the JMS connector
To you the JMS Connector, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-jms</artifactId>
<version>3.3.2</version>
</dependency>
The connector name is: smallrye-jms
.
So, to indicate that a channel is managed by this connector you need:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-jms
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-jms
The JMS Connector requires a javax.jms.ConnectionFactory
to be exposed (as CDI bean).
The connector looks for a javax.jms.ConnectionFactory
and delegate the interaction with the JMS server to this factory.
In other words, it creates the JMS connection and context using this factory.
So, in order to use this connector you would need to expose a javax.jms.ConnectionFactory
:
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.jms.ConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
@ApplicationScoped
public class ConnectionFactoryBean {
@Produces
ConnectionFactory factory() {
return new ActiveMQJMSConnectionFactory(
"tcp://localhost:61616",
null, null);
}
}
The factory class may depends on your JMS connector/server.
The connector uses JSON-B to serialize and deserialize objects. You need to have an implementation of JSON-B available. Check http://json-b.net/ for more details.
Receiving messages from JMS
The JMS Connector retrieves JMS Message and maps each of them into Reactive Messaging Messages
.
Example
Let’s imagine you have a javax.jms.ConnectionFactory
bean exposed and connected to your JMS server.
Don’t forget that it’s required to use the JMS connector.
Configure your application to receive JMS messages on the prices
channel as follows:
mp.messaging.incoming.prices.connector=smallrye-jms (1)
-
Sets the connector for the
prices
channel
You don’t need to set the destination. By default, it uses the channel name (prices ). You can configure the destination attribute to override it.
|
By default the connector uses a queue . You can configure it to use a topic by setting destination-type=topic .
|
Then, your application receives Message<Double>
.
You can consumes the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class JmsPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
Or, you can retrieve the Message<Double>
:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class JmsPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message
return price.ack();
}
}
Deserialization
The content of the incoming JMS message is mapped to a Java object.
By default it extracts the JMS Message body as a java.lang.Object
.
This can be changed by setting, in the incoming JMS Message:
-
The
_classname
property -
the
JMSType
The value must be a fully qualified class name. The connector then load the associated class.
The connector loads the associated Class using the TCCL and if not found, the classloader used to load the connector.
|
If the target type is a primitive type ort String
, the resulting message contains the mapped payload.
If the target type is a class, the object is built using JSON-B, from the JMSType
.
If not, the default behavior is used (Java deserialization).
Inbound Metadata
Messages coming from JMS contains an instance of IncomingJmsMessageMetadata
in the metadata.
Optional<IncomingJmsMessageMetadata> metadata = incoming.getMetadata(IncomingJmsMessageMetadata.class);
metadata.ifPresent(meta -> {
long expiration = meta.getExpiration();
Destination destination = meta.getDestination();
String value = meta.getStringProperty("my-property");
});
Acknowledgement
The the Reactive Messaging Message
gets acknowledged, the associated JMS Message is acknowledged.
As JMS acknowledgement is blocking, this acknowledgement is delegated to a worker thread.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
connection-factory-name |
The name of the JMS connection factory ( Type: String |
false |
|
username |
The username to connect to to the JMS server Type: String |
false |
|
password |
The password to connect to to the JMS server Type: String |
false |
|
session-mode |
The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE Type: String |
false |
|
client-id |
The client id Type: String |
false |
|
destination |
The name of the JMS destination. If not set the name of the channel is used Type: String |
false |
|
destination-type |
The type of destination. It can be either Type: string |
false |
|
selector |
The JMS selector Type: String |
false |
|
no-local |
Enable or disable local delivery Type: boolean |
false |
|
broadcast |
Whether or not the JMS message should be dispatched to multiple consumers Type: boolean |
false |
|
durable |
Set to Type: boolean |
false |
|
Sending messages to JMS
The JMS Connector can send Reactive Messaging Messages
as JMS Message.
Example
Let’s imagine you have a javax.jms.ConnectionFactory
bean exposed and connected to your JMS server.
Don’t forget that it’s required to use the JMS connector.
Configure your application to write the messages from the prices
channel into a JMS Message as follows:
mp.messaging.outgoing.prices.connector=smallrye-jms (1)
-
Sets the connector for the
prices
channel
You don’t need to set the destination. By default, it uses the channel name (prices ). You can configure the destination attribute to override it.
|
By default the connector uses a queue . You can configure it to use a topic by setting destination-type=topic .
|
Then, your application must send Message<Double>
to the prices
channel.
It can use double
payloads as in the following snippet:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class JmsPriceProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
Or, you can send Message<Double>
:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class JmsPriceMessageProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Message<Double>> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble()));
}
}
Serialization
The connector serializes the incoming message payload into the body of the outgoing JMS Message.
If the payload is a String
or a primitive type, the payload is encoded as String
and the JMSType
is set to the target class.
The _classname
property is also set.
The JMS Message is a TextMessage
.
If the payload is a byte[]
, it’s passed as byte[]
in a JMS BytesMessage
.
Otherwise, the payload is encoded using JSON-P.
The JMSType
is set to the target class.
The _classname
property is also set.
The JMS Message is a TextMessage
.
For example, the following code serialize the produced Person
using JSON-B.
@Incoming("...")
@Outgoing("my-channel")
public Person sendToJms(...) {
// ...
return new Person("bob", 42);
}
It requires that the Person
class can be serialized to JSON.
The classname is passed in the JMSType
property and _classname
property.
Outbound Metadata
When sending Messages
, you can add an instance of OutgoingJmsMessageMetadata
to influence how the message is going to written to JMS.
OutgoingJmsMessageMetadata metadata = OutgoingJmsMessageMetadata.builder()
.withProperties(JmsProperties.builder().with("some-property", "some-value").build())
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
These metadata allow adding properties but also override the destination.
Acknowledgement
Once the JMS message is sent to the JMS server, the message is acknowledged. Sending a JMS message is a blocking operation. So, sending is done on a worker thread.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
client-id |
The client id Type: String |
false |
|
connection-factory-name |
The name of the JMS connection factory ( Type: String |
false |
|
correlation-id |
The JMS Message correlation id Type: string |
false |
|
delivery-delay |
The delivery delay Type: long |
false |
|
delivery-mode |
The delivery mode. Either Type: string |
false |
|
destination |
The name of the JMS destination. If not set the name of the channel is used Type: String |
false |
|
destination-type |
The type of destination. It can be either Type: string |
false |
|
disable-message-id |
Omit the message id in the outbound JMS message Type: boolean |
false |
|
disable-message-timestamp |
Omit the message timestamp in the outbound JMS message Type: boolean |
false |
|
merge |
Whether the connector should allow multiple upstreams Type: boolean |
false |
|
password |
The password to connect to to the JMS server Type: String |
false |
|
priority |
The JMS Message priority Type: int |
false |
|
reply-to |
The reply to destination if any Type: string |
false |
|
reply-to-destination-type |
The type of destination for the response. It can be either Type: string |
false |
|
session-mode |
The session mode. Accepted values are AUTO_ACKNOWLEDGE, SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE Type: String |
false |
|
ttl |
The JMS Message time-to-live Type: long |
false |
|
username |
The username to connect to to the JMS server Type: String |
false |
Advanced configuration
Underlying thread pool
Lots of JMS operations are blocking and so not cannot be done on the caller thread. For this reason, these blocking operations are executed on a worker thread.
You can configure the thread pool providing these worker threads using the following MicroProfile Config properties:
-
smallrye.jms.threads.max-pool-size
- the max number of threads (Defaults to 10) -
smallrye.jms.threads.ttl
- the ttl of the created threads (Defaults to 60 seconds)
Selecting the ConnectionFactory
The JMS Connector requires a javax.jms.ConnectionFactory
to be exposed as a CDI bean.
The connector looks for a javax.jms.ConnectionFactory
and delegate the interaction with the JMS server to this factory.
In case you have several connection factories, you can use the @Identifier
qualifier on your factory to specify the name.
Then, in the channel configuration, configure the name as follows:
# Configure the connector globally
mp.messaging.connector.smallrye-jms.connection-factory-name=my-factory-name
# Configure a specific incoming channel
mp.messaging.incoming.my-channel.connection-factory-name=my-factory-name
# Configure a specific outgoing channel
mp.messaging.outgoing.my-channel.connection-factory-name=my-factory-name