Aggregate data with rollup

Apache Druid® can summarize raw data at ingestion time using a process known as “rollup.” Rollup is a first-level aggregation operation over a selected set of columns that reduces the size of stored data.

This tutorial demonstrates how to apply rollup during ingestion and highlights its effects during query execution. The examples in the tutorial use the multi-stage query (MSQ) task engine to execute SQL statements.

Prerequisites

Before proceeding, download Druid as described in Quickstart (local) and have it running on your local machine. You don’t need to load any data into the Druid cluster.

You should be familiar with data querying in Druid. If you haven’t already, go through the Query data tutorial first.

Load the example data

For this tutorial, you use a small sample of network flow event data representing IP traffic. The data contains packet and byte counts from a source IP address to a destination IP address.

  1. {"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
  2. {"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
  3. {"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}
  4. {"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
  5. {"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}
  6. {"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}
  7. {"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":38,"bytes":6289}
  8. {"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":123,"bytes":93999}
  9. {"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":12,"bytes":2818}

Load the sample dataset using the INSERT INTO statement and the EXTERN function to ingest the data inline. In the Druid web console, go to the Query view and run the following query:

  1. INSERT INTO "rollup_tutorial"
  2. WITH "inline_data" AS (
  3. SELECT *
  4. FROM TABLE(EXTERN('{
  5. "type":"inline",
  6. "data":"{\"timestamp\":\"2018-01-01T01:01:35Z\",\"srcIP\":\"1.1.1.1\",\"dstIP\":\"2.2.2.2\",\"packets\":20,\"bytes\":9024}\n{\"timestamp\":\"2018-01-01T01:02:14Z\",\"srcIP\":\"1.1.1.1\",\"dstIP\":\"2.2.2.2\",\"packets\":38,\"bytes\":6289}\n{\"timestamp\":\"2018-01-01T01:01:59Z\",\"srcIP\":\"1.1.1.1\",\"dstIP\":\"2.2.2.2\",\"packets\":11,\"bytes\":5780}\n{\"timestamp\":\"2018-01-01T01:01:51Z\",\"srcIP\":\"1.1.1.1\",\"dstIP\":\"2.2.2.2\",\"packets\":255,\"bytes\":21133}\n{\"timestamp\":\"2018-01-01T01:02:29Z\",\"srcIP\":\"1.1.1.1\",\"dstIP\":\"2.2.2.2\",\"packets\":377,\"bytes\":359971}\n{\"timestamp\":\"2018-01-01T01:03:29Z\",\"srcIP\":\"1.1.1.1\",\"dstIP\":\"2.2.2.2\",\"packets\":49,\"bytes\":10204}\n{\"timestamp\":\"2018-01-02T21:33:14Z\",\"srcIP\":\"7.7.7.7\",\"dstIP\":\"8.8.8.8\",\"packets\":38,\"bytes\":6289}\n{\"timestamp\":\"2018-01-02T21:33:45Z\",\"srcIP\":\"7.7.7.7\",\"dstIP\":\"8.8.8.8\",\"packets\":123,\"bytes\":93999}\n{\"timestamp\":\"2018-01-02T21:35:45Z\",\"srcIP\":\"7.7.7.7\",\"dstIP\":\"8.8.8.8\",\"packets\":12,\"bytes\":2818}"}',
  7. '{"type":"json"}'))
  8. EXTEND ("timestamp" VARCHAR, "srcIP" VARCHAR, "dstIP" VARCHAR, "packets" BIGINT, "bytes" BIGINT)
  9. )
  10. SELECT
  11. FLOOR(TIME_PARSE("timestamp") TO MINUTE) AS __time,
  12. "srcIP",
  13. "dstIP",
  14. SUM("bytes") AS "bytes",
  15. SUM("packets") AS "packets",
  16. COUNT(*) AS "count"
  17. FROM "inline_data"
  18. GROUP BY 1, 2, 3
  19. PARTITIONED BY DAY

Note the following aspects of the ingestion statement:

  • You transform the timestamp field using the FLOOR function to round timestamps down to the minute.
  • You group by the dimensions timestamp, srcIP, and dstIP.
  • You create the bytes and packets metrics, which are summed from their respective input fields.
  • You also create the count metric that records the number of rows that get rolled-up per each row in the datasource.

With rollup, Druid combines rows with identical timestamp and dimension values after the timestamp truncation. Druid computes and stores the metric values using the specified aggregation function over each set of rolled-up rows.

After the ingestion completes, you can query the data.

Query the example data

In the web console, open a new tab in the Query view. Run the following query to view the ingested data:

  1. SELECT * FROM "rollup_tutorial"

Returns the following:

__timesrcIPdstIPbytescountpackets
2018-01-01T01:01:00.000Z1.1.1.12.2.2.235,9373286
2018-01-01T01:02:00.000Z1.1.1.12.2.2.2366,2602415
2018-01-01T01:03:00.000Z1.1.1.12.2.2.210,204149
2018-01-02T21:33:00.000Z7.7.7.78.8.8.8100,2882161
2018-01-02T21:35:00.000Z7.7.7.78.8.8.82,818112

Notice there are only five rows as opposed to the nine rows in the example data. In the next section, you explore the components of the rolled-up rows.

View rollup in action

Consider the three events in the original input data that occur over the course of minute 2018-01-01T01:01:

  1. {"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
  2. {"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
  3. {"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}

Druid combines the three rows into one during rollup:

__timesrcIPdstIPbytescountpackets
2018-01-01T01:01:00.000Z1.1.1.12.2.2.235,9373286

Before the grouping occurs, the FLOOR(TIME_PARSE("timestamp") TO MINUTE) expression buckets (floors) the timestamp column of the original input by minute.

The input rows are grouped because they have the same values for their dimension columns {timestamp, srcIP, dstIP}. The metric columns calculate the sum aggregation of the grouped rows for packets and bytes. The count metric shows how many rows from the original input data contributed to the final rolled-up row.

Now, consider the two events in the original input data that occur over the course of minute 2018-01-01T01:02:

  1. {"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
  2. {"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}

The rows are grouped into the following during rollup:

__timesrcIPdstIPbytescountpackets
2018-01-01T01:02:00.000Z1.1.1.12.2.2.2366,2602415

In the original input data, only one event occurs over the course of minute 2018-01-01T01:03:

  1. {"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}

Therefore, no rollup takes place:

__timesrcIPdstIPbytescountpackets
2018-01-01T01:03:00.000Z1.1.1.12.2.2.210,204149

Learn more

See the following topics for more information: