Intro to the Python DataStream API

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal).

Python DataStream API is a Python version of DataStream API which allows Python users could write Python DatStream API jobs.

Common Structure of Python DataStream API Programs

The following code example shows the common structure of Python DataStream API programs.

  1. from pyflink.common import WatermarkStrategy, Row
  2. from pyflink.common.serialization import Encoder
  3. from pyflink.common.typeinfo import Types
  4. from pyflink.datastream import StreamExecutionEnvironment
  5. from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
  6. from pyflink.datastream.connectors.number_seq import NumberSequenceSource
  7. from pyflink.datastream.functions import RuntimeContext, MapFunction
  8. from pyflink.datastream.state import ValueStateDescriptor
  9. class MyMapFunction(MapFunction):
  10. def open(self, runtime_context: RuntimeContext):
  11. state_desc = ValueStateDescriptor('cnt', Types.PICKLED_BYTE_ARRAY())
  12. self.cnt_state = runtime_context.get_state(state_desc)
  13. def map(self, value):
  14. cnt = self.cnt_state.value()
  15. if cnt is None or cnt < 2:
  16. self.cnt_state.update(1 if cnt is None else cnt + 1)
  17. return value[0], value[1] + 1
  18. else:
  19. return value[0], value[1]
  20. def state_access_demo():
  21. # 1. create a StreamExecutionEnvironment
  22. env = StreamExecutionEnvironment.get_execution_environment()
  23. # 2. create source DataStream
  24. seq_num_source = NumberSequenceSource(1, 10000)
  25. ds = env.from_source(
  26. source=seq_num_source,
  27. watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
  28. source_name='seq_num_source',
  29. type_info=Types.LONG())
  30. # 3. define the execution logic
  31. ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
  32. .key_by(lambda a: a[0]) \
  33. .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))
  34. # 4. create sink and emit result to sink
  35. output_path = '/opt/output/'
  36. file_sink = FileSink \
  37. .for_row_format(output_path, Encoder.simple_string_encoder()) \
  38. .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
  39. .build()
  40. ds.sink_to(file_sink)
  41. # 5. execute the job
  42. env.execute('state_access_demo')
  43. if __name__ == '__main__':
  44. state_access_demo()

Create a StreamExecutionEnvironment

The StreamExecutionEnvironment is a central concept of the DataStream API program. The following code example shows how to create a StreamExecutionEnvironment:

  1. from pyflink.datastream import StreamExecutionEnvironment
  2. env = StreamExecutionEnvironment.get_execution_environment()

Create a DataStream

The DataStream API gets its name from the special DataStream class that is used to represent a collection of data in a Flink program. You can think of them as immutable collections of data that can contain duplicates. This data can either be finite or unbounded, the API that you use to work on them is the same.

A DataStream is similar to a regular Python Collection in terms of usage but is quite different in some key ways. They are immutable, meaning that once they are created you cannot add or remove elements. You can also not simply inspect the elements inside but only work on them using the DataStream API operations, which are also called transformations.

You can create an initial DataStream by adding a source in a Flink program. Then you can derive new streams from this and combine them by using API methods such as map, filter, and so on.

Create from a list object

You can create a DataStream from a list object:

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. env = StreamExecutionEnvironment.get_execution_environment()
  4. ds = env.from_collection(
  5. collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
  6. type_info=Types.ROW([Types.INT(), Types.STRING()]))

The parameter type_info is optional, if not specified, the output type of the returned DataStream will be Types.PICKLED_BYTE_ARRAY().

Create using DataStream connectors

You can also create a DataStream using DataStream connectors with method add_source as following:

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
  4. from pyflink.datastream.formats.json import JsonRowDeserializationSchema
  5. env = StreamExecutionEnvironment.get_execution_environment()
  6. # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
  7. env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
  8. deserialization_schema = JsonRowDeserializationSchema.builder() \
  9. .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
  10. kafka_consumer = FlinkKafkaConsumer(
  11. topics='test_source_topic',
  12. deserialization_schema=deserialization_schema,
  13. properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
  14. ds = env.add_source(kafka_consumer)

Note It currently only supports FlinkKafkaConsumer to be used as DataStream source connectors with method add_source.

Note The DataStream created using add_source could only be executed in streaming executing mode.

You could also call the from_source method to create a DataStream using unified DataStream source connectors:

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.common.watermark_strategy import WatermarkStrategy
  3. from pyflink.datastream import StreamExecutionEnvironment
  4. from pyflink.datastream.connectors.number_seq import NumberSequenceSource
  5. env = StreamExecutionEnvironment.get_execution_environment()
  6. seq_num_source = NumberSequenceSource(1, 1000)
  7. ds = env.from_source(
  8. source=seq_num_source,
  9. watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
  10. source_name='seq_num_source',
  11. type_info=Types.LONG())

Note Currently, it only supports NumberSequenceSource and FileSource as unified DataStream source connectors.

Note The DataStream created using from_source could be executed in both batch and streaming executing mode.

Create using Table & SQL connectors

Table & SQL connectors could also be used to create a DataStream. You could firstly create a Table using Table & SQL connectors and then convert it to a DataStream.

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. from pyflink.table import StreamTableEnvironment
  4. env = StreamExecutionEnvironment.get_execution_environment()
  5. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  6. t_env.execute_sql("""
  7. CREATE TABLE my_source (
  8. a INT,
  9. b VARCHAR
  10. ) WITH (
  11. 'connector' = 'datagen',
  12. 'number-of-rows' = '10'
  13. )
  14. """)
  15. ds = t_env.to_append_stream(
  16. t_env.from_path('my_source'),
  17. Types.ROW([Types.INT(), Types.STRING()]))

Note The StreamExecutionEnvironment env should be specified when creating the TableEnvironment t_env.

Note As all the Java Table & SQL connectors could be used in PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API.

DataStream Transformations

Operators transform one or more DataStream into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

The following example shows a simple example about how to convert a DataStream into another DataStream using map transformation:

  1. ds = ds.map(lambda a: a + 1)

Please see operators for an overview of the available DataStream transformations.

Conversion between DataStream and Table

It also supports to convert a DataStream to a Table and vice verse.

  1. # convert a DataStream to a Table
  2. table = t_env.from_data_stream(ds, 'a, b, c')
  3. # convert a Table to a DataStream
  4. ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
  5. # or
  6. ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()]))

Emit Results

Print

You can call the print method to print the data of a DataStream to the standard output:

  1. ds.print()

Collect results to client

You can call the execute_and_collect method to collect the data of a DataStream to client:

  1. with ds.execute_and_collect() as results:
  2. for result in results:
  3. print(result)

Note The method execute_and_collect will collect the data of the DataStream to the memory of the client and so it’s a good practice to limit the number of rows collected.

Emit results to a DataStream sink connector

You can call the add_sink method to emit the data of a DataStream to a DataStream sink connector:

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
  3. from pyflink.datastream.formats.json import JsonRowSerializationSchema
  4. serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
  5. type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
  6. kafka_producer = FlinkKafkaProducer(
  7. topic='test_sink_topic',
  8. serialization_schema=serialization_schema,
  9. producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
  10. ds.add_sink(kafka_producer)

Note It currently only supports FlinkKafkaProducer and JdbcSink to be used as DataStream sink connectors with method add_sink.

Note The method add_sink could only be used in streaming executing mode.

You could also call the sink_to method to emit the data of a DataStream to a unified DataStream sink connector:

  1. from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
  2. from pyflink.common.serialization import Encoder
  3. output_path = '/opt/output/'
  4. file_sink = FileSink \
  5. .for_row_format(output_path, Encoder.simple_string_encoder()) \
  6. .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
  7. .build()
  8. ds.sink_to(file_sink)

Note It currently only supports FileSink as unified DataStream sink connectors.

Note The method sink_to could be used in both batch and streaming executing mode.

Emit results to a Table & SQL sink connector

Table & SQL connectors could also be used to write out a DataStream. You need firstly convert a DataStream to a Table and then write it to a Table & SQL sink connector.

  1. from pyflink.common import Row
  2. from pyflink.common.typeinfo import Types
  3. from pyflink.datastream import StreamExecutionEnvironment
  4. from pyflink.table import StreamTableEnvironment
  5. env = StreamExecutionEnvironment.get_execution_environment()
  6. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  7. # option 1:the result type of ds is Types.ROW
  8. def split(s):
  9. splits = s[1].split("|")
  10. for sp in splits:
  11. yield Row(s[0], sp)
  12. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  13. .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
  14. .key_by(lambda i: i[1]) \
  15. .reduce(lambda i, j: Row(i[0] + j[0], i[1]))
  16. # option 1:the result type of ds is Types.TUPLE
  17. def split(s):
  18. splits = s[1].split("|")
  19. for sp in splits:
  20. yield s[0], sp
  21. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  22. .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
  23. .key_by(lambda i: i[1]) \
  24. .reduce(lambda i, j: (i[0] + j[0], i[1]))
  25. # emit ds to print sink
  26. t_env.execute_sql("""
  27. CREATE TABLE my_sink (
  28. a INT,
  29. b VARCHAR
  30. ) WITH (
  31. 'connector' = 'print'
  32. )
  33. """)
  34. table = t_env.from_data_stream(ds)
  35. table_result = table.execute_insert("my_sink")

Note The output type of DataStream ds must be composite type.

Submit Job

Finally, you should call the StreamExecutionEnvironment.execute method to submit the DataStream API job for execution:

  1. env.execute()

If you convert the DataStream to a Table and then write it to a Table API & SQL sink connector, it may happen that you need to submit the job using TableEnvironment.execute method.

  1. t_env.execute()