Storm集成HDFS和HBase

一、Storm集成HDFS

1.1 项目结构

Storm 集成 HDFS-HBase - 图1

本用例源码下载地址:storm-hdfs-integration

1.2 项目主要依赖

项目主要依赖如下,有两个地方需要注意:

  • 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时引入的也是 CDH 版本的依赖,需要使用 <repository> 标签指定 CDH 的仓库地址;
  • hadoop-commonhadoop-clienthadoop-hdfs 均需要排除 slf4j-log4j12 依赖,原因是 storm-core 中已经有该依赖,不排除的话有 JAR 包冲突的风险;
  1. <properties>
  2. <storm.version>1.2.2</storm.version>
  3. </properties>
  4. <repositories>
  5. <repository>
  6. <id>cloudera</id>
  7. <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  8. </repository>
  9. </repositories>
  10. <dependencies>
  11. <dependency>
  12. <groupId>org.apache.storm</groupId>
  13. <artifactId>storm-core</artifactId>
  14. <version>${storm.version}</version>
  15. </dependency>
  16. <!--Storm 整合 HDFS 依赖-->
  17. <dependency>
  18. <groupId>org.apache.storm</groupId>
  19. <artifactId>storm-hdfs</artifactId>
  20. <version>${storm.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.hadoop</groupId>
  24. <artifactId>hadoop-common</artifactId>
  25. <version>2.6.0-cdh5.15.2</version>
  26. <exclusions>
  27. <exclusion>
  28. <groupId>org.slf4j</groupId>
  29. <artifactId>slf4j-log4j12</artifactId>
  30. </exclusion>
  31. </exclusions>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.hadoop</groupId>
  35. <artifactId>hadoop-client</artifactId>
  36. <version>2.6.0-cdh5.15.2</version>
  37. <exclusions>
  38. <exclusion>
  39. <groupId>org.slf4j</groupId>
  40. <artifactId>slf4j-log4j12</artifactId>
  41. </exclusion>
  42. </exclusions>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.apache.hadoop</groupId>
  46. <artifactId>hadoop-hdfs</artifactId>
  47. <version>2.6.0-cdh5.15.2</version>
  48. <exclusions>
  49. <exclusion>
  50. <groupId>org.slf4j</groupId>
  51. <artifactId>slf4j-log4j12</artifactId>
  52. </exclusion>
  53. </exclusions>
  54. </dependency>
  55. </dependencies>

1.3 DataSourceSpout

  1. /**
  2. * 产生词频样本的数据源
  3. */
  4. public class DataSourceSpout extends BaseRichSpout {
  5. private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
  6. private SpoutOutputCollector spoutOutputCollector;
  7. @Override
  8. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  9. this.spoutOutputCollector = spoutOutputCollector;
  10. }
  11. @Override
  12. public void nextTuple() {
  13. // 模拟产生数据
  14. String lineData = productData();
  15. spoutOutputCollector.emit(new Values(lineData));
  16. Utils.sleep(1000);
  17. }
  18. @Override
  19. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  20. outputFieldsDeclarer.declare(new Fields("line"));
  21. }
  22. /**
  23. * 模拟数据
  24. */
  25. private String productData() {
  26. Collections.shuffle(list);
  27. Random random = new Random();
  28. int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
  29. return StringUtils.join(list.toArray(), "\t", 0, endIndex);
  30. }
  31. }

产生的模拟数据格式如下:

  1. Spark HBase
  2. Hive Flink Storm Hadoop HBase Spark
  3. Flink
  4. HBase Storm
  5. HBase Hadoop Hive Flink
  6. HBase Flink Hive Storm
  7. Hive Flink Hadoop
  8. HBase Hive
  9. Hadoop Spark HBase Storm

1.4 将数据存储到HDFS

这里 HDFS 的地址和数据存储路径均使用了硬编码,在实际开发中可以通过外部传参指定,这样程序更为灵活。

  1. public class DataToHdfsApp {
  2. private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
  3. private static final String HDFS_BOLT = "hdfsBolt";
  4. public static void main(String[] args) {
  5. // 指定 Hadoop 的用户名 如果不指定,则在 HDFS 创建目录时候有可能抛出无权限的异常 (RemoteException: Permission denied)
  6. System.setProperty("HADOOP_USER_NAME", "root");
  7. // 定义输出字段 (Field) 之间的分隔符
  8. RecordFormat format = new DelimitedRecordFormat()
  9. .withFieldDelimiter("|");
  10. // 同步策略: 每 100 个 tuples 之后就会把数据从缓存刷新到 HDFS 中
  11. SyncPolicy syncPolicy = new CountSyncPolicy(100);
  12. // 文件策略: 每个文件大小上限 1M,超过限定时,创建新文件并继续写入
  13. FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
  14. // 定义存储路径
  15. FileNameFormat fileNameFormat = new DefaultFileNameFormat()
  16. .withPath("/storm-hdfs/");
  17. // 定义 HdfsBolt
  18. HdfsBolt hdfsBolt = new HdfsBolt()
  19. .withFsUrl("hdfs://hadoop001:8020")
  20. .withFileNameFormat(fileNameFormat)
  21. .withRecordFormat(format)
  22. .withRotationPolicy(rotationPolicy)
  23. .withSyncPolicy(syncPolicy);
  24. // 构建 Topology
  25. TopologyBuilder builder = new TopologyBuilder();
  26. builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
  27. // save to HDFS
  28. builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
  29. // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
  30. if (args.length > 0 && args[0].equals("cluster")) {
  31. try {
  32. StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
  33. } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
  34. e.printStackTrace();
  35. }
  36. } else {
  37. LocalCluster cluster = new LocalCluster();
  38. cluster.submitTopology("LocalDataToHdfsApp",
  39. new Config(), builder.createTopology());
  40. }
  41. }
  42. }

1.5 启动测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

  1. # mvn clean package -D maven.test.skip=true

运行后,数据会存储到 HDFS 的 /storm-hdfs 目录下。使用以下命令可以查看目录内容:

  1. # 查看目录内容
  2. hadoop fs -ls /storm-hdfs
  3. # 监听文内容变化
  4. hadoop fs -tail -f /strom-hdfs/文件名
Storm 集成 HDFS-HBase - 图2

二、Storm集成HBase

2.1 项目结构

集成用例: 进行词频统计并将最后的结果存储到 HBase,项目主要结构如下:

Storm 集成 HDFS-HBase - 图3

本用例源码下载地址:storm-hbase-integration

2.2 项目主要依赖

  1. <properties>
  2. <storm.version>1.2.2</storm.version>
  3. </properties>
  4. <dependencies>
  5. <dependency>
  6. <groupId>org.apache.storm</groupId>
  7. <artifactId>storm-core</artifactId>
  8. <version>${storm.version}</version>
  9. </dependency>
  10. <!--Storm 整合 HBase 依赖-->
  11. <dependency>
  12. <groupId>org.apache.storm</groupId>
  13. <artifactId>storm-hbase</artifactId>
  14. <version>${storm.version}</version>
  15. </dependency>
  16. </dependencies>

2.3 DataSourceSpout

  1. /**
  2. * 产生词频样本的数据源
  3. */
  4. public class DataSourceSpout extends BaseRichSpout {
  5. private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
  6. private SpoutOutputCollector spoutOutputCollector;
  7. @Override
  8. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  9. this.spoutOutputCollector = spoutOutputCollector;
  10. }
  11. @Override
  12. public void nextTuple() {
  13. // 模拟产生数据
  14. String lineData = productData();
  15. spoutOutputCollector.emit(new Values(lineData));
  16. Utils.sleep(1000);
  17. }
  18. @Override
  19. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  20. outputFieldsDeclarer.declare(new Fields("line"));
  21. }
  22. /**
  23. * 模拟数据
  24. */
  25. private String productData() {
  26. Collections.shuffle(list);
  27. Random random = new Random();
  28. int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
  29. return StringUtils.join(list.toArray(), "\t", 0, endIndex);
  30. }
  31. }

产生的模拟数据格式如下:

  1. Spark HBase
  2. Hive Flink Storm Hadoop HBase Spark
  3. Flink
  4. HBase Storm
  5. HBase Hadoop Hive Flink
  6. HBase Flink Hive Storm
  7. Hive Flink Hadoop
  8. HBase Hive
  9. Hadoop Spark HBase Storm

2.4 SplitBolt

  1. /**
  2. * 将每行数据按照指定分隔符进行拆分
  3. */
  4. public class SplitBolt extends BaseRichBolt {
  5. private OutputCollector collector;
  6. @Override
  7. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  8. this.collector = collector;
  9. }
  10. @Override
  11. public void execute(Tuple input) {
  12. String line = input.getStringByField("line");
  13. String[] words = line.split("\t");
  14. for (String word : words) {
  15. collector.emit(tuple(word, 1));
  16. }
  17. }
  18. @Override
  19. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  20. declarer.declare(new Fields("word", "count"));
  21. }
  22. }

2.5 CountBolt

  1. /**
  2. * 进行词频统计
  3. */
  4. public class CountBolt extends BaseRichBolt {
  5. private Map<String, Integer> counts = new HashMap<>();
  6. private OutputCollector collector;
  7. @Override
  8. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  9. this.collector=collector;
  10. }
  11. @Override
  12. public void execute(Tuple input) {
  13. String word = input.getStringByField("word");
  14. Integer count = counts.get(word);
  15. if (count == null) {
  16. count = 0;
  17. }
  18. count++;
  19. counts.put(word, count);
  20. // 输出
  21. collector.emit(new Values(word, String.valueOf(count)));
  22. }
  23. @Override
  24. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  25. declarer.declare(new Fields("word", "count"));
  26. }
  27. }

2.6 WordCountToHBaseApp

  1. /**
  2. * 进行词频统计 并将统计结果存储到 HBase 中
  3. */
  4. public class WordCountToHBaseApp {
  5. private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
  6. private static final String SPLIT_BOLT = "splitBolt";
  7. private static final String COUNT_BOLT = "countBolt";
  8. private static final String HBASE_BOLT = "hbaseBolt";
  9. public static void main(String[] args) {
  10. // storm 的配置
  11. Config config = new Config();
  12. // HBase 的配置
  13. Map<String, Object> hbConf = new HashMap<>();
  14. hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase");
  15. hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");
  16. // 将 HBase 的配置传入 Storm 的配置中
  17. config.put("hbase.conf", hbConf);
  18. // 定义流数据与 HBase 中数据的映射
  19. SimpleHBaseMapper mapper = new SimpleHBaseMapper()
  20. .withRowKeyField("word")
  21. .withColumnFields(new Fields("word","count"))
  22. .withColumnFamily("info");
  23. /*
  24. * 给 HBaseBolt 传入表名、数据映射关系、和 HBase 的配置信息
  25. * 表需要预先创建: create 'WordCount','info'
  26. */
  27. HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
  28. .withConfigKey("hbase.conf");
  29. // 构建 Topology
  30. TopologyBuilder builder = new TopologyBuilder();
  31. builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
  32. // split
  33. builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
  34. // count
  35. builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
  36. // save to HBase
  37. builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
  38. // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
  39. if (args.length > 0 && args[0].equals("cluster")) {
  40. try {
  41. StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
  42. } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
  43. e.printStackTrace();
  44. }
  45. } else {
  46. LocalCluster cluster = new LocalCluster();
  47. cluster.submitTopology("LocalWordCountToRedisApp",
  48. config, builder.createTopology());
  49. }
  50. }
  51. }

2.7 启动测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

  1. # mvn clean package -D maven.test.skip=true

运行后,数据会存储到 HBase 的 WordCount 表中。使用以下命令查看表的内容:

  1. hbase > scan 'WordCount'
Storm 集成 HDFS-HBase - 图4

2.8 withCounterFields

在上面的用例中我们是手动编码来实现词频统计,并将最后的结果存储到 HBase 中。其实也可以在构建 SimpleHBaseMapper 的时候通过 withCounterFields 指定 count 字段,被指定的字段会自动进行累加操作,这样也可以实现词频统计。需要注意的是 withCounterFields 指定的字段必须是 Long 类型,不能是 String 类型。

  1. SimpleHBaseMapper mapper = new SimpleHBaseMapper()
  2. .withRowKeyField("word")
  3. .withColumnFields(new Fields("word"))
  4. .withCounterFields(new Fields("count"))
  5. .withColumnFamily("cf");

参考资料

  1. Apache HDFS Integration
  2. Apache HBase Integration