Skip to content

Receiving AWS SQS Messages

The AWS SQS connector allows you to receive messages from an AWS SQS queue.

Receiving messages

Before you start, you need to have an AWS account and a SQS queue created. To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.

mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue

Then, your application receives Message<String>. You can consume the payload directly:

package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class SqsStringConsumer {
    @Incoming("data")
    void consume(String messageBody) {
        System.out.println("Received: " + messageBody);
    }
}

Or, you can retrieve the Message<String>:

package sqs.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class SqsMessageStringConsumer {
    @Incoming("data")
    CompletionStage<Void> consume(Message<String> msg) {
        System.out.println("Received: " + msg.getPayload());
        return msg.ack();
    }
}

You also can directly consume the software.amazon.awssdk.services.sqs.model.Message:

package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsSdkMessageConsumer {

    @Incoming("data")
    void consume(Message msg) {
        System.out.println("Received: " + msg.body());
        Map<String, MessageAttributeValue> attributes = msg.messageAttributes();
        // ...
    }
}

Receive message request customizer

The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of SqsReceiveMessageRequestCustomizer and configuring it's identifier using the receive.request.customizer connector attribute.

package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

@Identifier("my-customizer") // or with channel name @Identifier("data")
@ApplicationScoped
public class SqsReceiveMessageRequestCustomizerExample implements SqsReceiveMessageRequestCustomizer {
    @Override
    public void customize(ReceiveMessageRequest.Builder builder) {
        builder.visibilityTimeout(10)
                .messageAttributeNames("my-attribute");
    }
}
1
2
3
mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
mp.messaging.incoming.data.receive.request.customizer=my-customizer

Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.

Receive message request pause and resume

The AWS SQS connector fetches messages by continuously sending receive message requests. If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.

The pause resume can be disabled using the receive.request.pause.resume connector attribute.

mp.messaging.incoming.data.receive.request.pause.resume=false

Deserialization

The connector converts incoming SQS Messages into Reactive Messaging Message<T> instances.

The payload type T is determined based on the value of the SQS message attribute _classname.

If you send messages with the AWS SQS connector (outbound connector), the _classname attribute is automatically added to the message. The primitive types are transformed from the string representation to the corresponding Java type. For objects, if one of the JsonMapping modules is present on the classpath, the connector used that JSON module to deserialize the message body to objects.

If the _classname attribute is not present, the payload is deserialized as a String.

@ApplicationScoped
public static class Generator {

    @Outgoing("to-rabbitmq")
    public Multi<Price> prices() {
        AtomicInteger count = new AtomicInteger();
        return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
                .map(l -> new Price().setPrice(count.incrementAndGet()))
                .onOverflow().drop();
    }

}

@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-rabbitmq")
    public void consume(Price price) {
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}

public static class Price {
    public int price;

    public Price setPrice(int price) {
        this.price = price;
        return this;
    }
}

Inbound Metadata

Messages coming from SQS contain an instance of SqsIncomingMetadata in the metadata.

SQS message attributes can be accessed from the metadata either by name or by the MessageAttributeValue object.

package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsMetadataExample {

    @Incoming("queue")
    public void metadata(String body, SqsIncomingMetadata metadata) {
        Map<String, MessageAttributeValue> attributes = metadata.getMessage().messageAttributes();
        attributes.forEach((k, v) -> System.out.println(k + " -> " + v.stringValue()));
        System.out.println("Message body: " + body);
    }
}

Acknowledgement Strategies

The default strategy for acknowledging AWS SQS Message is to delete the message from the queue. You can set the ack-strategy attribute to ignore if you want to ignore the message.

[NOTE] Deprecated ack.delete attribute is deprecated and will be removed in a future release.

You can implement a custom strategy by implementing the SqsAckHandler, interface with a Factory class and registering it as a CDI bean with an @Identifier.

package sqs.inbound;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import io.vertx.mutiny.core.Vertx;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class SqsCustomAckStrategy implements SqsAckHandler {

    @ApplicationScoped
    @Identifier("custom")
    public static class Factory implements SqsAckHandler.Factory {

        @Override
        public SqsAckHandler create(SqsConnectorIncomingConfiguration conf,
                Vertx vertx,
                SqsAsyncClient client,
                Uni<String> queueUrlUni,
                BiConsumer<Throwable, Boolean> reportFailure) {
            return new SqsCustomAckStrategy();
        }
    }

    @Override
    public Uni<Void> handle(SqsMessage<?> message) {
        return Uni.createFrom().voidItem()
                .emitOn(message::runOnMessageContext);
    }
}

Failure Strategies

The default strategy for handling message processing failures is ignore. It lets the visibility timeout of the message consumer to expire and reconsume the message.

Other possible strategies are:

  • fail: the failure is logged and the channel fail-stops.
  • delete: the message is removed from the queue.
  • visibility: the message visibility timeout is reset to 0.

You can implement a custom strategy by implementing the SqsFailureHandler, interface with a Factory class and registering it as a CDI bean with an @Identifier.

package sqs.inbound;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsFailureHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class SqsCustomNackStrategy implements SqsFailureHandler {

    @ApplicationScoped
    @Identifier("custom")
    public static class Factory implements SqsFailureHandler.Factory {

        @Override
        public SqsFailureHandler create(String channel, SqsAsyncClient client, Uni<String> queueUrlUni,
                BiConsumer<Throwable, Boolean> reportFailure) {
            return new SqsCustomNackStrategy();
        }
    }

    @Override
    public Uni<Void> handle(SqsMessage<?> message, Metadata metadata, Throwable throwable) {
        return Uni.createFrom().voidItem()
                .emitOn(message::runOnMessageContext);
    }
}

Configuration Reference

Attribute (alias) Description Type Mandatory Default
ack-strategy The identifier for the bean implementing ack strategy factory. Strategies: 'delete', 'ignore' string false delete
ack.delete deprecated - Whether the acknowledgement deletes the message from the queue. Deprecated, use ack-strategy instead boolean false
credentials-provider The credential provider to be used in the client string false
endpoint-override The endpoint override string false
failure-strategy The identifier for the bean implementing failure strategy factory. Strategies: 'ignore', 'fail', 'visibility', 'delete' string false ignore
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
max-number-of-messages The maximum number of messages to receive int false 10
queue The name of the SQS queue, defaults to channel name if not provided string false
queue.url The url of the SQS queue string false
receive.request.customizer The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided string false
receive.request.message-attribute-names The message attribute names to retrieve when receiving messages. string false
receive.request.pause.resume Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused. boolean false true
receive.request.retries If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. long false 2147483647
region The name of the SQS region string false
visibility-timeout The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request int false
wait-time-seconds The maximum amount of time in seconds to wait for messages to be received int false 1