Skip to content

Sending data with Camel

You can use the Camel connector to send data to almost any type of system.

To keep this document focused on the Camel connector, we use the Camel File component. However, the connector can be used with any Camel component.

Example

Let’s imagine you want to write generated prices into files. Configure your application to write the messages from the prices channel into a files as follows:

mp.messaging.outgoing.prices.connector=smallrye-camel # <1>
mp.messaging.outgoing.prices.endpoint-uri=file:prices/?fileName=${date:now:yyyyMMddssSS}.txt&charset=utf-8 # <2>
1. Sets the connector for the prices channel 2. Configure the endpoint-uri to write into files in the prices directory

Important

Depending on your implementation of MicroProfile Reactive Messaging, the $ may need to be escaped as follows: $${...}

Then, your application must send Message<String> to the prices channel. It can use String payloads as in the following snippet:

package camel.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class CamelPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<String> generate() {
        // Build an infinite stream of random prices
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onOverflow().drop()
                .map(x -> random.nextDouble())
                .map(p -> Double.toString(p));
    }

}

Or, you can send Message<Double>:

package camel.outbound;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class CamelPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<String>> generate() {
        // Build an infinite stream of random prices
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> random.nextDouble())
                .map(p -> Double.toString(p))
                .map(Message::of);
    }

}

Serialization

The serialization is handled by the Camel component. Refer to the Camel documentation to check which payload type is supported by the component.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingExchangeMetadata to the message metadata. You can then influence how the outbound Camel Exchange is created, for example by adding properties:

1
2
3
4
5
6
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> random.nextDouble())
        .map(p -> Double.toString(p))
        .map(s -> Message.of(s)
                .addMetadata(new OutgoingExchangeMetadata()
                        .putProperty("my-property", "my-value")));

Acknowledgement

The incoming messages are acknowledged when the underlying Camel exchange completes. If the exchange fails, the message is nacked.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
endpoint-uri The URI of the Camel endpoint (read from or written to) string true
merge Whether the connector should allow multiple upstreams boolean false false