Storm集成Kafka

一、整合说明

Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下:

这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档进行整合,不适用于 0.8.x 版本的 Kafka。

二、写入数据到Kafka

2.1 项目结构

Storm 集成 Kafka - 图1

2.2 项目主要依赖

  1. <properties>
  2. <storm.version>1.2.2</storm.version>
  3. <kafka.version>2.2.0</kafka.version>
  4. </properties>
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.apache.storm</groupId>
  8. <artifactId>storm-core</artifactId>
  9. <version>${storm.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.storm</groupId>
  13. <artifactId>storm-kafka-client</artifactId>
  14. <version>${storm.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.kafka</groupId>
  18. <artifactId>kafka-clients</artifactId>
  19. <version>${kafka.version}</version>
  20. </dependency>
  21. </dependencies>

2.3 DataSourceSpout

  1. /**
  2. * 产生词频样本的数据源
  3. */
  4. public class DataSourceSpout extends BaseRichSpout {
  5. private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
  6. private SpoutOutputCollector spoutOutputCollector;
  7. @Override
  8. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  9. this.spoutOutputCollector = spoutOutputCollector;
  10. }
  11. @Override
  12. public void nextTuple() {
  13. // 模拟产生数据
  14. String lineData = productData();
  15. spoutOutputCollector.emit(new Values(lineData));
  16. Utils.sleep(1000);
  17. }
  18. @Override
  19. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  20. outputFieldsDeclarer.declare(new Fields("line"));
  21. }
  22. /**
  23. * 模拟数据
  24. */
  25. private String productData() {
  26. Collections.shuffle(list);
  27. Random random = new Random();
  28. int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
  29. return StringUtils.join(list.toArray(), "\t", 0, endIndex);
  30. }
  31. }

产生的模拟数据格式如下:

  1. Spark HBase
  2. Hive Flink Storm Hadoop HBase Spark
  3. Flink
  4. HBase Storm
  5. HBase Hadoop Hive Flink
  6. HBase Flink Hive Storm
  7. Hive Flink Hadoop
  8. HBase Hive
  9. Hadoop Spark HBase Storm

2.4 WritingToKafkaApp

  1. /**
  2. * 写入数据到 Kafka 中
  3. */
  4. public class WritingToKafkaApp {
  5. private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
  6. private static final String TOPIC_NAME = "storm-topic";
  7. public static void main(String[] args) {
  8. TopologyBuilder builder = new TopologyBuilder();
  9. // 定义 Kafka 生产者属性
  10. Properties props = new Properties();
  11. /*
  12. * 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。
  13. * 不过建议至少要提供两个 broker 的信息作为容错。
  14. */
  15. props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
  16. /*
  17. * acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
  18. * acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  19. * acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  20. * acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  21. */
  22. props.put("acks", "1");
  23. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  24. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  25. KafkaBolt bolt = new KafkaBolt<String, String>()
  26. .withProducerProperties(props)
  27. .withTopicSelector(new DefaultTopicSelector(TOPIC_NAME))
  28. .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());
  29. builder.setSpout("sourceSpout", new DataSourceSpout(), 1);
  30. builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");
  31. if (args.length > 0 && args[0].equals("cluster")) {
  32. try {
  33. StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());
  34. } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
  35. e.printStackTrace();
  36. }
  37. } else {
  38. LocalCluster cluster = new LocalCluster();
  39. cluster.submitTopology("LocalWritingToKafkaApp",
  40. new Config(), builder.createTopology());
  41. }
  42. }
  43. }

2.5 测试准备工作

进行测试前需要启动 Kakfa:

1. 启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

  1. # zookeeper启动命令
  2. bin/zkServer.sh start
  3. # 内置zookeeper启动命令
  4. bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

  1. # bin/kafka-server-start.sh config/server.properties

2. 创建topic

  1. # 创建用于测试主题
  2. bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic
  3. # 查看所有主题
  4. bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 启动消费者

启动一个消费者用于观察写入情况,启动命令如下:

  1. # bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning

2.6 测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

  1. # mvn clean package -D maven.test.skip=true

启动后,消费者监听情况如下:

Storm 集成 Kafka - 图2

三、从Kafka中读取数据

3.1 项目结构

Storm 集成 Kafka - 图3

3.2 ReadingFromKafkaApp

  1. /**
  2. * 从 Kafka 中读取数据
  3. */
  4. public class ReadingFromKafkaApp {
  5. private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";
  6. private static final String TOPIC_NAME = "storm-topic";
  7. public static void main(String[] args) {
  8. final TopologyBuilder builder = new TopologyBuilder();
  9. builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);
  10. builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");
  11. // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
  12. if (args.length > 0 && args[0].equals("cluster")) {
  13. try {
  14. StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
  15. } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
  16. e.printStackTrace();
  17. }
  18. } else {
  19. LocalCluster cluster = new LocalCluster();
  20. cluster.submitTopology("LocalReadingFromKafkaApp",
  21. new Config(), builder.createTopology());
  22. }
  23. }
  24. private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
  25. return KafkaSpoutConfig.builder(bootstrapServers, topic)
  26. // 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常
  27. .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
  28. // 定义重试策略
  29. .setRetry(getRetryService())
  30. // 定时提交偏移量的时间间隔,默认是 15s
  31. .setOffsetCommitPeriodMs(10_000)
  32. .build();
  33. }
  34. // 定义重试策略
  35. private static KafkaSpoutRetryService getRetryService() {
  36. return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
  37. TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
  38. }
  39. }

3.3 LogConsoleBolt

  1. /**
  2. * 打印从 Kafka 中获取的数据
  3. */
  4. public class LogConsoleBolt extends BaseRichBolt {
  5. private OutputCollector collector;
  6. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  7. this.collector=collector;
  8. }
  9. public void execute(Tuple input) {
  10. try {
  11. String value = input.getStringByField("value");
  12. System.out.println("received from kafka : "+ value);
  13. // 必须 ack,否则会重复消费 kafka 中的消息
  14. collector.ack(input);
  15. }catch (Exception e){
  16. e.printStackTrace();
  17. collector.fail(input);
  18. }
  19. }
  20. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  21. }
  22. }

这里从 value 字段中获取 kafka 输出的值数据。

在开发中,我们可以通过继承 RecordTranslator 接口定义了 Kafka 中 Record 与输出流之间的映射关系,可以在构建 KafkaSpoutConfig 的时候通过构造器或者 setRecordTranslator() 方法传入,并最后传递给具体的 KafkaSpout

默认情况下使用内置的 DefaultRecordTranslator,其源码如下,FIELDS 中 定义了 tuple 中所有可用的字段:主题,分区,偏移量,消息键,值。

  1. public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
  2. private static final long serialVersionUID = -5782462870112305750L;
  3. public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
  4. @Override
  5. public List<Object> apply(ConsumerRecord<K, V> record) {
  6. return new Values(record.topic(),
  7. record.partition(),
  8. record.offset(),
  9. record.key(),
  10. record.value());
  11. }
  12. @Override
  13. public Fields getFieldsFor(String stream) {
  14. return FIELDS;
  15. }
  16. @Override
  17. public List<String> streams() {
  18. return DEFAULT_STREAM;
  19. }
  20. }

3.4 启动测试

这里启动一个生产者用于发送测试数据,启动命令如下:

  1. # bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic
Storm 集成 Kafka - 图4

本地运行的项目接收到从 Kafka 发送过来的数据:

Storm 集成 Kafka - 图5


用例源码下载地址:storm-kafka-integration

参考资料

  1. Storm Kafka Integration (0.10.x+)