Azure Table Storage

本例使用 HadoopInputFormat 包装器来使用现有的 Hadoop input format 实现访问 Azure’s Table Storage.

  1. 下载并编译 azure-tables-hadoop 项目。该项目开发的 input format 在 Maven 中心尚不存在,因此,我们必须自己构建该项目。 执行如下命令:
  1. git clone https://github.com/mooso/azure-tables-hadoop.git
  2. cd azure-tables-hadoop
  3. mvn clean install
  1. 使用 quickstarts 创建一个新的 Flink 项目:
  1. curl https://flink.apache.org/q/quickstart.sh | bash
  1. 在你的 pom.xml 文件 <dependencies> 部分添加如下依赖:
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.12</artifactId>
  4. <version>1.16.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.microsoft.hadoop</groupId>
  8. <artifactId>microsoft-hadoop-azure</artifactId>
  9. <version>0.0.5</version>
  10. </dependency>

flink-hadoop-compatibility 是一个提供 Hadoop input format 包装器的 Flink 包。 microsoft-hadoop-azure 可以将之前构建的部分添加到项目中。

现在可以开始进行项目的编码。我们建议将项目导入 IDE,例如 IntelliJ。你应该将其作为 Maven 项目导入。 跳转到文件 Job.java。这是 Flink 作业的初始框架。

粘贴如下代码:

  1. import java.util.Map;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.java.DataStream;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import com.microsoft.hadoop.azure.AzureTableConfiguration;
  10. import com.microsoft.hadoop.azure.AzureTableInputFormat;
  11. import com.microsoft.hadoop.azure.WritableEntity;
  12. import com.microsoft.windowsazure.storage.table.EntityProperty;
  13. public class AzureTableExample {
  14. public static void main(String[] args) throws Exception {
  15. // 安装 execution environment
  16. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  18. // 使用 Hadoop input format 包装器创建 AzureTableInputFormat
  19. HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
  20. // 设置 Account URI,如 https://apacheflink.table.core.windows.net
  21. hdIf.getConfiguration().set(azuretableconfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
  22. // 设置存储密钥
  23. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
  24. // 在此处设置表名
  25. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
  26. DataStream<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
  27. // 如何在 map 中使用数据的简单示例。
  28. DataStream<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
  29. @Override
  30. public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
  31. System.err.println("--------------------------------\nKey = "+arg0.f0);
  32. WritableEntity we = arg0.f1;
  33. for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
  34. System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
  35. }
  36. return arg0.f0.toString();
  37. }
  38. });
  39. // 发送结果(这仅在本地模式有效)
  40. fin.print();
  41. // 执行程序
  42. env.execute("Azure Example");
  43. }
  44. }

该示例展示了如何访问 Azure 表和如何将数据转换为 Flink 的 DataStream(更具体地说,集合的类型是 DataStream<Tuple2<Text, WritableEntity>>)。你可以将所有已知的 transformations 应用到 DataStream 实例。