JSON Format
Format: Serialization Schema Format: Deserialization Schema
The JSON format allows to read and write JSON data based on an JSON schema. Currently, the JSON schema is derived from table schema.
Dependencies
In order to use the Json format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client JAR |
---|---|
flink-json | Built-in |
How to create a table with JSON format
Here is an example to create a table using Kafka connector and JSON format.
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
Format Options
Option | Required | Default | Type | Description |
---|---|---|---|---|
format | required | (none) | String | Specify what format to use, here should be ‘json’ . |
json.fail-on-missing-field | optional | false | Boolean | Whether to fail if a field is missing or not. |
json.ignore-parse-errors | optional | false | Boolean | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
json.timestamp-format.standard | optional | ‘SQL’ | String | Specify the input and output timestamp format for TIMESTAMP and TIMESTAMP WITH LOCAL TIME ZONE type. Currently supported values are ‘SQL’ and ‘ISO-8601’ :
|
json.map-null-key.mode | optional | ‘FAIL’ | String | Specify the handling mode when serializing null keys for map data. Currently supported values are ‘FAIL’ , ‘DROP’ and ‘LITERAL’ :
|
json.map-null-key.literal | optional | ‘null’ | String | Specify string literal to replace null key when ‘json.map-null-key.mode’ is LITERAL. |
Data Type Mapping
Currently, the JSON schema is always derived from table schema. Explicitly defining an JSON schema is not supported yet.
Flink JSON format uses jackson databind API to parse and generate JSON string.
The following table lists the type mapping from Flink type to JSON type.
Flink SQL type | JSON type |
---|---|
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |