Manage packages

Package 管理允许使用版本管理,并将简化对 Function、Sink 和 Source 的升级回滚过程。 当需要在不同的命名空间下使用相同的 Function、Sink 或 Source 时,可以将它们上传到一个共用的 Package 管理系统中。

包名

一个完整的pacakge由五个部分组成:typetenantnamespacepackage nameversion

部分说明
typePackage 的类型 支持以下类型:functionsinksource
namePackage 的完全限定名称:<tenant>/<namespace>/<package name>.
versionPackage 的版本号

以下是 Package 的定义示例。

  1. class PackageName {
  2. private final PackageType type;
  3. private final String namespace;
  4. private final String tenant;
  5. private final String name;
  6. private final String version;
  7. }
  8. enum PackageType {
  9. FUNCTION("function"), SINK("sink"), SOURCE("source");
  10. }

Package 链接

Package 通过 URL 来标识位置。 Package 的 URL 遵循以下格式:

  1. <type>://<tenant>/<namespace>/<package name>@<version>

以下是一些 Package 的 URL 示例:

sink://public/default/mysql-sink@1.0
function://my-tenant/my-ns/my-function@0.1
source://my-tenant/my-ns/mysql-cdc-source@2.3

Package 管理系统存储了每个 Package 的数据、版本和元数据。 元数据包含的内容如下表所示。

metadata说明
描述Package 的描述
联系方式Package 的联系方式: 比如团队邮箱地址。
创建时间Package 的创建时间。
修改时间Package 的修改时间
properties存储自定义信息的键值对。

权限管理

Package 的层级通过租户和命名空间来组织,所以你可以直接将租户和命名空间的权限应用到 Package 上。

Package 管理资源

你可以通过命令行、REST API 和 Java 客户端来管理 Package。

上传 Package

你可以通过以下方式将 Package 上传到 Package 管理服务中。

pulsar-admin

REST API

JAVA

  1. bin/pulsar-admin packages upload function://public/default/example@v0.1 --path package-file --description package-description

POST /admin/v3/packages/:type/:tenant/:namespace/:packageName/:version/?version=2.9.2

使用同步调用的方式上传到 Package 管理服务中。

  1. void upload(PackageMetadata metadata, String packageName, String path) throws PulsarAdminException;

使用异步调用的方式上传到 Package 管理服务中。

  1. CompletableFuture<Void> uploadAsync(PackageMetadata metadata, String packageName, String path);

下载 Package

你可以通过以下方式从 Package 管理服务中下载 Package。

pulsar-admin

REST API

JAVA

  1. bin/pulsar-admin packages download function://public/default/example@v0.1 --path package-file

GET /admin/v3/packages/:type/:tenant/:namespace/:packageName/:version/?version=2.9.2

使用同步调用的方式从 Package 管理服务中下载 Package。

  1. void download(String packageName, String path) throws PulsarAdminException;

使用异步调用的方式从 Package 管理服务中下载 Package。

  1. CompletableFuture<Void> downloadAsync(String packageName, String path);

列出 Package 的所有版本

可以通过以下方式列出 Package 的所有版本。

pulsar-admin

REST API

JAVA

  1. bin/pulsar-admin packages list --type function public/default

GET /admin/v3/packages/:type/:tenant/:namespace/:packageName/?version=2.9.2

使用同步调用的方式列出 Package 的所有版本。

  1. List<String> listPackageVersions(String packageName) throws PulsarAdminException;

使用异步调用的方式列出 Package 的所有版本。

  1. CompletableFuture<List<String>> listPackageVersionsAsync(String packageName);

列出某个命名空间下指定类型的 Package

你可以通过以下方式列出某个命名空间下的所有指定类型的Package。

pulsar-admin

REST API

JAVA

  1. bin/pulsar-admin packages list --type function public/default

PUT /admin/v3/packages/:type/:tenant/:namespace/?version=2.9.2

使用同步调用的方式列出某个命名空间下的所有指定类型的 Package。

  1. List<String> listPackages(String type, String namespace) throws PulsarAdminException;

使用异步调用的方式列出某个命名空间下的所有指定类型的Package。

  1. CompletableFuture<List<String>> listPackagesAsync(String type, String namespace);

获取 Package 的元数据

可以通过以下方式获取 Package 的元数据。

pulsar-admin

REST API

JAVA

  1. bin/pulsar-admin packages get-metadata function://public/default/test@v1

GET /admin/v3/packages/:type/:tenant/:namespace/:packageName/:version/metadata/?version=2.9.2

使用同步调用的方式获取 Package 的元数据。

  1. PackageMetadata getMetadata(String packageName) throws PulsarAdminException;

使用异步调用的方式获取 Package 的元数据。

  1. CompletableFuture<PackageMetadata> getMetadataAsync(String packageName);

更新 Package 的元数据

你可以通过以下方式更新Package的元数据。

pulsar-admin

REST API

JAVA

  1. bin/pulsar-admin packages update-metadata function://public/default/example@v0.1 --description update-description

PUT /admin/v3/packages/:type/:tenant/:namespace/:packageName/:version/metadata/?version=2.9.2

使用同步调用的方式更新 Package 的元数据。

  1. void updateMetadata(String packageName, PackageMetadata metadata) throws PulsarAdminException;

使用异步调用的方式获取Package的元数据。

  1. CompletableFuture<Void> updateMetadataAsync(String packageName, PackageMetadata metadata);

删除 Package

可以通过以下方式使用 Package 名删除特定版本的 Package。

pulsar-admin

REST API

JAVA

以下命令行示例将删除一个版本为 0.1 的 Package。

  1. bin/pulsar-admin packages delete function://public/default/example@v0.1

DELETE /admin/v3/packages/:type/:tenant/:namespace/:packageName/:version/?version=2.9.2

使用同步调用的方式删除特定版本的 Package。

  1. void delete(String packageName) throws PulsarAdminException;

使用异步调用的方式删除特定版本的 Package。

  1. CompletableFuture<Void> deleteAsync(String packageName);