JSON Format
Format: Serialization Schema Format: Deserialization Schema
JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。
依赖
In order to use the Json format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
Copied to clipboard! | Built-in |
如何创建一张基于 JSON Format 的表
以下是一个利用 Kafka 以及 JSON Format 构建表的例子。
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
Format 参数
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为‘json’ 。 |
json.fail-on-missing-field | 可选 | false | Boolean | 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。 |
json.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 。 |
json.timestamp-format.standard | 可选 | ‘SQL’ | String | 声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为‘SQL’ 以及 ‘ISO-8601’ :
|
json.map-null-key.mode | 选填 | ‘FAIL’ | String | 指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’ , ‘DROP’ 和 ‘LITERAL’ :
|
json.map-null-key.literal | 选填 | ‘null’ | String | 当 ‘json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
json.encode.decimal-as-plain-number | 选填 | false | Boolean | 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8 。当此选项设为 true 时,则会表示为 0.000000027 。 |
decode.json-parser.enabled | 选填 | true | Boolean | JsonParser 是 Jackson 提供的流式读取 JSON 数据的 API。与 JsonNode 方式相比,这种方式读取速度更快,内存消耗更少。同时,JsonParser 在读取数据时还支持嵌套字段的投影下推。该参数默认启用。如果遇到任何不兼容性问题,可以禁用并回退到 JsonNode 方式。 |
数据类型映射关系
当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 类型 | JSON 类型 |
---|---|
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |