Apache Kafka Source Example

Tutorial on how to build and deploy a KafkaSource event source.

Background

The KafkaSource reads all the messages, from all partitions, and sends those messages as CloudEvents via HTTP to its configured sink.

NOTE: In case you need a more sophisticated Kafka Consumer, with direct access to specific partitions or offsets you might want to implement a Kafka Consumer, using one of the available Apache Kafka SDKs, to handle the messages yourself, rather than using the Knative KafkaSource.

Prerequisites

Apache Kafka Topic (Optional)

  1. If using Strimzi, you can set a topic modifying source/kafka-topic.yaml with your desired:

  2. Topic

  3. Cluster Name

  4. Partitions
  5. Replicas
  1. apiVersion: kafka.strimzi.io/v1beta2
  2. kind: KafkaTopic
  3. metadata:
  4. name: knative-demo-topic
  5. namespace: kafka
  6. labels:
  7. strimzi.io/cluster: my-cluster
  8. spec:
  9. partitions: 3
  10. replicas: 1
  11. config:
  12. retention.ms: 7200000
  13. segment.bytes: 1073741824
  1. Deploy the KafkaTopic
  1. $ kubectl apply -f strimzi-topic.yaml
  2. kafkatopic.kafka.strimzi.io/knative-demo-topic created
  1. Ensure the KafkaTopic is running.
  1. $ kubectl -n kafka get kafkatopics.kafka.strimzi.io
  2. NAME AGE
  3. knative-demo-topic 16s

Create the Event Display service

  1. Download a copy of the code:
  1. git clone -b "release-0.24" https://github.com/knative/docs knative-docs
  2. cd knative-docs/docs/eventing/samples/kafka/source
  1. Build the Event Display Service (event-display.yaml)
  1. kubectl apply -f - <<EOF
  2. apiVersion: serving.knative.dev/v1
  3. kind: Service
  4. metadata:
  5. name: event-display
  6. namespace: default
  7. spec:
  8. template:
  9. spec:
  10. containers:
  11. - # This corresponds to
  12. # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
  13. image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
  14. EOF

Example output:

  1. service.serving.knative.dev/event-display created
  1. Ensure that the Service pod is running. The pod name will be prefixed with event-display.
  1. $ kubectl get pods
  2. NAME READY STATUS RESTARTS AGE
  3. event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s

Apache Kafka Event Source

  1. Modify source/event-source.yaml accordingly with bootstrap servers, topics, etc…:
  1. apiVersion: sources.knative.dev/v1beta1
  2. kind: KafkaSource
  3. metadata:
  4. name: kafka-source
  5. spec:
  6. consumerGroup: knative-group
  7. bootstrapServers:
  8. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
  9. topics:
  10. - knative-demo-topic
  11. sink:
  12. ref:
  13. apiVersion: serving.knative.dev/v1
  14. kind: Service
  15. name: event-display
  1. Deploy the event source.

    1. $ kubectl apply -f event-source.yaml
    2. ...
    3. kafkasource.sources.knative.dev/kafka-source created
  2. Check that the event source pod is running. The pod name will be prefixed with kafka-source.

    1. $ kubectl get pods
    2. NAME READY STATUS RESTARTS AGE
    3. kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
  3. Ensure the Apache Kafka Event Source started with the necessary configuration.

    1. $ kubectl logs --selector='knative-eventing-source-name=kafka-source'
    2. {"level":"info","ts":"2020-05-28T10:39:42.104Z","caller":"adapter/adapter.go:81","msg":"Starting with config: ","Topics":".","ConsumerGroup":"...","SinkURI":"...","Name":".","Namespace":"."}

Verify

  1. Produce a message ({"msg": "This is a test!"}) to the Apache Kafka topic, like shown below:

    1. kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
    2. If you don't see a command prompt, try pressing enter.
    3. >{"msg": "This is a test!"}
  2. Check that the Apache Kafka Event Source consumed the message and sent it to its sink properly. Since these logs are captured in debug level, edit the key level of config-logging configmap in knative-sources namespace to look like this:

    1. data:
    2. loglevel.controller: info
    3. loglevel.webhook: info
    4. zap-logger-config: |
    5. {
    6. "level": "debug",
    7. "development": false,
    8. "outputPaths": ["stdout"],
    9. "errorOutputPaths": ["stderr"],
    10. "encoding": "json",
    11. "encoderConfig": {
    12. "timeKey": "ts",
    13. "levelKey": "level",
    14. "nameKey": "logger",
    15. "callerKey": "caller",
    16. "messageKey": "msg",
    17. "stacktraceKey": "stacktrace",
    18. "lineEnding": "",
    19. "levelEncoder": "",
    20. "timeEncoder": "iso8601",
    21. "durationEncoder": "",
    22. "callerEncoder": ""
    23. }
    24. }

    Now manually delete the kafkasource deployment and allow the kafka-controller-manager deployment running in knative-sources namespace to redeploy it. Debug level logs should be visible now.

  1. $ kubectl logs --selector='knative-eventing-source-name=kafka-source'
  2. ...
  3. {"level":"debug","ts":"2020-05-28T10:40:29.400Z","caller":"kafka/consumer_handler.go:77","msg":"Message claimed","topic":".","value":"."}
  4. {"level":"debug","ts":"2020-05-28T10:40:31.722Z","caller":"kafka/consumer_handler.go:89","msg":"Message marked","topic":".","value":"."}
  1. Ensure the Event Display received the message sent to it by the Event Source.
  1. $ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
  2. ☁️ cloudevents.Event
  3. Validation: valid
  4. Context Attributes,
  5. specversion: 1.0
  6. type: dev.knative.kafka.event
  7. source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
  8. subject: partition:0#564
  9. id: partition:0/offset:564
  10. time: 2020-02-10T18:10:23.861866615Z
  11. datacontenttype: application/json
  12. Extensions,
  13. key:
  14. Data,
  15. {
  16. "msg": "This is a test!"
  17. }

Teardown Steps

  1. Remove the Apache Kafka Event Source

    1. $ kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
    2. "kafka-source" deleted
  2. Remove the Event Display

    1. $ kubectl delete -f source/event-display.yaml service.serving.knative.dev
    2. "event-display" deleted
  3. Remove the Apache Kafka Event Controller

    1. $ kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml
    2. serviceaccount "kafka-controller-manager" deleted
    3. clusterrole.rbac.authorization.k8s.io "eventing-sources-kafka-controller"
    4. deleted clusterrolebinding.rbac.authorization.k8s.io
    5. "eventing-sources-kafka-controller" deleted
    6. customresourcedefinition.apiextensions.k8s.io "kafkasources.sources.knative.dev"
    7. deleted service "kafka-controller" deleted statefulset.apps
    8. "kafka-controller-manager" deleted
  4. (Optional) Remove the Apache Kafka Topic

  1. $ kubectl delete -f kafka-topic.yaml
  2. kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted

(Optional) Specify the key deserializer

When KafkaSource receives a message from Kafka, it dumps the key in the Event extension called Key and dumps Kafka message headers in the extensions starting with kafkaheader.

You can specify the key deserializer among four types:

  • string (default) for UTF-8 encoded strings
  • int for 32-bit & 64-bit signed integers
  • float for 32-bit & 64-bit floating points
  • byte-array for a Base64 encoded byte array

To specify it, add the label kafkasources.sources.knative.dev/key-type to the KafkaSource definition like:

  1. apiVersion: sources.knative.dev/v1beta1
  2. kind: KafkaSource
  3. metadata:
  4. name: kafka-source
  5. labels:
  6. kafkasources.sources.knative.dev/key-type: int
  7. spec:
  8. consumerGroup: knative-group
  9. bootstrapServers:
  10. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
  11. topics:
  12. - knative-demo-topic
  13. sink:
  14. ref:
  15. apiVersion: serving.knative.dev/v1
  16. kind: Service
  17. name: event-display

Connecting to a TLS enabled Kafka broker

The KafkaSource supports TLS and SASL authentication methods. For enabling TLS authentication, please have the below files

  • CA Certificate
  • Client Certificate and Key

KafkaSource expects these files to be in pem format, if it is in other format like jks, please convert to pem.

  1. Create the certificate files as secrets in the namespace where KafkaSource is going to be set up

    1. $ kubectl create secret generic cacert --from-file=caroot.pem
    2. secret/cacert created
    3. $ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    4. secret/key created
  2. Apply the KafkaSource, change bootstrapServers and topics accordingly. ```yaml apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers:

    • my-secure-kafka-bootstrap.kafka:443 topics:
    • knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display ```