Skip to content

Sending AWS SQS Messages

The AWS SQS connector allows you to send messages to an AWS SQS queue.

Sending messages

Before you start, you need to have an AWS account and an SQS queue created. To send messages to an SQS queue, you need to create a method that produces messages to the queue.

Then, your application can send Message<String> to the prices channel. It can use String payloads as in the following snippet:

package sqs.outbound;

import java.time.Duration;
import java.util.UUID;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class SqsStringProducer {

    @Outgoing("data")
    public Multi<String> generate() {
        // It emits a UUID every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> UUID.randomUUID().toString());
    }

}

Or, you can send Message<Double>, which affords the opportunity to explicitly specify metadata on the outgoing message:

package sqs.outbound;

import java.time.Duration;
import java.util.UUID;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.aws.sqs.SqsOutboundMetadata;

@ApplicationScoped
public class SqsMessageStringProducer {

    @Outgoing("prices")
    public Multi<Message<String>> generate() {
        // It emits a UUID every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(UUID.randomUUID().toString(),
                        Metadata.of(SqsOutboundMetadata.builder()
                                .groupId("group-1")
                                .build())));
    }
}

Serialization

When sending a Message<T>, the connector converts the message into a AWS SQS Message. How the message is converted depends on the payload type:

  • If the payload is of type SendMessageRequest it is sent as is.
  • If the payload is of type SendMessageRequest.Builder, the queue url is set and sent.
  • If the payload is of type software.amazon.awssdk.services.sqs.model.Message it is usd to set the message body and attributes.
  • If the payload is of primitive types the paylaod is converted to String and the message attribute _classname is set to the class name of the payload.
  • If the payload is of any other object type, the payload is serialized (using the JsonMapping implementation discovered) and the message attribute _classname is set to the class name of the payload.

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

When sending Messages, you can add an instance of SqsOutboundMetadata to influence how the message is handled by AWS SQS. For example, you can configure the routing key, timestamp and headers:

1
2
3
4
5
6
7
8
final SqsOutboundMetadata metadata = SqsOutboundMetadata.builder()
        .messageAttributes(Map.of("my-attribute", MessageAttributeValue.builder()
                .dataType("String").stringValue("my-value").build()))
        .groupId("group-1")
        .build();

// Add `metadata` to the metadata of the outgoing message.
return Message.of("Hello", Metadata.of(metadata));

Acknowledgement

By default, the Reactive Messaging Message is acknowledged when the send message request is successful. If the message is not sent successfully, the message is nacked.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
credentials-provider The credential provider to be used in the client string false
endpoint-override The endpoint override string false
group.id When set, sends messages with the specified group id string false
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
queue The name of the SQS queue, defaults to channel name if not provided string false
queue.url The url of the SQS queue string false
region The name of the SQS region string false