Kafka

1 Example

Producer地址Kafka - 图1 (opens new window) Consumer地址Kafka - 图2 (opens new window)

ego版本:ego@v0.5.3

2 Producer

2.1 Producer配置

  1. type config struct {
  2. // Brokers brokers地址
  3. Brokers []string `json:"brokers" toml:"brokers"`
  4. // Debug 是否开启debug模式
  5. Debug bool `json:"debug" toml:"debug"`
  6. // Client 用于创建topic等
  7. Client clientConfig `json:"client" toml:"client"`
  8. // Producers 多个消费者,用于生产消息
  9. Producers map[string]producerConfig `json:"producers" toml:"producers"`
  10. // Consumers 多个生产者,用于消费消息
  11. Consumers map[string]consumerConfig `json:"consumers" toml:"consumers"`
  12. }
  13. type producerConfig struct {
  14. // Topic 指定生产的消息推送到哪个topic
  15. Topic string `json:"topic" toml:"topic"`
  16. // Balancer 指定使用哪种Balancer,可选:hash\roundRobin
  17. Balancer string `json:"balancer" toml:"balancer"`
  18. // MaxAttempts 最大重试次数,默认10次
  19. MaxAttempts int `json:"maxAttempts" toml:"maxAttempts"`
  20. // BatchSize 批量发送的消息数量,默认100条
  21. BatchSize int `json:"batchSize" toml:"batchSize"`
  22. // BatchBytes 批量发送的消息大小,默认1MB
  23. BatchBytes int64 `json:"batchBytes" toml:"batchBytes"`
  24. // BatchTimeout 批量发送消息的周期,默认1s
  25. BatchTimeout time.Duration `json:"batchTimeout" toml:"batchTimeout"`
  26. // ReadTimeout 读超时
  27. ReadTimeout time.Duration `json:"readTimeout" toml:"readTimeout"`
  28. // WriteTimeout 写超时
  29. WriteTimeout time.Duration `json:"writeTimeout" toml:"writeTimeout"`
  30. // RequiredAcks ACK配置
  31. // RequireNone (0) fire-and-forget,producer不等待来自broker同步完成的确认后,就可以发送下一批消息
  32. // RequireOne (1) producer在leader已成功收到的数据并得到确认后,才发送下一批消息
  33. // RequireAll (-1) producer在所有follower副本确认接收到数据后,才发送下一批消息
  34. RequiredAcks kafka.RequiredAcks `json:"requiredAcks" toml:"requiredAcks"`
  35. // Async 设置成true时会导致WriteMessages非阻塞,会导致调用WriteMessages方法获取不到error
  36. Async bool `json:"async" toml:"async"`
  37. }

2.2 优雅的Debug

通过开启debug配置和命令行的export EGO_DEBUG=true,我们就可以在测试环境里看到请求里的配置名、地址、耗时、请求数据、响应数据 img.png

2.3 用户配置

  1. [kafka]
  2. debug=true
  3. brokers=["localhost:9091","localhost:9092","localhost:9093"]
  4. [kafka.client]
  5. timeout="3s"
  6. [kafka.producers.p1] # 定义了名字为p1的producer
  7. topic="sre-infra-test" # 指定生产消息的topic
  8. balancer="my-balancer" # 指定balancer,此balancer非默认balancer,需要使用ekafka.WithRegisterBalancer()注册
  9. [kafka.consumers.c1] # 定义了名字为c1的consumer
  10. topic="sre-infra-test" # 指定消费的topic
  11. groupID="group-1" # 如果配置了groupID,将初始化为consumerGroup
  12. [kafka.consumers.c2] # 定义了名字为c2的consumer
  13. topic="sre-infra-test" # 指定消费的topic
  14. groupID="group-2" # 如果配置了groupID,将初始化为consumerGroup

2.4 用户代码

  1. package main
  2. // produce 生产消息
  3. func produce(w *ekafka.Producer) {
  4. // 生产3条消息
  5. err := w.WriteMessages(context.Background(),
  6. ekafka.Message{Key: []byte("Key-A"), Value: []byte("Hello World!")},
  7. ekafka.Message{Key: []byte("Key-B"), Value: []byte("One!")},
  8. ekafka.Message{Key: []byte("Key-C"), Value: []byte("Two!")},
  9. )
  10. if err != nil {
  11. log.Fatal("failed to write messages:", err)
  12. }
  13. if err := w.Close(); err != nil {
  14. log.Fatal("failed to close writer:", err)
  15. }
  16. fmt.Println(`produce message succ--------------->`)
  17. }
  18. // consume 使用consumer/consumerGroup消费消息
  19. func consume(r *ekafka.Consumer) {
  20. ctx := context.Background()
  21. for {
  22. // ReadMessage 再收到下一个Message时,会阻塞
  23. msg, err := r.ReadMessage(ctx)
  24. if err != nil {
  25. panic("could not read message " + err.Error())
  26. }
  27. // 打印消息
  28. fmt.Println("received: ", string(msg.Value))
  29. err = r.CommitMessages(ctx, msg)
  30. if err != nil {
  31. log.Printf("fail to commit msg:%v", err)
  32. }
  33. }
  34. }
  35. func main() {
  36. var stopCh = make(chan bool)
  37. // 假设你配置的toml如下所示
  38. conf := `
  39. [kafka]
  40. debug=true
  41. brokers=["localhost:9091","localhost:9092","localhost:9093"]
  42. [kafka.client]
  43. timeout="3s"
  44. [kafka.producers.p1] # 定义了名字为p1的producer
  45. topic="sre-infra-test" # 指定生产消息的topic
  46. balancer="my-balancer" # 指定balancer,此balancer非默认balancer,需要使用ekafka.WithRegisterBalancer()注册
  47. [kafka.consumers.c1] # 定义了名字为c1的consumer
  48. topic="sre-infra-test" # 指定消费的topic
  49. groupID="group-1" # 如果配置了groupID,将初始化为consumerGroup
  50. [kafka.consumers.c2] # 定义了名字为c2的consumer
  51. topic="sre-infra-test" # 指定消费的topic
  52. groupID="group-2" # 如果配置了groupID,将初始化为consumerGroup
  53. `
  54. // 加载配置文件
  55. err := econf.LoadFromReader(strings.NewReader(conf), toml.Unmarshal)
  56. if err != nil {
  57. panic("LoadFromReader fail," + err.Error())
  58. }
  59. // 初始化ekafka组件
  60. cmp := ekafka.Load("kafka").Build(
  61. // 注册名为my-balancer的自定义balancer
  62. ekafka.WithRegisterBalancer("my-balancer", &kafka.Hash{}),
  63. )
  64. // 使用p1生产者生产消息
  65. go produce(cmp.Producer("p1"))
  66. // 使用c1消费者消费消息
  67. consume(cmp.Consumer("c1"))
  68. stopCh <- true
  69. }

3 Consumer

3.1 Consumer配置

  1. type config struct {
  2. // Brokers brokers地址
  3. Brokers []string `json:"brokers" toml:"brokers"`
  4. // Debug 是否开启debug模式
  5. Debug bool `json:"debug" toml:"debug"`
  6. // Client 用于创建topic等
  7. Client clientConfig `json:"client" toml:"client"`
  8. // Producers 多个消费者,用于生产消息
  9. Producers map[string]producerConfig `json:"producers" toml:"producers"`
  10. // Consumers 多个生产者,用于消费消息
  11. Consumers map[string]consumerConfig `json:"consumers" toml:"consumers"`
  12. }
  13. type consumerConfig struct {
  14. // Partition 指定分区ID,和GroupID不能同时配置
  15. Partition int `json:"partition" toml:"partition"`
  16. // GroupID 指定分组ID,和Partition不能同时配置,当配置了GroupID时,默认使用ConsumerGroup来消费
  17. GroupID string `json:"groupID" toml:"groupID"`
  18. // Topic 消费的topic
  19. Topic string `json:"topic" toml:"topic"`
  20. // MinBytes 向kafka发送请求的包最小值
  21. MinBytes int `json:"minBytes" toml:"minBytes"`
  22. // MaxBytes 向kafka发送请求的包最大值
  23. MaxBytes int `json:"maxBytes" toml:"maxBytes"`
  24. // WatchPartitionChanges 是否监听分区变化
  25. WatchPartitionChanges bool `json:"watchPartitionChanges" toml:"watchPartitionChanges"`
  26. // PartitionWatchInterval 监听分区变化时间周期
  27. PartitionWatchInterval time.Duration `json:"partitionWatchInterval" toml:"partitionWatchInterval"`
  28. // RebalanceTimeout rebalance 超时时间
  29. RebalanceTimeout time.Duration `json:"rebalanceTimeout" toml:"rebalanceTimeout"`
  30. // MaxWait 从kafka批量获取数据时,最大等待间隔
  31. MaxWait time.Duration `json:"maxWait" toml:"maxWait"`
  32. // ReadLagInterval 获取消费者滞后值的时间周期
  33. ReadLagInterval time.Duration `json:"readLagInterval" toml:"readLagInterval"`
  34. HeartbeatInterval time.Duration `json:"heartbeatInterval" ,toml:"heartbeatInterval"`
  35. CommitInterval time.Duration `json:"commitInterval" toml:"commitInterval"`
  36. SessionTimeout time.Duration `json:"sessionTimeout" toml:"sessionTimeout"`
  37. JoinGroupBackoff time.Duration `json:"joinGroupBackoff" toml:"joinGroupBackoff"`
  38. RetentionTime time.Duration `json:"retentionTime" toml:"retentionTime"`
  39. StartOffset int64 `json:"startOffset" toml:"startOffset"`
  40. ReadBackoffMin time.Duration `json:"readBackoffMin" toml:"readBackoffMin"`
  41. ReadBackoffMax time.Duration `json:"readBackoffMax" toml:"readBackoffMax"`
  42. }

3.2 用户配置

  1. [kafka]
  2. debug=true
  3. brokers=["localhost:9094"]
  4. [kafka.client]
  5. timeout="3s"
  6. [kafka.producers.p1] # 定义了名字为p1的producer
  7. topic="sre-infra-test" # 指定生产消息的topic
  8. [kafka.consumers.c1] # 定义了名字为c1的consumer
  9. topic="sre-infra-test" # 指定消费的topic
  10. groupID="group-1" # 如果配置了groupID,将初始化为consumerGroup
  11. [kafkaConsumerServers.s1]
  12. debug=true
  13. consumerName="c1"

3.3 用户代码

  1. package main
  2. func main() {
  3. conf := `
  4. [kafka]
  5. debug=true
  6. brokers=["localhost:9094"]
  7. [kafka.client]
  8. timeout="3s"
  9. [kafka.producers.p1] # 定义了名字为p1的producer
  10. topic="sre-infra-test" # 指定生产消息的topic
  11. [kafka.consumers.c1] # 定义了名字为c1的consumer
  12. topic="sre-infra-test" # 指定消费的topic
  13. groupID="group-1" # 如果配置了groupID,将初始化为consumerGroup
  14. [kafkaConsumerServers.s1]
  15. debug=true
  16. consumerName="c1"
  17. `
  18. // 加载配置文件
  19. err := econf.LoadFromReader(strings.NewReader(conf), toml.Unmarshal)
  20. if err != nil {
  21. panic("LoadFromReader fail," + err.Error())
  22. }
  23. app := ego.New().Serve(
  24. // 可以搭配其他服务模块一起使用
  25. egovernor.Load("server.governor").Build(),
  26. // 初始化 Consumer Server
  27. func() *consumerserver.Component {
  28. // 依赖 `ekafka` 管理 Kafka consumer
  29. ec := ekafka.Load("kafka").Build()
  30. cs := consumerserver.Load("kafkaConsumerServers.s1").Build(
  31. consumerserver.WithEkafka(ec),
  32. )
  33. // 用来接收、处理 `kafka-go` 和处理消息的回调产生的错误
  34. consumptionErrors := make(chan error)
  35. // 注册处理消息的回调函数
  36. cs.OnEachMessage(consumptionErrors, func(ctx context.Context, message kafka.Message) error {
  37. fmt.Printf("got a message: %s\n", string(message.Value))
  38. // 如果返回错误则会被转发给 `consumptionErrors`
  39. return nil
  40. })
  41. return cs
  42. }(),
  43. // 还可以启动多个 Consumer Server
  44. )
  45. if err := app.Run(); err != nil {
  46. elog.Panic("startup", elog.Any("err", err))
  47. }
  48. }