Json format
To use the JSON format you need to add the Flink JSON dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.20.0</version>
<scope>provided</scope>
</dependency>
For PyFlink users, you could use it directly in your jobs.
Flink supports reading/writing JSON records via the JsonSerializationSchema/JsonDeserializationSchema
. These utilize the Jackson library, and support any type that is supported by Jackson, including, but not limited to, POJO
s and ObjectNode
.
The JsonDeserializationSchema
can be used with any connector that supports the DeserializationSchema
.
For example, this is how you use it with a KafkaSource
to deserialize a POJO
:
JsonDeserializationSchema<SomePojo> jsonFormat = new JsonDeserializationSchema<>(SomePojo.class);
KafkaSource<SomePojo> source =
KafkaSource.<SomePojo>builder()
.setValueOnlyDeserializer(jsonFormat)
...
The JsonSerializationSchema
can be used with any connector that supports the SerializationSchema
.
For example, this is how you use it with a KafkaSink
to serialize a POJO
:
JsonSerializationSchema<SomePojo> jsonFormat = new JsonSerializationSchema<>();
KafkaSink<SomePojo> source =
KafkaSink.<SomePojo>builder()
.setRecordSerializer(
new KafkaRecordSerializationSchemaBuilder<>()
.setValueSerializationSchema(jsonFormat)
...
Custom Mapper
Both schemas have constructors that accept a SerializableSupplier<ObjectMapper>
, acting a factory for object mappers. With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
JsonSerializationSchema<SomeClass> jsonFormat = new JsonSerializationSchema<>(
() -> new ObjectMapper()
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
.registerModule(new ParameterNamesModule());
Python
In PyFlink, JsonRowSerializationSchema
and JsonRowDeserializationSchema
are built-in support for Row
type. Here are examples on how to use it in KafkaSource
and KafkaSink
:
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
source = KafkaSource.builder() \
.set_value_only_deserializer(json_format) \
.build()
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
json_format = JsonRowSerializationSchema.builder().with_type_info(row_type_info).build()
sink = KafkaSink.builder() \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic('test')
.set_value_serialization_schema(json_format)
.build()
) \
.build()