Observability API
Important
Observability API is experimental and SmallRye only feature.
Smallrye Reactive Messaging proposes an observability API that allows to observe messages received and send through inbound and outbound channels.
For any observation to happen, you need to provide an implementation of the MessageObservationCollector
, discovered as a CDI-managed bean.
At wiring time the discovered MessageObservationCollector
implementation initObservation
method is called once per channel to initialize the ObservationContext
.
The default initObservation
implementation returns a default ObservationContext
object,
but the collector implementation can provide a custom per-channel ObservationContext
object that'll hold information necessary for the observation.
The ObservationContext#complete
method is called each time a message observation is completed – message being acked or nacked.
The collector implementation can decide at initialization time to disable the observation per channel by returning a null
observation context.
For each new message, the collector is on onNewMessage
method with the channel name, the Message
and the ObservationContext
object initialized beforehand.
This method can react to the creation of a new message but also is responsible for instantiating and returning a MessageObservation
.
While custom implementations can augment the observability capability, SmallRye Reactive Messaging provides a default implementation DefaultMessageObservation
.
So a simple observability collector can be implemented as such:
| package observability;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;
@ApplicationScoped
public class SimpleMessageObservationCollector implements MessageObservationCollector<ObservationContext> {
@Override
public MessageObservation onNewMessage(String channel, Message<?> message, ObservationContext ctx) {
// Called after message has been created
return new DefaultMessageObservation(channel);
}
}
|
A collector with a custom ObservationContext
can be implemented as such :
| package observability;
import java.time.Duration;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;
@ApplicationScoped
public class ContextMessageObservationCollector
implements MessageObservationCollector<ContextMessageObservationCollector.MyContext> {
@Override
public MyContext initObservation(String channel, boolean incoming, boolean emitter) {
// Called on observation setup, per channel
// if returned null the observation for that channel is disabled
return new MyContext(channel, incoming, emitter);
}
@Override
public MessageObservation onNewMessage(String channel, Message<?> message, MyContext ctx) {
// Called after message has been created
return new DefaultMessageObservation(channel);
}
public static class MyContext implements ObservationContext {
private final String channel;
private final boolean incoming;
private final boolean emitter;
public MyContext(String channel, boolean incoming, boolean emitter) {
this.channel = channel;
this.incoming = incoming;
this.emitter = emitter;
}
@Override
public void complete(MessageObservation observation) {
// called after message processing has completed and observation is done
// register duration
Duration duration = observation.getCompletionDuration();
Throwable reason = observation.getReason();
if (reason != null) {
// message was nacked
} else {
// message was acked successfully
}
}
}
}
|