Skip to content

Receiving AWS SQS Messages

The AWS SQS connector allows you to receive messages from an AWS SQS queue.

Receiving messages

Before you start, you need to have an AWS account and a SQS queue created. To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.

mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue

Then, your application receives Message<String>. You can consume the payload directly:

package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class SqsStringConsumer {
    @Incoming("data")
    void consume(String messageBody) {
        System.out.println("Received: " + messageBody);
    }
}

Or, you can retrieve the Message<String>:

package sqs.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class SqsMessageStringConsumer {
    @Incoming("data")
    CompletionStage<Void> consume(Message<String> msg) {
        System.out.println("Received: " + msg.getPayload());
        return msg.ack();
    }
}

You also can directly consume the software.amazon.awssdk.services.sqs.model.Message:

package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsSdkMessageConsumer {

    @Incoming("data")
    void consume(Message msg) {
        System.out.println("Received: " + msg.body());
        Map<String, MessageAttributeValue> attributes = msg.messageAttributes();
        // ...
    }
}

Receive message request customizer

The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of SqsReceiveMessageRequestCustomizer and configuring it's identifier using the receive.request.customizer connector attribute.

package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

@Identifier("my-customizer") // or with channel name @Identifier("data")
@ApplicationScoped
public class SqsReceiveMessageRequestCustomizerExample implements SqsReceiveMessageRequestCustomizer {
    @Override
    public void customize(ReceiveMessageRequest.Builder builder) {
        builder.visibilityTimeout(10)
                .messageAttributeNames("my-attribute");
    }
}
1
2
3
mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
mp.messaging.incoming.data.receive.request.customizer=my-customizer

Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.

Receive message request pause and resume

The AWS SQS connector fetches messages by continuously sending receive message requests. If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.

The pause resume can be disabled using the receive.request.pause.resume connector attribute.

mp.messaging.incoming.data.receive.request.pause.resume=false

Deserialization

The connector converts incoming SQS Messages into Reactive Messaging Message<T> instances.

The payload type T is determined based on the value of the SQS message attribute _classname.

If you send messages with the AWS SQS connector (outbound connector), the _classname attribute is automatically added to the message. The primitive types are transformed from the string representation to the corresponding Java type. For objects, if one of the JsonMapping modules is present on the classpath, the connector used that JSON module to deserialize the message body to objects.

If the _classname attribute is not present, the payload is deserialized as a String.

@ApplicationScoped
public static class Generator {

    @Outgoing("to-rabbitmq")
    public Multi<Price> prices() {
        AtomicInteger count = new AtomicInteger();
        return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
                .map(l -> new Price().setPrice(count.incrementAndGet()))
                .onOverflow().drop();
    }

}

@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-rabbitmq")
    public void consume(Price price) {
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}

public static class Price {
    public int price;

    public Price setPrice(int price) {
        this.price = price;
        return this;
    }
}

Inbound Metadata

Messages coming from SQS contain an instance of SqsIncomingMetadata in the metadata.

SQS message attributes can be accessed from the metadata either by name or by the MessageAttributeValue object.

package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsMetadataExample {

    @Incoming("queue")
    public void metadata(String body, SqsIncomingMetadata metadata) {
        Map<String, MessageAttributeValue> attributes = metadata.getMessage().messageAttributes();
        attributes.forEach((k, v) -> System.out.println(k + " -> " + v.stringValue()));
        System.out.println("Message body: " + body);
    }
}

Acknowledgement

The default strategy for acknowledging AWS SQS Message is to delete the message from the queue. With ack.delete set to false, the message is not deleted from the queue.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
ack.delete Whether the acknowledgement deletes the message from the queue boolean false true
credentials-provider The credential provider to be used in the client string false
endpoint-override The endpoint override string false
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
max-number-of-messages The maximum number of messages to receive int false 10
queue The name of the SQS queue, defaults to channel name if not provided string false
queue.url The url of the SQS queue string false
receive.request.customizer The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided string false
receive.request.message-attribute-names The message attribute names to retrieve when receiving messages. string false
receive.request.pause.resume Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused. boolean false true
receive.request.retries If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. long false 2147483647
region The name of the SQS region string false
visibility-timeout The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request int false
wait-time-seconds The maximum amount of time in seconds to wait for messages to be received int false 1