Aggregate data with rollup

Apache Druid can summarize raw data at ingestion time using a process we refer to as “rollup”. Rollup is a first-level aggregation operation over a selected set of columns that reduces the size of stored data.

This tutorial will demonstrate the effects of rollup on an example dataset.

For this tutorial, we’ll assume you’ve already downloaded Druid as described in the single-machine quickstart and have it running on your local machine.

It will also be helpful to have finished Load a file and Query data tutorials.

Example data

For this tutorial, we’ll use a small sample of network flow event data, representing packet and byte counts for traffic from a source to a destination IP address that occurred within a particular second.

  1. {"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
  2. {"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
  3. {"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}
  4. {"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
  5. {"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}
  6. {"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}
  7. {"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":38,"bytes":6289}
  8. {"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":123,"bytes":93999}
  9. {"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":12,"bytes":2818}

A file containing this sample input data is located at quickstart/tutorial/rollup-data.json.

We’ll ingest this data using the following ingestion task spec, located at quickstart/tutorial/rollup-index.json.

  1. {
  2. "type" : "index_parallel",
  3. "spec" : {
  4. "dataSchema" : {
  5. "dataSource" : "rollup-tutorial",
  6. "dimensionsSpec" : {
  7. "dimensions" : [
  8. "srcIP",
  9. "dstIP"
  10. ]
  11. },
  12. "timestampSpec": {
  13. "column": "timestamp",
  14. "format": "iso"
  15. },
  16. "metricsSpec" : [
  17. { "type" : "count", "name" : "count" },
  18. { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
  19. { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" }
  20. ],
  21. "granularitySpec" : {
  22. "type" : "uniform",
  23. "segmentGranularity" : "week",
  24. "queryGranularity" : "minute",
  25. "intervals" : ["2018-01-01/2018-01-03"],
  26. "rollup" : true
  27. }
  28. },
  29. "ioConfig" : {
  30. "type" : "index_parallel",
  31. "inputSource" : {
  32. "type" : "local",
  33. "baseDir" : "quickstart/tutorial",
  34. "filter" : "rollup-data.json"
  35. },
  36. "inputFormat" : {
  37. "type" : "json"
  38. },
  39. "appendToExisting" : false
  40. },
  41. "tuningConfig" : {
  42. "type" : "index_parallel",
  43. "partitionsSpec": {
  44. "type": "dynamic"
  45. },
  46. "maxRowsInMemory" : 25000
  47. }
  48. }
  49. }

Rollup has been enabled by setting "rollup" : true in the granularitySpec.

Note that we have srcIP and dstIP defined as dimensions, a longSum metric is defined for the packets and bytes columns, and the queryGranularity has been defined as minute.

We will see how these definitions are used after we load this data.

Load the example data

From the apache-druid-30.0.1 package root, run the following command:

  1. bin/post-index-task --file quickstart/tutorial/rollup-index.json --url http://localhost:8081

After the script completes, we will query the data.

Query the example data

Let’s run bin/dsql and issue a select * from "rollup-tutorial"; query to see what data was ingested.

  1. $ bin/dsql
  2. Welcome to dsql, the command-line client for Druid SQL.
  3. Type "\h" for help.
  4. dsql> select * from "rollup-tutorial";
  5. ┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
  6. __time bytes count dstIP packets srcIP
  7. ├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
  8. 2018-01-01T01:01:00.000Z 35937 3 2.2.2.2 286 1.1.1.1
  9. 2018-01-01T01:02:00.000Z 366260 2 2.2.2.2 415 1.1.1.1
  10. 2018-01-01T01:03:00.000Z 10204 1 2.2.2.2 49 1.1.1.1
  11. 2018-01-02T21:33:00.000Z 100288 2 8.8.8.8 161 7.7.7.7
  12. 2018-01-02T21:35:00.000Z 2818 1 8.8.8.8 12 7.7.7.7
  13. └──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘
  14. Retrieved 5 rows in 1.18s.
  15. dsql>

Let’s look at the three events in the original input data that occurred during 2018-01-01T01:01:

  1. {"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
  2. {"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
  3. {"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}

These three rows have been “rolled up” into the following row:

  1. ┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
  2. __time bytes count dstIP packets srcIP
  3. ├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
  4. 2018-01-01T01:01:00.000Z 35937 3 2.2.2.2 286 1.1.1.1
  5. └──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘

The input rows have been grouped by the timestamp and dimension columns {timestamp, srcIP, dstIP} with sum aggregations on the metric columns packets and bytes.

Before the grouping occurs, the timestamps of the original input data are bucketed/floored by minute, due to the "queryGranularity":"minute" setting in the ingestion spec.

Likewise, these two events that occurred during 2018-01-01T01:02 have been rolled up:

  1. {"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
  2. {"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}
  1. ┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
  2. __time bytes count dstIP packets srcIP
  3. ├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
  4. 2018-01-01T01:02:00.000Z 366260 2 2.2.2.2 415 1.1.1.1
  5. └──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘

For the last event recording traffic between 1.1.1.1 and 2.2.2.2, no rollup took place, because this was the only event that occurred during 2018-01-01T01:03:

  1. {"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}
  1. ┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
  2. __time bytes count dstIP packets srcIP
  3. ├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
  4. 2018-01-01T01:03:00.000Z 10204 1 2.2.2.2 49 1.1.1.1
  5. └──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘

Note that the count metric shows how many rows in the original input data contributed to the final “rolled up” row.