Skip to content

Retrieving data using Camel

Camel provides many components. To keep this documentation focused on the integration with Camel, we use the File component. This component let use read files from a directory. So the connector configured with this component creates a Message for each file located in the directory. As soon as a file is dropped in the directory, a new Message is created.

Example

Let’s imagine you want to read the files from the orders directory and send them to the files channel. Configuring the Camel connector to gets the file from this directory only requires 2 properties:

mp.messaging.incoming.files.connector=smallrye-camel # <1>
mp.messaging.incoming.files.endpoint-uri=file:orders/?delete=true&charset=utf-8 # <2>
1. Sets the connector for the files channel 2. Configures the endpoint-uri

Then, your application receives Message<GenericFile<File>>.

Note

The Camel File component produces org.apache.camel.component.file.GenericFile instances. You can retrieve the actual File using getFile().

You can consume the payload directly:

package camel.inbound;

import java.io.File;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class CamelFileConsumer {

    @Incoming("files")
    public void consume(GenericFile<File> gf) {
        File file = gf.getFile();
        // process the file

    }

}

You can also retrieve the Message<GenericFile<File>>:

package camel.inbound;

import java.io.File;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class CamelFileMessageConsumer {

    @Incoming("files")
    public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {
        File file = msg.getPayload().getFile();
        // process the file

        return msg.ack();
    }

}

Deserialization

Each Camel component is producing specific objects. As we have seen, the File component produces GenericFile.

Refer to the component documentation to check which type is produced.

Inbound Metadata

Messages coming from Camel contains an instance of IncomingExchangeMetadata in the metadata.

package camel.inbound;

import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFile;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.camel.IncomingExchangeMetadata;

@ApplicationScoped
public class IncomingCamelMetadataExample {

    @Incoming("files")
    public CompletionStage<Void> consume(Message<GenericFile<File>> msg) {
        Optional<IncomingExchangeMetadata> metadata = msg.getMetadata(IncomingExchangeMetadata.class);
        if (metadata.isPresent()) {
            // Retrieve the camel exchange:
            Exchange exchange = metadata.get().getExchange();
        }
        return msg.ack();
    }

}

This object lets you retrieve the Camel Exchange.

Failure Management

If a message produced from a Camel exchange is nacked, a failure strategy is applied. The Camel connector supports 3 strategies:

  • fail - fail the application, no more MQTT messages will be processed. (default) The offset of the record that has not been processed correctly is not committed.
  • ignore - the failure is logged, but the processing continue.

In both cases, the exchange is marked as rollback only and the nack reason is attached to the exchange.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
endpoint-uri The URI of the Camel endpoint (read from or written to) string true
failure-strategy Specify the failure strategy to apply when a message produced from a Camel exchange is nacked. Values can be fail (default) or ignore string false fail