Create a Knative-based Function to Interact with Middleware

Learn how to create a Knative-based function to interact with middleware via Dapr components.

This document describes how to create a Knative-based function to interact with middleware via Dapr components.

Overview

Similar to asynchronous functions, the functions that are based on Knative runtime can interact with middleware through Dapr components. This document uses two functions, function-front and kafka-input, for demonstration.

The following diagram illustrates the relationship between these functions.

Create a Knative-based Function to Interact with Middleware - 图1

Prerequisites

Create a Kafka Server and Topic

  1. Run the following commands to install strimzi-kafka-operator in the default namespace.

    1. helm repo add strimzi https://strimzi.io/charts/
    2. helm install kafka-operator -n default strimzi/strimzi-kafka-operator
  2. Use the following content to create a file kafka.yaml.

    1. apiVersion: kafka.strimzi.io/v1beta2
    2. kind: Kafka
    3. metadata:
    4. name: kafka-server
    5. namespace: default
    6. spec:
    7. kafka:
    8. version: 3.3.1
    9. replicas: 1
    10. listeners:
    11. - name: plain
    12. port: 9092
    13. type: internal
    14. tls: false
    15. - name: tls
    16. port: 9093
    17. type: internal
    18. tls: true
    19. config:
    20. offsets.topic.replication.factor: 1
    21. transaction.state.log.replication.factor: 1
    22. transaction.state.log.min.isr: 1
    23. default.replication.factor: 1
    24. min.insync.replicas: 1
    25. inter.broker.protocol.version: "3.1"
    26. storage:
    27. type: ephemeral
    28. zookeeper:
    29. replicas: 1
    30. storage:
    31. type: ephemeral
    32. entityOperator:
    33. topicOperator: {}
    34. userOperator: {}
    35. ---
    36. apiVersion: kafka.strimzi.io/v1beta2
    37. kind: KafkaTopic
    38. metadata:
    39. name: sample-topic
    40. namespace: default
    41. labels:
    42. strimzi.io/cluster: kafka-server
    43. spec:
    44. partitions: 10
    45. replicas: 1
    46. config:
    47. retention.ms: 7200000
    48. segment.bytes: 1073741824
  3. Run the following command to deploy a 1-replica Kafka server named kafka-server and 1-replica Kafka topic named sample-topic in the default namespace.

    1. kubectl apply -f kafka.yaml
  4. Run the following command to check pod status and wait for Kafka and Zookeeper to be up and running.

    1. $ kubectl get po
    2. NAME READY STATUS RESTARTS AGE
    3. kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s
    4. kafka-server-kafka-0 1/1 Running 0 9m13s
    5. kafka-server-zookeeper-0 1/1 Running 0 9m46s
    6. strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
  5. Run the following commands to view the metadata of the Kafka cluster.

    1. # Starts a utility pod.
    2. $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
    3. # Checks metadata of the Kafka cluster.
    4. $ kafkacat -L -b kafka-server-kafka-brokers:9092

Create Functions

  1. Use the following example YAML file to create a manifest kafka-input.yaml and modify the value of spec.image to set your own image registry address. The field spec.serving.inputs defines an input source that points to a Dapr component of the Kafka server. It means that the kafka-input function will be driven by events in the topic sample-topic of the Kafka server.

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: kafka-input
    5. spec:
    6. version: "v1.0.0"
    7. image: <your registry name>/kafka-input:latest
    8. imageCredentials:
    9. name: push-secret
    10. build:
    11. builder: openfunction/builder-go:latest
    12. env:
    13. FUNC_NAME: "HandleKafkaInput"
    14. FUNC_CLEAR_SOURCE: "true"
    15. srcRepo:
    16. url: "https://github.com/OpenFunction/samples.git"
    17. sourceSubPath: "functions/async/bindings/kafka-input"
    18. revision: "main"
    19. serving:
    20. runtime: async
    21. scaleOptions:
    22. minReplicas: 0
    23. maxReplicas: 10
    24. keda:
    25. scaledObject:
    26. pollingInterval: 15
    27. minReplicaCount: 0
    28. maxReplicaCount: 10
    29. cooldownPeriod: 60
    30. advanced:
    31. horizontalPodAutoscalerConfig:
    32. behavior:
    33. scaleDown:
    34. stabilizationWindowSeconds: 45
    35. policies:
    36. - type: Percent
    37. value: 50
    38. periodSeconds: 15
    39. scaleUp:
    40. stabilizationWindowSeconds: 0
    41. triggers:
    42. - type: kafka
    43. metadata:
    44. topic: sample-topic
    45. bootstrapServers: kafka-server-kafka-brokers.default.svc:9092
    46. consumerGroup: kafka-input
    47. lagThreshold: "20"
    48. inputs:
    49. - name: greeting
    50. component: target-topic
    51. bindings:
    52. target-topic:
    53. type: bindings.kafka
    54. version: v1
    55. metadata:
    56. - name: brokers
    57. value: "kafka-server-kafka-brokers:9092"
    58. - name: topics
    59. value: "sample-topic"
    60. - name: consumerGroup
    61. value: "kafka-input"
    62. - name: publishTopic
    63. value: "sample-topic"
    64. - name: authRequired
    65. value: "false"
    66. template:
    67. containers:
    68. - name: function
    69. imagePullPolicy: Always
  2. Run the following command to create the function kafka-input.

    1. kubectl apply -f kafka-input.yaml
  3. Use the following example YAML file to create a manifest function-front.yaml and modify the value of spec.image to set your own image registry address.

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: function-front
    5. annotations:
    6. plugins: |
    7. pre:
    8. - plugin-custom
    9. - plugin-example
    10. post:
    11. - plugin-custom
    12. - plugin-example
    13. spec:
    14. version: "v1.0.0"
    15. image: "<your registry name>/sample-knative-dapr:latest"
    16. imageCredentials:
    17. name: push-secret
    18. port: 8080 # Default to 8080
    19. build:
    20. builder: openfunction/builder-go:latest
    21. env:
    22. FUNC_NAME: "ForwardToKafka"
    23. FUNC_CLEAR_SOURCE: "true"
    24. srcRepo:
    25. url: "https://github.com/OpenFunction/samples.git"
    26. sourceSubPath: "functions/knative/with-output-binding"
    27. revision: "main"
    28. serving:
    29. scaleOptions:
    30. minReplicas: 0
    31. maxReplicas: 5
    32. runtime: knative
    33. outputs:
    34. - name: target
    35. component: kafka-server
    36. operation: "create"
    37. bindings:
    38. kafka-server:
    39. type: bindings.kafka
    40. version: v1
    41. metadata:
    42. - name: brokers
    43. value: "kafka-server-kafka-brokers:9092"
    44. - name: authRequired
    45. value: "false"
    46. - name: publishTopic
    47. value: "sample-topic"
    48. - name: topics
    49. value: "sample-topic"
    50. - name: consumerGroup
    51. value: "function-front"
    52. template:
    53. containers:
    54. - name: function
    55. imagePullPolicy: Always

    Note

    metadata.plugins.pre defines the order of plugins that need to be called before the user function is executed. metadata.plugins.post defines the order of plugins that need to be called after the user function is executed. For more information about the logic of these two plugins and the effect of the plugins after they are executed, see Plugin mechanism.

  4. In the manifest, spec.serving.outputs defines an output that points to a Dapr component of the Kafka server. That allows you to send custom content to the output target in the function function-front.

    1. func Sender(ctx ofctx.Context, in []byte) (ofctx.Out, error) {
    2. ...
    3. _, err := ctx.Send("target", greeting)
    4. ...
    5. }
  5. Run the following command to create the function function-front.

    1. kubectl apply -f function-front.yaml

Check Results

  1. Run the following command to view the status of the functions.

    1. $ kubectl get functions.core.openfunction.io
    2. NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
    3. function-front Succeeded Running builder-bhbtk serving-vc6jw https://openfunction.io/default/function-front 2m41s
    4. kafka-input Succeeded Running builder-dprfd serving-75vrt 2m21s

    Note

    The URL, provided by the OpenFunction Domain, is the address that can be accessed. To access the function through this URL address, you need to make sure that DNS can resolve this address.

  2. Run the following command to create a pod in the cluster for accessing the function.

    1. kubectl run curl --image=radial/busyboxplus:curl -i --tty --rm
  3. Run the following command to access the function through URL.

    1. [ root@curl:/ ]$ curl -d '{"message":"Awesome OpenFunction!"}' -H "Content-Type: application/json" -X POST http://openfunction.io.svc.cluster.local/default/function-front
  4. Run the following command to view the log of function-front.

    1. kubectl logs -f \
    2. $(kubectl get po -l \
    3. openfunction.io/serving=$(kubectl get functions function-front -o jsonpath='{.status.serving.resourceRef}') \
    4. -o jsonpath='{.items[0].metadata.name}') \
    5. function

    The output looks as follows.

    1. dapr client initializing for: 127.0.0.1:50001
    2. I0125 06:51:55.584973 1 framework.go:107] Plugins for pre-hook stage:
    3. I0125 06:51:55.585044 1 framework.go:110] - plugin-custom
    4. I0125 06:51:55.585052 1 framework.go:110] - plugin-example
    5. I0125 06:51:55.585057 1 framework.go:115] Plugins for post-hook stage:
    6. I0125 06:51:55.585062 1 framework.go:118] - plugin-custom
    7. I0125 06:51:55.585067 1 framework.go:118] - plugin-example
    8. I0125 06:51:55.585179 1 knative.go:46] Knative Function serving http: listening on port 8080
    9. 2022/01/25 06:52:02 http - Data: {"message":"Awesome OpenFunction!"}
    10. I0125 06:52:02.246450 1 plugin-example.go:83] the sum is: 2
  5. Run the following command to view the log of kafka-input.

    1. kubectl logs -f \
    2. $(kubectl get po -l \
    3. openfunction.io/serving=$(kubectl get functions kafka-input -o jsonpath='{.status.serving.resourceRef}') \
    4. -o jsonpath='{.items[0].metadata.name}') \
    5. function

    The output looks as follows.

    1. dapr client initializing for: 127.0.0.1:50001
    2. I0125 06:35:28.332381 1 framework.go:107] Plugins for pre-hook stage:
    3. I0125 06:35:28.332863 1 framework.go:115] Plugins for post-hook stage:
    4. I0125 06:35:28.333749 1 async.go:39] Async Function serving grpc: listening on port 8080
    5. message from Kafka '{Awesome OpenFunction!}'