Ogg Format

Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema

Oracle GoldenGate (a.k.a ogg) is a managed service providing a real-time data mesh platform, which uses replication to keep data highly available, and enabling real-time analysis. Customers can design, execute, and monitor their data replication and stream data processing solutions without the need to allocate or manage compute environments. Ogg provides a format schema for changelog and supports to serialize messages using JSON.

Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is useful in many cases to leverage this feature, such as

  • synchronizing incremental data from databases to other systems
  • auditing logs
  • real-time materialized views on databases
  • temporal join changing history of a database table and so on.

Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.

Dependencies

Ogg Json

In order to use the Ogg the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-json</artifactId>
  4. <version>1.19.0</version>
  5. </dependency>
Copied to clipboard!
Built-in

Note: please refer to Ogg Kafka Handler documentation about how to set up an Ogg Kafka handler to synchronize changelog to Kafka topics.

How to use Ogg format

Ogg provides a unified format for changelog, here is a simple example for an update operation captured from an Oracle PRODUCTS table in JSON format:

  1. {
  2. "before": {
  3. "id": 111,
  4. "name": "scooter",
  5. "description": "Big 2-wheel scooter",
  6. "weight": 5.18
  7. },
  8. "after": {
  9. "id": 111,
  10. "name": "scooter",
  11. "description": "Big 2-wheel scooter",
  12. "weight": 5.15
  13. },
  14. "op_type": "U",
  15. "op_ts": "2020-05-13 15:40:06.000000",
  16. "current_ts": "2020-05-13 15:40:07.000000",
  17. "primary_keys": [
  18. "id"
  19. ],
  20. "pos": "00000000000000000000143",
  21. "table": "PRODUCTS"
  22. }

Note: please refer to Debezium documentation about the meaning of each field.

The Oracle PRODUCTS table has 4 columns (id, name, description and weight). The above JSON message is an update change event on the PRODUCTS table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. Assuming this messages is synchronized to Kafka topic products_ogg, then we can use the following DDL to consume this topic and interpret the change events.

  1. CREATE TABLE topic_products (
  2. -- schema is totally the same to the Oracle "products" table
  3. id BIGINT,
  4. name STRING,
  5. description STRING,
  6. weight DECIMAL(10, 2)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'products_ogg',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'ogg-json'
  13. )

After registering the topic as a Flink table, then you can consume the Ogg messages as a changelog source.

  1. -- a real-time materialized view on the Oracle "PRODUCTS"
  2. -- which calculate the latest average of weight for the same products
  3. SELECT name, AVG(weight)
  4. FROM topic_products
  5. GROUP BY name;
  6. -- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to
  7. -- Elasticsearch "products" index for future searching
  8. INSERT INTO elasticsearch_products
  9. SELECT *
  10. FROM topic_products;

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Attention Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.

KeyData TypeDescription
tableSTRING NULLContains fully qualified table name. The format of the fully qualified table name is: CATALOG NAME.SCHEMA NAME.TABLE NAME
primary-keysARRAY<STRING> NULLAn array variable holding the column names of the primary keys of the source table. The primary-keys field is only include in the JSON output if the includePrimaryKeys configuration property is set to true.
ingestion-timestampTIMESTAMP_LTZ(6) NULLThe timestamp at which the connector processed the event. Corresponds to the current_ts field in the Ogg record.
event-timestampTIMESTAMP_LTZ(6) NULLThe timestamp at which the source system created the event. Corresponds to the op_ts field in the Ogg record.

The following example shows how to access Ogg metadata fields in Kafka:

  1. CREATE TABLE KafkaTable (
  2. origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  3. event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
  4. origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  5. primary_keys ARRAY<STRING> METADATA FROM 'value.primary-keys' VIRTUAL,
  6. user_id BIGINT,
  7. item_id BIGINT,
  8. behavior STRING
  9. ) WITH (
  10. 'connector' = 'kafka',
  11. 'topic' = 'user_behavior',
  12. 'properties.bootstrap.servers' = 'localhost:9092',
  13. 'properties.group.id' = 'testGroup',
  14. 'scan.startup.mode' = 'earliest-offset',
  15. 'value.format' = 'ogg-json'
  16. );

Format Options

OptionRequiredDefaultTypeDescription
format
required(none)StringSpecify what format to use, here should be ‘ogg-json’.
ogg-json.ignore-parse-errors
optionalfalseBooleanSkip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
ogg-json.timestamp-format.standard
optional‘SQL’StringSpecify the input and output timestamp format. Currently supported values are ‘SQL’ and ‘ISO-8601’:
  • Option ‘SQL’ will parse input timestamp in “yyyy-MM-dd HH:mm:ss.s{precision}” format, e.g ‘2020-12-30 12:13:14.123’ and output timestamp in the same format.
  • Option ‘ISO-8601’will parse input timestamp in “yyyy-MM-ddTHH:mm:ss.s{precision}” format, e.g ‘2020-12-30T12:13:14.123’ and output timestamp in the same format.
ogg-json.map-null-key.mode
optional‘FAIL’StringSpecify the handling mode when serializing null keys for map data. Currently supported values are ‘FAIL’, ‘DROP’ and ‘LITERAL’:
  • Option ‘FAIL’ will throw exception when encountering map with null key.
  • Option ‘DROP’ will drop null key entries for map data.
  • Option ‘LITERAL’ will replace null key with string literal. The string literal is defined by ogg-json.map-null-key.literal option.
ogg-json.map-null-key.literal
optional‘null’StringSpecify string literal to replace null key when ‘ogg-json.map-null-key.mode’ is LITERAL.

Data Type Mapping

Currently, the Ogg format uses JSON format for serialization and deserialization. Please refer to JSON Format documentation for more details about the data type mapping.