gkafka

gkafka模块实现了对kafka消息队列系统的客户端功能封装,支持分组消费指定起始位置等特性,并提供简便易用的API接口。

模块安装

  1. go get -u github.com/gogf/gkafka

或者使用go.mod:

  1. require github.com/gogf/gkafka latest

使用方式

  1. import "github.com/gogf/gkafka"

接口文档

godoc.org/github.com/gogf/gkafka

使用示例

生产者

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gkafka"
  5. "time"
  6. )
  7. func newKafkaClientProducer(topic string) *gkafka.Client {
  8. kafkaConfig := gkafka.NewConfig()
  9. kafkaConfig.Servers = "localhost:9092"
  10. kafkaConfig.AutoMarkOffset = false
  11. kafkaConfig.Topics = topic
  12. return gkafka.NewClient(kafkaConfig)
  13. }
  14. func main() {
  15. client := newKafkaClientProducer("test")
  16. defer client.Close()
  17. for {
  18. s := time.Now().String()
  19. fmt.Println("produce:", s)
  20. if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil {
  21. fmt.Println(err)
  22. }
  23. time.Sleep(time.Second)
  24. }
  25. }

消费者

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gkafka"
  5. )
  6. func newKafkaClientConsumer(topic, group string) *gkafka.Client {
  7. kafkaConfig := gkafka.NewConfig()
  8. kafkaConfig.Servers = "localhost:9092"
  9. kafkaConfig.AutoMarkOffset = false
  10. kafkaConfig.Topics = topic
  11. kafkaConfig.GroupId = group
  12. return gkafka.NewClient(kafkaConfig)
  13. }
  14. func main() {
  15. group := "test-group"
  16. topic := "test"
  17. client := newKafkaClientConsumer(topic, group)
  18. defer client.Close()
  19. for {
  20. if msg, err := client.Receive(); err != nil {
  21. fmt.Println(err)
  22. break
  23. } else {
  24. fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value))
  25. msg.MarkOffset()
  26. }
  27. }
  28. }