Data Types

In Apache Flink’s Python DataStream API, a data type describes the type of a value in the DataStream ecosystem. It can be used to declare input and output types of operations and informs the system how to serialize elements.

Pickle Serialization

If the type has not been declared, data would be serialized or deserialized using Pickle. For example, the program below specifies no data types.

  1. from pyflink.datastream import StreamExecutionEnvironment
  2. def processing():
  3. env = StreamExecutionEnvironment.get_execution_environment()
  4. env.set_parallelism(1)
  5. env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
  6. .map(lambda record: (record[0]+1, record[1].upper())) \
  7. .print() # note: print to stdout on the worker machine
  8. env.execute()
  9. if __name__ == '__main__':
  10. processing()

However, types need to be specified when:

  • Passing Python records to Java operations.
  • Improve serialization and deserialization performance.

Passing Python records to Java operations

Since Java operators or functions can not identify Python data, types need to be provided to help to convert Python types to Java types for processing. For example, types need to be provided if you want to output data using the FileSink which is implemented in Java.

  1. from pyflink.common.serialization import Encoder
  2. from pyflink.common.typeinfo import Types
  3. from pyflink.datastream import StreamExecutionEnvironment
  4. from pyflink.datastream.connectors.file_system import FileSink
  5. def file_sink():
  6. env = StreamExecutionEnvironment.get_execution_environment()
  7. env.set_parallelism(1)
  8. env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
  9. .map(lambda record: (record[0] + 1, record[1].upper()),
  10. output_type=Types.ROW([Types.INT(), Types.STRING()])) \
  11. .add_sink(FileSink
  12. .for_row_format('/tmp/output', Encoder.simple_string_encoder())
  13. .build())
  14. env.execute()
  15. if __name__ == '__main__':
  16. file_sink()

Improve serialization and deserialization performance

Even though data can be serialized and deserialized through Pickle, performance will be better if types are provided. Explicit types allow PyFlink to use efficient serializers when moving records through the pipeline.

Supported Data Types

You can use pyflink.common.typeinfo.Types to define types in Python DataStream API. The table below shows the types supported now and how to define them:

PyFlink TypePython TypeJava Type
Types.BOOLEAN()booljava.lang.Boolean
Types.BYTE()intjava.lang.Byte
Types.SHORT()intjava.lang.Short
Types.INT()intjava.lang.Integer
Types.LONG()intjava.lang.Long
Types.FLOAT()floatjava.lang.Float
Types.DOUBLE()floatjava.lang.Double
Types.CHAR()strjava.lang.Character
Types.STRING()strjava.lang.String
Types.BIG_INT()intjava.math.BigInteger
Types.BIG_DEC()decimal.Decimaljava.math.BigDecimal
Types.INSTANT()pyflink.common.time.Instantjava.time.Instant
Types.TUPLE()tupleorg.apache.flink.api.java.tuple.Tuple0 ~ org.apache.flink.api.java.tuple.Tuple25
Types.ROW()pyflink.common.Roworg.apache.flink.types.Row
Types.ROW_NAMED()pyflink.common.Roworg.apache.flink.types.Row
Types.MAP()dictjava.util.Map
Types.PICKLED_BYTE_ARRAY()The actual unpickled Python objectbyte[]
Types.SQL_DATE()datetime.datejava.sql.Date
Types.SQL_TIME()datetime.timejava.sql.Time
Types.SQL_TIMESTAMP()datetime.datetimejava.sql.Timestamp
Types.LIST()list of Python objectjava.util.List

The table below shows the array types supported:

PyFlink Array TypePython TypeJava Type
Types.PRIMITIVE_ARRAY(Types.BYTE())bytesbyte[]
Types.PRIMITIVE_ARRAY(Types.BOOLEAN())list of boolboolean[]
Types.PRIMITIVE_ARRAY(Types.SHORT())list of intshort[]
Types.PRIMITIVE_ARRAY(Types.INT())list of intint[]
Types.PRIMITIVE_ARRAY(Types.LONG())list of intlong[]
Types.PRIMITIVE_ARRAY(Types.FLOAT())list of floatfloat[]
Types.PRIMITIVE_ARRAY(Types.DOUBLE())list of floatdouble[]
Types.PRIMITIVE_ARRAY(Types.CHAR())list of strchar[]
Types.BASIC_ARRAY(Types.BYTE())list of intjava.lang.Byte[]
Types.BASIC_ARRAY(Types.BOOLEAN())list of booljava.lang.Boolean[]
Types.BASIC_ARRAY(Types.SHORT())list of intjava.lang.Short[]
Types.BASIC_ARRAY(Types.INT())list of intjava.lang.Integer[]
Types.BASIC_ARRAY(Types.LONG())list of intjava.lang.Long[]
Types.BASIC_ARRAY(Types.FLOAT())list of floatjava.lang.Float[]
Types.BASIC_ARRAY(Types.DOUBLE())list of floatjava.lang.Double[]
Types.BASIC_ARRAY(Types.CHAR())list of strjava.lang.Character[]
Types.BASIC_ARRAY(Types.STRING())list of strjava.lang.String[]
Types.OBJECT_ARRAY()list of Python objectArray