Quarkus - Using Apache Kafka Streams

This guide demonstrates how your Quarkus application can utilize the Apache Kafka Streams API to implement stream processing applications based on Apache Kafka.

Prerequisites

To complete this guide, you need:

  • less than 30 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.6.2+

  • Docker Compose to start an Apache Kafka development cluster

  • GraalVM installed if you want to run in native mode.

It is recommended, that you have read the Kafka quickstart before.

The Quarkus extension for Kafka Streams allows for very fast turnaround times during development by supporting the Quarkus Dev Mode (e.g. via ./mvnw compile quarkus:dev). After changing the code of your Kafka Streams topology, the application will automatically be reloaded when the next input message arrives.

A recommended development set-up is to have some producer which creates test messages on the processed topic(s) in fixed intervals, e.g. every second and observe the streaming application’s output topic(s) using a tool such as kafkacat. Using the dev mode, you’ll instantly see messages on the output topic(s) as produced by the latest version of your streaming application when saving.

For the best development experience, we recommend applying the following configuration settings to your Kafka broker:

  1. group.min.session.timeout.ms=250

Also specify the following settings in your Quarkus application.properties:

  1. kafka-streams.consumer.session.timeout.ms=250
  2. kafka-streams.consumer.heartbeat.interval.ms=200

Together, these settings will ensure that the application can very quickly reconnect to the broker after being restarted in dev mode.

Architecture

In this guide, we are going to generate (random) temperature values in one component (named generator). These values are associated to given weather stations and are written in a Kafka topic (temperature-values). Another topic (weather-stations) contains just the master data about the weather stations themselves (id and name).

A second component (aggregator) reads from the two Kafka topics and processes them in a streaming pipeline:

  • the two topics are joined on weather station id

  • per weather station the min, max and average temperature is determined

  • this aggregated data is written out to a third topic (temperatures-aggregated)

The data can be examined by inspecting the output topic. By exposing a Kafka Streams interactive query, the latest result for each weather station can alternatively be obtained via a simple REST query.

The overall architecture looks like so:

Architecture

Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone [https://github.com/quarkusio/quarkus-quickstarts.git](https://github.com/quarkusio/quarkus-quickstarts.git), or download an archive.

The solution is located in the kafka-streams-quickstart directory.

Creating the Producer Maven Project

First, we need a new project with the temperature value producer. Create a new project with the following command:

  1. mvn io.quarkus:quarkus-maven-plugin:1.7.6.Final:create \
  2. -DprojectGroupId=org.acme \
  3. -DprojectArtifactId=kafka-streams-quickstart-producer \
  4. -Dextensions="kafka" \
  5. && mv kafka-streams-quickstart-producer producer

This command generates a Maven project, importing the Reactive Messaging and Kafka connector extensions.

If you already have your Quarkus project configured, you can add the smallrye-reactive-messaging-kafka extension to your project by running the following command in your project base directory:

  1. ./mvnw quarkus:add-extension -Dextensions="kafka"

This will add the following to your pom.xml:

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
  4. </dependency>

The Temperature Value Producer

Create the producer/src/main/java/org/acme/kafka/streams/producer/generator/ValuesGenerator.java file, with the following content:

  1. package org.acme.kafka.streams.producer.generator;
  2. import java.math.BigDecimal;
  3. import java.math.RoundingMode;
  4. import java.time.Instant;
  5. import java.util.Arrays;
  6. import java.util.Collections;
  7. import java.util.List;
  8. import java.util.Random;
  9. import java.util.concurrent.TimeUnit;
  10. import java.util.stream.Collectors;
  11. import javax.enterprise.context.ApplicationScoped;
  12. import org.eclipse.microprofile.reactive.messaging.Outgoing;
  13. import org.jboss.logging.Logger;
  14. import io.reactivex.Flowable;
  15. import io.smallrye.reactive.messaging.kafka.KafkaRecord;
  16. /**
  17. * A bean producing random temperature data every second.
  18. * The values are written to a Kafka topic (temperature-values).
  19. * Another topic contains the name of weather stations (weather-stations).
  20. * The Kafka configuration is specified in the application configuration.
  21. */
  22. @ApplicationScoped
  23. public class ValuesGenerator {
  24. private static final Logger LOG = Logger.getLogger(ValuesGenerator.class);
  25. private Random random = new Random();
  26. private List<WeatherStation> stations = Collections.unmodifiableList(
  27. Arrays.asList(
  28. new WeatherStation(1, "Hamburg", 13),
  29. new WeatherStation(2, "Snowdonia", 5),
  30. new WeatherStation(3, "Boston", 11),
  31. new WeatherStation(4, "Tokio", 16),
  32. new WeatherStation(5, "Cusco", 12),
  33. new WeatherStation(6, "Svalbard", -7),
  34. new WeatherStation(7, "Porthsmouth", 11),
  35. new WeatherStation(8, "Oslo", 7),
  36. new WeatherStation(9, "Marrakesh", 20)
  37. ));
  38. @Outgoing("temperature-values") (1)
  39. public Flowable<KafkaRecord<Integer, String>> generate() {
  40. return Flowable.interval(500, TimeUnit.MILLISECONDS) (2)
  41. .onBackpressureDrop()
  42. .map(tick -> {
  43. WeatherStation station = stations.get(random.nextInt(stations.size()));
  44. double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
  45. .setScale(1, RoundingMode.HALF_UP)
  46. .doubleValue();
  47. LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
  48. return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
  49. });
  50. }
  51. @Outgoing("weather-stations") (3)
  52. public Flowable<KafkaRecord<Integer, String>> weatherStations() {
  53. List<KafkaRecord<Integer, String>> stationsAsJson = stations.stream()
  54. .map(s -> KafkaRecord.of(
  55. s.id,
  56. "{ \"id\" : " + s.id +
  57. ", \"name\" : \"" + s.name + "\" }"))
  58. .collect(Collectors.toList());
  59. return Flowable.fromIterable(stationsAsJson);
  60. };
  61. private static class WeatherStation {
  62. int id;
  63. String name;
  64. int averageTemperature;
  65. public WeatherStation(int id, String name, int averageTemperature) {
  66. this.id = id;
  67. this.name = name;
  68. this.averageTemperature = averageTemperature;
  69. }
  70. }
  71. }
1Instruct Reactive Messaging to dispatch the items from the returned Flowable to temperature-values.
2The method returns a RX Java 2 stream (Flowable) emitting a random temperature value every 0.5 seconds.
3Instruct Reactive Messaging to dispatch the items from the returned Flowable (static list of weather stations) to weather-stations.

The two methods each return a reactive stream whose items are sent to the streams named temperature-values and weather-stations, respectively.

Topic Configuration

The two channels are mapped to Kafka topics using the Quarkus configuration file application.properties. For that, add the following to the file producer/src/main/resources/application.properties:

  1. # Configure the Kafka broker location
  2. kafka.bootstrap.servers=localhost:9092
  3. mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
  4. mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
  5. mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer
  6. mp.messaging.outgoing.weather-stations.connector=smallrye-kafka
  7. mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
  8. mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer

This configures the Kafka bootstrap server, the two topics and the corresponding (de-)serializers. More details about the different configuration options are available on the Producer configuration and Consumer configuration section from the Kafka documentation.

Creating the Aggregator Maven Project

With the producer application in place, it’s time to implement the actual aggregator application, which will run the Kafka Streams pipeline. Create another project like so:

  1. mvn io.quarkus:quarkus-maven-plugin:1.7.6.Final:create \
  2. -DprojectGroupId=org.acme \
  3. -DprojectArtifactId=kafka-streams-quickstart-aggregator \
  4. -Dextensions="kafka-streams,resteasy-jsonb" \
  5. && mv kafka-streams-quickstart-aggregator aggregator

This creates the aggregator project with the Quarkus extension for Kafka Streams and with RESTEasy support for JSON-B.

If you already have your Quarkus project configured, you can add the kafka-streams extension to your project by running the following command in your project base directory:

  1. ./mvnw quarkus:add-extension -Dextensions="kafka-streams"

This will add the following to your pom.xml:

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-kafka-streams</artifactId>
  4. </dependency>

The Pipeline Implementation

Let’s begin the implementation of the stream processing application by creating a few value objects for representing temperature measurements, weather stations and for keeping track of aggregated values.

First, create the file aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStation.java, representing a weather station, with the following content:

  1. package org.acme.kafka.streams.aggregator.model;
  2. import io.quarkus.runtime.annotations.RegisterForReflection;
  3. @RegisterForReflection (1)
  4. public class WeatherStation {
  5. public int id;
  6. public String name;
  7. }
1By adding the @RegisterForReflection annotation, it is ensured that this type can be instantiated reflectively when running the application in native mode.

Then the file aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/TemperatureMeasurement.java, representing temperature measurements for a given station:

  1. package org.acme.kafka.streams.aggregator.model;
  2. import java.time.Instant;
  3. public class TemperatureMeasurement {
  4. public int stationId;
  5. public String stationName;
  6. public Instant timestamp;
  7. public double value;
  8. public TemperatureMeasurement(int stationId, String stationName, Instant timestamp,
  9. double value) {
  10. this.stationId = stationId;
  11. this.stationName = stationName;
  12. this.timestamp = timestamp;
  13. this.value = value;
  14. }
  15. }

And finally aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/Aggregation.java, which will be used to keep track of the aggregated values while the events are processed in the streaming pipeline:

  1. package org.acme.kafka.streams.aggregator.model;
  2. import java.math.BigDecimal;
  3. import java.math.RoundingMode;
  4. import io.quarkus.runtime.annotations.RegisterForReflection;
  5. @RegisterForReflection
  6. public class Aggregation {
  7. public int stationId;
  8. public String stationName;
  9. public double min = Double.MAX_VALUE;
  10. public double max = Double.MIN_VALUE;
  11. public int count;
  12. public double sum;
  13. public double avg;
  14. public Aggregation updateFrom(TemperatureMeasurement measurement) {
  15. stationId = measurement.stationId;
  16. stationName = measurement.stationName;
  17. count++;
  18. sum += measurement.value;
  19. avg = BigDecimal.valueOf(sum / count)
  20. .setScale(1, RoundingMode.HALF_UP).doubleValue();
  21. min = Math.min(min, measurement.value);
  22. max = Math.max(max, measurement.value);
  23. return this;
  24. }
  25. }

Next, let’s create the actual streaming query implementation itself in the aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/TopologyProducer.java file. All we need to do for that is to declare a CDI producer method which returns the Kafka Streams Topology; the Quarkus extension will take care of configuring, starting and stopping the actual Kafka Streams engine.

  1. package org.acme.kafka.streams.aggregator.streams;
  2. import java.time.Instant;
  3. import javax.enterprise.context.ApplicationScoped;
  4. import javax.enterprise.inject.Produces;
  5. import org.acme.kafka.streams.aggregator.model.Aggregation;
  6. import org.acme.kafka.streams.aggregator.model.TemperatureMeasurement;
  7. import org.acme.kafka.streams.aggregator.model.WeatherStation;
  8. import org.apache.kafka.common.serialization.Serdes;
  9. import org.apache.kafka.streams.StreamsBuilder;
  10. import org.apache.kafka.streams.Topology;
  11. import org.apache.kafka.streams.kstream.Consumed;
  12. import org.apache.kafka.streams.kstream.GlobalKTable;
  13. import org.apache.kafka.streams.kstream.Materialized;
  14. import org.apache.kafka.streams.kstream.Produced;
  15. import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
  16. import org.apache.kafka.streams.state.Stores;
  17. import io.quarkus.kafka.client.serialization.JsonbSerde;
  18. @ApplicationScoped
  19. public class TopologyProducer {
  20. static final String WEATHER_STATIONS_STORE = "weather-stations-store";
  21. private static final String WEATHER_STATIONS_TOPIC = "weather-stations";
  22. private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values";
  23. private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated";
  24. @Produces
  25. public Topology buildTopology() {
  26. StreamsBuilder builder = new StreamsBuilder();
  27. JsonbSerde<WeatherStation> weatherStationSerde = new JsonbSerde<>(
  28. WeatherStation.class);
  29. JsonbSerde<Aggregation> aggregationSerde = new JsonbSerde<>(Aggregation.class);
  30. KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
  31. WEATHER_STATIONS_STORE);
  32. GlobalKTable<Integer, WeatherStation> stations = builder.globalTable( (1)
  33. WEATHER_STATIONS_TOPIC,
  34. Consumed.with(Serdes.Integer(), weatherStationSerde));
  35. builder.stream( (2)
  36. TEMPERATURE_VALUES_TOPIC,
  37. Consumed.with(Serdes.Integer(), Serdes.String())
  38. )
  39. .join( (3)
  40. stations,
  41. (stationId, timestampAndValue) -> stationId,
  42. (timestampAndValue, station) -> {
  43. String[] parts = timestampAndValue.split(";");
  44. return new TemperatureMeasurement(station.id, station.name,
  45. Instant.parse(parts[0]), Double.valueOf(parts[1]));
  46. }
  47. )
  48. .groupByKey() (4)
  49. .aggregate( (5)
  50. Aggregation::new,
  51. (stationId, value, aggregation) -> aggregation.updateFrom(value),
  52. Materialized.<Integer, Aggregation> as(storeSupplier)
  53. .withKeySerde(Serdes.Integer())
  54. .withValueSerde(aggregationSerde)
  55. )
  56. .toStream()
  57. .to( (6)
  58. TEMPERATURES_AGGREGATED_TOPIC,
  59. Produced.with(Serdes.Integer(), aggregationSerde)
  60. );
  61. return builder.build();
  62. }
  63. }
1The weather-stations table is read into a GlobalKTable, representing the current state of each weather station
2The temperature-values topic is read into a KStream; whenever a new message arrives to this topic, the pipeline will be processed for this measurement
3The message from the temperature-values topic is joined with the corresponding weather station, using the topic’s key (weather station id); the join result contains the data from the measurement and associated weather station message
4The values are grouped by message key (the weather station id)
5Within each group, all the measurements of that station are aggregated, by keeping track of minimum and maximum values and calculating the average value of all measurements of that station (see the Aggregation type)
6The results of the pipeline are written out to the temperatures-aggregated topic

The Kafka Streams extension is configured via the Quarkus configuration file application.properties. Create the file aggregator/src/main/resources/application.properties with the following contents:

  1. quarkus.kafka-streams.bootstrap-servers=localhost:9092
  2. quarkus.kafka-streams.application-server=${hostname}:8080
  3. quarkus.kafka-streams.topics=weather-stations,temperature-values
  4. # pass-through options
  5. kafka-streams.cache.max.bytes.buffering=10240
  6. kafka-streams.commit.interval.ms=1000
  7. kafka-streams.metadata.max.age.ms=500
  8. kafka-streams.auto.offset.reset=earliest
  9. kafka-streams.metrics.recording.level=DEBUG

The options with the quarkus.kafka-streams prefix can be changed dynamically at application startup, e.g. via environment variables or system properties. bootstrap-servers and application-server are mapped to the Kafka Streams properties bootstrap.servers and application.server, respectively. topics is specific to Quarkus: the application will wait for all the given topics to exist before launching the Kafka Streams engine. This is to done to gracefully await the creation of topics that don’t yet exist at application startup time.

All the properties within the kafka-streams namespace are passed through as-is to the Kafka Streams engine. Changing their values requires a rebuild of the application.

Building and Running the Applications

We now can build the producer and aggregator applications:

  1. ./mvnw clean package -f producer/pom.xml
  2. ./mvnw clean package -f aggregator/pom.xml

Instead of running them directly on the host machine using the Quarkus dev mode, we’re going to package them into container images and launch them via Docker Compose. This is done in order to demonstrate scaling the aggregator aggregation to multiple nodes later on.

The Dockerfile created by Quarkus by default needs one adjustment for the aggregator application in order to run the Kafka Streams pipeline. To do so, edit the file aggregator/src/main/docker/Dockerfile.jvm and replace the line FROM fabric8/java-alpine-openjdk8-jre with FROM fabric8/java-centos-openjdk8-jdk.

Next create a Docker Compose file (docker-compose.yaml) for spinning up the two applications as well as Apache Kafka and ZooKeeper like so:

  1. version: '3.5'
  2. services:
  3. zookeeper:
  4. image: strimzi/kafka:0.11.3-kafka-2.1.0
  5. command: [
  6. "sh", "-c",
  7. "bin/zookeeper-server-start.sh config/zookeeper.properties"
  8. ]
  9. ports:
  10. - "2181:2181"
  11. environment:
  12. LOG_DIR: /tmp/logs
  13. networks:
  14. - kafkastreams-network
  15. kafka:
  16. image: strimzi/kafka:0.11.3-kafka-2.1.0
  17. command: [
  18. "sh", "-c",
  19. "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}"
  20. ]
  21. depends_on:
  22. - zookeeper
  23. ports:
  24. - "9092:9092"
  25. environment:
  26. LOG_DIR: "/tmp/logs"
  27. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
  28. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  29. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  30. KAFKA_NUM_PARTITIONS: 3
  31. networks:
  32. - kafkastreams-network
  33. producer:
  34. image: quarkus-quickstarts/kafka-streams-producer:1.0
  35. build:
  36. context: producer
  37. dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
  38. environment:
  39. KAFKA_BOOTSTRAP_SERVERS: kafka:9092
  40. networks:
  41. - kafkastreams-network
  42. aggregator:
  43. image: quarkus-quickstarts/kafka-streams-aggregator:1.0
  44. build:
  45. context: aggregator
  46. dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
  47. environment:
  48. QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
  49. networks:
  50. - kafkastreams-network
  51. networks:
  52. kafkastreams-network:
  53. name: ks

To launch all the containers, building the producer and aggregator container images, run docker-compose up --build.

You should see log statements from the producer application about messages being sent to the “temperature-values” topic.

Now run an instance of the debezium/tooling image, attaching to the same network all the other containers run in. This image provides several useful tools such as kafkacat and httpie:

  1. docker run --tty --rm -i --network ks debezium/tooling:1.0

Within the tooling container, run kafkacat to examine the results of the streaming pipeline:

  1. kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated
  2. {"avg":34.7,"count":4,"max":49.4,"min":16.8,"stationId":9,"stationName":"Marrakesh","sum":138.8}
  3. {"avg":15.7,"count":1,"max":15.7,"min":15.7,"stationId":2,"stationName":"Snowdonia","sum":15.7}
  4. {"avg":12.8,"count":7,"max":25.5,"min":-13.8,"stationId":7,"stationName":"Porthsmouth","sum":89.7}
  5. ...

You should see new values arrive as the producer continues to emit temperature measurements, each value on the outbound topic showing the minimum, maximum and average temperature values of the represented weather station.

Interactive Queries

Subscribing to the temperatures-aggregated topic is a great way to react to any new temperature values. It’s a bit wasteful though if you’re just interested in the latest aggregated value for a given weather station. This is where Kafka Streams interactive queries shine: they let you directly query the underlying state store of the pipeline for the value associated to a given key. By exposing a simple REST endpoint which queries the state store, the latest aggregation result can be retrieved without having to subscribe to any Kafka topic.

Let’s begin by creating a new class InteractiveQueries in the file aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/InteractiveQueries.java:

one more method to the KafkaStreamsPipeline class which obtains the current state for a given key:

  1. package org.acme.kafka.streams.aggregator.streams;
  2. import javax.enterprise.context.ApplicationScoped;
  3. import javax.inject.Inject;
  4. import org.acme.kafka.streams.aggregator.model.Aggregation;
  5. import org.acme.kafka.streams.aggregator.model.WeatherStationData;
  6. import org.apache.kafka.streams.KafkaStreams;
  7. import org.apache.kafka.streams.errors.InvalidStateStoreException;
  8. import org.apache.kafka.streams.state.QueryableStoreTypes;
  9. import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  10. @ApplicationScoped
  11. public class InteractiveQueries {
  12. @Inject
  13. KafkaStreams streams;
  14. public GetWeatherStationDataResult getWeatherStationData(int id) {
  15. Aggregation result = getWeatherStationStore().get(id);
  16. if (result != null) {
  17. return GetWeatherStationDataResult.found(WeatherStationData.from(result)); (1)
  18. }
  19. else {
  20. return GetWeatherStationDataResult.notFound(); (2)
  21. }
  22. }
  23. private ReadOnlyKeyValueStore<Integer, Aggregation> getWeatherStationStore() {
  24. while (true) {
  25. try {
  26. return streams.store(TopologyProducer.WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore());
  27. } catch (InvalidStateStoreException e) {
  28. // ignore, store not ready yet
  29. }
  30. }
  31. }
  32. }
1A value for the given station id was found, so that value will be returned
2No value was found, either because a non-existing station was queried or no measurement exists yet for the given station

Also create the method’s return type in the file aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/GetWeatherStationDataResult.java:

  1. package org.acme.kafka.streams.aggregator.streams;
  2. import java.util.Optional;
  3. import java.util.OptionalInt;
  4. import org.acme.kafka.streams.aggregator.model.WeatherStationData;
  5. public class GetWeatherStationDataResult {
  6. private static GetWeatherStationDataResult NOT_FOUND =
  7. new GetWeatherStationDataResult(null);
  8. private final WeatherStationData result;
  9. private GetWeatherStationDataResult(WeatherStationData result) {
  10. this.result = result;
  11. }
  12. public static GetWeatherStationDataResult found(WeatherStationData data) {
  13. return new GetWeatherStationDataResult(data);
  14. }
  15. public static GetWeatherStationDataResult notFound() {
  16. return NOT_FOUND;
  17. }
  18. public Optional<WeatherStationData> getResult() {
  19. return Optional.ofNullable(result);
  20. }
  21. }

Also create aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStationData.java, which represents the actual aggregation result for a weather station:

  1. package org.acme.kafka.streams.aggregator.model;
  2. import io.quarkus.runtime.annotations.RegisterForReflection;
  3. @RegisterForReflection
  4. public class WeatherStationData {
  5. public int stationId;
  6. public String stationName;
  7. public double min = Double.MAX_VALUE;
  8. public double max = Double.MIN_VALUE;
  9. public int count;
  10. public double avg;
  11. private WeatherStationData(int stationId, String stationName, double min, double max,
  12. int count, double avg) {
  13. this.stationId = stationId;
  14. this.stationName = stationName;
  15. this.min = min;
  16. this.max = max;
  17. this.count = count;
  18. this.avg = avg;
  19. }
  20. public static WeatherStationData from(Aggregation aggregation) {
  21. return new WeatherStationData(
  22. aggregation.stationId,
  23. aggregation.stationName,
  24. aggregation.min,
  25. aggregation.max,
  26. aggregation.count,
  27. aggregation.avg);
  28. }
  29. }

We now can add a simple REST endpoint (aggregator/src/main/java/org/acme/kafka/streams/aggregator/rest/WeatherStationEndpoint.java), which invokes getWeatherStationData() and returns the data to the client:

  1. package org.acme.kafka.streams.aggregator.rest;
  2. import java.net.URI;
  3. import java.net.URISyntaxException;
  4. import java.util.List;
  5. import javax.enterprise.context.ApplicationScoped;
  6. import javax.inject.Inject;
  7. import javax.ws.rs.Consumes;
  8. import javax.ws.rs.GET;
  9. import javax.ws.rs.Path;
  10. import javax.ws.rs.PathParam;
  11. import javax.ws.rs.Produces;
  12. import javax.ws.rs.core.MediaType;
  13. import javax.ws.rs.core.Response;
  14. import javax.ws.rs.core.Response.Status;
  15. import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
  16. import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
  17. @ApplicationScoped
  18. @Path("/weather-stations")
  19. public class WeatherStationEndpoint {
  20. @Inject
  21. InteractiveQueries interactiveQueries;
  22. @GET
  23. @Path("/data/{id}")
  24. @Consumes(MediaType.APPLICATION_JSON)
  25. @Produces(MediaType.APPLICATION_JSON)
  26. public Response getWeatherStationData(@PathParam("id") int id) {
  27. GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);
  28. if (result.getResult().isPresent()) { (1)
  29. return Response.ok(result.getResult().get()).build();
  30. }
  31. else {
  32. return Response.status(Status.NOT_FOUND.getStatusCode(),
  33. "No data found for weather station " + id).build();
  34. }
  35. }
  36. }
1Depending on whether a value was obtained, either return that value or a 404 response

With this code in place, it’s time to rebuild the application and the aggregator service in Docker Compose:

  1. ./mvnw clean package -f aggregator/pom.xml
  2. docker-compose stop aggregator
  3. docker-compose up --build -d

This will rebuild the aggregator container and restart its service. Once that’s done, you can invoke the service’s REST API to obtain the temperature data for one of the existing stations. To do so, you can use httpie in the tooling container launched before:

  1. http aggregator:8080/weather-stations/data/1
  2. HTTP/1.1 200 OK
  3. Connection: keep-alive
  4. Content-Length: 85
  5. Content-Type: application/json
  6. Date: Tue, 18 Jun 2019 19:29:16 GMT
  7. {
  8. "avg": 12.9,
  9. "count": 146,
  10. "max": 41.0,
  11. "min": -25.6,
  12. "stationId": 1,
  13. "stationName": "Hamburg"
  14. }

Scaling Out

A very interesting trait of Kafka Streams applications is that they can be scaled out, i.e. the load and state can be distributed amongst multiple application instances running the same pipeline. Each node will then contain a subset of the aggregation results, but Kafka Streams provides you with an API to obtain the information which node is hosting a given key. The application can then either fetch the data directly from the other instance, or simply point the client to the location of that other node.

Launching multiple instances of the aggregator application will make look the overall architecture like so:

Architecture with multiple aggregator nodes

The InteractiveQueries class must be adjusted slightly for this distributed architecture:

  1. public GetWeatherStationDataResult getWeatherStationData(int id) {
  2. StreamsMetadata metadata = streams.metadataForKey( (1)
  3. TopologyProducer.WEATHER_STATIONS_STORE,
  4. id,
  5. Serdes.Integer().serializer()
  6. );
  7. if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) {
  8. LOG.warn("Found no metadata for key {}", id);
  9. return GetWeatherStationDataResult.notFound();
  10. }
  11. else if (metadata.host().equals(host)) { (2)
  12. LOG.info("Found data for key {} locally", id);
  13. Aggregation result = getWeatherStationStore().get(id);
  14. if (result != null) {
  15. return GetWeatherStationDataResult.found(WeatherStationData.from(result));
  16. }
  17. else {
  18. return GetWeatherStationDataResult.notFound();
  19. }
  20. }
  21. else { (3)
  22. LOG.info(
  23. "Found data for key {} on remote host {}:{}",
  24. id,
  25. metadata.host(),
  26. metadata.port()
  27. );
  28. return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port());
  29. }
  30. }
  31. public List<PipelineMetadata> getMetaData() { (4)
  32. return streams.allMetadataForStore(TopologyProducer.WEATHER_STATIONS_STORE)
  33. .stream()
  34. .map(m -> new PipelineMetadata(
  35. m.hostInfo().host() + ":" + m.hostInfo().port(),
  36. m.topicPartitions()
  37. .stream()
  38. .map(TopicPartition::toString)
  39. .collect(Collectors.toSet()))
  40. )
  41. .collect(Collectors.toList());
  42. }
1The streams metadata for the given weather station id is obtained
2The given key (weather station id) is maintained by the local application node, i.e. it can answer the query itself
3The given key is maintained by another application node; in this case the information about that node (host and port) will be returned
4The getMetaData() method is added to provide callers with a list of all the nodes in the application cluster.

The GetWeatherStationDataResult type must be adjusted accordingly:

  1. package org.acme.kafka.streams.aggregator.streams;
  2. import java.util.Optional;
  3. import java.util.OptionalInt;
  4. import org.acme.kafka.streams.aggregator.model.WeatherStationData;
  5. public class GetWeatherStationDataResult {
  6. private static GetWeatherStationDataResult NOT_FOUND =
  7. new GetWeatherStationDataResult(null, null, null);
  8. private final WeatherStationData result;
  9. private final String host;
  10. private final Integer port;
  11. private GetWeatherStationDataResult(WeatherStationData result, String host,
  12. Integer port) {
  13. this.result = result;
  14. this.host = host;
  15. this.port = port;
  16. }
  17. public static GetWeatherStationDataResult found(WeatherStationData data) {
  18. return new GetWeatherStationDataResult(data, null, null);
  19. }
  20. public static GetWeatherStationDataResult foundRemotely(String host, int port) {
  21. return new GetWeatherStationDataResult(null, host, port);
  22. }
  23. public static GetWeatherStationDataResult notFound() {
  24. return NOT_FOUND;
  25. }
  26. public Optional<WeatherStationData> getResult() {
  27. return Optional.ofNullable(result);
  28. }
  29. public Optional<String> getHost() {
  30. return Optional.ofNullable(host);
  31. }
  32. public OptionalInt getPort() {
  33. return port != null ? OptionalInt.of(port) : OptionalInt.empty();
  34. }
  35. }

Also the return type for getMetaData() must be defined (aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/PipelineMetadata.java):

  1. package org.acme.kafka.streams.aggregator.streams;
  2. import java.util.Set;
  3. public class PipelineMetadata {
  4. public String host;
  5. public Set<String> partitions;
  6. public PipelineMetadata(String host, Set<String> partitions) {
  7. this.host = host;
  8. this.partitions = partitions;
  9. }
  10. }

Lastly, the REST endpoint class must be updated:

  1. package org.acme.kafka.streams.aggregator.rest;
  2. import java.net.URI;
  3. import java.net.URISyntaxException;
  4. import java.util.List;
  5. import javax.enterprise.context.ApplicationScoped;
  6. import javax.inject.Inject;
  7. import javax.ws.rs.Consumes;
  8. import javax.ws.rs.GET;
  9. import javax.ws.rs.Path;
  10. import javax.ws.rs.PathParam;
  11. import javax.ws.rs.Produces;
  12. import javax.ws.rs.core.MediaType;
  13. import javax.ws.rs.core.Response;
  14. import javax.ws.rs.core.Response.Status;
  15. import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
  16. import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
  17. import org.acme.kafka.streams.aggregator.streams.PipelineMetadata;
  18. @ApplicationScoped
  19. @Path("/weather-stations")
  20. public class WeatherStationEndpoint {
  21. @Inject
  22. InteractiveQueries interactiveQueries;
  23. @GET
  24. @Path("/data/{id}")
  25. @Consumes(MediaType.APPLICATION_JSON)
  26. @Produces(MediaType.APPLICATION_JSON)
  27. public Response getWeatherStationData(@PathParam("id") int id) {
  28. GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);
  29. if (result.getResult().isPresent()) { (1)
  30. return Response.ok(result.getResult().get()).build();
  31. }
  32. else if (result.getHost().isPresent()) { (2)
  33. URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(),
  34. id);
  35. return Response.seeOther(otherUri).build();
  36. }
  37. else { (3)
  38. return Response.status(Status.NOT_FOUND.getStatusCode(),
  39. "No data found for weather station " + id).build();
  40. }
  41. }
  42. @GET
  43. @Path("/meta-data")
  44. @Produces(MediaType.APPLICATION_JSON)
  45. public List<PipelineMetadata> getMetaData() { (4)
  46. return interactiveQueries.getMetaData();
  47. }
  48. private URI getOtherUri(String host, int port, int id) {
  49. try {
  50. return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id);
  51. }
  52. catch (URISyntaxException e) {
  53. throw new RuntimeException(e);
  54. }
  55. }
  56. }
1The data was found locally, so return it
2The data is maintained by another node, so reply with a redirect (HTTP status code 303) if the data for the given key is stored on one of the other nodes.
3No data was found for the given weather station id
4Exposes information about all the hosts forming the application cluster

Now stop the aggregator service again and rebuild it. Then let’s spin up three instances of it:

  1. ./mvnw clean package -f aggregator/pom.xml
  2. docker-compose stop aggregator
  3. docker-compose up --build -d --scale aggregator=3

When invoking the REST API on any of the three instances, it might either be that the aggregation for the requested weather station id is stored locally on the node receiving the query, or it could be stored on one of the other two nodes.

As the load balancer of Docker Compose will distribute requests to the aggregator service in a round-robin fashion, we’ll invoke the actual nodes directly. The application exposes information about all the host names via REST:

  1. http aggregator:8080/weather-stations/meta-data
  2. HTTP/1.1 200 OK
  3. Connection: keep-alive
  4. Content-Length: 202
  5. Content-Type: application/json
  6. Date: Tue, 18 Jun 2019 20:00:23 GMT
  7. [
  8. {
  9. "host": "2af13fe516a9:8080",
  10. "partitions": [
  11. "temperature-values-2"
  12. ]
  13. },
  14. {
  15. "host": "32cc8309611b:8080",
  16. "partitions": [
  17. "temperature-values-1"
  18. ]
  19. },
  20. {
  21. "host": "1eb39af8d587:8080",
  22. "partitions": [
  23. "temperature-values-0"
  24. ]
  25. }
  26. ]

Retrieve the data from one of the three hosts shown in the response (your actual host names will differ):

  1. http 2af13fe516a9:8080/weather-stations/data/1

If that node holds the data for key “1”, you’ll get a response like this:

  1. HTTP/1.1 200 OK
  2. Connection: keep-alive
  3. Content-Length: 74
  4. Content-Type: application/json
  5. Date: Tue, 11 Jun 2019 19:16:31 GMT
  6. {
  7. "avg": 11.9,
  8. "count": 259,
  9. "max": 50.0,
  10. "min": -30.1,
  11. "stationId": 1,
  12. "stationName": "Hamburg"
  13. }

Otherwise, the service will send a redirect:

  1. HTTP/1.1 303 See Other
  2. Connection: keep-alive
  3. Content-Length: 0
  4. Date: Tue, 18 Jun 2019 20:01:03 GMT
  5. Location: http://1eb39af8d587:8080/weather-stations/data/1

You can also have httpie automatically follow the redirect by passing the --follow option:

  1. http --follow 2af13fe516a9:8080/weather-stations/data/1

Running Natively

The Quarkus extension for Kafka Streams enables the execution of stream processing applications natively via GraalVM without further configuration.

To run both the producer and aggregator applications in native mode, the Maven builds can be executed using the native profile:

  1. ./mvnw clean package -f producer/pom.xml -Pnative -Dnative-image.container-runtime=docker
  2. ./mvnw clean package -f aggregator/pom.xml -Pnative -Dnative-image.container-runtime=docker

Now create an environment variable named QUARKUS_MODE and with value set to “native”:

  1. export QUARKUS_MODE=native

This is used by the Docker Compose file to use the correct Dockerfile when building the producer and aggregator images. The Kafka Streams application can work with less than 50 MB RSS in native mode. To do so, add the Xmx option to the program invocation in aggregator/src/main/docker/Dockerfile.native:

  1. CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"]

Now start Docker Compose as described above (don’t forget to rebuild the container images).

Kafka Streams Health Checks

If you are using the quarkus-smallrye-health extension, quarkus-kafka-streams will automatically add:

  • a readiness health check to validate that all topics declared in the quarkus.kafka-streams.topics property are created,

  • a liveness health check based on the Kafka Streams state.

So when you access the /health endpoint of your application you will have information about the state of the Kafka Streams and the available and/or missing topics.

This is an example of when the status is DOWN:

  1. curl -i http://aggregator:8080/health
  2. HTTP/1.1 503 Service Unavailable
  3. content-type: application/json; charset=UTF-8
  4. content-length: 454
  5. {
  6. "status": "DOWN",
  7. "checks": [
  8. {
  9. "name": "Kafka Streams state health check", (1)
  10. "status": "DOWN",
  11. "data": {
  12. "state": "CREATED"
  13. }
  14. },
  15. {
  16. "name": "Kafka Streams topics health check", (2)
  17. "status": "DOWN",
  18. "data": {
  19. "available_topics": "weather-stations,temperature-values",
  20. "missing_topics": "hygrometry-values"
  21. }
  22. }
  23. ]
  24. }
1Liveness health check. Also available at /health/live endpoint.
2Readiness health check. Also available at /health/ready endpoint.

So as you can see, the status is DOWN as soon as one of the quarkus.kafka-streams.topics is missing or the Kafka Streams state is not RUNNING.

If no topics are available, the available_topics key will not be present in the data field of the Kafka Streams topics health check. As well as if no topics are missing, the missing_topics key will not be present in the data field of the Kafka Streams topics health check.

You can of course disable the health check of the quarkus-kafka-streams extension by setting the quarkus.kafka-streams.health.enabled property to false in your application.properties.

Obviously you can create your liveness and readiness probes based on the respective endpoints /health/live and /health/ready.

Liveness health check

Here is an example of the liveness check:

  1. curl -i http://aggregator:8080/health/live
  2. HTTP/1.1 503 Service Unavailable
  3. content-type: application/json; charset=UTF-8
  4. content-length: 225
  5. {
  6. "status": "DOWN",
  7. "checks": [
  8. {
  9. "name": "Kafka Streams state health check",
  10. "status": "DOWN",
  11. "data": {
  12. "state": "CREATED"
  13. }
  14. }
  15. ]
  16. }

The state is coming from the KafkaStreams.State enum.

Readiness health check

Here is an example of the readiness check:

  1. curl -i http://aggregator:8080/health/ready
  2. HTTP/1.1 503 Service Unavailable
  3. content-type: application/json; charset=UTF-8
  4. content-length: 265
  5. {
  6. "status": "DOWN",
  7. "checks": [
  8. {
  9. "name": "Kafka Streams topics health check",
  10. "status": "DOWN",
  11. "data": {
  12. "missing_topics": "weather-stations,temperature-values"
  13. }
  14. }
  15. ]
  16. }

Going Further

This guide has shown how you can build stream processing applications using Quarkus and the Kafka Streams APIs, both in JVM and native modes. For running your KStreams application in production, you could also add health checks and metrics for the data pipeline. Refer to the Quarkus guides on metrics and health checks to learn more.

Configuration Reference

About the Duration format

The format for durations uses the standard java.time.Duration format. You can learn more about it in the Duration#parse() javadoc.

You can also provide duration values starting with a number. In this case, if the value consists only of a number, the converter treats the value as seconds. Otherwise, PT is implicitly prepended to the value to obtain a standard java.time.Duration format.