Java API

We do not recommend using the Paimon API naked, unless you are a professional downstream ecosystem developer, and even if you do, there will be significant difficulties.

If you are only using Paimon, we strongly recommend using computing engines such as Flink SQL or Spark SQL.

The following documents are not detailed and are for reference only.

Dependency

Maven dependency:

  1. <dependency>
  2. <groupId>org.apache.paimon</groupId>
  3. <artifactId>paimon-bundle</artifactId>
  4. <version>0.9.0</version>
  5. </dependency>

Or download the jar file: Paimon Bundle.

Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.

Create Catalog

Before coming into contact with the Table, you need to create a Catalog.

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.CatalogContext;
  3. import org.apache.paimon.catalog.CatalogFactory;
  4. import org.apache.paimon.fs.Path;
  5. import org.apache.paimon.options.Options;
  6. public class CreateCatalog {
  7. public static Catalog createFilesystemCatalog() {
  8. CatalogContext context = CatalogContext.create(new Path("..."));
  9. return CatalogFactory.createCatalog(context);
  10. }
  11. public static Catalog createHiveCatalog() {
  12. // Paimon Hive catalog relies on Hive jars
  13. // You should add hive classpath or hive bundled jar.
  14. Options options = new Options();
  15. options.set("warehouse", "...");
  16. options.set("metastore", "hive");
  17. options.set("uri", "...");
  18. options.set("hive-conf-dir", "...");
  19. options.set("hadoop-conf-dir", "...");
  20. CatalogContext context = CatalogContext.create(options);
  21. return CatalogFactory.createCatalog(context);
  22. }
  23. }

Create Table

You can use the catalog to create tables. The created tables are persistence in the file system. Next time you can directly obtain these tables.

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. import org.apache.paimon.schema.Schema;
  4. import org.apache.paimon.types.DataTypes;
  5. public class CreateTable {
  6. public static void main(String[] args) {
  7. Schema.Builder schemaBuilder = Schema.newBuilder();
  8. schemaBuilder.primaryKey("f0", "f1");
  9. schemaBuilder.partitionKeys("f1");
  10. schemaBuilder.column("f0", DataTypes.STRING());
  11. schemaBuilder.column("f1", DataTypes.INT());
  12. Schema schema = schemaBuilder.build();
  13. Identifier identifier = Identifier.create("my_db", "my_table");
  14. try {
  15. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  16. catalog.createTable(identifier, schema, false);
  17. } catch (Catalog.TableAlreadyExistException e) {
  18. // do something
  19. } catch (Catalog.DatabaseNotExistException e) {
  20. // do something
  21. }
  22. }
  23. }

Get Table

The Table interface provides access to the table metadata and tools to read and write table.

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. import org.apache.paimon.table.Table;
  4. public class GetTable {
  5. public static Table getTable() {
  6. Identifier identifier = Identifier.create("my_db", "my_table");
  7. try {
  8. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  9. return catalog.getTable(identifier);
  10. } catch (Catalog.TableNotExistException e) {
  11. // do something
  12. throw new RuntimeException("table not exist");
  13. }
  14. }
  15. }

Batch Read

For relatively small amounts of data, or for data that has undergone projection and filtering, you can directly use a standalone program to read the table data.

But if the data volume of the table is relatively large, you can distribute splits to different tasks for reading.

The reading is divided into two stages:

  1. Scan Plan: Generate plan splits in a global node (‘Coordinator’, or named ‘Driver’).
  2. Read Split: Read split in distributed tasks.
  1. import org.apache.paimon.data.InternalRow;
  2. import org.apache.paimon.predicate.Predicate;
  3. import org.apache.paimon.predicate.PredicateBuilder;
  4. import org.apache.paimon.reader.RecordReader;
  5. import org.apache.paimon.table.Table;
  6. import org.apache.paimon.table.source.ReadBuilder;
  7. import org.apache.paimon.table.source.Split;
  8. import org.apache.paimon.table.source.TableRead;
  9. import org.apache.paimon.types.DataTypes;
  10. import org.apache.paimon.types.RowType;
  11. import com.google.common.collect.Lists;
  12. import java.util.List;
  13. public class ReadTable {
  14. public static void main(String[] args) throws Exception {
  15. // 1. Create a ReadBuilder and push filter (`withFilter`)
  16. // and projection (`withProjection`) if necessary
  17. Table table = GetTable.getTable();
  18. PredicateBuilder builder =
  19. new PredicateBuilder(RowType.of(DataTypes.STRING(), DataTypes.INT()));
  20. Predicate notNull = builder.isNotNull(0);
  21. Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
  22. int[] projection = new int[] {0, 1};
  23. ReadBuilder readBuilder =
  24. table.newReadBuilder()
  25. .withProjection(projection)
  26. .withFilter(Lists.newArrayList(notNull, greaterOrEqual));
  27. // 2. Plan splits in 'Coordinator' (or named 'Driver')
  28. List<Split> splits = readBuilder.newScan().plan().splits();
  29. // 3. Distribute these splits to different tasks
  30. // 4. Read a split in task
  31. TableRead read = readBuilder.newRead();
  32. RecordReader<InternalRow> reader = read.createReader(splits);
  33. reader.forEachRemaining(System.out::println);
  34. }
  35. }

Batch Write

The writing is divided into two stages:

  1. Write records: Write records in distributed tasks, generate commit messages.
  2. Commit/Abort: Collect all CommitMessages, commit them in a global node (‘Coordinator’, or named ‘Driver’, or named ‘Committer’). When the commit fails for certain reason, abort unsuccessful commit via CommitMessages.
  1. import org.apache.paimon.data.BinaryString;
  2. import org.apache.paimon.data.GenericRow;
  3. import org.apache.paimon.table.Table;
  4. import org.apache.paimon.table.sink.BatchTableCommit;
  5. import org.apache.paimon.table.sink.BatchTableWrite;
  6. import org.apache.paimon.table.sink.BatchWriteBuilder;
  7. import org.apache.paimon.table.sink.CommitMessage;
  8. import java.util.List;
  9. public class BatchWrite {
  10. public static void main(String[] args) throws Exception {
  11. // 1. Create a WriteBuilder (Serializable)
  12. Table table = GetTable.getTable();
  13. BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder().withOverwrite();
  14. // 2. Write records in distributed tasks
  15. BatchTableWrite write = writeBuilder.newWrite();
  16. GenericRow record1 = GenericRow.of(BinaryString.fromString("Alice"), 12);
  17. GenericRow record2 = GenericRow.of(BinaryString.fromString("Bob"), 5);
  18. GenericRow record3 = GenericRow.of(BinaryString.fromString("Emily"), 18);
  19. // If this is a distributed write, you can use writeBuilder.newWriteSelector.
  20. // WriteSelector determines to which logical downstream writers a record should be written to.
  21. // If it returns empty, no data distribution is required.
  22. write.write(record1);
  23. write.write(record2);
  24. write.write(record3);
  25. List<CommitMessage> messages = write.prepareCommit();
  26. // 3. Collect all CommitMessages to a global node and commit
  27. BatchTableCommit commit = writeBuilder.newCommit();
  28. commit.commit(messages);
  29. // Abort unsuccessful commit to delete data files
  30. // commit.abort(messages);
  31. }
  32. }

Stream Read

The difference of Stream Read is that StreamTableScan can continuously scan and generate splits.

StreamTableScan provides the ability to checkpoint and restore, which can let you save the correct state during stream reading.

  1. import org.apache.paimon.data.InternalRow;
  2. import org.apache.paimon.predicate.Predicate;
  3. import org.apache.paimon.predicate.PredicateBuilder;
  4. import org.apache.paimon.reader.RecordReader;
  5. import org.apache.paimon.table.Table;
  6. import org.apache.paimon.table.source.ReadBuilder;
  7. import org.apache.paimon.table.source.Split;
  8. import org.apache.paimon.table.source.StreamTableScan;
  9. import org.apache.paimon.table.source.TableRead;
  10. import org.apache.paimon.types.DataTypes;
  11. import org.apache.paimon.types.RowType;
  12. import com.google.common.collect.Lists;
  13. import java.util.List;
  14. public class StreamReadTable {
  15. public static void main(String[] args) throws Exception {
  16. // 1. Create a ReadBuilder and push filter (`withFilter`)
  17. // and projection (`withProjection`) if necessary
  18. Table table = GetTable.getTable();
  19. PredicateBuilder builder =
  20. new PredicateBuilder(RowType.of(DataTypes.STRING(), DataTypes.INT()));
  21. Predicate notNull = builder.isNotNull(0);
  22. Predicate greaterOrEqual = builder.greaterOrEqual(1, 12);
  23. int[] projection = new int[] {0, 1};
  24. ReadBuilder readBuilder =
  25. table.newReadBuilder()
  26. .withProjection(projection)
  27. .withFilter(Lists.newArrayList(notNull, greaterOrEqual));
  28. // 2. Plan splits in 'Coordinator' (or named 'Driver')
  29. StreamTableScan scan = readBuilder.newStreamScan();
  30. while (true) {
  31. List<Split> splits = scan.plan().splits();
  32. // Distribute these splits to different tasks
  33. Long state = scan.checkpoint();
  34. // can be restored in scan.restore(state) after fail over
  35. // 3. Read a split in task
  36. TableRead read = readBuilder.newRead();
  37. RecordReader<InternalRow> reader = read.createReader(splits);
  38. reader.forEachRemaining(System.out::println);
  39. Thread.sleep(1000);
  40. }
  41. }
  42. }

Stream Write

The difference of Stream Write is that StreamTableCommit can continuously commit.

Key points to achieve exactly-once consistency:

  • CommitUser represents a user. A user can commit multiple times. In distributed processing, you are expected to use the same commitUser.
  • Different applications need to use different commitUsers.
  • The commitIdentifier of StreamTableWrite and StreamTableCommit needs to be consistent, and the id needs to be incremented for the next committing.
  • When a failure occurs, if you still have uncommitted CommitMessages, please use StreamTableCommit#filterAndCommit to exclude the committed messages by commitIdentifier.
  1. import org.apache.paimon.data.BinaryString;
  2. import org.apache.paimon.data.GenericRow;
  3. import org.apache.paimon.table.Table;
  4. import org.apache.paimon.table.sink.CommitMessage;
  5. import org.apache.paimon.table.sink.StreamTableCommit;
  6. import org.apache.paimon.table.sink.StreamTableWrite;
  7. import org.apache.paimon.table.sink.StreamWriteBuilder;
  8. import java.util.List;
  9. public class StreamWriteTable {
  10. public static void main(String[] args) throws Exception {
  11. // 1. Create a WriteBuilder (Serializable)
  12. Table table = GetTable.getTable();
  13. StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
  14. // 2. Write records in distributed tasks
  15. StreamTableWrite write = writeBuilder.newWrite();
  16. // commitIdentifier like Flink checkpointId
  17. long commitIdentifier = 0;
  18. while (true) {
  19. GenericRow record1 = GenericRow.of(BinaryString.fromString("Alice"), 12);
  20. GenericRow record2 = GenericRow.of(BinaryString.fromString("Bob"), 5);
  21. GenericRow record3 = GenericRow.of(BinaryString.fromString("Emily"), 18);
  22. // If this is a distributed write, you can use writeBuilder.newWriteSelector.
  23. // WriteSelector determines to which logical downstream writers a record should be written to.
  24. // If it returns empty, no data distribution is required.
  25. write.write(record1);
  26. write.write(record2);
  27. write.write(record3);
  28. List<CommitMessage> messages = write.prepareCommit(false, commitIdentifier);
  29. commitIdentifier++;
  30. // 3. Collect all CommitMessages to a global node and commit
  31. StreamTableCommit commit = writeBuilder.newCommit();
  32. commit.commit(commitIdentifier, messages);
  33. // 4. When failure occurs and you're not sure if the commit process is successful,
  34. // you can use `filterAndCommit` to retry the commit process.
  35. // Succeeded commits will be automatically skipped.
  36. /*
  37. Map<Long, List<CommitMessage>> commitIdentifiersAndMessages = new HashMap<>();
  38. commitIdentifiersAndMessages.put(commitIdentifier, messages);
  39. commit.filterAndCommit(commitIdentifiersAndMessages);
  40. */
  41. Thread.sleep(1000);
  42. }
  43. }
  44. }

Data Types

JavaPaimon
booleanboolean
bytebyte
shortshort
intint
longlong
floatfloat
doubledouble
stringorg.apache.paimon.data.BinaryString
decimalorg.apache.paimon.data.Decimal
timestamporg.apache.paimon.data.Timestamp
byte[]byte[]
arrayorg.apache.paimon.data.InternalArray
maporg.apache.paimon.data.InternalMap
InternalRoworg.apache.paimon.data.InternalRow

Predicate Types

SQL PredicatePaimon Predicate
andorg.apache.paimon.predicate.PredicateBuilder.And
ororg.apache.paimon.predicate.PredicateBuilder.Or
is nullorg.apache.paimon.predicate.PredicateBuilder.IsNull
is not nullorg.apache.paimon.predicate.PredicateBuilder.IsNotNull
inorg.apache.paimon.predicate.PredicateBuilder.In
not inorg.apache.paimon.predicate.PredicateBuilder.NotIn
=org.apache.paimon.predicate.PredicateBuilder.Equal
<>org.apache.paimon.predicate.PredicateBuilder.NotEqual
<org.apache.paimon.predicate.PredicateBuilder.LessThan
<=org.apache.paimon.predicate.PredicateBuilder.LessOrEqual
>org.apache.paimon.predicate.PredicateBuilder.GreaterThan
>=org.apache.paimon.predicate.PredicateBuilder.GreaterOrEqual

Create Database

You can use the catalog to create databases. The created databases are persistence in the file system.

  1. import org.apache.paimon.catalog.Catalog;
  2. public class CreateDatabase {
  3. public static void main(String[] args) {
  4. try {
  5. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  6. catalog.createDatabase("my_db", false);
  7. } catch (Catalog.DatabaseAlreadyExistException e) {
  8. // do something
  9. }
  10. }
  11. }

Determine Whether Database Exists

You can use the catalog to determine whether the database exists

  1. import org.apache.paimon.catalog.Catalog;
  2. public class DatabaseExists {
  3. public static void main(String[] args) {
  4. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  5. boolean exists = catalog.databaseExists("my_db");
  6. }
  7. }

List Databases

You can use the catalog to list databases.

  1. import org.apache.paimon.catalog.Catalog;
  2. import java.util.List;
  3. public class ListDatabases {
  4. public static void main(String[] args) {
  5. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  6. List<String> databases = catalog.listDatabases();
  7. }
  8. }

Drop Database

You can use the catalog to drop databases.

  1. import org.apache.paimon.catalog.Catalog;
  2. public class DropDatabase {
  3. public static void main(String[] args) {
  4. try {
  5. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  6. catalog.dropDatabase("my_db", false, true);
  7. } catch (Catalog.DatabaseNotEmptyException e) {
  8. // do something
  9. } catch (Catalog.DatabaseNotExistException e) {
  10. // do something
  11. }
  12. }
  13. }

Determine Whether Table Exists

You can use the catalog to determine whether the table exists

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. public class TableExists {
  4. public static void main(String[] args) {
  5. Identifier identifier = Identifier.create("my_db", "my_table");
  6. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  7. boolean exists = catalog.tableExists(identifier);
  8. }
  9. }

List Tables

You can use the catalog to list tables.

  1. import org.apache.paimon.catalog.Catalog;
  2. import java.util.List;
  3. public class ListTables {
  4. public static void main(String[] args) {
  5. try {
  6. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  7. List<String> tables = catalog.listTables("my_db");
  8. } catch (Catalog.DatabaseNotExistException e) {
  9. // do something
  10. }
  11. }
  12. }

Drop Table

You can use the catalog to drop table.

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. public class DropTable {
  4. public static void main(String[] args) {
  5. Identifier identifier = Identifier.create("my_db", "my_table");
  6. try {
  7. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  8. catalog.dropTable(identifier, false);
  9. } catch (Catalog.TableNotExistException e) {
  10. // do something
  11. }
  12. }
  13. }

Rename Table

You can use the catalog to rename a table.

  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. public class RenameTable {
  4. public static void main(String[] args) {
  5. Identifier fromTableIdentifier = Identifier.create("my_db", "my_table");
  6. Identifier toTableIdentifier = Identifier.create("my_db", "test_table");
  7. try {
  8. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  9. catalog.renameTable(fromTableIdentifier, toTableIdentifier, false);
  10. } catch (Catalog.TableAlreadyExistException e) {
  11. // do something
  12. } catch (Catalog.TableNotExistException e) {
  13. // do something
  14. }
  15. }
  16. }

Alter Table

You can use the catalog to alter a table, but you need to pay attention to the following points.

  • Column %s cannot specify NOT NULL in the %s table.
  • Cannot update partition column type in the table.
  • Cannot change nullability of primary key.
  • If the type of the column is nested row type, update the column type is not supported.
  • Update column to nested row type is not supported.
  1. import org.apache.paimon.catalog.Catalog;
  2. import org.apache.paimon.catalog.Identifier;
  3. import org.apache.paimon.schema.Schema;
  4. import org.apache.paimon.schema.SchemaChange;
  5. import org.apache.paimon.types.DataField;
  6. import org.apache.paimon.types.DataTypes;
  7. import com.google.common.collect.Lists;
  8. import java.util.Arrays;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. public class AlterTable {
  12. public static void main(String[] args) {
  13. Identifier identifier = Identifier.create("my_db", "my_table");
  14. Map<String, String> options = new HashMap<>();
  15. options.put("bucket", "4");
  16. options.put("compaction.max.file-num", "40");
  17. Catalog catalog = CreateCatalog.createFilesystemCatalog();
  18. catalog.createDatabase("my_db", false);
  19. try {
  20. catalog.createTable(
  21. identifier,
  22. new Schema(
  23. Lists.newArrayList(
  24. new DataField(0, "col1", DataTypes.STRING(), "field1"),
  25. new DataField(1, "col2", DataTypes.STRING(), "field2"),
  26. new DataField(2, "col3", DataTypes.STRING(), "field3"),
  27. new DataField(3, "col4", DataTypes.BIGINT(), "field4"),
  28. new DataField(
  29. 4,
  30. "col5",
  31. DataTypes.ROW(
  32. new DataField(
  33. 5, "f1", DataTypes.STRING(), "f1"),
  34. new DataField(
  35. 6, "f2", DataTypes.STRING(), "f2"),
  36. new DataField(
  37. 7, "f3", DataTypes.STRING(), "f3")),
  38. "field5"),
  39. new DataField(8, "col6", DataTypes.STRING(), "field6")),
  40. Lists.newArrayList("col1"), // partition keys
  41. Lists.newArrayList("col1", "col2"), // primary key
  42. options,
  43. "table comment"),
  44. false);
  45. } catch (Catalog.TableAlreadyExistException e) {
  46. // do something
  47. } catch (Catalog.DatabaseNotExistException e) {
  48. // do something
  49. }
  50. // add option
  51. SchemaChange addOption = SchemaChange.setOption("snapshot.time-retained", "2h");
  52. // remove option
  53. SchemaChange removeOption = SchemaChange.removeOption("compaction.max.file-num");
  54. // add column
  55. SchemaChange addColumn = SchemaChange.addColumn("col1_after", DataTypes.STRING());
  56. // add a column after col1
  57. SchemaChange.Move after = SchemaChange.Move.after("col1_after", "col1");
  58. SchemaChange addColumnAfterField =
  59. SchemaChange.addColumn("col7", DataTypes.STRING(), "", after);
  60. // rename column
  61. SchemaChange renameColumn = SchemaChange.renameColumn("col3", "col3_new_name");
  62. // drop column
  63. SchemaChange dropColumn = SchemaChange.dropColumn("col6");
  64. // update column comment
  65. SchemaChange updateColumnComment =
  66. SchemaChange.updateColumnComment(new String[] {"col4"}, "col4 field");
  67. // update nested column comment
  68. SchemaChange updateNestedColumnComment =
  69. SchemaChange.updateColumnComment(new String[] {"col5", "f1"}, "col5 f1 field");
  70. // update column type
  71. SchemaChange updateColumnType = SchemaChange.updateColumnType("col4", DataTypes.DOUBLE());
  72. // update column position, you need to pass in a parameter of type Move
  73. SchemaChange updateColumnPosition =
  74. SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4"));
  75. // update column nullability
  76. SchemaChange updateColumnNullability =
  77. SchemaChange.updateColumnNullability(new String[] {"col4"}, false);
  78. // update nested column nullability
  79. SchemaChange updateNestedColumnNullability =
  80. SchemaChange.updateColumnNullability(new String[] {"col5", "f2"}, false);
  81. SchemaChange[] schemaChanges =
  82. new SchemaChange[] {
  83. addOption,
  84. removeOption,
  85. addColumn,
  86. addColumnAfterField,
  87. renameColumn,
  88. dropColumn,
  89. updateColumnComment,
  90. updateNestedColumnComment,
  91. updateColumnType,
  92. updateColumnPosition,
  93. updateColumnNullability,
  94. updateNestedColumnNullability
  95. };
  96. try {
  97. catalog.alterTable(identifier, Arrays.asList(schemaChanges), false);
  98. } catch (Catalog.TableNotExistException e) {
  99. // do something
  100. } catch (Catalog.ColumnAlreadyExistException e) {
  101. // do something
  102. } catch (Catalog.ColumnNotExistException e) {
  103. // do something
  104. }
  105. }
  106. }

Table metadata:

  • name return a name string to identify this table.
  • rowType return the current row type of this table containing a sequence of table’s fields.
  • partitionKeys returns the partition keys of this table.
  • parimaryKeys returns the primary keys of this table.
  • options returns the configuration of this table in a map of key-value.
  • comment returns the optional comment of this table.
  • copy return a new table by applying dynamic options to this table.