Implement your own service discovery mechanism
Stork is extensible, and you can implement your own service discovery mechanism.
Dependencies
To implement your Service Discovery Provider, make sure your project depends on Core and Configuration Generator.
The former brings classes necessary to implement custom discovery, the latter contains an annotation processor that generates classes needed by Stork.
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-core</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-configuration-generator</artifactId>
<scope>provided</scope>
<!-- provided scope is sufficient for the annotation processor -->
<version>2.7.1</version>
</dependency>
Implementing a service discovery provider
Service discovery implementation consists of three elements:
ServiceDiscovery
which is responsible for locating service instances for a single Stork service.
ServiceDiscoveryProvider
which creates instances of ServiceDiscovery
for a given service discovery type.
$typeConfiguration
which is a configuration for the discovery. This class is automatically generated during the compilation (using an annotation processor).
A type, for example, acme
, identifies each provider.
This type is used in the configuration to reference the provider:
A ServiceDiscoveryProvider
implementation needs to be annotated with @ServiceDiscoveryType
that defines the type.
Any configuration properties that the provider expects should be defined with @ServiceDiscoveryAttribute
annotations placed on the provider.
Optionally, you can also add @ApplicationScoped
annotation in order to provide the service discovery implementation as CDI bean.
A service discovery provider class should look as follows:
| package examples;
import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.config.ServiceConfig;
import io.smallrye.stork.api.config.ServiceDiscoveryAttribute;
import io.smallrye.stork.api.config.ServiceDiscoveryType;
import io.smallrye.stork.spi.StorkInfrastructure;
import io.smallrye.stork.spi.ServiceDiscoveryProvider;
import jakarta.enterprise.context.ApplicationScoped;
@ServiceDiscoveryType("acme")
@ServiceDiscoveryAttribute(name = "host",
description = "Host name of the service discovery server.", required = true)
@ServiceDiscoveryAttribute(name = "port",
description = "Hort of the service discovery server.", required = false)
@ApplicationScoped
public class AcmeServiceDiscoveryProvider
implements ServiceDiscoveryProvider<AcmeConfiguration> {
@Override
public ServiceDiscovery createServiceDiscovery(
AcmeConfiguration config,
String serviceName,
ServiceConfig serviceConfig,
StorkInfrastructure storkInfrastructure) {
return new AcmeServiceDiscovery(config);
}
}
|
Note, that the ServiceDiscoveryProvider
interface takes a configuration class as a parameter. This configuration class
is generated automatically by the Configuration Generator.
Its name is created by appending Configuration
to the service discovery type, such as AcmeConfiguration
.
The next step is to implement the ServiceDiscovery
interface:
| package examples;
import java.util.Collections;
import java.util.List;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.impl.DefaultServiceInstance;
import io.smallrye.stork.utils.ServiceInstanceIds;
public class AcmeServiceDiscovery implements ServiceDiscovery {
private final String host;
private final int port;
public AcmeServiceDiscovery(AcmeConfiguration configuration) {
this.host = configuration.getHost();
this.port = Integer.parseInt(configuration.getPort());
}
@Override
public Uni<List<ServiceInstance>> getServiceInstances() {
// Proceed to the lookup...
// Here, we just return a DefaultServiceInstance with the configured host and port
// The last parameter specifies whether the communication with the instance should
// happen over a secure connection
DefaultServiceInstance instance =
new DefaultServiceInstance(ServiceInstanceIds.next(), host, port, false);
return Uni.createFrom().item(() -> Collections.singletonList(instance));
}
}
|
This implementation is simplistic.
Typically, instead of creating a service instance with values from the configuration, you would connect to a service discovery backend, look for the service and build the list of service instance accordingly.
That’s why the method returns a Uni
.
Most of the time, the lookup is a remote operation.
As you can see, the AcmeConfiguration
class gives access to the configuration attribute.
Using your service discovery
In the project using it, don’t forget to add the dependency on the module providing your implementation.
Then, in the configuration, just add:
Then, Stork will use your implementation to locate the my-service
service.
Using your service discovery using the programmatic API
When building your service discovery project, the configuration generator creates a configuration class.
This class can be used to configure your service discovery using the Stork programmatic API.
| package examples;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceDefinition;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.api.StorkServiceRegistry;
public class AcmeDiscoveryApiUsage {
public void example(StorkServiceRegistry stork) {
stork.defineIfAbsent("my-service", ServiceDefinition.of(
new AcmeConfiguration().withHost("my-host"))
);
Uni<ServiceInstance> uni = stork.getService("my-service").selectInstance();
}
}
|
Remember that attributes, like host
, are declared using the @ServiceDiscoveryAttribute
annotation on the ServiceDiscoveryProvider
implementation.
Caching the service instances
Your ServiceDiscovery
implementation can extend io.smallrye.stork.impl.CachingServiceDiscovery
to automatically cache the service instances.
In this case, the retrieved set of ServiceInstance
is cached and only updated after some time.
This duration is an additional configuration attribute.
For homogeneity, we recommend the following attribute:
@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.",
defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL)
The following snippet extends the acme service discovery with the refresh-period
attribute:
| package examples;
import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.config.ServiceConfig;
import io.smallrye.stork.api.config.ServiceDiscoveryAttribute;
import io.smallrye.stork.api.config.ServiceDiscoveryType;
import io.smallrye.stork.impl.CachingServiceDiscovery;
import io.smallrye.stork.spi.ServiceDiscoveryProvider;
import io.smallrye.stork.spi.StorkInfrastructure;
import jakarta.enterprise.context.ApplicationScoped;
@ServiceDiscoveryType("cached-acme")
@ServiceDiscoveryAttribute(name = "host",
description = "Host name of the service discovery server.", required = true)
@ServiceDiscoveryAttribute(name = "port",
description = "Hort of the service discovery server.", required = false)
@ServiceDiscoveryAttribute(name = "refresh-period",
description = "Service discovery cache refresh period.",
defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL)
@ApplicationScoped
public class CachedAcmeServiceDiscoveryProvider
implements ServiceDiscoveryProvider<CachedAcmeConfiguration> {
@Override
public ServiceDiscovery createServiceDiscovery(
CachedAcmeConfiguration config,
String serviceName,
ServiceConfig serviceConfig,
StorkInfrastructure storkInfrastructure) {
return new CachedAcmeServiceDiscovery(config);
}
}
|
Extending io.smallrye.stork.impl.CachingServiceDiscovery
changes the structure of the service discovery implementation:
| package examples;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.impl.CachingServiceDiscovery;
import io.smallrye.stork.impl.DefaultServiceInstance;
import io.smallrye.stork.utils.ServiceInstanceIds;
import java.util.Collections;
import java.util.List;
public class CachedAcmeServiceDiscovery extends CachingServiceDiscovery {
private final String host;
private final int port;
public CachedAcmeServiceDiscovery(CachedAcmeConfiguration configuration) {
super(configuration.getRefreshPeriod()); // (1)
this.host = configuration.getHost();
this.port = Integer.parseInt(configuration.getPort());
}
@Override // (2)
public Uni<List<ServiceInstance>> fetchNewServiceInstances(List<ServiceInstance> previousInstances) {
// Retrieve services...
DefaultServiceInstance instance =
new DefaultServiceInstance(ServiceInstanceIds.next(), host, port, false);
return Uni.createFrom().item(() -> Collections.singletonList(instance));
}
}
|
- Call the
super
constructor with the refresh-period
value
- Implement
fetchNewServiceInstances
instead of getServiceInstances
.
The method is called periodically, and the retrieved instances are cached.
This implementation is simplistic.
If the retrieval fails, the error is reported, and Stork keeps the previously retrieved list of instances.
Customizing the caching strategy
Sometimes it can be useful to change this behaviour and customize the cache expiration strategy.
For example, imagine you are using a backend service discovery where service instances can change very frequently.
Moreover, contacting the backend service discovery can be expensive in terms of computing,
thus finding a good value for the refreshing time can be mission impossible.
For these situations, Stork allows to implement a better expiration strategy for the cache.
If you want to customize the expiration strategy, you need:
1. Implement the cache
method where the expiration strategy should be defined.
2. Invalidate the cache when expiration condition evaluates to true.
Look at the example bellow:
| package examples;
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.impl.CachingServiceDiscovery;
import io.smallrye.stork.impl.DefaultServiceInstance;
import io.smallrye.stork.utils.ServiceInstanceIds;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class CustomExpirationCachedAcmeServiceDiscovery extends CachingServiceDiscovery {
private final String host;
private final int port;
private AtomicBoolean invalidated = new AtomicBoolean();
public CustomExpirationCachedAcmeServiceDiscovery(CachedAcmeConfiguration configuration) {
super(configuration.getRefreshPeriod());
this.host = configuration.getHost();
this.port = Integer.parseInt(configuration.getPort());
}
@Override
public Uni<List<ServiceInstance>> fetchNewServiceInstances(List<ServiceInstance> previousInstances) {
// Retrieve services...
DefaultServiceInstance instance =
new DefaultServiceInstance(ServiceInstanceIds.next(), host, port, false);
return Uni.createFrom().item(() -> Collections.singletonList(instance));
}
@Override
public Uni<List<ServiceInstance>> cache(Uni<List<ServiceInstance>> uni) {
return uni.memoize().until(() -> invalidated.get());
}
//command-based cache invalidation: user triggers the action to invalidate the cache.
public void invalidate() {
invalidated.set(true);
}
}
|
Additionally, you can check the Kubernetes Service Discovery for further details about an event-based invalidation example.