Tutorial: Load streaming data from Apache Kafka

Getting started

This tutorial demonstrates how to load data into Apache Druid from a Kafka stream, using Druid’s Kafka indexing service.

For this tutorial, we’ll assume you’ve already downloaded Druid as described in the quickstart using the micro-quickstart single-machine configuration and have it running on your local machine. You don’t need to have loaded any data yet.

Download and start Kafka

Apache Kafka is a high throughput message bus that works well with Druid. For this tutorial, we will use Kafka 2.7.0. To download Kafka, issue the following commands in your terminal:

  1. curl -O https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
  2. tar -xzf kafka_2.13-2.7.0.tgz
  3. cd kafka_2.13-2.7.0

Start a Kafka broker by running the following command in a new terminal:

  1. ./bin/kafka-server-start.sh config/server.properties

Run this command to create a Kafka topic called wikipedia, to which we’ll send data:

  1. ./bin/kafka-topics.sh --create --topic wikipedia --bootstrap-server localhost:9092

Load data into Kafka

Let’s launch a producer for our topic and send some data!

In your Druid directory, run the following command:

  1. cd quickstart/tutorial
  2. gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

In your Kafka directory, run the following command, where {PATH_TO_DRUID} is replaced by the path to the Druid directory:

  1. export KAFKA_OPTS="-Dfile.encoding=UTF-8"
  2. ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

The previous command posted sample events to the wikipedia Kafka topic. Now we will use Druid’s Kafka indexing service to ingest messages from our newly created topic.

Loading data with the data loader

Navigate to localhost:8888 and click Load data in the console header.

Data loader init

Select Apache Kafka and click Connect data.

Data loader sample

Enter localhost:9092 as the bootstrap server and wikipedia as the topic.

Click Apply and make sure that the data you are seeing is correct.

Once the data is located, you can click “Next: Parse data” to go to the next step.

Data loader parse data

The data loader will try to automatically determine the correct parser for the data. In this case it will successfully determine json. Feel free to play around with different parser options to get a preview of how Druid will parse your data.

With the json parser selected, click Next: Parse time to get to the step centered around determining your primary timestamp column.

Data loader parse time

Druid’s architecture requires a primary timestamp column (internally stored in a column called __time). If you do not have a timestamp in your data, select Constant value. In our example, the data loader will determine that the time column in our raw data is the only candidate that can be used as the primary time column.

Click Next: ... twice to go past the Transform and Filter steps. You do not need to enter anything in these steps as applying ingestion time transforms and filters are out of scope for this tutorial.

Data loader schema

In the Configure schema step, you can configure which dimensions and metrics will be ingested into Druid. This is exactly what the data will appear like in Druid once it is ingested. Since our dataset is very small, go ahead and turn off Rollup by clicking on the switch and confirming the change.

Once you are satisfied with the schema, click Next to go to the Partition step where you can fine tune how the data will be partitioned into segments.

Data loader partition

Here, you can adjust how the data will be split up into segments in Druid. Since this is a small dataset, there are no adjustments that need to be made in this step.

Click Next: Tune to go to the tuning step.

Data loader tune

In the Tune step is it very important to set Use earliest offset to True since we want to consume the data from the start of the stream. There are no other changes that need to be made here, so click Next: Publish to go to the Publish step.

Data loader publish

Let’s name this datasource wikipedia-kafka.

Finally, click Next to review your spec.

Data loader spec

This is the spec you have constructed. Feel free to go back and make changes in previous steps to see how changes will update the spec. Similarly, you can also edit the spec directly and see it reflected in the previous steps.

Once you are satisfied with the spec, click Submit and an ingestion task will be created.

Tasks view

You will be taken to the task view with the focus on the newly created supervisor.

The task view is set to auto refresh, wait until your supervisor launches a task.

When a tasks starts running, it will also start serving the data that it is ingesting.

Navigate to the Datasources view from the header.

Datasource view

When the wikipedia-kafka datasource appears here it can be queried.

Note: if the datasource does not appear after a minute you might have not set the supervisor to read from the start of the stream (in the Tune step).

At this point, you can go to the Query view to run SQL queries against the datasource.

Since this is a small dataset, you can simply run a SELECT * FROM "wikipedia-kafka" query to see your results.

Query view

Check out the query tutorial to run some example queries on the newly loaded data.

Submit a supervisor via the console

In the console, click Submit supervisor to open the submit supervisor dialog.

Submit supervisor

Paste in this spec and click Submit.

  1. {
  2. "type": "kafka",
  3. "spec" : {
  4. "dataSchema": {
  5. "dataSource": "wikipedia",
  6. "timestampSpec": {
  7. "column": "time",
  8. "format": "auto"
  9. },
  10. "dimensionsSpec": {
  11. "dimensions": [
  12. "channel",
  13. "cityName",
  14. "comment",
  15. "countryIsoCode",
  16. "countryName",
  17. "isAnonymous",
  18. "isMinor",
  19. "isNew",
  20. "isRobot",
  21. "isUnpatrolled",
  22. "metroCode",
  23. "namespace",
  24. "page",
  25. "regionIsoCode",
  26. "regionName",
  27. "user",
  28. { "name": "added", "type": "long" },
  29. { "name": "deleted", "type": "long" },
  30. { "name": "delta", "type": "long" }
  31. ]
  32. },
  33. "metricsSpec" : [],
  34. "granularitySpec": {
  35. "type": "uniform",
  36. "segmentGranularity": "DAY",
  37. "queryGranularity": "NONE",
  38. "rollup": false
  39. }
  40. },
  41. "tuningConfig": {
  42. "type": "kafka",
  43. "reportParseExceptions": false
  44. },
  45. "ioConfig": {
  46. "topic": "wikipedia",
  47. "inputFormat": {
  48. "type": "json"
  49. },
  50. "replicas": 2,
  51. "taskDuration": "PT10M",
  52. "completionTimeout": "PT20M",
  53. "consumerProperties": {
  54. "bootstrap.servers": "localhost:9092"
  55. }
  56. }
  57. }
  58. }

This will start the supervisor that will in turn spawn some tasks that will start listening for incoming data.

Submit a supervisor directly

To start the service directly, we will need to submit a supervisor spec to the Druid overlord by running the following from the Druid package root:

  1. curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

If the supervisor was successfully created, you will get a response containing the ID of the supervisor; in our case we should see {"id":"wikipedia"}.

For more details about what’s going on here, check out the Druid Kafka indexing service documentation.

You can view the current supervisors and tasks in the Druid Console: http://localhost:8888/unified-console.md#tasks.

Querying your data

After data is sent to the Kafka stream, it is immediately available for querying.

Please follow the query tutorial to run some example queries on the newly loaded data.

Cleanup

To go through any of the other ingestion tutorials, you will need to shut down the cluster and reset the cluster state by removing the contents of the var directory in the Druid home, as the other tutorials will write to the same “wikipedia” datasource.

You should additionally clear out any Kafka state. Do so by shutting down the Kafka broker with CTRL-C before stopping ZooKeeper and the Druid services, and then deleting the Kafka log directory at /tmp/kafka-logs:

  1. rm -rf /tmp/kafka-logs

Further reading

For more information on loading data from Kafka streams, please see the Druid Kafka indexing service documentation.