Integration with Apache Camel
The Camel connector adds support for Apache Camel to Reactive Messaging.
Camel is an open source integration framework let you integrate various systems consuming or producing data. Camel implements the Enterprise Integration Patterns and provides several hundred of components used to access databases, message queues, APIs or basically anything under the sun.
Introduction
Camel is not a messaging broker. But, it allows your Reactive Messaging application to retrieve data from almost anything and send data to almost anything.
So if you want to send Reactive Messaging Message
to Telegram or retrieve data from Salesforce or SAP, this is the connector you need.
One of the Camel cornerstone is the endpoint
and its uri
encoding the connection to an external system.
For example, file:orders/?delete=true&charset=utf-8
instructs Camel to read the files from the orders
directory.
URI format and parameters are listed on the component documentation, such as the File component.
Using the camel connector
To you the camel Connector, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-camel</artifactId>
<version>3.13.0</version>
</dependency>
You will also need the dependency of the Camel component you are using. For example, if you want to process files, you would need to add the Camel File Component artifact:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-file</artifactId>
<version>3.13.0</version>
</dependency>
The connector name is: smallrye-camel
.
So, to indicate that a channel is managed by this connector you need:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-camel
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-camel
Retrieving data using Camel
Camel provides many components.
To keep this documentation focused on the integration with Camel, we use the File component.
This component let use read files from a directory.
So the connector configured with this component creates a Message
for each file located in the directory.
As soon as a file is dropped in the directory, a new Message
is created.
Example
Let’s imagine you want to read the files from the orders
directory and send them to the files
channel.
Configuring the Camel connector to gets the file from this directory only requires 2 properties:
mp.messaging.incoming.files.connector=smallrye-camel (1)
mp.messaging.incoming.files.endpoint-uri=file:orders/?delete=true&charset=utf-8 (2)
-
Sets the connector for the
files
channel -
Configures the
endpoint-uri
Then, your application receives Message<GenericFile<File>>
.
The Camel File component produces org.apache.camel.component.file.GenericFile instances. You can retrieve the actual File using getFile() .
|
You can consumes the payload directly:
package inbound;
import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import java.io.File;
@ApplicationScoped
public class CamelFileConsumer {
@Incoming("files")
public void consume(GenericFile<File> gf) {
File file = gf.getFile();
// process the file
}
}
You can also retrieve the Message<GenericFile<File>>
:
package inbound;
import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.io.File;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class CamelFileMessageConsumer {
@Incoming("files")
public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {
File file = msg.getPayload().getFile();
// process the file
return msg.ack();
}
}
Deserialization
Each Camel component is producing specific objects.
As we have seen, the File component produces GenericFile
.
Refer to the component documentation to check which type is produced.
Inbound Metadata
Messages coming from Camel contains an instance of IncomingExchangeMetadata
in the metadata.
package inbound;
import io.smallrye.reactive.messaging.camel.IncomingExchangeMetadata;
import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class IncomingCamelMetadataExample {
@Incoming("files")
public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {
Optional<IncomingExchangeMetadata> metadata = msg.getMetadata(IncomingExchangeMetadata.class);
if (metadata.isPresent()) {
// Retrieve the camel exchange:
Exchange exchange = metadata.get().getExchange();
}
return msg.ack();
}
}
This object lets you retrieve the Camel Exchange
.
Failure Management
If a message produced from a Camel exchange is nacked, a failure strategy is applied. The Camel connector supports 3 strategies:
-
fail
- fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed. -
ignore
- the failure is logged, but the processing continue.
In both cases, the exchange
is marked as rollback only and the nack reason is attached to the exchange.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
endpoint-uri |
The URI of the Camel endpoint (read from or written to) Type: string |
true |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from a Camel exchange is nacked. Values can be Type: string |
false |
|
Sending data with Camel
You can use the Camel connector to send data to almost any type of system.
To keep this document focused on the Camel connector, we use the Camel File component. However, the connector can be used with any Camel component.
Example
Let’s imagine you want to write generated prices into files.
Configure your application to write the messages from the prices
channel into a files as follows:
mp.messaging.outgoing.prices.connector=smallrye-camel (1)
mp.messaging.outgoing.prices.endpoint-uri=file:prices/?fileName=${date:now:yyyyMMddssSS}.txt&charset=utf-8
-
Sets the connector for the
prices
channel -
Configure the
endpoint-uri
to write into files in theprices
directory
Depending on your implementation of MicroProfile Reactive Messaging, the $ may need to be escaped as follows: $${…}
|
Then, your application must send Message<String>
to the prices
channel.
It can use String
payloads as in the following snippet:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class CamelPriceProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<String> generate() {
// Build an infinite stream of random prices
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.map(x -> random.nextDouble())
.map(p -> Double.toString(p));
}
}
Or, you can send Message<Double>
:
package outbound;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class CamelPriceMessageProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Message<String>> generate() {
// Build an infinite stream of random prices
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble())
.map(p -> Double.toString(p))
.map(Message::of);
}
}
Serialization
The serialization is handled by the Camel component. Refer to the Camel documentation to check which payload type is supported by the component.
Outbound Metadata
When sending Messages
, you can add an instance of OutgoingExchangeMetadata
to the message metadata.
You can then influence how the outbound Camel Exchange
is created, for example by adding properties:
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble())
.map(p -> Double.toString(p))
.map(s ->
Message.of(s)
.addMetadata(new OutgoingExchangeMetadata().putProperty("my-property", "my-value"))
);
Acknowledgement
The incoming messages are acknowledged when the underlying Camel exchange completes. If the exchange fails, the message is nacked.
Configuration Reference
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
endpoint-uri |
The URI of the Camel endpoint (read from or written to) Type: string |
true |
|
merge |
Whether the connector should allow multiple upstreams Type: boolean |
false |
|
The processor pattern using Camel
Using the processor pattern, you can consume on a channel using a Camel component, and produce on a channel using another Camel component. In that case, the headers present in the incoming metadata will be forwarded in the outgoing metadata.
Example
Let’s imagine you want to read messages from a Nats subject, process it and produce a message on a Kafka topic.
mp.messaging.incoming.mynatssubject.connector=smallrye-camel (1)
mp.messaging.incoming.mynatssubject.endpoint-uri=nats:mynatssubject (2)
mp.messaging.outgoing.mykafkatopic.connector=smallrye-camel (3)
mp.messaging.outgoing.mykafkatopic.endpoint-uri=kafka:mykafkatopic (4)
camel.component.nats.servers=127.0.0.1:5555 (5)
camel.component.kafka.brokers=127.0.0.1:9092 (6)
-
Sets the connector for the
mynatssubject
channel -
Configures the
endpoint-uri
for nats subject -
Sets the connector for the
mykafkatopic
channel -
Configures the
endpoint-uri
for the kafka topic -
Sets the URL of the nats server to use
-
Sets the URL of a kafka broker to use
package processor;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class CamelProcessor {
@Incoming("mynatssubject")
@Outgoing("mykafkatopic")
public byte[] process(byte[] message) {
// do some logic
return message;
}
}
Using the Camel API
The Camel connector is based on the Reactive Streams support from Camel.
If you have an application already using the Camel API (routes, from
…), you can integrate it with Reactive Messaging.
Getting the CamelReactiveStreamsService
Once you add the Camel connector to your application, you can retrieve the org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
object:
@Inject CamelReactiveStreamsService reactiveStreamsService;
This CamelReactiveStreamsService
lets you create Publisher
and Subscriber
instances from existing routes.
Using Camel Route with @Outgoing
If you have an existing Camel route, you can transform it as a Publisher
using the CamelReactiveStreamsService
.
Then, you can return this Publisher
from a method annotated with @Outgoing
:
@Outgoing("camel")
public Publisher<Exchange> retrieveDataFromCamelRoute() {
return reactiveStreamsService.from("seda:camel");
}
You can also use RouteBuilder
:
@ApplicationScoped
static class MyRouteBuilder extends RouteBuilder {
@Inject CamelReactiveStreamsService reactiveStreamsService;
@Outgoing("sink")
public Publisher<String> getDataFromCamelRoute() {
return reactiveStreamsService.fromStream("my-stream", String.class);
}
@Override
public void configure() {
from("seda:camel").process(
exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
.to("reactive-streams:my-stream");
}
}
Using Camel Route with @Incoming
If you have an existing Camel route, you can transform it as a Subscriber
using the CamelReactiveStreamsService
.
Then, you can return this Subscriber
from a method annotated with @Incoming
:
@Incoming("to-camel")
public Subscriber<String> sendDataToCamelRoute() {
return reactiveStreamsService.subscriber("file:./target?fileName=values.txt&fileExist=append",
String.class);
}
You can also use a producer:
@Inject CamelContext camel;
@Incoming("to-camel")
public CompletionStage<Void> sink(String value) {
return camel.createProducerTemplate()
.asyncSendBody("file:./target?fileName=values.txt&fileExist=append", value).thenApply(x -> null);
}