Quarkus - Using Apache Kafka with Reactive Messaging

This guide demonstrates how your Quarkus application can utilize MicroProfile Reactive Messaging to interact with Apache Kafka.

Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.6.2+

  • A running Kafka cluster, or Docker Compose to start a development cluster

  • GraalVM installed if you want to run in native mode.

Architecture

In this guide, we are going to generate (random) prices in one component. These prices are written in a Kafka topic (prices). A second component reads from the prices Kafka topic and apply some magic conversion to the price. The result is sent to an in-memory stream consumed by a JAX-RS resource. The data is sent to a browser using server-sent events.

Architecture

Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone [https://github.com/quarkusio/quarkus-quickstarts.git](https://github.com/quarkusio/quarkus-quickstarts.git), or download an archive.

The solution is located in the kafka-quickstart directory.

Creating the Maven Project

First, we need a new project. Create a new project with the following command:

  1. mvn io.quarkus:quarkus-maven-plugin:1.7.6.Final:create \
  2. -DprojectGroupId=org.acme \
  3. -DprojectArtifactId=kafka-quickstart \
  4. -Dextensions="smallrye-reactive-messaging-kafka"
  5. cd kafka-quickstart

This command generates a Maven project, importing the Reactive Messaging and Kafka connector extensions.

If you already have your Quarkus project configured, you can add the smallrye-reactive-messaging-kafka extension to your project by running the following command in your project base directory:

  1. ./mvnw quarkus:add-extension -Dextensions="smallrye-reactive-messaging-kafka"

This will add the following to your pom.xml:

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
  4. </dependency>

Starting Kafka

Then, we need a Kafka cluster. You can follow the instructions from the Apache Kafka web site or create a docker-compose.yaml file with the following content:

  1. version: '2'
  2. services:
  3. zookeeper:
  4. image: strimzi/kafka:0.11.3-kafka-2.1.0
  5. command: [
  6. "sh", "-c",
  7. "bin/zookeeper-server-start.sh config/zookeeper.properties"
  8. ]
  9. ports:
  10. - "2181:2181"
  11. environment:
  12. LOG_DIR: /tmp/logs
  13. kafka:
  14. image: strimzi/kafka:0.11.3-kafka-2.1.0
  15. command: [
  16. "sh", "-c",
  17. "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
  18. ]
  19. depends_on:
  20. - zookeeper
  21. ports:
  22. - "9092:9092"
  23. environment:
  24. LOG_DIR: "/tmp/logs"
  25. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
  26. KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
  27. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Once created, run docker-compose up.

This is a development cluster, do not use in production.

The price generator

Create the src/main/java/org/acme/kafka/PriceGenerator.java file, with the following content:

  1. package org.acme.kafka;
  2. import io.reactivex.Flowable;
  3. import org.eclipse.microprofile.reactive.messaging.Outgoing;
  4. import javax.enterprise.context.ApplicationScoped;
  5. import java.util.Random;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * A bean producing random prices every 5 seconds.
  9. * The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
  10. */
  11. @ApplicationScoped
  12. public class PriceGenerator {
  13. private Random random = new Random();
  14. @Outgoing("generated-price") (1)
  15. public Flowable<Integer> generate() { (2)
  16. return Flowable.interval(5, TimeUnit.SECONDS)
  17. .map(tick -> random.nextInt(100));
  18. }
  19. }
1Instruct Reactive Messaging to dispatch the items from returned stream to generated-price.
2The method returns a RX Java 2 stream (Flowable) emitting a random price every 5 seconds.

The method returns a Reactive Stream. The generated items are sent to the stream named generated-price. This stream is mapped to Kafka using the application.properties file that we will create soon.

The price converter

The price converter reads the prices from Kafka, and transforms them. Create the src/main/java/org/acme/kafka/PriceConverter.java file with the following content:

  1. package org.acme.kafka;
  2. import io.smallrye.reactive.messaging.annotations.Broadcast;
  3. import org.eclipse.microprofile.reactive.messaging.Incoming;
  4. import org.eclipse.microprofile.reactive.messaging.Outgoing;
  5. import javax.enterprise.context.ApplicationScoped;
  6. /**
  7. * A bean consuming data from the "prices" Kafka topic and applying some conversion.
  8. * The result is pushed to the "my-data-stream" stream which is an in-memory stream.
  9. */
  10. @ApplicationScoped
  11. public class PriceConverter {
  12. private static final double CONVERSION_RATE = 0.88;
  13. @Incoming("prices") (1)
  14. @Outgoing("my-data-stream") (2)
  15. @Broadcast (3)
  16. public double process(int priceInUsd) {
  17. return priceInUsd * CONVERSION_RATE;
  18. }
  19. }
1Indicates that the method consumes the items from the prices topic
2Indicates that the objects returned by the method are sent to the my-data-stream stream
3Indicates that the item are dispatched to all subscribers

The process method is called for every Kafka record from the prices topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.

The price resource

Finally, let’s bind our stream to a JAX-RS resource. Creates the src/main/java/org/acme/kafka/PriceResource.java file with the following content:

  1. package org.acme.kafka;
  2. import io.smallrye.reactive.messaging.annotations.Channel;
  3. import org.reactivestreams.Publisher;
  4. import javax.inject.Inject;
  5. import javax.ws.rs.GET;
  6. import javax.ws.rs.Path;
  7. import javax.ws.rs.Produces;
  8. import javax.ws.rs.core.MediaType;
  9. import org.jboss.resteasy.annotations.SseElementType;
  10. /**
  11. * A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
  12. */
  13. @Path("/prices")
  14. public class PriceResource {
  15. @Inject
  16. @Channel("my-data-stream") Publisher<Double> prices; (1)
  17. @GET
  18. @Path("/stream")
  19. @Produces(MediaType.SERVER_SENT_EVENTS) (2)
  20. @SseElementType("text/plain") (3)
  21. public Publisher<Double> stream() { (4)
  22. return prices;
  23. }
  24. }
1Injects the my-data-stream channel using the @Channel qualifier
2Indicates that the content is sent using Server Sent Events
3Indicates that the data contained within the server sent events is of type text/plain
4Returns the stream (Reactive Stream)

Configuring the Kafka connector

We need to configure the Kafka connector. This is done in the application.properties file. The keys are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

The channel-name segment must match the value set in the @Incoming and @Outgoing annotation:

  • generated-price → sink in which we write the prices

  • prices → source in which we read the prices

  1. # Configure the SmallRye Kafka connector
  2. kafka.bootstrap.servers=localhost:9092
  3. # Configure the Kafka sink (we write to it)
  4. mp.messaging.outgoing.generated-price.connector=smallrye-kafka
  5. mp.messaging.outgoing.generated-price.topic=prices
  6. mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
  7. # Configure the Kafka source (we read from it)
  8. mp.messaging.incoming.prices.connector=smallrye-kafka
  9. mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

More details about this configuration is available on the Producer configuration and Consumer configuration section from the Kafka documentation. These properties are configured with the prefix kafka.

What about my-data-stream? This is an in-memory stream, not connected to a message broker.

The HTML page

Final touch, the HTML page reading the converted prices using SSE.

Create the src/main/resources/META-INF/resources/prices.html file, with the following content:

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Prices</title>
  6. <link rel="stylesheet" type="text/css"
  7. href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
  8. <link rel="stylesheet" type="text/css"
  9. href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
  10. </head>
  11. <body>
  12. <div class="container">
  13. <h2>Last price</h2>
  14. <div class="row">
  15. <p class="col-md-12">The last price is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
  16. </div>
  17. </div>
  18. </body>
  19. <script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
  20. <script>
  21. var source = new EventSource("/prices/stream");
  22. source.onmessage = function (event) {
  23. document.getElementById("content").innerHTML = event.data;
  24. };
  25. </script>
  26. </html>

Nothing spectacular here. On each received price, it updates the page.

Get it running

If you followed the instructions, you should have Kafka running. Then, you just need to run the application using:

  1. ./mvnw quarkus:dev

Open [http://localhost:8080/prices.html](http://localhost:8080/prices.html) in your browser.

If you started the Kafka broker with docker compose, stop it using CTRL+C followed by docker-compose down.

Running Native

You can build the native executable with:

  1. ./mvnw package -Pnative

Imperative usage

Sometimes, you need to have an imperative way of sending messages.

For example, if you need to send a message to a stream, from inside a REST endpoint, when receiving a POST request. In this case, you cannot use @Output because your method has parameters.

For this, you can use an Emitter.

  1. import org.eclipse.microprofile.reactive.messaging.Channel;
  2. import org.eclipse.microprofile.reactive.messaging.Emitter;
  3. import javax.inject.Inject;
  4. import javax.ws.rs.POST;
  5. import javax.ws.rs.Path;
  6. import javax.ws.rs.Consumes;
  7. import javax.ws.rs.core.MediaType;
  8. @Path("/prices")
  9. public class PriceResource {
  10. @Inject @Channel("price-create") Emitter<Double> priceEmitter;
  11. @POST
  12. @Consumes(MediaType.TEXT_PLAIN)
  13. public void addPrice(Double price) {
  14. priceEmitter.send(price);
  15. }
  16. }
The Emitter configuration is done the same way as the other stream configuration used by @Incoming and @Outgoing. In addition, you can use @OnOverflow to configure back-pressure strategy.
Deprecation

The io.smallrye.reactive.messaging.annotations.Emitter, io.smallrye.reactive.messaging.annotations.Channel and io.smallrye.reactive.messaging.annotations.OnOverflow classes are now deprecated and replaced by:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

The new Emitter.send method returns a CompletionStage completed when the produced message is acknowledged.

Kafka Health Check

If you are using the quarkus-smallrye-health extension, quarkus-kafka can add a readiness health check to validate the connection to the broker. This is disabled by default.

If enabled, when you access the /health/ready endpoint of your application you will have information about the connection validation status.

This behavior can be enabled by setting the quarkus.kafka.health.enabled property to true in your application.properties.

JSON serialization

Quarkus has built-in capabilities to deal with JSON Kafka messages.

Imagine we have a Fruit pojo as follows:

  1. public class Fruit {
  2. public String name;
  3. public int price;
  4. public Fruit() {
  5. }
  6. public Fruit(String name, int price) {
  7. this.name = name;
  8. this.price = price;
  9. }
  10. }

And we want to use it to receive messages from Kafka, make some price transformation, and send messages back to Kafka.

  1. import io.smallrye.reactive.messaging.annotations.Broadcast;
  2. import org.eclipse.microprofile.reactive.messaging.Incoming;
  3. import org.eclipse.microprofile.reactive.messaging.Outgoing;
  4. import javax.enterprise.context.ApplicationScoped;
  5. /**
  6. * A bean consuming data from the "fruit-in" Kafka topic and applying some price conversion.
  7. * The result is pushed to the "fruit-out" stream.
  8. */
  9. @ApplicationScoped
  10. public class FruitProcessor {
  11. private static final double CONVERSION_RATE = 0.88;
  12. @Incoming("fruit-in")
  13. @Outgoing("fruit-out")
  14. @Broadcast
  15. public double process(Fruit fruit) {
  16. fruit.price = fruit.price * CONVERSION_RATE;
  17. return fruit;
  18. }
  19. }

To do this, we will need to setup JSON serialization with JSON-B or Jackson.

With JSON serialization correctly configured, you can also use Publisher<Fruit> and Emitter<Fruit>.

Serializing via JSON-B

First, you need to include the quarkus-jsonb extension (if you already use the quarkus-resteasy-jsonb extension, this is not needed).

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-jsonb</artifactId>
  4. </dependency>

There is an existing JsonbSerializer that can be used to serialize all pojos via JSON-B, but the corresponding deserializer is generic, so it needs to be subclassed.

So, let’s create a FruitDeserializer that extends the generic JsonbDeserializer.

  1. package com.acme.fruit.jsonb;
  2. import io.quarkus.kafka.client.serialization.JsonbDeserializer;
  3. public class FruitDeserializer extends JsonbDeserializer<Fruit> {
  4. public FruitDeserializer(){
  5. // pass the class to the parent.
  6. super(Fruit.class);
  7. }
  8. }
If you don’t want to create a deserializer for each of your pojo, you can use the generic io.vertx.kafka.client.serialization.JsonObjectDeserializer that will deserialize to a javax.json.JsonObject. The corresponding serializer can also be used: io.vertx.kafka.client.serialization.JsonObjectSerializer.

Finally, configure your streams to use the JSON-B serializer and deserializer.

  1. # Configure the Kafka source (we read from it)
  2. mp.messaging.incoming.fruit-in.connector=smallrye-kafka
  3. mp.messaging.incoming.fruit-in.topic=fruit-in
  4. mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer
  5. # Configure the Kafka sink (we write to it)
  6. mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
  7. mp.messaging.outgoing.fruit-out.topic=fruit-out
  8. mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

Now, your Kafka messages will contain a JSON-B serialized representation of your Fruit pojo.

Serializing via Jackson

First, you need to include the quarkus-jackson extension (if you already use the quarkus-resteasy-jackson extension, this is not needed).

  1. <dependency>
  2. <groupId>io.quarkus</groupId>
  3. <artifactId>quarkus-jackson</artifactId>
  4. </dependency>

There is an existing ObjectMapperSerializer that can be used to serialize all pojos via Jackson, but the corresponding deserializer is generic, so it needs to be subclassed.

So, let’s create a FruitDeserializer that extends the ObjectMapperDeserializer.

  1. package com.acme.fruit.jackson;
  2. import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
  3. public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
  4. public FruitDeserializer(){
  5. // pass the class to the parent.
  6. super(Fruit.class);
  7. }
  8. }

Finally, configure your streams to use the Jackson serializer and deserializer.

  1. # Configure the Kafka source (we read from it)
  2. mp.messaging.incoming.fruit-in.connector=smallrye-kafka
  3. mp.messaging.incoming.fruit-in.topic=fruit-in
  4. mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer
  5. # Configure the Kafka sink (we write to it)
  6. mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
  7. mp.messaging.outgoing.fruit-out.topic=fruit-out
  8. mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Now, your Kafka messages will contain a Jackson serialized representation of your Fruit pojo.

Sending JSON Server-Sent Events (SSE)

If you want RESTEasy to send JSON Server-Sent Events, you need to use the @SseElementType annotation to define the content type of the events, as the method will be annotated with @Produces(MediaType.SERVER_SENT_EVENTS).

The following example shows how to use SSE from a Kafka topic source.

  1. import io.smallrye.reactive.messaging.annotations.Channel;
  2. import org.reactivestreams.Publisher;
  3. import javax.inject.Inject;
  4. import javax.ws.rs.GET;
  5. import javax.ws.rs.Path;
  6. import javax.ws.rs.Produces;
  7. import javax.ws.rs.core.MediaType;
  8. import org.jboss.resteasy.annotations.SseElementType;
  9. @Path("/fruits")
  10. public class PriceResource {
  11. @Inject
  12. @Channel("fruit-out") Publisher<Fruit> fruits;
  13. @GET
  14. @Path("/stream")
  15. @Produces(MediaType.SERVER_SENT_EVENTS)
  16. @SseElementType(MediaType.APPLICATION_JSON)
  17. public Publisher<Fruit> stream() {
  18. return fruits;
  19. }
  20. }

Blocking processing

You often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and cannot be run on the caller thread.

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

  1. package org.acme.panache;
  2. import io.smallrye.reactive.messaging.annotations.Blocking;
  3. import org.eclipse.microprofile.reactive.messaging.Incoming;
  4. import javax.enterprise.context.ApplicationScoped;
  5. import javax.transaction.Transactional;
  6. @ApplicationScoped
  7. public class PriceStorage {
  8. @Incoming("prices")
  9. @Blocking
  10. @Transactional
  11. public void store(int priceInUsd) {
  12. Price price = new Price();
  13. price.value = priceInUsd;
  14. price.persist();
  15. }
  16. }

The complete example is available in the kafka-panache-quickstart directory.

Going further

This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.