Legacy Features

As Flink SQL has matured there are some features that have been replaced with more modern and better functioning substitutes. These legacy features remain documented here for those users that have not yet or are unable to, upgrade to the more modern variant.

Temporal Table Function

The temporal table function is the legacy way of defining something akin to a versioned table that can be used in a temporal table join. Please define temporal joins using versioned tables in new queries.

Unlike a versioned table, temporal table functions can only be defined on top of append-only streams — it does not support changelog inputs. Additionally, a temporal table function cannot be defined in pure SQL DDL.

Defining a Temporal Table Function

Temporal table functions can be defined on top of append-only streams using the Table API. The table is registered with one or more key columns, and a time attribute used for versioning.

Suppose we have an append-only table of currency rates that we would like to register as a temporal table function.

  1. SELECT * FROM currency_rates;
  2. update_time currency rate
  3. ============= ========= ====
  4. 09:00:00 Yen 102
  5. 09:00:00 Euro 114
  6. 09:00:00 USD 1
  7. 11:15:00 Euro 119
  8. 11:49:00 Pounds 108

Using the Table API, we can register this stream using currency for the key and update_time as the versioning time attribute.

  1. TemporalTableFunction rates = tEnv
  2. .from("currency_rates").
  3. .createTemporalTableFunction("update_time", "currency");
  4. tEnv.registerFunction("rates", rates);
  1. rates = tEnv
  2. .from("currency_rates").
  3. .createTemporalTableFunction("update_time", "currency")
  4. tEnv.registerFunction("rates", rates)

Temporal Table Function Join

Once defined, a temporal table function is used as a standard table function. Append-only tables (left input/probe side) can join with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes, to retrieve the value for a key as it was at a particular point in time.

Consider an append-only table orders that tracks customers’ orders in different currencies.

  1. SELECT * FROM orders;
  2. order_time amount currency
  3. ========== ====== =========
  4. 10:15 2 Euro
  5. 10:30 1 USD
  6. 10:32 50 Yen
  7. 10:52 3 Euro
  8. 11:04 5 USD

Given these tables, we would like to convert orders to a common currency — USD.

  1. SELECT
  2. SUM(amount * rate) AS amount
  3. FROM
  4. orders,
  5. LATERAL TABLE (rates(order_time))
  6. WHERE
  7. rates.currency = orders.currency
  1. Table result = orders
  2. .joinLateral($("rates(order_time)"), $("orders.currency = rates.currency"))
  3. .select($("(o_amount * r_rate).sum as amount"));
  1. val result = orders
  2. .joinLateral($"rates(order_time)", $"orders.currency = rates.currency")
  3. .select($"(o_amount * r_rate).sum as amount"))