SQL Query

Just like all other tables, Paimon tables can be queried with SELECT statement.

Batch Query

Paimon’s batch read returns all the data in a snapshot of the table. By default, batch reads return the latest snapshot.

Batch Time Travel

Paimon batch reads with time travel can specify a snapshot or a tag and read the corresponding data.

Requires Spark 3.3+.

you can use VERSION AS OF and TIMESTAMP AS OF in query to do time travel:

  1. -- read the snapshot with id 1L (use snapshot id as version)
  2. SELECT * FROM t VERSION AS OF 1;
  3. -- read the snapshot from specified timestamp
  4. SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
  5. -- read the snapshot from specified timestamp in unix seconds
  6. SELECT * FROM t TIMESTAMP AS OF 1678883047;
  7. -- read tag 'my-tag'
  8. SELECT * FROM t VERSION AS OF 'my-tag';
  9. -- read the snapshot from specified watermark. will match the first snapshot after the watermark
  10. SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

If tag’s name is a number and equals to a snapshot id, the VERSION AS OF syntax will consider tag first. For example, if you have a tag named ‘1’ based on snapshot 2, the statement SELECT * FROM t VERSION AS OF '1' actually queries snapshot 2 instead of snapshot 1.

Batch Incremental

Read incremental changes between start snapshot (exclusive) and end snapshot.

For example:

  • ‘5,10’ means changes between snapshot 5 and snapshot 10.
  • ‘TAG1,TAG3’ means changes between TAG1 and TAG3.

By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files. You can also force specifying 'incremental-between-scan-mode'.

Requires Spark 3.2+.

Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function.

you can use paimon_incremental_query in query to extract the incremental data:

  1. -- read the incremental data between snapshot id 12 and snapshot id 20.
  2. SELECT * FROM paimon_incremental_query('tableName', 12, 20);

In Batch SQL, the DELETE records are not allowed to be returned, so records of -D will be dropped. If you want see DELETE records, you can query audit_log table.

Streaming Query

Paimon currently supports Spark 3.3+ for streaming read.

Paimon supports rich scan mode for streaming read. There is a list:

Scan ModeDescription
latest
For streaming sources, continuously reads latest changes without producing a snapshot at the beginning.
latest-full
For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.
from-timestamp
For streaming sources, continuously reads changes starting from timestamp specified by “scan.timestamp-millis”, without producing a snapshot at the beginning.
from-snapshot
For streaming sources, continuously reads changes starting from snapshot specified by “scan.snapshot-id”, without producing a snapshot at the beginning.
from-snapshot-full
For streaming sources, produces from snapshot specified by “scan.snapshot-id” on the table upon first startup, and continuously reads changes.
default
It is equivalent to from-snapshot if “scan.snapshot-id” is specified. It is equivalent to from-timestamp if “timestamp-millis” is specified. Or, It is equivalent to latest-full.

A simple example with default scan mode:

  1. // no any scan-related configs are provided, that will use latest-full scan mode.
  2. val query = spark.readStream
  3. .format("paimon")
  4. .load("/path/to/paimon/source/table")
  5. .writeStream
  6. .format("console")
  7. .start()

Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.

These read limits are supported:

KeyDefaultTypeDescription
read.stream.maxFilesPerTrigger
(none)IntegerThe maximum number of files returned in a single batch.
read.stream.maxBytesPerTrigger
(none)LongThe maximum number of bytes returned in a single batch.
read.stream.maxRowsPerTrigger
(none)LongThe maximum number of rows returned in a single batch.
read.stream.minRowsPerTrigger
(none)LongThe minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.
read.stream.maxTriggerDelayMs
(none)LongThe maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.

Example: One

Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.

  1. // Trigger.AvailableNow()) processes all available data at the start
  2. // of the query in one or multiple batches, then terminates the query.
  3. // That set read.stream.maxBytesPerTrigger to 128M means that each
  4. // batch processes a maximum of 128 MB of data.
  5. val query = spark.readStream
  6. .format("paimon")
  7. .option("read.stream.maxBytesPerTrigger", "134217728")
  8. .load("/path/to/paimon/source/table")
  9. .writeStream
  10. .format("console")
  11. .trigger(Trigger.AvailableNow())
  12. .start()

Example: Two

Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.

  1. // It will not trigger a batch until there are more than 5,000 pieces of data,
  2. // unless the interval between the two batches is more than 300 seconds.
  3. val query = spark.readStream
  4. .format("paimon")
  5. .option("read.stream.minRowsPerTrigger", "5000")
  6. .option("read.stream.maxTriggerDelayMs", "300000")
  7. .load("/path/to/paimon/source/table")
  8. .writeStream
  9. .format("console")
  10. .start()

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:

  • Direct streaming read with the system audit_log table
  • Set read.changelog to true (default is false), then streaming read with table location

Example:

  1. // Option 1
  2. val query1 = spark.readStream
  3. .format("paimon")
  4. .table("`table_name$audit_log`")
  5. .writeStream
  6. .format("console")
  7. .start()
  8. // Option 2
  9. val query2 = spark.readStream
  10. .format("paimon")
  11. .option("read.changelog", "true")
  12. .load("/path/to/paimon/source/table")
  13. .writeStream
  14. .format("console")
  15. .start()
  16. /*
  17. +I 1 Hi
  18. +I 2 Hello
  19. */

Query Optimization

It is highly recommended to specify partition and primary key filters along with the query, which will speed up the data skipping of the query.

The filter functions that can accelerate data skipping are:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon will sort the data by primary key, which speeds up the point queries and range queries. When using a composite primary key, it is best for the query filters to form a leftmost prefix of the primary key for good acceleration.

Suppose that a table has the following specification:

  1. CREATE TABLE orders (
  2. catalog_id BIGINT,
  3. order_id BIGINT,
  4. .....,
  5. ) TBLPROPERTIES (
  6. 'primary-key' = 'catalog_id,order_id'
  7. );

The query obtains a good acceleration by specifying a range filter for the leftmost prefix of the primary key.

  1. SELECT * FROM orders WHERE catalog_id=1025;
  2. SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
  3. SELECT * FROM orders
  4. WHERE catalog_id=1025
  5. AND order_id>2035 AND order_id<6000;

However, the following filter cannot accelerate the query well.

  1. SELECT * FROM orders WHERE order_id=29495;
  2. SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;