Receiving AWS SQS Messages
The AWS SQS connector allows you to receive messages from an AWS SQS queue.
Receiving messages
Before you start, you need to have an AWS account and a SQS queue created. To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.
Then, your application receives Message<String>
.
You can consume the payload directly:
Or, you can retrieve the Message<String>
:
You also can directly consume the software.amazon.awssdk.services.sqs.model.Message
:
Receive message request customizer
The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of
SqsReceiveMessageRequestCustomizer
and configuring it's identifier using the receive.request.customizer
connector attribute.
Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.
Receive message request pause and resume
The AWS SQS connector fetches messages by continuously sending receive message requests. If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.
The pause resume can be disabled using the receive.request.pause.resume
connector attribute.
Deserialization
The connector converts incoming SQS Messages into Reactive Messaging Message<T>
instances.
The payload type T
is determined based on the value of the SQS message attribute _classname
.
If you send messages with the AWS SQS connector (outbound connector),
the _classname
attribute is automatically added to the message.
The primitive types are transformed from the string representation to the corresponding Java type.
For objects, if one of the JsonMapping
modules is present on the classpath,
the connector used that JSON module to deserialize the message body to objects.
If the _classname
attribute is not present, the payload is deserialized as a String
.
Inbound Metadata
Messages coming from SQS contain an instance of SqsIncomingMetadata in the metadata.
SQS message attributes can be accessed from the metadata either by name or by the MessageAttributeValue
object.
Acknowledgement
The default strategy for acknowledging AWS SQS Message is to delete the message from the queue.
With ack.delete
set to false
, the message is not deleted from the queue.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
ack.delete | Whether the acknowledgement deletes the message from the queue | boolean | false | true |
credentials-provider | The credential provider to be used in the client | string | false | |
endpoint-override | The endpoint override | string | false | |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
max-number-of-messages | The maximum number of messages to receive | int | false | 10 |
queue | The name of the SQS queue, defaults to channel name if not provided | string | false | |
queue.url | The url of the SQS queue | string | false | |
receive.request.customizer | The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided | string | false | |
receive.request.message-attribute-names | The message attribute names to retrieve when receiving messages. | string | false | |
receive.request.pause.resume | Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused. | boolean | false | true |
receive.request.retries | If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. | long | false | 2147483647 |
region | The name of the SQS region | string | false | |
visibility-timeout | The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request | int | false | |
wait-time-seconds | The maximum amount of time in seconds to wait for messages to be received | int | false | 1 |