Connectors

Reading from and writing to file systems

The Apache Flink project supports multiple file systems that can be used as backing storesfor input and output connectors.

Using Hadoop file system implementations

Apache Flink allows users to use any file system implementing the org.apache.hadoop.fs.FileSysteminterface. There are Hadoop FileSystem implementations for

  • S3 (tested)
  • Google Cloud Storage Connector for Hadoop (tested)
  • Alluxio (tested)
  • XtreemFS (tested)
  • FTP via Hftp (not tested)
  • and many more.In order to use a Hadoop file system with Flink, make sure that

  • the flink-conf.yaml has set the fs.hdfs.hadoopconf property to the Hadoop configuration directory. For automated testing or running from an IDE the directory containing flink-conf.yaml can be set by defining the FLINK_CONF_DIR environment variable.

  • the Hadoop configuration (in that directory) has an entry for the required file system in a file core-site.xml. Examples for S3 and Alluxio are linked/shown below.
  • the required classes for using the file system are available in the lib/ folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink also respects the HADOOP_CLASSPATH environment variable to add Hadoop jar files to the classpath.

Amazon S3

See Deployment & Operations - Deployment - AWS - S3: Simple Storage Service for available S3 file system implementations, their configuration and required libraries.

Alluxio

For Alluxio support add the following entry into the core-site.xml file:

  1. <property>
  2. <name>fs.alluxio.impl</name>
  3. <value>alluxio.hadoop.FileSystem</value>
  4. </property>

Connecting to other systems using Input/OutputFormat wrappers for Hadoop

Apache Flink allows users to access many different systems as data sources or sinks.The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the conceptof so called InputFormats and OutputFormats.

One implementation of these InputFormats is the HadoopInputFormat. This is a wrapper that allowsusers to use all existing Hadoop input formats with Flink.

This section shows some examples for connecting Flink to other systems.Read more about Hadoop compatibility in Flink.

Flink has extensive built-in support for Apache Avro. This allows to easily read from Avro files with Flink.Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>

In order to read data from an Avro file, you have to specify an AvroInputFormat.

Example:

  1. AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
  2. DataSet<User> usersDS = env.createInput(users);

Note that User is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:

  1. usersDS.groupBy("name")

Note that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.

Flink’s POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type Object you can not use the field as a join or grouping key.Specifying a field in Avro like this {"name": "type_double_test", "type": "double"}, works fine, however specifying it as a UNION-type with only one field ({"name": "type_double_test", "type": ["double"]},) will generate a field of type Object. Note that specifying nullable types ({"name": "type_double_test", "type": ["null", "double"]},) is possible!

Access Microsoft Azure Table Storage

Note: This example works starting from Flink 0.6-incubating

This example is using the HadoopInputFormat wrapper to use an existing Hadoop input format implementation for accessing Azure’s Table Storage.

  • Download and compile the azure-tables-hadoop project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.Execute the following commands:
  1. git clone https://github.com/mooso/azure-tables-hadoop.git
  2. cd azure-tables-hadoop
  3. mvn clean install
  • Setup a new Flink project using the quickstarts:
  1. curl https://flink.apache.org/q/quickstart.sh | bash
  • Add the following dependencies (in the <dependencies> section) to your pom.xml file:
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.microsoft.hadoop</groupId>
  8. <artifactId>microsoft-hadoop-azure</artifactId>
  9. <version>0.0.4</version>
  10. </dependency>

flink-hadoop-compatibility is a Flink package that provides the Hadoop input format wrappers. microsoft-hadoop-azure is adding the project we’ve build before to our project.

The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).Browse to the code of the Job.java file. Its an empty skeleton for a Flink job.

Paste the following code into it:

  1. import java.util.Map;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import com.microsoft.hadoop.azure.AzureTableConfiguration;
  10. import com.microsoft.hadoop.azure.AzureTableInputFormat;
  11. import com.microsoft.hadoop.azure.WritableEntity;
  12. import com.microsoft.windowsazure.storage.table.EntityProperty;
  13. public class AzureTableExample {
  14. public static void main(String[] args) throws Exception {
  15. // set up the execution environment
  16. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  17. // create a AzureTableInputFormat, using a Hadoop input format wrapper
  18. HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
  19. // set the Account URI, something like: https://apacheflink.table.core.windows.net
  20. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
  21. // set the secret storage key here
  22. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
  23. // set the table name here
  24. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
  25. DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
  26. // a little example how to use the data in a mapper.
  27. DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
  28. @Override
  29. public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
  30. System.err.println("--------------------------------\nKey = "+arg0.f0);
  31. WritableEntity we = arg0.f1;
  32. for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
  33. System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
  34. }
  35. return arg0.f0.toString();
  36. }
  37. });
  38. // emit result (this works only locally)
  39. fin.print();
  40. // execute program
  41. env.execute("Azure Example");
  42. }
  43. }

The example shows how to access an Azure table and turn data into Flink’s DataSet (more specifically, the type of the set is DataSet<Tuple2<Text, WritableEntity>>). With the DataSet, you can apply all known transformations to the DataSet.

Access MongoDB

This GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating).

Hive Connector

Starting from 1.9.0, Apache Flink provides Hive connector to access Apache Hive tables. HiveCatalog is required in order to use the Hive connector.After HiveCatalog is setup, please refer to Reading & Writing Hive Tables for the usage of the Hive connector and its limitations.Same as HiveCatalog, the officially supported Apache Hive versions for Hive connector are 2.3.4 and 1.2.1.