Add Kafka as Receiver (aka Collector)
KubeSphere supports using Elasticsearch, Kafka and Fluentd as log receivers. This doc will demonstrate:
- Deploy strimzi-kafka-operator and then create a Kafka cluster and a Kafka topic by creating
Kafka
andKafkaTopic
CRDs. - Add Kafka log receiver to receive logs sent from Fluent Bit
- Verify whether the Kafka cluster is receiving logs using Kafkacat
Prerequisite
Before adding a log receiver, you need to enable any of the logging
, events
or auditing
components following Enable Pluggable Components. The logging
component is enabled as an example in this doc.
Step 1: Create a Kafka cluster and a Kafka topic
备注
If you already have a Kafka cluster, you can start from Step 2.
You can use strimzi-kafka-operator to create a Kafka cluster and a Kafka topic
- Install strimzi-kafka-operator to the
default
namespace:
helm repo add strimzi https://strimzi.io/charts/
helm install --name kafka-operator -n default strimzi/strimzi-kafka-operator
- Create a Kafka cluster and a Kafka topic in the
default
namespace:
To deploy a Kafka cluster and create a Kafka topic, you simply need to open the kubectl console in KubeSphere Toolbox and run the following command:
备注
The following will create Kafka and Zookeeper clusters with storage type ephemeral
which is emptydir
for demo purpose. You should use other storage types for production, please refer to kafka-persistent
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
namespace: default
spec:
kafka:
version: 2.5.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '2.5'
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: my-topic
namespace: default
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
EOF
- Run the following command to wait for Kafka and Zookeeper pods are all up and runing:
kubectl -n default get pod
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-f977bf457-s7ns2 3/3 Running 0 69m
my-cluster-kafka-0 2/2 Running 0 69m
my-cluster-kafka-1 2/2 Running 0 69m
my-cluster-kafka-2 2/2 Running 0 69m
my-cluster-zookeeper-0 1/1 Running 0 71m
my-cluster-zookeeper-1 1/1 Running 1 71m
my-cluster-zookeeper-2 1/1 Running 1 71m
strimzi-cluster-operator-7d6cd6bdf7-9cf6t 1/1 Running 0 104m
Then run the follwing command to find out metadata of kafka cluster
kafkacat -L -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092
- Add Kafka as logs receiver: Click Add Log Collector and then select Kafka, input Kafka broker address and port like below:
my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc 9092
my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc 9092
my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc 9092
- Run the following command to verify whether the Kafka cluster is receiving logs sent from Fluent Bit:
# Start a util container
kubectl run --rm utils -it --generator=run-pod/v1 --image arunvelsriram/utils bash
# Install Kafkacat in the util container
apt-get install kafkacat
# Run the following command to consume log messages from kafka topic: my-topic
kafkacat -C -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092 -t my-topic