添加 Kafka 作为接收器
您可以在 KubeSphere 中使用 Elasticsearch、Kafka 和 Fluentd 日志接收器。本教程演示:
- 部署 strimzi-kafka-operator,然后通过创建
Kafka
和KafkaTopic
CRD 以创建 Kafka 集群和 Kafka 主题。 - 添加 Kafka 作为日志接收器以从 Fluent Bit 接收日志。
- 使用 Kafkacat 验证 Kafka 集群是否能接收日志。
准备工作
- 您需要一个被授予集群管理权限的用户。例如,您可以直接用
admin
用户登录控制台,或创建一个具有集群管理权限的角色然后将此角色授予一个用户。 - 添加日志接收器前,您需要启用组件
logging
、events
或auditing
。有关更多信息,请参见启用可插拔组件。本教程启用logging
作为示例。
步骤 1:创建 Kafka 集群和 Kafka 主题
您可以使用 strimzi-kafka-operator 创建 Kafka 集群和 Kafka 主题。如果您已经有了一个 Kafka 集群,您可以直接从下一步开始。
在
default
命名空间中安装 strimzi-kafka-operator:helm repo add strimzi https://strimzi.io/charts/
helm install --name kafka-operator -n default strimzi/strimzi-kafka-operator
运行以下命令在
default
命名空间中创建 Kafka 集群和 Kafka 主题,该命令所创建的 Kafka 和 Zookeeper 集群的存储类型为ephemeral
,使用emptyDir
进行演示。若要在生产环境下配置储存类型,请参见 kafka-persistent。cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
namespace: default
spec:
kafka:
version: 2.5.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '2.5'
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: my-topic
namespace: default
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
EOF
运行以下命令查看容器组状态,并等待 Kafka 和 Zookeeper 运行并启动。
$ kubectl -n default get pod
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-f977bf457-s7ns2 3/3 Running 0 69m
my-cluster-kafka-0 2/2 Running 0 69m
my-cluster-kafka-1 2/2 Running 0 69m
my-cluster-kafka-2 2/2 Running 0 69m
my-cluster-zookeeper-0 1/1 Running 0 71m
my-cluster-zookeeper-1 1/1 Running 1 71m
my-cluster-zookeeper-2 1/1 Running 1 71m
strimzi-cluster-operator-7d6cd6bdf7-9cf6t 1/1 Running 0 104m
运行以下命令查看 Kafka 集群的元数据:
kafkacat -L -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092
步骤 2:添加 Kafka 作为日志接收器
以
admin
身份登录 KubeSphere 的 Web 控制台。点击左上角的平台管理,然后选择集群管理。备注
如果您启用了多集群功能,您可以选择一个集群。
在集群管理页面,选择集群设置下的日志接收器。
点击添加日志接收器并选择 Kafka。输入 Kafka 服务地址和端口信息,然后点击确定继续。
服务地址 端口号 my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc 9092 my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc 9092 my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc 9092 运行以下命令验证 Kafka 集群是否能从 Fluent Bit 接收日志:
# Start a util container
kubectl run --rm utils -it --generator=run-pod/v1 --image arunvelsriram/utils bash
# Install Kafkacat in the util container
apt-get install kafkacat
# Run the following command to consume log messages from kafka topic: my-topic
kafkacat -C -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092 -t my-topic
当前内容版权归 KubeSphere 或其关联方所有,如需对内容或内容相关联开源项目进行关注与资助,请访问 KubeSphere .