添加 Kafka 作为接收器

您可以在 KubeSphere 中使用 Elasticsearch、Kafka 和 Fluentd 日志接收器。本教程演示:

  • 部署 strimzi-kafka-operator,然后通过创建 KafkaKafkaTopic CRD 以创建 Kafka 集群和 Kafka Topic。
  • 添加 Kafka 作为日志接收器以从 Fluent Bit 接收日志。
  • 使用 Kafkacat 验证 Kafka 集群是否能接收日志。

准备工作

  • 您需要一个被授予集群管理权限的帐户。例如,您可以直接用 admin 帐户登录控制台,或创建一个具有集群管理权限的角色然后将此角色授予一个帐户。
  • 添加日志接收器前,您需要启用组件 loggingeventsauditing。有关更多信息,请参见启用可插拔组件。本教程启用 logging 作为示例。

步骤 1:创建 Kafka 集群和 Kafka Topic

您可以使用 strimzi-kafka-operator 创建 Kafka 集群和 Kafka Topic。如果您已经有了一个 Kafka 集群,您可以直接从下一步开始。

  1. default 命名空间中安装 strimzi-kafka-operator

    1. helm repo add strimzi https://strimzi.io/charts/
    1. helm install --name kafka-operator -n default strimzi/strimzi-kafka-operator
  2. 运行以下命令在 default 命名空间中创建 Kafka 集群和 Kafka Topic,该命令所创建的 Kafka 和 Zookeeper 集群的存储类型为 ephemeral,使用 emptyDir 进行演示。若要在生产环境下配置储存类型,请参见 kafka-persistent

    1. cat <<EOF | kubectl apply -f -
    2. apiVersion: kafka.strimzi.io/v1beta1
    3. kind: Kafka
    4. metadata:
    5. name: my-cluster
    6. namespace: default
    7. spec:
    8. kafka:
    9. version: 2.5.0
    10. replicas: 3
    11. listeners:
    12. plain: {}
    13. tls: {}
    14. config:
    15. offsets.topic.replication.factor: 3
    16. transaction.state.log.replication.factor: 3
    17. transaction.state.log.min.isr: 2
    18. log.message.format.version: '2.5'
    19. storage:
    20. type: ephemeral
    21. zookeeper:
    22. replicas: 3
    23. storage:
    24. type: ephemeral
    25. entityOperator:
    26. topicOperator: {}
    27. userOperator: {}
    28. ---
    29. apiVersion: kafka.strimzi.io/v1beta1
    30. kind: KafkaTopic
    31. metadata:
    32. name: my-topic
    33. namespace: default
    34. labels:
    35. strimzi.io/cluster: my-cluster
    36. spec:
    37. partitions: 3
    38. replicas: 3
    39. config:
    40. retention.ms: 7200000
    41. segment.bytes: 1073741824
    42. EOF
  3. 运行以下命令查看 Pod 状态,并等待 Kafka 和 Zookeeper 运行并启动。

    1. $ kubectl -n default get pod
    2. NAME READY STATUS RESTARTS AGE
    3. my-cluster-entity-operator-f977bf457-s7ns2 3/3 Running 0 69m
    4. my-cluster-kafka-0 2/2 Running 0 69m
    5. my-cluster-kafka-1 2/2 Running 0 69m
    6. my-cluster-kafka-2 2/2 Running 0 69m
    7. my-cluster-zookeeper-0 1/1 Running 0 71m
    8. my-cluster-zookeeper-1 1/1 Running 1 71m
    9. my-cluster-zookeeper-2 1/1 Running 1 71m
    10. strimzi-cluster-operator-7d6cd6bdf7-9cf6t 1/1 Running 0 104m

    运行以下命令查看 Kafka 集群的元数据:

    1. kafkacat -L -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092

步骤 2:添加 Kafka 作为日志接收器

  1. admin 身份登录 KubeSphere 的 Web 控制台。点击左上角的平台管理,然后选择集群管理

  2. 如果您启用了多集群功能,您可以选择一个集群。如果尚未启用该功能,请直接进行下一步。

  3. 集群管理页面,选择集群设置下的日志收集

  4. 点击添加日志接收器并选择 Kafka。输入 Kafka 代理地址和端口信息,然后点击确定继续。

    1. my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc 9092
    2. my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc 9092
    3. my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc 9092

    add-kafka

  5. 运行以下命令验证 Kafka 集群是否能从 Fluent Bit 接收日志:

    1. # Start a util container
    2. kubectl run --rm utils -it --generator=run-pod/v1 --image arunvelsriram/utils bash
    3. # Install Kafkacat in the util container
    4. apt-get install kafkacat
    5. # Run the following command to consume log messages from kafka topic: my-topic
    6. kafkacat -C -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092 -t my-topic