Google Cloud Storage

Google Cloud Storage (GCS) provides cloud storage for a variety of use cases. You can use it for reading and writing data, and for checkpoint storage when using FileSystemCheckpointStorage) with the streaming state backends.

You can use GCS objects like regular files by specifying paths in the following format:

  1. gs://<your-bucket>/<endpoint>

The endpoint can either be a single file or a directory, for example:

  1. // Read from GCS bucket
  2. env.readTextFile("gs://<bucket>/<endpoint>");
  3. // Write to GCS bucket
  4. stream.writeAsText("gs://<bucket>/<endpoint>");
  5. // Use GCS as checkpoint storage
  6. Configuration config = new Configuration();
  7. config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
  8. config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "gs://<bucket>/<endpoint>");
  9. env.configure(config);

Note that these examples are not exhaustive and you can use GCS in other places as well, including your high availability setup or the EmbeddedRocksDBStateBackend; everywhere that Flink expects a FileSystem URI.

GCS File System plugin

Flink provides the flink-gs-fs-hadoop file system to write to GCS. This implementation is self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use it.

flink-gs-fs-hadoop registers a FileSystem wrapper for URIs with the gs:// scheme. It uses Google’s gcs-connector Hadoop library to access GCS. It also uses Google’s google-cloud-storage library to provide RecoverableWriter support.

This file system can be used with the FileSystem connector.

To use flink-gs-fs-hadoop, copy the JAR file from the opt directory to the plugins directory of your Flink distribution before starting Flink, i.e.

  1. mkdir ./plugins/gs-fs-hadoop
  2. cp ./opt/flink-gs-fs-hadoop-1.20.0.jar ./plugins/gs-fs-hadoop/

Configuration

The underlying Hadoop file system can be configured using the Hadoop configuration keys for gcs-connector by adding the configurations to your Flink configuration file.

For example, gcs-connector has a fs.gs.http.connect-timeout configuration key. If you want to change it, you need to set gs.http.connect-timeout: xyz in Flink configuration file. Flink will internally translate this back to fs.gs.http.connect-timeout.

You can also set gcs-connector options directly in the Hadoop core-site.xml configuration file, so long as the Hadoop configuration directory is made known to Flink via the env.hadoop.conf.dir Flink option or via the HADOOP_CONF_DIR environment variable.

flink-gs-fs-hadoop can also be configured by setting the following options in Flink configuration file:

KeyDescription
gs.writer.temporary.bucket.nameSet this property to choose a bucket to hold temporary blobs for in-progress writes via RecoverableWriter. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix .inprogress/.

It is recommended to choose a separate bucket in order to assign it a TTL, to provide a mechanism to clean up orphaned blobs that can occur when restoring from check/savepoints.

If you do use a separate bucket with a TTL for temporary blobs, attempts to restart jobs from check/savepoints after the TTL interval expires may fail.
gs.writer.chunk.sizeSet this property to set the chunk size for writes via RecoverableWriter.

If not set, a Google-determined default chunk size will be used.
gs.filesink.entropy.enabledSet this property to improve performance due to hotspotting issues on GCS. This option defines whether to enable entropy injection in filesink gcs path. If this is enabled, entropy in the form of temporary object id will be injected in beginning of the gcs path of the temporary objects. The final object path remains unchanged.
gs.http.connect-timeoutSet this property to set the connection timeout for java-storage client. GCS default will be used if not configured.
gs.http.read-timeoutSet this property to set the content read timeout from connection established via java-storage client. GCS default will be used if not configured.
gs.retry.max-attemptSet this property to define the maximum number of retry attempts to perform. GCS default will be used if not configured.
gs.retry.init-rpc-timeoutSet this property to set the timeout for the initial RPC. Subsequent calls will use this value adjusted according to the gs.retry.rpc-timeout-multiplier. GCS default will be used if not configured.
gs.retry.rpc-timeout-multiplierSet this property to controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call. GCS default will be used if not configured.
gs.retry.max-rpc-timeoutSet this property to put a limit on the value of the RPC timeout, so that the max rpc timeout can’t increase the RPC timeout higher than this amount. GCS default will be used if not configured.
gs.retry.total-timeoutSet this property to change the total duration during which retries could be attempted. GCS default will be used if not configured.

Authentication to access GCS

Most operations on GCS require authentication. To provide authentication credentials, either:

  • Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of the JSON credentials file, as described here, where JobManagers and TaskManagers run. This is the recommended method.

  • Set the google.cloud.auth.service.account.json.keyfile property in core-site.xml to the path to the JSON credentials file (and make sure that the Hadoop configuration directory is specified to Flink as described above):

  1. <configuration>
  2. <property>
  3. <name>google.cloud.auth.service.account.json.keyfile</name>
  4. <value>PATH TO GOOGLE AUTHENTICATION JSON FILE</value>
  5. </property>
  6. </configuration>

For flink-gs-fs-hadoop to use credentials via either of these two methods, the use of service accounts for authentication must be enabled. This is enabled by default; however, it can be disabled in core-site.xml by setting:

  1. <configuration>
  2. <property>
  3. <name>google.cloud.auth.service.account.enable</name>
  4. <value>false</value>
  5. </property>
  6. </configuration>

gcs-connector supports additional options to provide authentication credentials besides the google.cloud.auth.service.account.json.keyfile option described above.

However, if you use any of those other options, the provided credentials will not be used by the google-cloud-storage library, which provides RecoverableWriter support, so Flink recoverable-write operations would be expected to fail.

For this reason, use of the gcs-connector authentication-credentials options other than google.cloud.auth.service.account.json.keyfile is not recommended.