Sending data with Camel
You can use the Camel connector to send data to almost any type of
system.
To keep this document focused on the Camel connector, we use the Camel
File component. However, the connector can be used with any Camel
component.
Example
Let’s imagine you want to write generated prices into files. Configure
your application to write the messages from the prices channel into a
files as follows:
| mp.messaging.outgoing.prices.connector=smallrye-camel # <1>
mp.messaging.outgoing.prices.endpoint-uri=file:prices/?fileName=${date:now:yyyyMMddssSS}.txt&charset=utf-8 # <2>
|
1. Sets the connector for the prices channel
2. Configure the endpoint-uri to write into files in the prices
directory
Important
Depending on your implementation of MicroProfile Reactive Messaging, the
$ may need to be escaped as follows: $${...}
Then, your application must send Message<String> to the prices
channel. It can use String payloads as in the following snippet:
| package camel.outbound;
import java.time.Duration;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
public class CamelPriceProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<String> generate() {
// Build an infinite stream of random prices
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.map(x -> random.nextDouble())
.map(p -> Double.toString(p));
}
}
|
Or, you can send Message<Double>:
| package camel.outbound;
import java.time.Duration;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
public class CamelPriceMessageProducer {
private Random random = new Random();
@Outgoing("prices")
public Multi<Message<String>> generate() {
// Build an infinite stream of random prices
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble())
.map(p -> Double.toString(p))
.map(Message::of);
}
}
|
Serialization
The serialization is handled by the Camel component. Refer to the Camel
documentation to check which payload type is supported by the component.
When sending Messages, you can add an instance
of OutgoingExchangeMetadata
to the message metadata. You can then influence how the outbound Camel
Exchange is created, for example by adding properties:
| return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble())
.map(p -> Double.toString(p))
.map(s -> Message.of(s)
.addMetadata(new OutgoingExchangeMetadata()
.putProperty("my-property", "my-value")));
|
Acknowledgement
The incoming messages are acknowledged when the underlying Camel
exchange completes. If the exchange fails, the message is nacked.
Configuration Reference
| Attribute (alias) |
Description |
Type |
Mandatory |
Default |
| endpoint-uri |
The URI of the Camel endpoint (read from or written to) |
string |
true |
|
| merge |
Whether the connector should allow multiple upstreams |
boolean |
false |
false |