Kafka 流处理开发简介

在上一节,我们介绍了分布式流处理平台Kafka的运维工作,在这一节,我们将讨论Kafka的应用开发。

你可能已经注意到,这一节的标题并不是”在微服务中的集成”,而是”开发简介”。

使得,如在前文所述,Kafka的吞吐性能出色,但延迟性能一般,因此多用于离线处理的场景,典型应用有:

  • 异源数据^1的同步、转换、备份。即”数据搬运工”
  • 处理从多处收集的日志
  • Event Sourcing模式的事件回放

对于数据搬运工类的需求,建议优先考虑Kafka Connect。这是Kafka官方提供的一组工具集,可以通过简单的配置,就完成数据的同步和一些转换的工作。

对于Event Sourcing的模式开发,和日志处理收集的开发模式基本一致。

在此,我们用较短的篇幅,对Kafka的开发做一个简要的介绍。

在研究代码之前,有一些必须明确的基本概念:

  • Topics: 可以理解为队列,同种消息应放入相同的Topic内。
  • Partition: Topic下可划分为多个分区, 便于并行处理。
  • Replicas: Topic的Partition可以划分为多个副本,从而实现高可用保证。
  • Producer: 消息的生产者。
  • Consumer: 消息的消费者。
  • Consumer Group: 每个消费者可以设定一个Consumer Group。Kafka保证: 同一个消息,只投递给相同Consumer Group中的一台Consumer。换句话说,注册在同一个Consumer Group下的Consumer,可以并行的处理消息。

下面,我们看一下生产者

  1. //import util.properties packages
  2. import java.util.Properties;
  3. //import simple producer packages
  4. import org.apache.kafka.clients.producer.Producer;
  5. //import KafkaProducer packages
  6. import org.apache.kafka.clients.producer.KafkaProducer;
  7. //import ProducerRecord packages
  8. import org.apache.kafka.clients.producer.ProducerRecord;
  9. //Create java class named “SimpleProducer"
  10. public class SimpleProducer {
  11. public static void main(String[] args) throws Exception{
  12. // Check arguments length value
  13. if(args.length == 0){
  14. System.out.println("Enter topic name");
  15. return;
  16. }
  17. //Assign topicName to string variable
  18. String topicName = args[0].toString();
  19. // create instance for properties to access producer configs
  20. Properties props = new Properties();
  21. //Assign localhost id
  22. props.put("bootstrap.servers", "localhost:9092");
  23. //Set acknowledgements for producer requests.
  24. props.put("acks", "all");
  25. //If the request fails, the producer can automatically retry,
  26. props.put("retries", 0);
  27. //Specify buffer size in config
  28. props.put("batch.size", 16384);
  29. //Reduce the no of requests less than 0
  30. props.put("linger.ms", 1);
  31. //The buffer.memory controls the total amount of memory available to the producer for buffering.
  32. props.put("buffer.memory", 33554432);
  33. props.put("key.serializer",
  34. "org.apache.kafka.common.serializa-tion.StringSerializer");
  35. props.put("value.serializer",
  36. "org.apache.kafka.common.serializa-tion.StringSerializer");
  37. Producer<String, String> producer = new KafkaProducer
  38. <String, String>(props);
  39. for(int i = 0; i < 10; i++)
  40. producer.send(new ProducerRecord<String, String>(topicName,
  41. Integer.toString(i), Integer.toString(i)));
  42. System.out.println("Message sent successfully");
  43. producer.close();
  44. }
  45. }

简单解释下代码:

  1. 连接localhost:9092
  2. 收到消息后自动ack
  3. 设置缓存大小等配置
  4. 消息的key和value都是string类型,配置对应的序列化类
  5. 发送10个消息

下面来看一下Consumer代码

  1. import java.util.Properties;
  2. import java.util.Arrays;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. public class ConsumerGroup {
  7. public static void main(String[] args) throws Exception {
  8. if(args.length < 2){
  9. System.out.println("Usage: consumer <topic> <groupname>");
  10. return;
  11. }
  12. String topic = args[0].toString();
  13. String group = args[1].toString();
  14. Properties props = new Properties();
  15. props.put("bootstrap.servers", "localhost:9092");
  16. props.put("group.id", group);
  17. props.put("enable.auto.commit", "true");
  18. props.put("auto.commit.interval.ms", "1000");
  19. props.put("session.timeout.ms", "30000");
  20. props.put("key.deserializer",
  21. "org.apache.kafka.common.serializa-tion.StringDeserializer");
  22. props.put("value.deserializer",
  23. "org.apache.kafka.common.serializa-tion.StringDeserializer");
  24. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  25. consumer.subscribe(Arrays.asList(topic));
  26. System.out.println("Subscribed to topic " + topic);
  27. int i = 0;
  28. while (true) {
  29. ConsumerRecords<String, String> records = con-sumer.poll(100);
  30. for (ConsumerRecord<String, String> record : records)
  31. System.out.printf("offset = %d, key = %s, value = %s\n",
  32. record.offset(), record.key(), record.value());
  33. }
  34. }
  35. }

基本的配置与Producer类似,这里i不再重复了,能够并行处理的关键是”group.id”这个参数。如果同时启动2个Consumer进程,会发现消息是在两个Consumer进程中,交替输出的。

上述代码参考自Tutorial Spoint的Kafka教程,这是一部非常好的Kafka教程,如果你没有时间完整阅读官方文档,强烈推荐你读一下这部教程。