Flink API

We do not recommend using programming API. Paimon is designed for SQL first, unless you are a professional Flink developer, even if you do, it can be very difficult.

We strongly recommend that you use Flink SQL or Spark SQL, or simply use SQL APIs in programs.

The following documents are not detailed and are for reference only.

Dependency

Maven dependency:

  1. <dependency>
  2. <groupId>org.apache.paimon</groupId>
  3. <artifactId>paimon-flink-1.20</artifactId>
  4. <version>0.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-table-api-java-bridge</artifactId>
  9. <version>1.20.0</version>
  10. <scope>provided</scope>
  11. </dependency>

Or download the jar file: Paimon Flink.

Please choose your Flink version.

Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.

Not only DataStream API, you can also read or write to Paimon tables by the conversion between DataStream and Table in Flink. See DataStream API Integration.

Write to Table

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. import org.apache.paimon.flink.FlinkCatalogFactory;
  4. import org.apache.paimon.flink.sink.FlinkSinkBuilder;
  5. import org.apache.paimon.options.Options;
  6. import org.apache.paimon.table.Table;
  7. import org.apache.flink.api.common.typeinfo.Types;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.table.api.DataTypes;
  11. import org.apache.flink.table.types.DataType;
  12. import org.apache.flink.types.Row;
  13. import org.apache.flink.types.RowKind;
  14. public class WriteToTable {
  15. public static void writeTo() throws Exception {
  16. // create environments of both APIs
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. // for CONTINUOUS_UNBOUNDED source, set checkpoint interval
  19. // env.enableCheckpointing(60_000);
  20. // create a changelog DataStream
  21. DataStream<Row> input =
  22. env.fromElements(
  23. Row.ofKind(RowKind.INSERT, "Alice", 12),
  24. Row.ofKind(RowKind.INSERT, "Bob", 5),
  25. Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
  26. Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
  27. .returns(
  28. Types.ROW_NAMED(
  29. new String[] {"name", "age"}, Types.STRING, Types.INT));
  30. // get table from catalog
  31. Options catalogOptions = new Options();
  32. catalogOptions.set("warehouse", "/path/to/warehouse");
  33. Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
  34. Table table = catalog.getTable(Identifier.create("my_db", "T"));
  35. DataType inputType =
  36. DataTypes.ROW(
  37. DataTypes.FIELD("name", DataTypes.STRING()),
  38. DataTypes.FIELD("age", DataTypes.INT()));
  39. FlinkSinkBuilder builder = new FlinkSinkBuilder(table).forRow(input, inputType);
  40. // set sink parallelism
  41. // builder.parallelism(_your_parallelism)
  42. // set overwrite mode
  43. // builder.overwrite(...)
  44. builder.build();
  45. env.execute();
  46. }
  47. }

Read from Table

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. import org.apache.paimon.flink.FlinkCatalogFactory;
  4. import org.apache.paimon.flink.source.FlinkSourceBuilder;
  5. import org.apache.paimon.options.Options;
  6. import org.apache.paimon.table.Table;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.types.Row;
  10. public class ReadFromTable {
  11. public static void readFrom() throws Exception {
  12. // create environments of both APIs
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. // get table from catalog
  15. Options catalogOptions = new Options();
  16. catalogOptions.set("warehouse", "/path/to/warehouse");
  17. Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
  18. Table table = catalog.getTable(Identifier.create("my_db", "T"));
  19. // table = table.copy(Collections.singletonMap("scan.file-creation-time-millis", "..."));
  20. FlinkSourceBuilder builder = new FlinkSourceBuilder(table).env(env);
  21. // builder.sourceBounded(true);
  22. // builder.projection(...);
  23. // builder.predicate(...);
  24. // builder.limit(...);
  25. // builder.sourceParallelism(...);
  26. DataStream<Row> dataStream = builder.buildForRow();
  27. // use this datastream
  28. dataStream.executeAndCollect().forEachRemaining(System.out::println);
  29. // prints:
  30. // +I[Bob, 12]
  31. // +I[Alice, 12]
  32. // -U[Alice, 12]
  33. // +U[Alice, 14]
  34. }
  35. }

Cdc ingestion Table

Paimon supports ingest data into Paimon tables with schema evolution.

  • You can use Java API to write cdc records into Paimon Tables.
  • You can write records to Paimon’s partial-update table with adding columns dynamically.

Here is an example to use RichCdcSinkBuilder API:

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.CatalogContext;
  3. import org.apache.paimon.catalog.CatalogFactory;
  4. import org.apache.paimon.flink.FlinkCatalogFactory;
  5. import org.apache.paimon.catalog.Identifier;
  6. import org.apache.paimon.flink.sink.cdc.RichCdcRecord;
  7. import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder;
  8. import org.apache.paimon.fs.Path;
  9. import org.apache.paimon.options.Options;
  10. import org.apache.paimon.schema.Schema;
  11. import org.apache.paimon.table.Table;
  12. import org.apache.paimon.types.DataTypes;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import static org.apache.paimon.types.RowKind.INSERT;
  16. public class WriteCdcToTable {
  17. public static void writeTo() throws Exception {
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. // for CONTINUOUS_UNBOUNDED source, set checkpoint interval
  20. // env.enableCheckpointing(60_000);
  21. DataStream<RichCdcRecord> dataStream =
  22. env.fromElements(
  23. RichCdcRecord.builder(INSERT)
  24. .field("order_id", DataTypes.BIGINT(), "123")
  25. .field("price", DataTypes.DOUBLE(), "62.2")
  26. .build(),
  27. // dt field will be added with schema evolution
  28. RichCdcRecord.builder(INSERT)
  29. .field("order_id", DataTypes.BIGINT(), "245")
  30. .field("price", DataTypes.DOUBLE(), "82.1")
  31. .field("dt", DataTypes.TIMESTAMP(), "2023-06-12 20:21:12")
  32. .build());
  33. Identifier identifier = Identifier.create("my_db", "T");
  34. Options catalogOptions = new Options();
  35. catalogOptions.set("warehouse", "/path/to/warehouse");
  36. Catalog.Loader catalogLoader =
  37. () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
  38. Table table = catalogLoader.load().getTable(identifier);
  39. new RichCdcSinkBuilder(table)
  40. .forRichCdcRecord(dataStream)
  41. .identifier(identifier)
  42. .catalogLoader(catalogLoader)
  43. .build();
  44. env.execute();
  45. }
  46. }