Apache Kafka Channel Example

You can install and configure the Apache Kafka Channel as the default Channel configuration for Knative Eventing.

Prerequisites

  • A Kubernetes cluster with Knative Eventing, as well as the optional Broker and Kafka Channel components.

Creating a Kafka Channel

  1. Create a Kafka Channel that contains the following YAML:

    1. apiVersion: messaging.knative.dev/v1beta1
    2. kind: KafkaChannel
    3. metadata:
    4. name: my-kafka-channel
    5. spec:
    6. numPartitions: 3
    7. replicationFactor: 1
  2. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

Specifying Kafka as the default Channel implementation

  1. To configure Kafka Channel as the default channel configuration, modify the default-ch-webhook ConfigMap so that it contains the following YAML:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: default-ch-webhook
    5. namespace: knative-eventing
    6. data:
    7. # Configuration for defaulting channels that do not specify CRD implementations.
    8. default-ch-config: |
    9. clusterDefault:
    10. apiVersion: messaging.knative.dev/v1beta1
    11. kind: KafkaChannel
    12. spec:
    13. numPartitions: 3
    14. replicationFactor: 1
  2. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

Creating an Apache Kafka channel

  1. After KafkaChannel is set as the default Channel type, you can create a Kafka Channel by creating a generic Channel object that contains the following YAML:

    1. apiVersion: messaging.knative.dev/v1
    2. kind: Channel
    3. metadata:
    4. name: testchannel-one
  2. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

  3. Verify that the Channel was created properly by checking that your Kafka cluster has a testchannel-one Topic. If you are using Strimzi, you can run the command:

    1. kubectl -n kafka exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list

    The output looks similar to the following:

    1. ...
    2. __consumer_offsets
    3. knative-messaging-kafka.default.my-kafka-channel
    4. knative-messaging-kafka.default.testchannel-one
    5. ...

    The Kafka Topic that is created by the Channel contains the name of the namespace, default in this example, followed by the name of the Channel. In the consolidated Channel implementation, it is also prefixed with knative-messaging-kafka to indicate that it is a Kafka Channel from Knative.

    Note

    The topic of a Kafka Channel is an implementation detail and records from it should not be consumed from different applications.

Creating a Service and Trigger that use the Apache Kafka Broker

The following example uses a ApiServerSource to publish events to an existing Broker, and a Trigger that routes those events to a Knative Service.

  1. Create a Knative Service:

    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: broker-kafka-display
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
  2. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

  3. Create a ServiceAccount, ClusterRole, and ClusterRoleBinding for the ApiServerSource:

    1. apiVersion: v1
    2. kind: ServiceAccount
    3. metadata:
    4. name: events-sa
    5. namespace: default
    6. ---
    7. apiVersion: rbac.authorization.k8s.io/v1
    8. kind: ClusterRole
    9. metadata:
    10. name: event-watcher
    11. rules:
    12. - apiGroups:
    13. - ""
    14. resources:
    15. - events
    16. verbs:
    17. - get
    18. - list
    19. - watch
    20. ---
    21. apiVersion: rbac.authorization.k8s.io/v1
    22. kind: ClusterRoleBinding
    23. metadata:
    24. name: k8s-ra-event-watcher
    25. roleRef:
    26. apiGroup: rbac.authorization.k8s.io
    27. kind: ClusterRole
    28. name: event-watcher
    29. subjects:
    30. - kind: ServiceAccount
    31. name: events-sa
    32. namespace: default
  4. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

  5. Create an ApiServerSource that sends events to the default Broker:

    1. apiVersion: sources.knative.dev/v1
    2. kind: ApiServerSource
    3. metadata:
    4. name: testevents-kafka-03
    5. namespace: default
    6. spec:
    7. serviceAccountName: events-sa
    8. mode: Resource
    9. resources:
    10. - apiVersion: v1
    11. kind: Event
    12. sink:
    13. ref:
    14. apiVersion: eventing.knative.dev/v1
    15. kind: Broker
    16. name: default
  6. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

  7. Create a Trigger that filters events from the Broker to the Service:

    1. apiVersion: eventing.knative.dev/v1
    2. kind: Trigger
    3. metadata:
    4. name: testevents-trigger
    5. namespace: default
    6. spec:
    7. broker: default
    8. subscriber:
    9. ref:
    10. apiVersion: serving.knative.dev/v1
    11. kind: Service
    12. name: broker-kafka-display
  8. Apply the YAML file by running the command:

    1. kubectl apply -f <filename>.yaml

    Where <filename> is the name of the file you created in the previous step.

  9. Verifying the Kafka Channel is working, by observing events in the log of the Service, by running the command:

    1. kubectl logs --selector='serving.knative.dev/service=broker-kafka-display' -c user-container

Authentication against an Apache Kafka cluster

In production environments it is common that the Apache Kafka cluster is secured using TLS or SASL. This section shows how to configure a Kafka Channel to work against a protected Apache Kafka cluster, with the two supported TLS and SASL authentication methods.

Note

Kafka Channels require certificates to be in .pem format. If your files are in a different format, you must convert them to .pem.

TLS authentication

  1. Edit the config-kafka ConfigMap:

    1. kubectl -n knative-eventing edit configmap config-kafka
  2. Set the TLS.Enable field to true:

    1. ...
    2. data:
    3. sarama: |
    4. config: |
    5. Net:
    6. TLS:
    7. Enable: true
    8. ...
  3. Optional: If you are using a custom CA certificate, add your certificate data to the ConfigMap in the data.sarama.config.Net.TLS.Config.RootPEMs field:

    1. ...
    2. data:
    3. sarama: |
    4. config: |
    5. Net:
    6. TLS:
    7. Config:
    8. RootPEMs: # Array of Root Certificate PEM Files (Use '|-' Syntax To Preserve Linefeeds & Avoiding Terminating \n)
    9. - |-
    10. -----BEGIN CERTIFICATE-----
    11. MIIGDzCCA/egAwIBAgIUWq6j7u/25wPQiNMPZqL6Vy0rkvQwDQYJKoZIhvcNAQEL
    12. ...
    13. 771uezZAFqd1GLLL8ZYRmCsAMg==
    14. -----END CERTIFICATE-----
    15. ...

SASL authentication

To use SASL authentication, you will need the following information:

  • A username and password.
  • The type of SASL mechanism you wish to use. For example; PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.

Note

It is recommended to also enable TLS as described in the previous section.

  1. Edit the config-kafka ConfigMap:

    1. kubectl -n knative-eventing edit configmap config-kafka
  2. Set the SASL.Enable field to true:

    1. ...
    2. data:
    3. sarama: |
    4. config: |
    5. Net:
    6. SASL:
    7. Enable: true
    8. ...
  3. Create a secret that uses the username, password, and SASL mechanism:

    1. kubectl create secret --namespace <namespace> generic <kafka-auth-secret> \
    2. --from-literal=password="SecretPassword" \
    3. --from-literal=saslType="PLAIN" \
    4. --from-literal=username="my-sasl-user"

All authentication methods

  1. If you have created a secret for your desired authentication method by using the previous steps, reference the secret and the namespace of the secret in the config-kafka ConfigMap:

    1. ...
    2. data:
    3. eventing-kafka: |
    4. kafka:
    5. authSecretName: <kafka-auth-secret>
    6. authSecretNamespace: <namespace>
    7. ...

    Note

    The default secret name and namespace are kafka-cluster and knative-eventing respectively. If you reference a secret in a different namespace, make sure you configure your roles and bindings so that the knative-eventing Pods can access it.

Channel configuration

The config-kafka ConfigMap allows for a variety of Channel options such as:

  • CPU and Memory requests and limits for the dispatcher (and receiver for the distributed Channel type) deployments created by the controller

  • Kafka Topic default values (number of partitions, replication factor, and retention time)

  • Maximum idle connections/connections per host for Knative cloudevents

  • The brokers string for your Kafka connection

  • The name and namespace of your TLS/SASL authentication secret

  • The Kafka admin type (distributed channel only)

  • Nearly all the settings exposed in a Sarama Config Struct

  • Sarama debugging assistance (via sarama.enableLogging)

For detailed information (particularly for the distributed channel), see the Distributed Channel README