添加 Kafka 作为日志接收者

KubeSphere 目前支持添加的日志接收者包括 Elasticsearch、Kafka 和 Fluentd,本文档通过以下两种方式说明如何创建 Kafka 和 Zookeeper 集群,并通过 KubeSphere 添加日志接收者将日志输出到 Kafka 的 topic,最终通过 Kafkacat 客户端 验证接收实时的日志消息:

  • 在 Kubernetes 部署一个单节点的 Kafka 和 Zookeeper (适用于测试或演示)
  • 在 AppCenter 创建 Kafka 和 Zookeeper 集群 (适用于正式环境)

Kubernetes 部署单节点 Kafka 和 Zookeeper

第一步:通过 yaml 创建 Zookeeper、Kafka

注意:单节点 Kafka 和 Zookeeper 仅用于测试日志输出到 Kafka,正式环境建议搭建多节点的 Kafka 集群。

1、在工具箱打开 web kubectl 或后台 SSH 到 KubeSphere 后台,新建一个 kafka 的 Namespace,然后在该 Namespace 中创建 Zookeeper 的 Service 和 Deployment。

zookeeper-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. labels:
  5. app: zookeeper-service
  6. name: zookeeper-service
  7. spec:
  8. type: NodePort
  9. ports:
  10. - name: zookeeper-port
  11. port: 2181
  12. nodePort: 30181
  13. targetPort: 2181
  14. selector:
  15. app: zookeeper

zookeeper.yaml

  1. apiVersion: extensions/v1beta1
  2. kind: Deployment
  3. metadata:
  4. labels:
  5. app: zookeeper
  6. name: zookeeper
  7. spec:
  8. replicas: 1
  9. template:
  10. metadata:
  11. labels:
  12. app: zookeeper
  13. spec:
  14. containers:
  15. - image: wurstmeister/zookeeper
  16. imagePullPolicy: IfNotPresent
  17. name: zookeeper
  18. ports:
  19. - containerPort: 2181

创建命令

  1. $ kubectl create ns kafka
  2. $ kubectl create -f zookeeper-service.yaml,zookeeper.yaml -n kafka

2、创建 Kafka 的 Deployment 和 Service,其中 192.168.0.16 请替换为您实际的 IP 地址:

kafka.yaml

  1. apiVersion: extensions/v1beta1
  2. kind: Deployment
  3. metadata:
  4. labels:
  5. app: kafka
  6. name: kafka
  7. spec:
  8. replicas: 1
  9. template:
  10. metadata:
  11. labels:
  12. app: kafka
  13. spec:
  14. containers:
  15. - env:
  16. - name: KAFKA_ADVERTISED_HOST_NAME
  17. value: "192.168.0.16"
  18. - name: KAFKA_ADVERTISED_PORT
  19. value: "30092"
  20. - name: KAFKA_BROKER_ID
  21. value: "1"
  22. - name: KAFKA_ZOOKEEPER_CONNECT
  23. value: 192.168.0.16:30181
  24. - name: KAFKA_CREATE_TOPICS
  25. value: "test:1:1"
  26. image: wurstmeister/kafka
  27. imagePullPolicy: IfNotPresent
  28. name: kafka
  29. ports:
  30. - containerPort: 9092

kafka-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. labels:
  5. app: kafka-service
  6. name: kafka-service
  7. spec:
  8. type: NodePort
  9. ports:
  10. - name: kafka-port
  11. port: 9092
  12. nodePort: 30092
  13. targetPort: 9092
  14. selector:
  15. app: kafka

创建命令

  1. $ kubectl create -f kafka.yaml,kafka-service.yaml -n kafka

3、查看 kafka namespace 中,以上步骤创建的资源。

  1. $ kubectl get all -n kafka
  2. ···
  3. NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
  4. service/kafka-service NodePort 10.233.0.194 <none> 9092:30092/TCP 6h41m
  5. service/zookeeper-service NodePort 10.233.0.234 <none> 2181:30181/TCP 6h44m
  6. NAME READY UP-TO-DATE AVAILABLE AGE
  7. deployment.apps/kafka 1/1 1 1 6h41m
  8. deployment.apps/zookeeper 1/1 1 1 6h44m
  9. ···

4、安装 Kafkacat 客户端,Kafkacat 是一款开源的 Kafka 调试工具。以下仅演示在 ubuntu 安装 Kafkacat (其它 OS 安装请参考 Kafkacat):

  1. $ apt-get install kafkacat

5、使用 Kafkacat 验证单节点 kafka 消息的发送和接收,如下表明发送和接收成功:

  1. $ cho "Hello KubeSphere!" | kafkacat -P -b 192.168.0.16:30092 -t test-topic
  2. $ kafkacat -C -b 192.168.0.16:30092 -t test-topic
  3. Hello KubeSphere!

第二步:添加 Kafka 作为日志接收者

1、点击「平台管理」 → 「平台设置」,选择「日志收集」,点击「添加日志接收者」。

添加 Kafka 作为日志接收者 - 图1

2、在弹窗中选择 Kafka,参考如下填写信息,点击确定保存信息。

添加 Kafka 作为日志接收者 - 图2

说明:若需要对 Fluent Bit 转发 Kafka 做个性化配置,可在日志接收者页面下进入 「Kafka」 → 「更多操作」,点击 「编辑配置文件」 然后修改 parameters,可参看 Fluent Bit 官方文档 kafka 插件支持的参数项。

添加 Kafka 作为日志接收者 - 图3

第三步:验证日志输出

通过 Kafkacat 的消费命令观察日志的动态输出流信息:

  1. $ kafkacat -C -b 192.168.0.16:30092 -t test-topic
  2. Hello KubeSphere!
  3. {"@timestamp":1563122754.655668, "log":"2019/07/14 16:45:54 config map updated\n", "time":"2019-07-14T16:45:54.655667686Z", "kubernetes":{"pod_name":"fluent-bit-ss5zr", "namespace_name":"kubesphere-logging-system", "host":"ks-allinone", "container_name":"config-reloader", "docker_id":"1475e0b4ccce582848b55463e4c1f405dceb4a117fa2c0aa29f047f782bb04a6"}}
  4. {"@timestamp":1563122754.659812, "log":"2019/07/14 16:45:54 successfully triggered reload\n", "time":"2019-07-14T16:45:54.659811676Z", "kubernetes":{"pod_name":"fluent-bit-ss5zr", "namespace_name":"kubesphere-logging-system", "host":"ks-allinone", "container_name":"config-reloader", "docker_id":"1475e0b4ccce582848b55463e4c1f405dceb4a117fa2c0aa29f047f782bb04a6"}}
  5. {"@timestamp":1563123117.109699, "log":"2019-07-14 16:51:57.109 [INFO][96] ipsets.go 295: Finished resync family=\"inet\" numInconsistenciesFound=0 resyncDuration=6.203131ms\n", "time":"2019-07-14T16:51:57.109698704Z", "kubernetes":{"pod_name":"calico-node-ndk99", "namespace_name":"kube-system", "host":"ks-allinone", "container_name":"calico-node", "docker_id":"e45138d5b465c9670ff6632e7bd7eb21802d8827c7981d6ad05e2da8f3fb7123"}}
  6. {"@timestamp":1563123117.109884, "log":"2019-07-14 16:51:57.109 [INFO][96] int_dataplane.go 747: Finished applying updates to dataplane. msecToApply=6.6922559999999995\n", "time":"2019-07-14T16:51:57.109883574Z", "kubernetes":{"pod_name":"calico-node-ndk99", "namespace_name":"kube-system", "host":"ks-allinone", "container_name":"calico-node", "docker_id":"e45138d5b465c9670ff6632e7bd7eb21802d8827c7981d6ad05e2da8f3fb7123"}}
  7. ···

AppCenter 部署 Kafka 和 Zookeeper 集群

第一步:创建 Zookeeper + Kafka 集群

除了上述方式,正式环境建议自行搭建一个 Zookeeper 和 Kafka 集群,可通过 QingCloud AppCenter 完成 Zookeeper 和 Kafka 集群的一键部署,参考文档 Zookeeper 服务Kafka 服务

说明:建议将 Kafka、Zookeeper 集群与 KubeSphere 集群置于同一个私有网络环境。若它们处于不同的集群,需要在 broker 所在的路由器上配置端口转发,并且需要修改 broker 的 advertised host 与 advertised port 为路由器转发的源地址和源端口。因为 Kafka 各节点 (broker, producer, consumer) 之间是靠 advertised host 与 advertised port 通讯的,配置详见 跨网访问

1、创建 Zookeeper 集群,Zookeeper 作为 Kafka 的依赖,需要在 Kafka 之前创建。

Zookeeper 集群

2、创建 Kafka 集群。

Kafka 集群

第二步:添加日志接收者并验证

同上,在 KubeSphere 中添加 Kafka 集群作为日志接收者。登录 Kafka 集群客户端节点的 VNC,可安装使用 Kafkacat 验证 Kafka 集群的消息发送和接收,或通过 kafka-console-consumer.sh 脚本验证,参考 Kafka 文档

添加 Kafka 作为日志接收者 - 图6

添加 Kafka 作为日志接收者 - 图7