Skip to content

Implement your own service discovery mechanism#

Stork is extensible, and you can implement your own service discovery mechanism.

Dependencies#

To implement your Service Discovery Provider, make sure your project depends on Core and Configuration Generator. The former brings classes necessary to implement custom discovery, the latter contains an annotation processor that generates classes needed by Stork.

<dependency>
    <groupId>io.smallrye.stork</groupId>
    <artifactId>stork-core</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>io.smallrye.stork</groupId>
    <artifactId>stork-configuration-generator</artifactId>
    <scope>provided</scope>
    <!-- provided scope is sufficient for the annotation processor -->
    <version>2.7.1</version>
</dependency>

Implementing a service discovery provider#

Service discovery implementation consists of three elements:

  • ServiceDiscovery which is responsible for locating service instances for a single Stork service.
  • ServiceDiscoveryProvider which creates instances of ServiceDiscovery for a given service discovery type.
  • $typeConfiguration which is a configuration for the discovery. This class is automatically generated during the compilation (using an annotation processor).

A type, for example, acme, identifies each provider. This type is used in the configuration to reference the provider:

stork.my-service.service-discovery.type=acme
quarkus.stork.my-service.service-discovery.type=acme

A ServiceDiscoveryProvider implementation needs to be annotated with @ServiceDiscoveryType that defines the type. Any configuration properties that the provider expects should be defined with @ServiceDiscoveryAttribute annotations placed on the provider. Optionally, you can also add @ApplicationScoped annotation in order to provide the service discovery implementation as CDI bean.

A service discovery provider class should look as follows:

package examples;

import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.config.ServiceConfig;
import io.smallrye.stork.api.config.ServiceDiscoveryAttribute;
import io.smallrye.stork.api.config.ServiceDiscoveryType;
import io.smallrye.stork.spi.StorkInfrastructure;
import io.smallrye.stork.spi.ServiceDiscoveryProvider;
import jakarta.enterprise.context.ApplicationScoped;

@ServiceDiscoveryType("acme")
@ServiceDiscoveryAttribute(name = "host",
        description = "Host name of the service discovery server.", required = true)
@ServiceDiscoveryAttribute(name = "port",
        description = "Hort of the service discovery server.", required = false)
@ApplicationScoped
public class AcmeServiceDiscoveryProvider
        implements ServiceDiscoveryProvider<AcmeConfiguration> {

    @Override
    public ServiceDiscovery createServiceDiscovery(
            AcmeConfiguration config,
            String serviceName,
            ServiceConfig serviceConfig,
            StorkInfrastructure storkInfrastructure) {
        return new AcmeServiceDiscovery(config);
    }
}

Note, that the ServiceDiscoveryProvider interface takes a configuration class as a parameter. This configuration class is generated automatically by the Configuration Generator. Its name is created by appending Configuration to the service discovery type, such as AcmeConfiguration.

The next step is to implement the ServiceDiscovery interface:

package examples;

import java.util.Collections;
import java.util.List;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.impl.DefaultServiceInstance;
import io.smallrye.stork.utils.ServiceInstanceIds;

public class AcmeServiceDiscovery implements ServiceDiscovery {

    private final String host;
    private final int port;

    public AcmeServiceDiscovery(AcmeConfiguration configuration) {
        this.host = configuration.getHost();
        this.port = Integer.parseInt(configuration.getPort());
    }

    @Override
    public Uni<List<ServiceInstance>> getServiceInstances() {
        // Proceed to the lookup...
        // Here, we just return a DefaultServiceInstance with the configured host and port
        // The last parameter specifies whether the communication with the instance should
        // happen over a secure connection
        DefaultServiceInstance instance =
                new DefaultServiceInstance(ServiceInstanceIds.next(), host, port, false);
        return Uni.createFrom().item(() -> Collections.singletonList(instance));
    }
}

This implementation is simplistic. Typically, instead of creating a service instance with values from the configuration, you would connect to a service discovery backend, look for the service and build the list of service instance accordingly. That’s why the method returns a Uni. Most of the time, the lookup is a remote operation.

As you can see, the AcmeConfiguration class gives access to the configuration attribute.

Using your service discovery#

In the project using it, don’t forget to add the dependency on the module providing your implementation. Then, in the configuration, just add:

stork.my-service.service-discovery.type=acme
stork.my-service.service-discovery.host=localhost
stork.my-service.service-discovery.port=1234
quarkus.stork.my-service.service-discovery.type=acme
quarkus.stork.my-service.service-discovery.host=localhost
quarkus.stork.my-service.service-discovery.port=1234

Then, Stork will use your implementation to locate the my-service service.

Using your service discovery using the programmatic API#

When building your service discovery project, the configuration generator creates a configuration class. This class can be used to configure your service discovery using the Stork programmatic API.

package examples;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceDefinition;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.api.StorkServiceRegistry;

public class AcmeDiscoveryApiUsage {

    public void example(StorkServiceRegistry stork) {
        stork.defineIfAbsent("my-service", ServiceDefinition.of(
                new AcmeConfiguration().withHost("my-host"))
        );

        Uni<ServiceInstance> uni = stork.getService("my-service").selectInstance();
    }

}

Remember that attributes, like host, are declared using the @ServiceDiscoveryAttribute annotation on the ServiceDiscoveryProvider implementation.

Caching the service instances#

Your ServiceDiscovery implementation can extend io.smallrye.stork.impl.CachingServiceDiscovery to automatically cache the service instances. In this case, the retrieved set of ServiceInstance is cached and only updated after some time. This duration is an additional configuration attribute. For homogeneity, we recommend the following attribute:

@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", 
        defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL)

The following snippet extends the acme service discovery with the refresh-period attribute:

package examples;

import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.config.ServiceConfig;
import io.smallrye.stork.api.config.ServiceDiscoveryAttribute;
import io.smallrye.stork.api.config.ServiceDiscoveryType;
import io.smallrye.stork.impl.CachingServiceDiscovery;
import io.smallrye.stork.spi.ServiceDiscoveryProvider;
import io.smallrye.stork.spi.StorkInfrastructure;
import jakarta.enterprise.context.ApplicationScoped;

@ServiceDiscoveryType("cached-acme")
@ServiceDiscoveryAttribute(name = "host",
        description = "Host name of the service discovery server.", required = true)
@ServiceDiscoveryAttribute(name = "port",
        description = "Hort of the service discovery server.", required = false)
@ServiceDiscoveryAttribute(name = "refresh-period",
        description = "Service discovery cache refresh period.",
        defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL)
@ApplicationScoped
public class CachedAcmeServiceDiscoveryProvider
        implements ServiceDiscoveryProvider<CachedAcmeConfiguration> {

    @Override
    public ServiceDiscovery createServiceDiscovery(
            CachedAcmeConfiguration config,
            String serviceName,
            ServiceConfig serviceConfig,
            StorkInfrastructure storkInfrastructure) {
        return new CachedAcmeServiceDiscovery(config);
    }
}

Extending io.smallrye.stork.impl.CachingServiceDiscovery changes the structure of the service discovery implementation:

package examples;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.impl.CachingServiceDiscovery;
import io.smallrye.stork.impl.DefaultServiceInstance;
import io.smallrye.stork.utils.ServiceInstanceIds;

import java.util.Collections;
import java.util.List;

public class CachedAcmeServiceDiscovery extends CachingServiceDiscovery {

    private final String host;
    private final int port;

    public CachedAcmeServiceDiscovery(CachedAcmeConfiguration configuration) {
        super(configuration.getRefreshPeriod()); // (1)
        this.host = configuration.getHost();
        this.port = Integer.parseInt(configuration.getPort());
    }

    @Override  // (2)
    public Uni<List<ServiceInstance>> fetchNewServiceInstances(List<ServiceInstance> previousInstances) {
        // Retrieve services...
        DefaultServiceInstance instance =
                new DefaultServiceInstance(ServiceInstanceIds.next(), host, port, false);
        return Uni.createFrom().item(() -> Collections.singletonList(instance));
    }
}
  1. Call the super constructor with the refresh-period value
  2. Implement fetchNewServiceInstances instead of getServiceInstances. The method is called periodically, and the retrieved instances are cached. This implementation is simplistic.

If the retrieval fails, the error is reported, and Stork keeps the previously retrieved list of instances.

Customizing the caching strategy#

Sometimes it can be useful to change this behaviour and customize the cache expiration strategy.

For example, imagine you are using a backend service discovery where service instances can change very frequently.

Moreover, contacting the backend service discovery can be expensive in terms of computing, thus finding a good value for the refreshing time can be mission impossible.

For these situations, Stork allows to implement a better expiration strategy for the cache.

If you want to customize the expiration strategy, you need: 1. Implement the cache method where the expiration strategy should be defined. 2. Invalidate the cache when expiration condition evaluates to true.

Look at the example bellow:

package examples;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.impl.CachingServiceDiscovery;
import io.smallrye.stork.impl.DefaultServiceInstance;
import io.smallrye.stork.utils.ServiceInstanceIds;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class CustomExpirationCachedAcmeServiceDiscovery extends CachingServiceDiscovery {

    private final String host;
    private final int port;

    private AtomicBoolean invalidated = new AtomicBoolean();

    public CustomExpirationCachedAcmeServiceDiscovery(CachedAcmeConfiguration configuration) {
        super(configuration.getRefreshPeriod());
        this.host = configuration.getHost();
        this.port = Integer.parseInt(configuration.getPort());
    }

    @Override
    public Uni<List<ServiceInstance>> fetchNewServiceInstances(List<ServiceInstance> previousInstances) {
        // Retrieve services...
        DefaultServiceInstance instance =
                new DefaultServiceInstance(ServiceInstanceIds.next(), host, port, false);
        return Uni.createFrom().item(() -> Collections.singletonList(instance));
    }

    @Override
    public Uni<List<ServiceInstance>> cache(Uni<List<ServiceInstance>> uni) {
        return uni.memoize().until(() -> invalidated.get());
    }

    //command-based cache invalidation: user triggers the action to invalidate the cache.
    public void invalidate() {
        invalidated.set(true);
    }

}

Additionally, you can check the Kubernetes Service Discovery for further details about an event-based invalidation example.