Receiving AWS SQS Messages
The AWS SQS connector allows you to receive messages from an AWS SQS queue.
Receiving messages
Before you start, you need to have an AWS account and a SQS queue created.
To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.
| mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
|
Then, your application receives Message<String>.
You can consume the payload directly:
| package sqs.inbound;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
@ApplicationScoped
public class SqsStringConsumer {
@Incoming("data")
void consume(String messageBody) {
System.out.println("Received: " + messageBody);
}
}
|
Or, you can retrieve the Message<String>:
| package sqs.inbound;
import java.util.concurrent.CompletionStage;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
@ApplicationScoped
public class SqsMessageStringConsumer {
@Incoming("data")
CompletionStage<Void> consume(Message<String> msg) {
System.out.println("Received: " + msg.getPayload());
return msg.ack();
}
}
|
You also can directly consume the software.amazon.awssdk.services.sqs.model.Message:
| package sqs.inbound;
import java.util.Map;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
@ApplicationScoped
public class SqsSdkMessageConsumer {
@Incoming("data")
void consume(Message msg) {
System.out.println("Received: " + msg.body());
Map<String, MessageAttributeValue> attributes = msg.messageAttributes();
// ...
}
}
|
Receive message request customizer
The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of
SqsReceiveMessageRequestCustomizer
and configuring it's identifier using the receive.request.customizer connector attribute.
| package sqs.inbound;
import jakarta.enterprise.context.ApplicationScoped;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
@Identifier("my-customizer") // or with channel name @Identifier("data")
@ApplicationScoped
public class SqsReceiveMessageRequestCustomizerExample implements SqsReceiveMessageRequestCustomizer {
@Override
public void customize(ReceiveMessageRequest.Builder builder) {
builder.visibilityTimeout(10)
.messageAttributeNames("my-attribute");
}
}
|
| mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
mp.messaging.incoming.data.receive.request.customizer=my-customizer
|
Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.
Receive message request pause and resume
The AWS SQS connector fetches messages by continuously sending receive message requests.
If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.
The pause resume can be disabled using the receive.request.pause.resume connector attribute.
| mp.messaging.incoming.data.receive.request.pause.resume=false
|
Deserialization
The connector converts incoming SQS Messages into Reactive Messaging Message<T> instances.
The payload type T is determined based on the value of the SQS message attribute _classname.
If you send messages with the AWS SQS connector (outbound connector),
the _classname attribute is automatically added to the message.
The primitive types are transformed from the string representation to the corresponding Java type.
For objects, if one of the JsonMapping modules is present on the classpath,
the connector used that JSON module to deserialize the message body to objects.
If the _classname attribute is not present, the payload is deserialized as a String.
| @ApplicationScoped
public static class Generator {
@Outgoing("to-rabbitmq")
public Multi<Price> prices() {
AtomicInteger count = new AtomicInteger();
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.map(l -> new Price().setPrice(count.incrementAndGet()))
.onOverflow().drop();
}
}
@ApplicationScoped
public static class Consumer {
List<Price> prices = new CopyOnWriteArrayList<>();
@Incoming("from-rabbitmq")
public void consume(Price price) {
prices.add(price);
}
public List<Price> list() {
return prices;
}
}
public static class Price {
public int price;
public Price setPrice(int price) {
this.price = price;
return this;
}
}
|
Messages coming from SQS contain an instance of SqsIncomingMetadata
in the metadata.
SQS message attributes can be accessed from the metadata either by name or by the MessageAttributeValue object.
| package sqs.inbound;
import java.util.Map;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
@ApplicationScoped
public class SqsMetadataExample {
@Incoming("queue")
public void metadata(String body, SqsIncomingMetadata metadata) {
Map<String, MessageAttributeValue> attributes = metadata.getMessage().messageAttributes();
attributes.forEach((k, v) -> System.out.println(k + " -> " + v.stringValue()));
System.out.println("Message body: " + body);
}
}
|
Acknowledgement Strategies
The default strategy for acknowledging AWS SQS Message is to delete the message from the queue.
You can set the ack-strategy attribute to ignore if you want to ignore the message.
[NOTE] Deprecated
ack.delete attribute is deprecated and will be removed in a future release.
You can implement a custom strategy by implementing the SqsAckHandler,
interface with a Factory class and registering it as a CDI bean with an @Identifier.
| package sqs.inbound;
import java.util.function.BiConsumer;
import jakarta.enterprise.context.ApplicationScoped;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import io.vertx.mutiny.core.Vertx;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
public class SqsCustomAckStrategy implements SqsAckHandler {
@ApplicationScoped
@Identifier("custom")
public static class Factory implements SqsAckHandler.Factory {
@Override
public SqsAckHandler create(SqsConnectorIncomingConfiguration conf,
Vertx vertx,
SqsAsyncClient client,
Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure) {
return new SqsCustomAckStrategy();
}
}
@Override
public Uni<Void> handle(SqsMessage<?> message) {
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
|
Failure Strategies
The default strategy for handling message processing failures is ignore.
It lets the visibility timeout of the message consumer to expire and reconsume the message.
Other possible strategies are:
fail: the failure is logged and the channel fail-stops.
delete: the message is removed from the queue.
visibility: the message visibility timeout is reset to 0.
You can implement a custom strategy by implementing the SqsFailureHandler,
interface with a Factory class and registering it as a CDI bean with an @Identifier.
| package sqs.inbound;
import java.util.function.BiConsumer;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsFailureHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
public class SqsCustomNackStrategy implements SqsFailureHandler {
@ApplicationScoped
@Identifier("custom")
public static class Factory implements SqsFailureHandler.Factory {
@Override
public SqsFailureHandler create(String channel, SqsAsyncClient client, Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure) {
return new SqsCustomNackStrategy();
}
}
@Override
public Uni<Void> handle(SqsMessage<?> message, Metadata metadata, Throwable throwable) {
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
|
Configuration Reference
| Attribute (alias) |
Description |
Type |
Mandatory |
Default |
| ack-strategy |
The identifier for the bean implementing ack strategy factory. Strategies: 'delete', 'ignore' |
string |
false |
delete |
| ack.delete |
deprecated - Whether the acknowledgement deletes the message from the queue. Deprecated, use ack-strategy instead |
boolean |
false |
|
| credentials-provider |
The credential provider to be used in the client |
string |
false |
|
| endpoint-override |
The endpoint override |
string |
false |
|
| failure-strategy |
The identifier for the bean implementing failure strategy factory. Strategies: 'ignore', 'fail', 'visibility', 'delete' |
string |
false |
ignore |
| health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
true |
| max-number-of-messages |
The maximum number of messages to receive |
int |
false |
10 |
| queue |
The name of the SQS queue, defaults to channel name if not provided |
string |
false |
|
| queue.url |
The url of the SQS queue |
string |
false |
|
| receive.request.customizer |
The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided |
string |
false |
|
| receive.request.message-attribute-names |
The message attribute names to retrieve when receiving messages. |
string |
false |
|
| receive.request.pause.resume |
Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused. |
boolean |
false |
true |
| receive.request.retries |
If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. |
long |
false |
2147483647 |
| region |
The name of the SQS region |
string |
false |
|
| visibility-timeout |
The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request |
int |
false |
|
| wait-time-seconds |
The maximum amount of time in seconds to wait for messages to be received |
int |
false |
1 |