PyFlink Table 和 Pandas DataFrame 互转
It supports to convert between PyFlink Table and Pandas DataFrame.
Convert Pandas DataFrame to PyFlink Table
It supports creating a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint and provides the exactly once guarantees.
The following example shows how to create a PyFlink Table from a Pandas DataFrame:
import pandas as pd
import numpy as np
# Create a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000, 2))
# Create a PyFlink Table from a Pandas DataFrame
table = t_env.from_pandas(pdf)
# Create a PyFlink Table from a Pandas DataFrame with the specified column names
table = t_env.from_pandas(pdf, ['f0', 'f1'])
# Create a PyFlink Table from a Pandas DataFrame with the specified column types
table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
# Create a PyFlink Table from a Pandas DataFrame with the specified row type
table = t_env.from_pandas(pdf,
DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
DataTypes.FIELD("f1", DataTypes.DOUBLE())])
Convert PyFlink Table to Pandas DataFrame
It also supports converting a PyFlink Table to a Pandas DataFrame. Internally, it will materialize the results of the table and serialize them into multiple Arrow batches of Arrow columnar format at client side. The maximum Arrow batch size is determined by the config option python.fn-execution.arrow.batch.size. The serialized data will then be converted to Pandas DataFrame.
The following example shows how to convert a PyFlink Table to a Pandas DataFrame:
import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()