数据类型

在 Apache Flink 的 Python DataStream API 中,一种数据类型描述 DataStream 生态系统中数据的类型。 数据类型可用于声明算子输入和输出的类型,并告知系统如何对数据进行序列化。

Pickle 序列化

如果类型没有被定义,数据将使用 Pickle 进行序列化和反序列化。 例如,以下程序没有指定数据类型。

  1. from pyflink.datastream import StreamExecutionEnvironment
  2. def processing():
  3. env = StreamExecutionEnvironment.get_execution_environment()
  4. env.set_parallelism(1)
  5. env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
  6. .map(lambda record: (record[0]+1, record[1].upper())) \
  7. .print() # note: print to stdout on the worker machine
  8. env.execute()
  9. if __name__ == '__main__':
  10. processing()

但是,在下列情况下需要指定类型:

  • 将 Python 数据发送给 Java。
  • 提高序列化和反序列化的性能。

发送 Python 数据给 Java

由于 Java 算子或函数不能识别 Python 数据,因此需要提供数据类型来将 Python 类型转换为 Java 类型以进行处理。 例如,如果你想要使用 Java 实现的 StreamingFileSink 输出数据,则需要提供数据类型。

  1. from pyflink.common.serialization import SimpleStringEncoder
  2. from pyflink.common.typeinfo import Types
  3. from pyflink.datastream import StreamExecutionEnvironment
  4. from pyflink.datastream.connectors import StreamingFileSink
  5. def streaming_file_sink():
  6. env = StreamExecutionEnvironment.get_execution_environment()
  7. env.set_parallelism(1)
  8. env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
  9. .map(lambda record: (record[0]+1, record[1].upper()),
  10. result_type=Types.ROW([Types.INT(), Types.STRING()])) \
  11. .add_sink(StreamingFileSink
  12. .for_row_format('/tmp/output', SimpleStringEncoder())
  13. .build())
  14. env.execute()
  15. if __name__ == '__main__':
  16. streaming_file_sink()

提高序列化和反序列化的性能

尽管可以通过 Pickle 序列化和反序列化数据,但是如果提供了确定的类型,性能会更好。 当在 pipeline 中传递数据时,显式类型允许 PyFlink 使用更高效的序列化器。

支持的数据类型

你可以使用 pyflink.common.typeinfo.Types 在 Python DataStream API 中指定类型. 下面列出了现在支持的类型以及如何定义它们:

PyFlink 类型使用对应 Python 类型
BOOLEANTypes.BOOLEAN()bool
SHORTTypes.SHORT()int
INTTypes.INT()int
LONGTypes.LONG()int
FLOATTypes.FLOAT()float
DOUBLETypes.DOUBLE()float
CHARTypes.CHAR()str
BIG_INTTypes.BIG_INT()bytes
BIG_DECTypes.BIG_DEC()decimal.Decimal
STRINGTypes.STRING()str
BYTETypes.BYTE()int
TUPLETypes.TUPLE()tuple
PRIMITIVE_ARRAYTypes.PRIMITIVE_ARRAY()list
ROWTypes.ROW()dict