Integration with Apache Camel

The Camel connector adds support for Apache Camel to Reactive Messaging.

Camel is an open source integration framework let you integrate various systems consuming or producing data. Camel implements the Enterprise Integration Patterns and provides several hundred of components used to access databases, message queues, APIs or basically anything under the sun.

Introduction

Camel is not a messaging broker. But, it allows your Reactive Messaging application to retrieve data from almost anything and send data to almost anything.

So if you want to send Reactive Messaging Message to Telegram or retrieve data from Salesforce or SAP, this is the connector you need.

One of the Camel cornerstone is the endpoint and its uri encoding the connection to an external system. For example, file:orders/?delete=true&charset=utf-8 instructs Camel to read the files from the orders directory. URI format and parameters are listed on the component documentation, such as the File component.

Using the camel connector

To you the camel Connector, add the following dependency to your project:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-camel</artifactId>
  <version>3.10.1</version>
</dependency>

You will also need the dependency of the Camel component you are using. For example, if you want to process files, you would need to add the Camel File Component artifact:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-file</artifactId>
  <version>3.11.2</version>
</dependency>

The connector name is: smallrye-camel.

So, to indicate that a channel is managed by this connector you need:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-camel

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-camel

Retrieving data using Camel

Camel provides many components. To keep this documentation focused on the integration with Camel, we use the File component. This component let use read files from a directory. So the connector configured with this component creates a Message for each file located in the directory. As soon as a file is dropped in the directory, a new Message is created.

Example

Let’s imagine you want to read the files from the orders directory and send them to the files channel. Configuring the Camel connector to gets the file from this directory only requires 2 properties:

mp.messaging.incoming.files.connector=smallrye-camel       (1)
mp.messaging.incoming.files.endpoint-uri=file:orders/?delete=true&charset=utf-8 (2)
  1. Sets the connector for the files channel

  2. Configures the endpoint-uri

Then, your application receives Message<GenericFile<File>>.

The Camel File component produces org.apache.camel.component.file.GenericFile instances. You can retrieve the actual File using getFile().

You can consumes the payload directly:

package inbound;

import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import java.io.File;

@ApplicationScoped
public class CamelFileConsumer {

    @Incoming("files")
    public void consume(GenericFile<File> gf) {
        File file = gf.getFile();
        // process the file

    }

}

You can also retrieve the Message<GenericFile<File>>:

package inbound;

import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.io.File;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class CamelFileMessageConsumer {

    @Incoming("files")
    public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {
        File file = msg.getPayload().getFile();
        // process the file

        return msg.ack();
    }


}

Deserialization

Each Camel component is producing specific objects. As we have seen, the File component produces GenericFile.

Refer to the component documentation to check which type is produced.

Inbound Metadata

Messages coming from Camel contains an instance of IncomingExchangeMetadata in the metadata.

package inbound;

import io.smallrye.reactive.messaging.camel.IncomingExchangeMetadata;
import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class IncomingCamelMetadataExample {

    @Incoming("files")
    public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {
        Optional<IncomingExchangeMetadata> metadata = msg.getMetadata(IncomingExchangeMetadata.class);
        if (metadata.isPresent()) {
            // Retrieve the camel exchange:
            Exchange exchange = metadata.get().getExchange();
        }
        return msg.ack();
    }


}

This object lets you retrieve the Camel Exchange.

Failure Management

If a message produced from a Camel exchange is nacked, a failure strategy is applied. The Camel connector supports 3 strategies:

  • fail - fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed.

  • ignore - the failure is logged, but the processing continue.

In both cases, the exchange is marked as rollback only and the nack reason is attached to the exchange.

Configuration Reference

Table 1. Incoming Attributes of the 'smallrye-camel' connector
Attribute (alias) Description Mandatory Default

endpoint-uri

The URI of the Camel endpoint (read from or written to)

Type: string

true

failure-strategy

Specify the failure strategy to apply when a message produced from a Camel exchange is nacked. Values can be fail (default) or ignore

Type: string

false

fail

Sending data with Camel

You can use the Camel connector to send data to almost any type of system.

To keep this document focused on the Camel connector, we use the Camel File component. However, the connector can be used with any Camel component.

Example

Let’s imagine you want to write generated prices into files. Configure your application to write the messages from the prices channel into a files as follows:

mp.messaging.outgoing.prices.connector=smallrye-camel (1)
mp.messaging.outgoing.prices.endpoint-uri=file:prices/?fileName=${date:now:yyyyMMddssSS}.txt&charset=utf-8
  1. Sets the connector for the prices channel

  2. Configure the endpoint-uri to write into files in the prices directory

Depending on your implementation of MicroProfile Reactive Messaging, the $ may need to be escaped as follows: $${…​}

Then, your application must send Message<String> to the prices channel. It can use String payloads as in the following snippet:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class CamelPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<String> generate() {
        // Build an infinite stream of random prices
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .onOverflow().drop()
            .map(x -> random.nextDouble())
            .map(p -> Double.toString(p));
    }

}

Or, you can send Message<Double>:

package outbound;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class CamelPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<String>> generate() {
        // Build an infinite stream of random prices
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble())
            .map(p -> Double.toString(p))
            .map(Message::of);
    }


}

Serialization

The serialization is handled by the Camel component. Refer to the Camel documentation to check which payload type is supported by the component.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingExchangeMetadata to the message metadata. You can then influence how the outbound Camel Exchange is created, for example by adding properties:

        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble())
            .map(p -> Double.toString(p))
            .map(s ->
                Message.of(s)
                    .addMetadata(new OutgoingExchangeMetadata().putProperty("my-property", "my-value"))
            );

Acknowledgement

The incoming messages are acknowledged when the underlying Camel exchange completes. If the exchange fails, the message is nacked.

Configuration Reference

Table 2. Outgoing Attributes of the 'smallrye-camel' connector
Attribute (alias) Description Mandatory Default

endpoint-uri

The URI of the Camel endpoint (read from or written to)

Type: string

true

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

Using the Camel API

The Camel connector is based on the Reactive Streams support from Camel. If you have an application already using the Camel API (routes, from…​), you can integrate it with Reactive Messaging.

Getting the CamelReactiveStreamsService

Once you add the Camel connector to your application, you can retrieve the org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService object:

@Inject CamelReactiveStreamsService reactiveStreamsService;

This CamelReactiveStreamsService lets you create Publisher and Subscriber instances from existing routes.

Using Camel Route with @Outgoing

If you have an existing Camel route, you can transform it as a Publisher using the CamelReactiveStreamsService. Then, you can return this Publisher from a method annotated with @Outgoing:

@Outgoing("camel")
public Publisher<Exchange> retrieveDataFromCamelRoute() {
    return reactiveStreamsService.from("seda:camel");
}

You can also use RouteBuilder:

@ApplicationScoped
static class MyRouteBuilder extends RouteBuilder {
    @Inject CamelReactiveStreamsService reactiveStreamsService;

    @Outgoing("sink")
    public Publisher<String> getDataFromCamelRoute() {
        return reactiveStreamsService.fromStream("my-stream", String.class);
    }

    @Override
    public void configure() throws Exception {
        from("seda:camel").process(
            exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
            .to("reactive-streams:my-stream");
    }
}

Using Camel Route with @Incoming

If you have an existing Camel route, you can transform it as a Subscriber using the CamelReactiveStreamsService. Then, you can return this Subscriber from a method annotated with @Incoming:

    @Incoming("to-camel")
    public Subscriber<String> sendDataToCamelRoute() {
        return reactiveStreamsService.subscriber("file:./target?fileName=values.txt&fileExist=append",
            String.class);
    }

You can also use a producer:

    @Inject CamelContext camel;

    @Incoming("to-camel")
    public CompletionStage<Void> sink(String value) {
        return camel.createProducerTemplate()
            .asyncSendBody("file:./target?fileName=values.txt&fileExist=append", value).thenApply(x -> null);
    }