Query for latest values

This tutorial describes strategies in Apache Druid for use cases that might be handled by UPSERT in other databases. You can use the LATEST_BY aggregation at query time or “deltas” for numeric dimensions at insert time.

The Update data tutorial demonstrates how to use batch operations to update data according to the timestamp, including UPSERT cases. However, with streaming data, you can potentially use LATEST_BY or deltas to satisfy requirements otherwise handled with updates.

Prerequisites

Before you follow the steps in this tutorial, download Druid as described in the Local quickstart and have it running on your local machine. You don’t need to load any data into the Druid cluster.

You should be familiar with data querying in Druid. If you haven’t already, go through the Query data tutorial first.

Use LATEST_BY to retrieve updated values

Sometimes, you want to read the latest value of one dimension or measure in relation to another dimension. In a transactional database, you might maintain dimensions or measures using UPSERT, but in Druid you can append all updates or changes during ingestion. The LATEST_BY function lets you get the most recent value for the dimension with the following type of query:

  1. SELECT dimension,
  2. LATEST_BY(changed_dimension, updated_timestamp)
  3. FROM my_table
  4. GROUP BY 1

In this example update_timestamp represents the reference timestamp to use to evaluate the “latest” value. This could be __time or another timestamp.

For example, consider the following table of events that log the total number of points for a user:

__timeuser_idpoints
2024-01-01T01:00:00.000Zfunny_bunny110
2024-01-01T01:05:00.000Zfunny_bunny130
2024-01-01T02:00:00.000Zfunny_bunny135
2024-01-01T02:00:00.000Zsilly_monkey230
2024-01-01T02:05:00.000Zsilly_monkey255
2024-01-01T03:00:00.000Zfunny_bunny140

Insert sample data

In the Druid web console, navigate to the Query view and run the following query to insert sample data:

  1. REPLACE INTO "latest_by_tutorial1" OVERWRITE ALL
  2. WITH "ext" AS (
  3. SELECT *
  4. FROM TABLE(
  5. EXTERN(
  6. '{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":30}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":35}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":55}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":40}"}',
  7. '{"type":"json"}'
  8. )
  9. ) EXTEND ("timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
  10. )
  11. SELECT
  12. TIME_PARSE("timestamp") AS "__time",
  13. "user_id",
  14. "points"
  15. FROM "ext"
  16. PARTITIONED BY DAY

Run the following query to retrieve the most recent points value for each user_id:

  1. SELECT user_id,
  2. LATEST_BY("points", "__time") AS latest_points
  3. FROM latest_by_tutorial1
  4. GROUP BY 1

The results are as follows:

user_idtotal_points
silly_monkey255
funny_bunny140

In the example, the values increase each time, but this method works even if the values fluctuate.

You can use this query shape as a subquery for additional processing. However, if there are many values for user_id, the query can be expensive.

If you want to track the latest value at different times within a larger granularity time frame, you need an additional timestamp to record update times. This allows Druid to track the latest version. Consider the following data that represents points for various users updated within an hour time frame. __time is hour granularity, but updated_timestamp is minute granularity:

__timeupdated_timestampuser_idpoints
2024-01-01T01:00:00.000Z2024-01-01T01:00:00.000Zfunny_bunny110
2024-01-01T01:00:00.000Z2024-01-01T01:05:00.000Zfunny_bunny130
2024-01-01T02:00:00.000Z2024-01-01T02:00:00.000Zfunny_bunny135
2024-01-01T02:00:00.000Z2024-01-01T02:00:00.000Zsilly_monkey230
2024-01-01T02:00:00.000Z2024-01-01T02:05:00.000Zsilly_monkey255
2024-01-01T03:00:00.000Z2024-01-01T03:00:00.000Zfunny_bunny140

Insert sample data

Open a new tab in the Query view and run the following query to insert sample data:

  1. REPLACE INTO "latest_by_tutorial2" OVERWRITE ALL
  2. WITH "ext" AS (
  3. SELECT *
  4. FROM TABLE(
  5. EXTERN(
  6. '{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"updated_timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"updated_timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":30}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":35}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":55}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"updated_timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":40}"}',
  7. '{"type":"json"}'
  8. )
  9. ) EXTEND ("timestamp" VARCHAR, "updated_timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
  10. )
  11. SELECT
  12. TIME_PARSE("timestamp") AS "__time",
  13. "updated_timestamp",
  14. "user_id",
  15. "points"
  16. FROM "ext"
  17. PARTITIONED BY DAY

Run the following query to retrieve the latest points value by user for each hour:

  1. SELECT FLOOR("__time" TO HOUR) AS "hour_time",
  2. "user_id",
  3. LATEST_BY("points", TIME_PARSE(updated_timestamp)) AS "latest_points_hour"
  4. FROM latest_by_tutorial2
  5. GROUP BY 1,2

The results are as follows:

hour_timeuser_idlatest_points_hour
2024-01-01T01:00:00.000Zfunny_bunny120
2024-01-01T02:00:00.000Zfunny_bunny15
2024-01-01T02:00:00.000Zsilly_monkey225
2024-01-01T03:00:00.000Zfunny_bunny110

LATEST_BY is an aggregation function. While it’s very efficient when there are not many update rows matching a dimension, such as user_id, it scans all matching rows with the same dimension. For dimensions with numerous updates, such as when a user plays a game a million times, and the updates don’t arrive in a timely order, Druid processes all rows matching the user_id to find the row with the max timestamp to provide the latest data.

For instance, if updates constitute 1-5 percent of your data, you’ll get good query performance. If updates constitute 50 percent or more of your data, your queries will be slow.

To mitigate this, you can set up a periodic batch ingestion job that re-indexes modified data into a new datasource for direct querying without grouping to reduce the cost of these queries by pre-computing and storing the latest values. Note that your view of the latest data will not be up to date until the next refresh happens.

Alternatively, you can perform ingestion-time aggregation using LATEST_BY and append updates with streaming ingestion into a rolled up datasource. Appending into a time chunk adds new segments and does not perfectly roll up data, so rows may be partial rather than complete rollups, and you may have multiple partially rolled up rows. In this case, you still need to use the GROUP BY query for correct querying of the rolled up data source. You can tune automatic compaction to significantly reduce the number of stale rows and improve your performance.

Use delta values and aggregation for updated values

Instead of appending the latest total value in your events, you can log the change in value with each event and use the aggregator you usually use. This method may allow you to avoid a level of aggregation and grouping in your queries.

For most applications, you can send the event data directly to Druid without pre-processing. For example, when sending impression counts to Druid, don’t send the total impression count since yesterday, send just the recent impression count. You can then aggregate the total in Druid during query. Druid is optimized for adding up a lot of rows, so this might be counterintuitive to people who are familiar with batching or pre-aggregating data.

For example, consider a datasource with a measure column y that you aggregate with SUM, grouped by another dimension x. If you want to update the value of y from 3 to 2, then insert -1 for y. This way the aggregation SUM(y) is correct for any queries grouped by x. This may offer a significant performance advantage but the trade off is that the aggregation has to always be a SUM.

In other cases, the updates to the data may already be deltas to the original, and so the data engineering required to append the updates would be simple. The same performance impact mitigation applies as in the previous example: use rollup at ingestion time combined with ongoing automatic compaction.

For example, consider the following table of events that logs the number of points gained or lost for a user during a period of time:

__timeuser_iddelta
2024-01-01T01:00:00.000Zfunny_bunny110
2024-01-01T01:05:00.000Zfunny_bunny110
2024-01-01T02:00:00.000Zfunny_bunny15
2024-01-01T02:00:00.000Zsilly_monkey230
2024-01-01T02:05:00.000Zsilly_monkey2-5
2024-01-01T03:00:00.000Zfunny_bunny110

Insert sample data

Open a new tab in the Query view and run the following query to insert sample data:

  1. REPLACE INTO "delta_tutorial" OVERWRITE ALL
  2. WITH "ext" AS (
  3. SELECT *
  4. FROM TABLE(
  5. EXTERN(
  6. '{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":5}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":-5}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}"}',
  7. '{"type":"json"}'
  8. )
  9. ) EXTEND ("timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
  10. )
  11. SELECT
  12. TIME_PARSE("timestamp") AS "__time",
  13. "user_id",
  14. "points" AS "delta"
  15. FROM "ext"
  16. PARTITIONED BY DAY

The following query returns the same points per hour as the second LATEST_BY example:

  1. SELECT FLOOR("__time" TO HOUR) as "hour_time",
  2. "user_id",
  3. SUM("delta") AS "latest_points_hour"
  4. FROM "delta_tutorial"
  5. GROUP BY 1,2

Learn more

See the following topics for more information: