JSON Format

Format: Serialization Schema Format: Deserialization Schema

The JSON format allows to read and write JSON data based on an JSON schema. Currently, the JSON schema is derived from table schema.

The JSON format supports append-only streams, unless you’re using a connector that explicitly support retract streams and/or upsert streams like the Upsert Kafka connector. If you need to write retract streams and/or upsert streams, we suggest you to look at CDC JSON formats like Debezium JSON and Canal JSON.

Dependencies

In order to use the Json format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-json</artifactId>
  4. <version>1.19.0</version>
  5. </dependency>
Copied to clipboard!
Built-in

How to create a table with JSON format

Here is an example to create a table using Kafka connector and JSON format.

  1. CREATE TABLE user_behavior (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'user_behavior',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'json',
  13. 'json.fail-on-missing-field' = 'false',
  14. 'json.ignore-parse-errors' = 'true'
  15. )

Format Options

OptionRequiredForwardedDefaultTypeDescription
format
requiredno(none)StringSpecify what format to use, here should be ‘json’.
json.fail-on-missing-field
optionalnofalseBooleanWhether to fail if a field is missing or not.
json.ignore-parse-errors
optionalnofalseBooleanSkip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
json.timestamp-format.standard
optionalyes‘SQL’StringSpecify the input and output timestamp format for TIMESTAMP and TIMESTAMP_LTZ type. Currently supported values are ‘SQL’ and ‘ISO-8601’:
  • Option ‘SQL’ will parse input TIMESTAMP values in “yyyy-MM-dd HH:mm:ss.s{precision}” format, e.g “2020-12-30 12:13:14.123”, parse input TIMESTAMP_LTZ values in “yyyy-MM-dd HH:mm:ss.s{precision}’Z’” format, e.g “2020-12-30 12:13:14.123Z” and output timestamp in the same format.
  • Option ‘ISO-8601’will parse input TIMESTAMP in “yyyy-MM-ddTHH:mm:ss.s{precision}” format, e.g “2020-12-30T12:13:14.123” parse input TIMESTAMP_LTZ in “yyyy-MM-ddTHH:mm:ss.s{precision}’Z’” format, e.g “2020-12-30T12:13:14.123Z” and output timestamp in the same format.
json.map-null-key.mode
optionalyes‘FAIL’StringSpecify the handling mode when serializing null keys for map data. Currently supported values are ‘FAIL’, ‘DROP’ and ‘LITERAL’:
  • Option ‘FAIL’ will throw exception when encountering map with null key.
  • Option ‘DROP’ will drop null key entries for map data.
  • Option ‘LITERAL’ will replace null key with string literal. The string literal is defined by json.map-null-key.literal option.
json.map-null-key.literal
optionalyes‘null’StringSpecify string literal to replace null key when ‘json.map-null-key.mode’ is LITERAL.
json.encode.decimal-as-plain-number
optionalyesfalseBooleanEncode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true.
decode.json-parser.enabled
optionaltrueBooleanWhether to use the Jackson JsonParser to decode json. JsonParser is the Jackson JSON streaming API to read JSON data. This is much faster and consumes less memory compared to the previous JsonNode approach. Meanwhile, JsonParser also supports nested projection pushdown when reading data. This option is enabled by default. You can disable and fallback to the previous JsonNode approach when encountering any incompatibility issues.

Data Type Mapping

Currently, the JSON schema is always derived from table schema. Explicitly defining an JSON schema is not supported yet.

Flink JSON format uses jackson databind API to parse and generate JSON string.

The following table lists the type mapping from Flink type to JSON type.

Flink SQL typeJSON type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject