Google Cloud Storage
Google Cloud storage (GCS) provides cloud storage for a variety of use cases. You can use it for reading and writing data, and for checkpoint storage when using FileSystemCheckpointStorage) with the streaming state backends.
You can use GCS objects like regular files by specifying paths in the following format:
gs://<your-bucket>/<endpoint>
The endpoint can either be a single file or a directory, for example:
// Read from GSC bucket
env.readTextFile("gs://<bucket>/<endpoint>");
// Write to GCS bucket
stream.writeAsText("gs://<bucket>/<endpoint>");
// Use GCS as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>");
Libraries
You must include the following jars in Flink’s lib
directory to connect Flink with gcs:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2-uber</artifactId>
<version>${flink.shared_hadoop_latest_version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop2-2.2.0</version>
</dependency>
We have tested with flink-shared-hadoop2-uber
version >= 2.8.5-1.8.3
. You can track the latest version of the gcs-connector hadoop 2.
Authentication to access GCS
Most operations on GCS require authentication. Please see the documentation on Google Cloud Storage authentication for more information.
You can use the following method for authentication
Configure via core-site.xml You would need to add the following properties to
core-site.xml
<configuration>
<property>
<name>google.cloud.auth.service.account.enable</name>
<value>true</value>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value><PATH TO GOOGLE AUTHENTICATION JSON></value>
</property>
</configuration>
You would need to add the following to
flink-conf.yaml
flinkConfiguration:
fs.hdfs.hadoopconf: <DIRECTORY PATH WHERE core-site.xml IS SAVED>
You can provide the necessary key via the
GOOGLE_APPLICATION_CREDENTIALS
environment variable.