Skip to content

RabbitMQ Request/Reply

Experimental

RabbitMQ Request Reply Emitter is an experimental feature.

The RabbitMQ Request-Reply pattern allows you to publish a message to a RabbitMQ address and then await for a reply message that responds to the initial request, which can be used to implement RPC over RabbitMQ.

Given the following request/reply outgoing configuration example:

1
2
3
4
mp.messaging.outgoing.my-request.connector=smallrye-rabbitmq
mp.messaging.outgoing.my-request.exchange.name=rpc
mp.messaging.outgoing.my-request.exchange.type=direct
mp.messaging.outgoing.my-request.default-routing-key=rpc-request

The RabbitMQRequestReply emitter implements the requestor (or the client) of the request-reply pattern for RabbitMQ outbound channels:

package rabbitmq.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.rabbitmq.reply.RabbitMQRequestReply;

@ApplicationScoped
public class RabbitMQRequestReplyEmitter {

    @Inject
    @Channel("my-request")
    RabbitMQRequestReply<String, Integer> quoteRequest;

    public Uni<Integer> requestQuote(String request) {
        return quoteRequest.request(request);
    }
}

Note

While in this example we use a direct exchange, it is also possible to use a topic or fanout exchange as well as Local Random Exchanges for request/reply.

The request method publishes the request message to the configured target address of the outgoing channel leveraging RabbitMQ Direct Reply-To, and waits for a reply. When the reply is received the returned Uni is completed with the message payload.

The request send operation generates a correlation id and sets the correlationId property, which it expects to be sent back in the reply message.

The replier (or the server) can be implemented using a Reactive Messaging processor:

1
2
3
4
5
6
7
mp.messaging.incoming.request.connector=smallrye-rabbitmq
mp.messaging.incoming.request.exchange.name=rpc
mp.messaging.incoming.request.exchange.type=direct
mp.messaging.incoming.request.routing-keys=rpc-request

mp.messaging.outgoing.reply.connector=smallrye-rabbitmq
mp.messaging.outgoing.reply.exchange.name=""
package rabbitmq.outbound;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class RabbitMQReplier {

    Random rand = new Random();

    @Incoming("request")
    @Outgoing("reply")
    String handleRequest(String request) {
        return request + "-" + rand.nextInt(100);
    }
}

Requesting with Message types

Like the core Emitter's send methods, request method also can receive a Message type and return a message:

package rabbitmq.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.rabbitmq.reply.RabbitMQRequestReply;

@ApplicationScoped
public class RabbitMQRequestReplyMessageEmitter {

    @Inject
    @Channel("my-request")
    RabbitMQRequestReply<String, Integer> quoteRequest;

    public Uni<Message<Integer>> requestMessage(String request) {
        return quoteRequest.request(Message.of(request));
    }
}

Note

The ingested reply type of the RabbitMQRequestReply is discovered at runtime, in order to configure a MessageConverter to be applied on the incoming message before returning the Uni result.

Requesting multiple replies

You can use the requestMulti method to expect any number of replies represented by the Multi return type.

For example this can be used to aggregate multiple replies to a single request.

package rabbitmq.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.rabbitmq.reply.RabbitMQRequestReply;

@ApplicationScoped
public class RabbitMQRequestReplyMultiEmitter {

    @Inject
    @Channel("my-request")
    RabbitMQRequestReply<String, Integer> quoteRequest;

    public Multi<Integer> requestQuote(String request) {
        return quoteRequest.requestMulti(request).select().first(5);
    }
}
Like the other request you can also request Message types.

Note

The channel attribute reply.timeout will be applied between each message, if reached the returned Multi will fail.

Scaling Request/Reply

If multiple requestor instances are configured on the same outgoing address, and the same reply address, each requestor instance will receive replies of all instances. If an observed correlation id doesn't match the id of any pending replies, the reply is simply discarded. With the additional network traffic this allows scaling requestors, (and repliers) dynamically.

Pending replies and reply timeout

By default, the Uni returned from the request method is configured to fail with timeout exception if no replies is received after 5 seconds. This timeout is configurable with the channel attribute reply.timeout.

A snapshot of the list of pending replies is available through the RabbitMQRequestReply#getPendingReplies method.

Correlation Ids

The RabbitMQ Request/Reply allows configuring the correlation id mechanism completely through a CorrelationIdHandler implementation. The default handler is based on randomly generated UUID strings. The correlation id handler implementation can be configured using the reply.correlation-id.handler attribute. As mentioned the default configuration is uuid.

Custom handlers can be implemented by proposing a CDI-managed bean with @Identifier qualifier.

Reply Error Handling

If the reply server produces an error and can or would like to propagate the error back to the requestor, failing the returned Uni.

If configured using the reply.failure.handler channel attribute, the ReplyFailureHandler implementations are discovered through CDI, matching the @Identifier qualifier.

A sample reply error handler can lookup header values and return the error to be thrown by the reply:

package rabbitmq.outbound;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage;
import io.smallrye.reactive.messaging.rabbitmq.reply.ReplyFailureHandler;

@ApplicationScoped
@Identifier("my-reply-error")
public class MyRabbitMQReplyFailureHandler implements ReplyFailureHandler {

    @Override
    public Throwable handleReply(IncomingRabbitMQMessage<?> replyMessage) {
        var error = (String) replyMessage.getHeaders().get("REPLY_ERROR");
        if (error != null) {
            return (Throwable) new IllegalArgumentException(error);
        }
        return null;
    }
}

null return value indicates that no error has been found in the reply, and it can be delivered to the application.