Add Kafka as Receiver (aka Collector)

KubeSphere supports using Elasticsearch, Kafka and Fluentd as log receivers. This doc will demonstrate:

  • Deploy strimzi-kafka-operator and then create a Kafka cluster and a Kafka topic by creating Kafka and KafkaTopic CRDs.
  • Add Kafka log receiver to receive logs sent from Fluent Bit
  • Verify whether the Kafka cluster is receiving logs using Kafkacat

Prerequisite

Before adding a log receiver, you need to enable any of the logging, events or auditing components following Enable Pluggable Components. The logging component is enabled as an example in this doc.

Step 1: Create a Kafka cluster and a Kafka topic

Note

If you already have a Kafka cluster, you can start from Step 2.

You can use strimzi-kafka-operator to create a Kafka cluster and a Kafka topic

  1. Install strimzi-kafka-operator to the default namespace:
  1. helm repo add strimzi https://strimzi.io/charts/
  2. helm install --name kafka-operator -n default strimzi/strimzi-kafka-operator
  1. Create a Kafka cluster and a Kafka topic in the default namespace:

To deploy a Kafka cluster and create a Kafka topic, you simply need to open the kubectl console in KubeSphere Toolbox and run the following command:

Note

The following will create Kafka and Zookeeper clusters with storage type ephemeral which is emptydir for demo purpose. You should use other storage types for production, please refer to kafka-persistent

  1. cat <<EOF | kubectl apply -f -
  2. apiVersion: kafka.strimzi.io/v1beta1
  3. kind: Kafka
  4. metadata:
  5. name: my-cluster
  6. namespace: default
  7. spec:
  8. kafka:
  9. version: 2.5.0
  10. replicas: 3
  11. listeners:
  12. plain: {}
  13. tls: {}
  14. config:
  15. offsets.topic.replication.factor: 3
  16. transaction.state.log.replication.factor: 3
  17. transaction.state.log.min.isr: 2
  18. log.message.format.version: '2.5'
  19. storage:
  20. type: ephemeral
  21. zookeeper:
  22. replicas: 3
  23. storage:
  24. type: ephemeral
  25. entityOperator:
  26. topicOperator: {}
  27. userOperator: {}
  28. ---
  29. apiVersion: kafka.strimzi.io/v1beta1
  30. kind: KafkaTopic
  31. metadata:
  32. name: my-topic
  33. namespace: default
  34. labels:
  35. strimzi.io/cluster: my-cluster
  36. spec:
  37. partitions: 3
  38. replicas: 3
  39. config:
  40. retention.ms: 7200000
  41. segment.bytes: 1073741824
  42. EOF
  1. Run the following command to wait for Kafka and Zookeeper pods are all up and runing:
  1. kubectl -n default get pod
  2. NAME READY STATUS RESTARTS AGE
  3. my-cluster-entity-operator-f977bf457-s7ns2 3/3 Running 0 69m
  4. my-cluster-kafka-0 2/2 Running 0 69m
  5. my-cluster-kafka-1 2/2 Running 0 69m
  6. my-cluster-kafka-2 2/2 Running 0 69m
  7. my-cluster-zookeeper-0 1/1 Running 0 71m
  8. my-cluster-zookeeper-1 1/1 Running 1 71m
  9. my-cluster-zookeeper-2 1/1 Running 1 71m
  10. strimzi-cluster-operator-7d6cd6bdf7-9cf6t 1/1 Running 0 104m

Then run the follwing command to find out metadata of kafka cluster

  1. kafkacat -L -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092
  1. Add Kafka as logs receiver: Click Add Log Collector and then select Kafka, input Kafka broker address and port like below:
  1. my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc 9092
  2. my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc 9092
  3. my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc 9092

Add Kafka

  1. Run the following command to verify whether the Kafka cluster is receiving logs sent from Fluent Bit:
  1. # Start a util container
  2. kubectl run --rm utils -it --generator=run-pod/v1 --image arunvelsriram/utils bash
  3. # Install Kafkacat in the util container
  4. apt-get install kafkacat
  5. # Run the following command to consume log messages from kafka topic: my-topic
  6. kafkacat -C -b my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-1.my-cluster-kafka-brokers.default.svc:9092,my-cluster-kafka-2.my-cluster-kafka-brokers.default.svc:9092 -t my-topic