Skip to content

Sending AWS SQS Messages

The AWS SQS connector allows you to send messages to an AWS SQS queue.

Sending messages

Before you start, you need to have an AWS account and an SQS queue created. To send messages to an SQS queue, you need to create a method that produces messages to the queue.

Then, your application can send Message<String> to the prices channel. It can use String payloads as in the following snippet:

package sqs.outbound;

import java.time.Duration;
import java.util.UUID;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class SqsStringProducer {

    @Outgoing("data")
    public Multi<String> generate() {
        // It emits a UUID every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> UUID.randomUUID().toString());
    }

}

Or, you can send Message<Double>, which affords the opportunity to explicitly specify metadata on the outgoing message:

package sqs.outbound;

import java.time.Duration;
import java.util.UUID;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.aws.sqs.SqsOutboundMetadata;

@ApplicationScoped
public class SqsMessageStringProducer {

    @Outgoing("prices")
    public Multi<Message<String>> generate() {
        // It emits a UUID every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(UUID.randomUUID().toString(),
                        Metadata.of(SqsOutboundMetadata.builder()
                                .groupId("group-1")
                                .build())));
    }
}

Sending messages in batch

You can configure the outbound channel to send messages in batch of maximum 10 messages (AWS SQS limitation).

You can customize the size of batches, 10 being the default batch size, and the delay to wait for new messages to be added to the batch, 3000ms being the default delay:

1
2
3
4
5
mp.messaging.outgoing.prices.connector=smallrye-sqs
mp.messaging.outgoing.prices.queue=prices
mp.messaging.outgoing.prices.batch=true
mp.messaging.outgoing.prices.batch-size=5
mp.messaging.outgoing.prices.batch-delay=3000

Serialization

When sending a Message<T>, the connector converts the message into a AWS SQS Message. How the message is converted depends on the payload type:

  • If the payload is of type SendMessageRequest it is sent as is.
  • If the payload is of type SendMessageRequest.Builder, the queue url is set and sent.
  • If the payload is of type software.amazon.awssdk.services.sqs.model.Message it is usd to set the message body and attributes.
  • If the payload is of primitive types the paylaod is converted to String and the message attribute _classname is set to the class name of the payload.
  • If the payload is of any other object type, the payload is serialized (using the JsonMapping implementation discovered) and the message attribute _classname is set to the class name of the payload.

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

When sending Messages, you can add an instance of SqsOutboundMetadata to influence how the message is handled by AWS SQS. For example, you can configure the routing key, timestamp and headers:

1
2
3
4
5
6
7
8
final SqsOutboundMetadata metadata = SqsOutboundMetadata.builder()
        .messageAttributes(Map.of("my-attribute", MessageAttributeValue.builder()
                .dataType("String").stringValue("my-value").build()))
        .groupId("group-1")
        .build();

// Add `metadata` to the metadata of the outgoing message.
return Message.of("Hello", Metadata.of(metadata));

Acknowledgement

By default, the Reactive Messaging Message is acknowledged when the send message request is successful. If the message is not sent successfully, the message is nacked.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
batch When set, sends messages in batches of maximum 10 messages boolean false false
batch-delay In batch send mode, the maximum delay in milliseconds to wait for messages to be included in the batch int false 3000
batch-size In batch send mode, the maximum number of messages to include in batch, currently SQS maximum is 10 messages int false 10
credentials-provider The credential provider to be used in the client string false
endpoint-override The endpoint override string false
group.id When set, sends messages with the specified group id string false
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
queue The name of the SQS queue, defaults to channel name if not provided string false
queue.url The url of the SQS queue string false
region The name of the SQS region string false