Register a custom serializer for your Flink program
If you use a custom type in your Flink program which cannot be serialized by theFlink type serializer, Flink falls back to using the generic Kryoserializer. You may register your own serializer or a serialization system likeGoogle Protobuf or Apache Thrift with Kryo. To do that, simply register the typeclass and the serializer in the ExecutionConfig
of your Flink program.
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
Note that your custom serializer has to extend Kryo’s Serializer class. In thecase of Google Protobuf or Apache Thrift, this has already been done foryou:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
For the above example to work, you need to include the necessary dependencies inyour Maven project file (pom.xml). In the dependency section, add the followingfor Apache Thrift:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-thrift</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.11.0</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
For Google Protobuf you need the following Maven dependency:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.0</version>
</dependency>
Please adjust the versions of both libraries as needed.
Issue with using Kryo’s JavaSerializer
If you register Kryo’s JavaSerializer
for your custom type, you mayencounter ClassNotFoundException
s even though your custom type class isincluded in the submitted user code jar. This is due to a know issue withKryo’s JavaSerializer
, which may incorrectly use the wrong classloader.
In this case, you should use org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
instead to resolve the issue. This is a reimplemented JavaSerializer
in Flinkthat makes sure the user code classloader is used.
Please refer to FLINK-6025for more details.