Lookup Joins

Lookup Joins are a type of join in streaming queries. It is used to enrich a table with data that is queried from Paimon. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.

Paimon supports lookup joins on tables with primary keys and append tables in Flink. The following example illustrates this feature.

Prepare

First, let’s create a Paimon table and update it in real-time.

  1. -- Create a paimon catalog
  2. CREATE CATALOG my_catalog WITH (
  3. 'type'='paimon',
  4. 'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
  5. );
  6. USE CATALOG my_catalog;
  7. -- Create a table in paimon catalog
  8. CREATE TABLE customers (
  9. id INT PRIMARY KEY NOT ENFORCED,
  10. name STRING,
  11. country STRING,
  12. zip STRING
  13. );
  14. -- Launch a streaming job to update customers table
  15. INSERT INTO customers ...
  16. -- Create a temporary left table, like from kafka
  17. CREATE TEMPORARY TABLE orders (
  18. order_id INT,
  19. total INT,
  20. customer_id INT,
  21. proc_time AS PROCTIME()
  22. ) WITH (
  23. 'connector' = 'kafka',
  24. 'topic' = '...',
  25. 'properties.bootstrap.servers' = '...',
  26. 'format' = 'csv'
  27. ...
  28. );

Normal Lookup

You can now use customers in a lookup join query.

  1. -- enrich each order with customer information
  2. SELECT o.order_id, o.total, c.country, c.zip
  3. FROM orders AS o
  4. JOIN customers
  5. FOR SYSTEM_TIME AS OF o.proc_time AS c
  6. ON o.customer_id = c.id;

Retry Lookup

If the records of orders (main table) join missing because the corresponding data of customers (lookup table) is not ready. You can consider using Flink’s Delayed Retry Strategy For Lookup. Only for Flink 1.16+.

  1. -- enrich each order with customer information
  2. SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
  3. o.order_id, o.total, c.country, c.zip
  4. FROM orders AS o
  5. JOIN customers
  6. FOR SYSTEM_TIME AS OF o.proc_time AS c
  7. ON o.customer_id = c.id;

Async Retry Lookup

The problem with synchronous retry is that one record will block subsequent records, causing the entire job to be blocked. You can consider using async + allow_unordered to avoid blocking, the records that join missing will no longer block other records.

  1. -- enrich each order with customer information
  2. SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
  3. o.order_id, o.total, c.country, c.zip
  4. FROM orders AS o
  5. JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
  6. FOR SYSTEM_TIME AS OF o.proc_time AS c
  7. ON o.customer_id = c.id;

If the main table (orders) is CDC stream, allow_unordered will be ignored by Flink SQL (only supports append stream), your streaming job may be blocked. You can try to use audit_log system table feature of Paimon to walk around (convert CDC stream to append stream).

Dynamic Partition

In traditional data warehouses, each partition often maintains the latest full data, so this partition table only needs to join the latest partition. Paimon has specifically developed the max_pt feature for this scenario.

Create Paimon Partitioned Table

  1. CREATE TABLE customers (
  2. id INT,
  3. name STRING,
  4. country STRING,
  5. zip STRING,
  6. dt STRING,
  7. PRIMARY KEY (id, dt) NOT ENFORCED
  8. ) PARTITIONED BY (dt);

Lookup Join

  1. SELECT o.order_id, o.total, c.country, c.zip
  2. FROM orders AS o
  3. JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
  4. FOR SYSTEM_TIME AS OF o.proc_time AS c
  5. ON o.customer_id = c.id;

The Lookup node will automatically refresh the latest partition and query the data of the latest partition.

Query Service

You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join will prioritize obtaining data from it, which will effectively improve query performance.

Flink SQL

  1. CALL sys.query_service('database_name.table_name', parallelism);

Flink Action

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. query_service \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. [--parallelism <parallelism>] \
  8. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]