Receiving AWS SQS Messages
The AWS SQS connector allows you to receive messages from an AWS SQS queue.
Receiving messages
Before you start, you need to have an AWS account and a SQS queue created. To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.
Then, your application receives Message<String>
.
You can consume the payload directly:
Or, you can retrieve the Message<String>
:
You also can directly consume the software.amazon.awssdk.services.sqs.model.Message
:
Receive message request customizer
The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of
SqsReceiveMessageRequestCustomizer
and configuring it's identifier using the receive.request.customizer
connector attribute.
Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.
Receive message request pause and resume
The AWS SQS connector fetches messages by continuously sending receive message requests. If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.
The pause resume can be disabled using the receive.request.pause.resume
connector attribute.
Deserialization
The connector converts incoming SQS Messages into Reactive Messaging Message<T>
instances.
The payload type T
is determined based on the value of the SQS message attribute _classname
.
If you send messages with the AWS SQS connector (outbound connector),
the _classname
attribute is automatically added to the message.
The primitive types are transformed from the string representation to the corresponding Java type.
For objects, if one of the JsonMapping
modules is present on the classpath,
the connector used that JSON module to deserialize the message body to objects.
If the _classname
attribute is not present, the payload is deserialized as a String
.
Inbound Metadata
Messages coming from SQS contain an instance of SqsIncomingMetadata in the metadata.
SQS message attributes can be accessed from the metadata either by name or by the MessageAttributeValue
object.
Acknowledgement Strategies
The default strategy for acknowledging AWS SQS Message is to delete the message from the queue.
You can set the ack-strategy
attribute to ignore
if you want to ignore the message.
[NOTE] Deprecated
ack.delete
attribute is deprecated and will be removed in a future release.
You can implement a custom strategy by implementing the SqsAckHandler,
interface with a Factory
class and registering it as a CDI bean with an @Identifier
.
Failure Strategies
The default strategy for handling message processing failures is ignore
.
It lets the visibility timeout of the message consumer to expire and reconsume the message.
Other possible strategies are:
fail
: the failure is logged and the channel fail-stops.delete
: the message is removed from the queue.visibility
: the message visibility timeout is reset to 0.
You can implement a custom strategy by implementing the SqsFailureHandler,
interface with a Factory
class and registering it as a CDI bean with an @Identifier
.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
ack-strategy | The identifier for the bean implementing ack strategy factory. Strategies: 'delete', 'ignore' | string | false | delete |
ack.delete | deprecated - Whether the acknowledgement deletes the message from the queue. Deprecated, use ack-strategy instead | boolean | false | |
credentials-provider | The credential provider to be used in the client | string | false | |
endpoint-override | The endpoint override | string | false | |
failure-strategy | The identifier for the bean implementing failure strategy factory. Strategies: 'ignore', 'fail', 'visibility', 'delete' | string | false | ignore |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
max-number-of-messages | The maximum number of messages to receive | int | false | 10 |
queue | The name of the SQS queue, defaults to channel name if not provided | string | false | |
queue.url | The url of the SQS queue | string | false | |
receive.request.customizer | The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided | string | false | |
receive.request.message-attribute-names | The message attribute names to retrieve when receiving messages. | string | false | |
receive.request.pause.resume | Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused. | boolean | false | true |
receive.request.retries | If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. | long | false | 2147483647 |
region | The name of the SQS region | string | false | |
visibility-timeout | The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request | int | false | |
wait-time-seconds | The maximum amount of time in seconds to wait for messages to be received | int | false | 1 |