阿里云对象存储服务 (OSS)

OSS:对象存储服务

阿里云对象存储服务 (Aliyun OSS) 使用广泛,尤其在中国云用户中十分流行,能提供多种应用场景下的云对象存储。OSS 可与 Flink 一起使用以读取与存储数据,以及与流 State Backend 结合使用。

通过以下格式指定路径,OSS 对象可类似于普通文件使用:

  1. oss://<your-bucket>/<object-name>

以下代码展示了如何在 Flink 作业中使用 OSS:

  1. // 读取 OSS bucket
  2. env.readTextFile("oss://<your-bucket>/<object-name>");
  3. // 写入 OSS bucket
  4. stream.writeAsText("oss://<your-bucket>/<object-name>");
  5. // 将 OSS 用作 checkpoint storage
  6. Configuration config = new Configuration();
  7. config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
  8. config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "oss://<your-bucket>/<object-name>");
  9. env.configure(config);

Shaded Hadoop OSS 文件系统

为使用 flink-oss-fs-hadoop,在启动 Flink 之前,将对应的 JAR 文件从 opt 目录复制到 Flink 发行版中的 plugin 目录下的一个文件夹中,例如:

  1. mkdir ./plugins/oss-fs-hadoop
  2. cp ./opt/flink-oss-fs-hadoop-1.19.0.jar ./plugins/oss-fs-hadoop/

flink-oss-fs-hadoop 为使用 oss:// scheme 的 URI 注册了默认的文件系统包装器。

配置设置

在设置好 OSS 文件系统包装器之后,需要添加一些配置以保证 Flink 有权限访问 OSS buckets。

为了简单使用,可直接在 Flink 配置文件 中使用与 Hadoop core-site.xml 相同的配置关键字。

可在 Hadoop OSS 文档 中查看配置关键字。

一些配置必须添加至 Flink 配置文件在 Hadoop OSS 文档中定义的其它配置为用作性能调优的高级配置):

  1. fs.oss.endpoint: 连接的 Aliyun OSS endpoint
  2. fs.oss.accessKeyId: Aliyun access key ID
  3. fs.oss.accessKeySecret: Aliyun access key secret

备选的 CredentialsProvider 也可在 Flink 配置文件 中配置,例如:

  1. # 从 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET 读取凭据 (Credentials)
  2. fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider

其余的凭据提供者(credential providers)可在这里中找到。