Skip to content

Observability API

Important

Observability API is experimental and SmallRye only feature.

Smallrye Reactive Messaging proposes an observability API that allows to observe messages received and send through inbound and outbound channels.

For any observation to happen, you need to provide an implementation of the MessageObservationCollector, discovered as a CDI-managed bean.

At wiring time the discovered MessageObservationCollector implementation initObservation method is called once per channel to initialize the ObservationContext. The default initObservation implementation returns a default ObservationContext object, but the collector implementation can provide a custom per-channel ObservationContext object that'll hold information necessary for the observation. The ObservationContext#complete method is called each time a message observation is completed – message being acked or nacked. The collector implementation can decide at initialization time to disable the observation per channel by returning a null observation context.

For each new message, the collector is on onNewMessage method with the channel name, the Message and the ObservationContext object initialized beforehand. This method can react to the creation of a new message but also is responsible for instantiating and returning a MessageObservation. While custom implementations can augment the observability capability, SmallRye Reactive Messaging provides a default implementation DefaultMessageObservation.

So a simple observability collector can be implemented as such:

package observability;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class SimpleMessageObservationCollector implements MessageObservationCollector<ObservationContext> {

    @Override
    public MessageObservation onNewMessage(String channel, Message<?> message, ObservationContext ctx) {
        // Called after message has been created
        return new DefaultMessageObservation(channel);
    }

}

A collector with a custom ObservationContext can be implemented as such :

package observability;

import java.time.Duration;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class ContextMessageObservationCollector
        implements MessageObservationCollector<ContextMessageObservationCollector.MyContext> {

    @Override
    public MyContext initObservation(String channel, boolean incoming, boolean emitter) {
        // Called on observation setup, per channel
        // if returned null the observation for that channel is disabled
        return new MyContext(channel, incoming, emitter);
    }

    @Override
    public MessageObservation onNewMessage(String channel, Message<?> message, MyContext ctx) {
        // Called after message has been created
        return new DefaultMessageObservation(channel);
    }

    public static class MyContext implements ObservationContext {

        private final String channel;
        private final boolean incoming;
        private final boolean emitter;

        public MyContext(String channel, boolean incoming, boolean emitter) {
            this.channel = channel;
            this.incoming = incoming;
            this.emitter = emitter;
        }

        @Override
        public void complete(MessageObservation observation) {
            // called after message processing has completed and observation is done
            // register duration
            Duration duration = observation.getCompletionDuration();
            Throwable reason = observation.getReason();
            if (reason != null) {
                // message was nacked
            } else {
                // message was acked successfully
            }
        }
    }

}