Azure Table Storage

This example is using the HadoopInputFormat wrapper to use an existing Hadoop input format implementation for accessing Azure’s Table Storage.

  1. Download and compile the azure-tables-hadoop project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves. Execute the following commands:
  1. git clone https://github.com/mooso/azure-tables-hadoop.git
  2. cd azure-tables-hadoop
  3. mvn clean install
  1. Setup a new Flink project using the quickstarts:
  1. curl https://flink.apache.org/q/quickstart.sh | bash
  1. Add the following dependencies (in the <dependencies> section) to your pom.xml file:
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.12</artifactId>
  4. <version>1.18.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.microsoft.hadoop</groupId>
  8. <artifactId>microsoft-hadoop-azure</artifactId>
  9. <version>0.0.5</version>
  10. </dependency>

flink-hadoop-compatibility is a Flink package that provides the Hadoop input format wrappers. microsoft-hadoop-azure is adding the project we’ve build before to our project.

The project is now ready for starting to code. We recommend to import the project into an IDE, such as IntelliJ. You should import it as a Maven project. Browse to the file Job.java. This is an empty skeleton for a Flink job.

Paste the following code:

  1. import java.util.Map;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.java.DataStream;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import com.microsoft.hadoop.azure.AzureTableConfiguration;
  10. import com.microsoft.hadoop.azure.AzureTableInputFormat;
  11. import com.microsoft.hadoop.azure.WritableEntity;
  12. import com.microsoft.windowsazure.storage.table.EntityProperty;
  13. public class AzureTableExample {
  14. public static void main(String[] args) throws Exception {
  15. // set up the execution environment
  16. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  18. // create a AzureTableInputFormat, using a Hadoop input format wrapper
  19. HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
  20. // set the Account URI, something like: https://apacheflink.table.core.windows.net
  21. hdIf.getConfiguration().set(azuretableconfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
  22. // set the secret storage key here
  23. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
  24. // set the table name here
  25. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
  26. DataStream<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
  27. // a little example how to use the data in a mapper.
  28. DataStream<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
  29. @Override
  30. public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
  31. System.err.println("--------------------------------\nKey = "+arg0.f0);
  32. WritableEntity we = arg0.f1;
  33. for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
  34. System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
  35. }
  36. return arg0.f0.toString();
  37. }
  38. });
  39. // emit result (this works only locally)
  40. fin.print();
  41. // execute program
  42. env.execute("Azure Example");
  43. }
  44. }

The example shows how to access an Azure table and turn data into Flink’s DataStream (more specifically, the type of the set is DataStream<Tuple2<Text, WritableEntity>>). With the DataStream, you can apply all known transformations to the DataStream.