Apache Kafka Channel Example

You can install and configure the Apache Kafka CRD (KafkaChannel) as the default channel configuration in Knative Eventing.

Prerequisites

Creating a KafkaChannel channel CRD

  1. Create a new object by configuring the YAML file as follows:

    1. kubectl apply -f - <<EOF
    2. ---
    3. apiVersion: messaging.knative.dev/v1beta1
    4. kind: KafkaChannel
    5. metadata:
    6. name: my-kafka-channel
    7. spec:
    8. numPartitions: 3
    9. replicationFactor: 1
    10. EOF

Specifying the default channel configuration

  1. To configure the usage of the KafkaChannel CRD as the default channel configuration, edit the default-ch-webhook ConfigMap as follows:

    1. kubectl apply -f - <<EOF
    2. ---
    3. apiVersion: v1
    4. kind: ConfigMap
    5. metadata:
    6. name: default-ch-webhook
    7. namespace: knative-eventing
    8. data:
    9. # Configuration for defaulting channels that do not specify CRD implementations.
    10. default-ch-config: |
    11. clusterDefault:
    12. apiVersion: messaging.knative.dev/v1beta1
    13. kind: KafkaChannel
    14. spec:
    15. numPartitions: 3
    16. replicationFactor: 1
    17. EOF

Creating an Apache Kafka channel using the default channel configuration

  1. Now that KafkaChannel is set as the default channel configuration, use the channels.messaging.knative.dev CRD to create a new Apache Kafka channel, using the generic Channel:

    1. kubectl apply -f - <<EOF
    2. ---
    3. apiVersion: messaging.knative.dev/v1
    4. kind: Channel
    5. metadata:
    6. name: testchannel-one
    7. EOF
  2. Check Kafka for a testchannel-one topic. With Strimzi this can be done by using the command:

    1. kubectl -n kafka exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list

    The result is:

    1. ...
    2. __consumer_offsets
    3. knative-messaging-kafka.default.my-kafka-channel
    4. knative-messaging-kafka.default.testchannel-one
    5. ...

The Apache Kafka topic that is created by the channel implementation contains the name of the namespace, default in this example, followed by the actual name of the channel. In the consolidated channel implementation, it is also prefixed with knative-messaging-kafka to indicate that it is an Apache Kafka channel from Knative.

Note

The topic of a Kafka channel is an implementation detail and records from it should not be consumed from different applications.

Configuring the Knative broker for Apache Kafka channels

  1. To setup a broker that will use the new default Kafka channels, you must create a new default broker, using the command:

    1. kubectl create -f - <<EOF
    2. apiVersion: eventing.knative.dev/v1
    3. kind: Broker
    4. metadata:
    5. name: default
    6. EOF

This will give you two pods, such as:

  1. default-broker-filter-64658fc79f-nf596 1/1 Running 0 15m
  2. default-broker-ingress-ff79755b6-vj9jt 1/1 Running 0 15

Inside the Apache Kafka cluster you should see two new topics, such as:

  1. ...
  2. knative-messaging-kafka.default.default-kn2-ingress
  3. knative-messaging-kafka.default.default-kn2-trigger
  4. ...

Note

The topic of a Kafka channel is an implementation detail and records from it should not be consumed from different applications.

Creating a service and trigger to use the Apache Kafka broker

To use the Apache Kafka based broker, let’s take a look at a simple demo. Use theApiServerSource to publish events to the broker as well as the Trigger API, which then routes events to a Knative Service.

  1. Install ksvc, using the command:

    1. kubectl apply -f 000-ksvc.yaml
  2. Install a source that publishes to the default broker

    1. kubectl apply -f 020-k8s-events.yaml
  3. Create a trigger that routes the events to the ksvc:

    1. kubectl apply -f 030-trigger.yaml

Verifying your Apache Kafka channel and broker

Now that your Eventing cluster is configured for Apache Kafka, you can verify your configuration with the following options.

Receive events via Knative

  1. Observe the events in the log of the ksvc using the command:

    1. kubectl logs --selector='serving.knative.dev/service=broker-kafka-display' -c user-container

Authentication against an Apache Kafka cluster

In production environments it is common that the Apache Kafka cluster is secured using TLS or SASL. This section shows how to configure the KafkaChannel to work against a protected Apache Kafka cluster, with the two supported TLS and SASL authentication methods.

Note

Kafka channels require certificates to be in .pem format. If your files are in a different format, you must convert them to .pem.

TLS authentication

  1. Edit your config-kafka ConfigMap:

    1. kubectl -n knative-eventing edit configmap config-kafka
  2. Set the TLS.Enable field to true, for example

    1. ...
    2. data:
    3. sarama: |
    4. config: |
    5. Net:
    6. TLS:
    7. Enable: true
    8. ...
  3. Optional. If using a custom CA certificate, place your certificate data into the ConfigMap in the data.sarama.config.Net.TLS.Config.RootPEMs field, for example:

    1. ...
    2. data:
    3. sarama: |
    4. config: |
    5. Net:
    6. TLS:
    7. Config:
    8. RootPEMs: # Array of Root Certificate PEM Files (Use '|-' Syntax To Preserve Linefeeds & Avoiding Terminating \n)
    9. - |-
    10. -----BEGIN CERTIFICATE-----
    11. MIIGDzCCA/egAwIBAgIUWq6j7u/25wPQiNMPZqL6Vy0rkvQwDQYJKoZIhvcNAQEL
    12. ...
    13. 771uezZAFqd1GLLL8ZYRmCsAMg==
    14. -----END CERTIFICATE-----
    15. ...

SASL authentication

To use SASL authentication, you will need the following information:

  • A username and password.
  • The type of SASL mechanism you wish to use. For example; PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.

Note

It is recommended to also enable TLS as described in the previous section.

  1. Edit your config-kafka ConfigMap:

    1. kubectl -n knative-eventing edit configmap config-kafka
  2. Set the SASL.Enable field to true, for example:

    1. ...
    2. data:
    3. sarama: |
    4. config: |
    5. Net:
    6. SASL:
    7. Enable: true
    8. ...
  3. Create a secret with the username, password, and SASL mechanism, for example:

    1. kubectl create secret --namespace <namespace> generic <kafka-auth-secret> \
    2. --from-literal=password="SecretPassword" \
    3. --from-literal=saslType="PLAIN" \
    4. --from-literal=username="my-sasl-user"

All authentication methods

  1. If you have created a secret for your desired authentication method by using the previous steps, reference the secret and the namespace of the secret in the config-kafka ConfigMap:

    1. ...
    2. data:
    3. eventing-kafka: |
    4. kafka:
    5. authSecretName: <kafka-auth-secret>
    6. authSecretNamespace: <namespace>
    7. ...

Note

The default secret name and namespace are kafka-cluster and knative-eventing respectively. If you reference a secret in a different namespace, be sure you configure your roles and bindings so that the knative-eventing pods can access it.

Channel configuration

The config-kafka ConfigMap allows for a variety of channel options such as:

  • CPU and Memory requests and limits for the dispatcher (and receiver for the distributed channel type) deployments created by the controller

  • Kafka topic default values (number of partitions, replication factor, and retention time)

  • Maximum idle connections/connections per host for Knative cloudevents

  • The brokers string for your Kafka connection

  • The name and namespace of your TLS/SASL authentication secret

  • The Kafka admin type (distributed channel only)

  • Nearly all the settings exposed in a Sarama Config Struct

  • Sarama debugging assistance (via sarama.enableLogging)

For detailed information (particularly for the distributed channel), see the Distributed Channel README