部署 vector aggregator

在 Kafka 所在集群,执行以下命令创建 vector namespace,并在该 namespace 下部署 vector-aggregator。

说明
  • 请勿将 vector aggregator 部署到 kubesphere-logging-system namespace, 以免与 KubeSphere 企业版内置的 vector aggregator 冲突。

  • 请联系 KubeSphere 企业版交付服务专家获取 vector aggregator 的 helm 包。

  1. helm install vector-aggregator aggregator-0.30.0.tgz -n vector --create-namespace --set vectorConfig.image.tag=v0.2.1 --set image.tag=0.36.0-debian

所需镜像:

  1. docker.io/timberio/vector:0.36.0-debian
  2. docker.io/kubesphere/kubectl:v1.26.13
  3. docker.io/kubesphere/vector-config:v0.2.1

获取证书

  1. 在 Kafka 所在集群的节点上,执行以下命令。

    说明

    kafka cluster 为 Kafka 集群的名称,kafka namespace 为 Kafka 所在的 namespace,kafka user 为之前创建的 Kafka 用户。

    1. export kafka_cluster=< kafka cluster >
    2. export kafka_namespace=< kafka namespace >
    3. export kafka_user=< kafka user >
    4. echo -e "apiVersion: v1\ndata:" > kafka-aggregator-ca.yaml
    5. echo " ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert \
    6. -o jsonpath='{.data.ca\.crt}')" >> kafka-aggregator-ca.yaml
    7. echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-cluster-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \
    8. namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml
    9. echo "---" >> kafka-aggregator-ca.yaml
    10. echo -e "apiVersion: v1\ndata:" >> kafka-aggregator-ca.yaml
    11. echo " user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user} \
    12. -o jsonpath='{.data.user\.p12}')" >> kafka-aggregator-ca.yaml
    13. echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-user-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \
    14. namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml

    此命令会生成 kafka-aggregator-ca.yaml 文件,包含 kafka-aggregator-cluster-ca 以及 kafka-aggregator-user-ca 两个 secret 文件,分别含有上一个步骤中的 ca.crt 以及 user.p12 信息。 示例如下:

    1. apiVersion: v1
    2. data:
    3. ca.crt: xxx
    4. kind: Secret
    5. metadata:
    6. name: kafka-aggregator-cluster-ca
    7. labels:
    8. logging.whizard.io/certification: 'true'
    9. logging.whizard.io/vector-role: Aggregator
    10. namespace: vector
    11. type: Opaque
    12. ---
    13. apiVersion: v1
    14. data:
    15. user.p12: xxx
    16. kind: Secret
    17. metadata:
    18. name: kafka-aggregator-user-ca
    19. labels:
    20. logging.whizard.io/certification: 'true'
    21. logging.whizard.io/vector-role: Aggregator
    22. namespace: vector
    23. type: Opaque

配置 vector-aggregator,将消息发送至 OpenSearch

创建 vector 配置,在 bootstrap_servers 填入相应的 Kafka 集群地址,在 sink:kafka_to_opensearch:endpoints 填入相应的 OpenSearch 地址。

  1. cat <<EOF | kubectl apply -f -
  2. apiVersion: v1
  3. kind: Secret
  4. metadata:
  5. name: vector-aggregator-opensearch
  6. namespace: vector
  7. labels:
  8. logging.whizard.io/vector-role: Aggregator
  9. logging.whizard.io/enable: "true"
  10. stringData:
  11. kafka-pipeline.yaml: >-
  12. sources:
  13. kafka_source:
  14. type: "kafka"
  15. group_id: "ks"
  16. topics: [ "^(vector)-.+" ]
  17. bootstrap_servers: "172.31.53.102:32476"
  18. librdkafka_options:
  19. security.protocol: "ssl"
  20. ssl.endpoint.identification.algorithm: "none"
  21. ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
  22. ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
  23. ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
  24. max.poll.interval.ms: "600000"
  25. partition.assignment.strategy: roundrobin
  26. decoding:
  27. codec: json
  28. session_timeout_ms: 20000
  29. socket_timeout_ms: 90000
  30. transforms:
  31. kafka_remapped:
  32. inputs:
  33. - kafka_source
  34. source: |-
  35. .event.original = encode_json(.)
  36. ts = parse_timestamp!(.timestamp, format: "%+")
  37. .timestamp = format_timestamp!(ts, format: "%+", timezone: "local")
  38. .topictime = to_unix_timestamp(ts, unit: "milliseconds")
  39. .logstamp = from_unix_timestamp!(.topictime, unit: "milliseconds")
  40. .logdate = .timestamp
  41. .idxdate = format_timestamp!(ts, format: "%Y.%m.%d", timezone: "local")
  42. tmp = split!(.topic, "-")
  43. .index = join!(remove!(tmp, [0]), "-")
  44. type: remap
  45. sinks:
  46. kafka_to_opensearch:
  47. api_version: v8
  48. auth:
  49. password: admin
  50. strategy: basic
  51. user: admin
  52. batch:
  53. timeout_secs: 5
  54. buffer:
  55. max_events: 10000
  56. endpoints:
  57. - https://<opensearch-url>:<port>
  58. tls:
  59. verify_certificate: false
  60. type: elasticsearch
  61. inputs:
  62. - kafka_remapped
  63. bulk:
  64. index: "{{ .index }}-%Y.%m.%d"
  65. request:
  66. timeout_sec: 180
  67. type: Opaque
  68. EOF