Using the Camel API
The Camel connector is based on the Reactive Streams
support
from Camel. If you have an application already using the Camel API
(routes, from...), you can integrate it with Reactive Messaging.
Getting the CamelReactiveStreamsService
Once you add the Camel connector to your application, you can retrieve
the
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
object:
| @Inject
CamelReactiveStreamsService reactiveStreamsService;
|
This CamelReactiveStreamsService lets you create Publisher and
Subscriber instances from existing routes.
Using Camel Route with @Outgoing
If you have an existing Camel route, you can transform it as a
Publisher using the CamelReactiveStreamsService. Then, you can
return this Publisher from a method annotated with @Outgoing:
| @Outgoing("camel")
public Publisher<Exchange> retrieveDataFromCamelRoute() {
return reactiveStreamsService.from("seda:camel");
}
|
You can also use RouteBuilder:
| @ApplicationScoped
static class MyRouteBuilder extends RouteBuilder {
@Inject
CamelReactiveStreamsService reactiveStreamsService;
@Outgoing("sink")
public Publisher<String> getDataFromCamelRoute() {
return reactiveStreamsService.fromStream("my-stream", String.class);
}
@Override
public void configure() {
from("seda:camel").process(
exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
.to("reactive-streams:my-stream");
}
}
|
Using Camel Route with @Incoming
If you have an existing Camel route, you can transform it as a
Subscriber using the CamelReactiveStreamsService. Then, you can
return this Subscriber from a method annotated with @Incoming:
| @Incoming("to-camel")
public Subscriber<String> sendDataToCamelRoute() {
return reactiveStreamsService.subscriber("file:./target?fileName=values.txt&fileExist=append",
String.class);
}
|
You can also use a producer:
| @Inject
CamelContext camel;
@Incoming("to-camel")
public CompletionStage<Void> sink(String value) {
return camel.createProducerTemplate()
.asyncSendBody("file:./target?fileName=values.txt&fileExist=append", value).thenApply(x -> null);
}
|