Use EventBus and Trigger

This document gives an example of how to use EventBus and Trigger.

Prerequisites

  • You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
  • You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.

Deploy an NATS streaming server

Run the following commands to deploy an NATS streaming server. This document uses nats://nats.default:4222 as the access address of the NATS streaming server and stan as the cluster ID. For more information, see NATS Streaming (STAN).

  1. helm repo add nats https://nats-io.github.io/k8s/helm/charts/
  2. helm install nats nats/nats
  3. helm install stan nats/stan --set stan.nats.url=nats://nats:4222

Create an OpenFuncAsync Runtime Function

  1. Use the following content to create a configuration file (for example, openfuncasync-function.yaml) for the target function, which is triggered by the Trigger CRD and prints the received message.

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: trigger-target
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-trigger-target:latest
    8. port: 8080
    9. serving:
    10. runtime: "async"
    11. scaleOptions:
    12. keda:
    13. scaledObject:
    14. pollingInterval: 15
    15. minReplicaCount: 0
    16. maxReplicaCount: 10
    17. cooldownPeriod: 30
    18. triggers:
    19. - type: stan
    20. metadata:
    21. natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
    22. queueGroup: "grp1"
    23. durableName: "ImDurable"
    24. subject: "metrics"
    25. lagThreshold: "10"
    26. inputs:
    27. - name: autoscaling-pubsub
    28. component: eventbus
    29. topic: metrics
    30. pubsub:
    31. eventbus:
    32. type: pubsub.natsstreaming
    33. version: v1
    34. metadata:
    35. - name: natsURL
    36. value: "nats://nats.default:4222"
    37. - name: natsStreamingClusterID
    38. value: "stan"
    39. - name: subscriptionType
    40. value: "queue"
    41. - name: durableSubscriptionName
    42. value: "ImDurable"
    43. - name: consumerID
    44. value: "grp1"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f openfuncasync-function.yaml

Create an EventBus and an EventSource

  1. Use the following content to create a configuration file (for example, eventbus.yaml) for an EventBus.

    1. apiVersion: events.openfunction.io/v1alpha1
    2. kind: EventBus
    3. metadata:
    4. name: default
    5. spec:
    6. natsStreaming:
    7. natsURL: "nats://nats.default:4222"
    8. natsStreamingClusterID: "stan"
    9. subscriptionType: "queue"
    10. durableSubscriptionName: "ImDurable"
  2. Use the following content to create a configuration file (for example, eventsource.yaml) for an EventSource.

    Note

    Set the name of the event bus through spec.eventBus.

    1. apiVersion: events.openfunction.io/v1alpha1
    2. kind: EventSource
    3. metadata:
    4. name: my-eventsource
    5. spec:
    6. logLevel: "2"
    7. eventBus: "default"
    8. kafka:
    9. sample-two:
    10. brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
    11. topic: "events-sample"
    12. authRequired: false
  3. Run the following commands to apply these configuration files.

    1. kubectl apply -f eventbus.yaml
    2. kubectl apply -f eventsource.yaml
  4. Run the following commands to check the results.

    1. $ kubectl get eventsources.events.openfunction.io
    2. NAME EVENTBUS SINK STATUS
    3. my-eventsource default Ready
    4. $ kubectl get eventbus.events.openfunction.io
    5. NAME AGE
    6. default 6m53s
    7. $ kubectl get components
    8. NAME AGE
    9. serving-6r5dl-component-eventbus-jlpqf 11m
    10. serving-9689d-component-ebfes-my-eventsource-cmcbw 6m57s
    11. serving-9689d-component-esc-kafka-sample-two-l99cg 6m57s
    12. serving-k6zw8-component-cron-9x8hl 61m
    13. serving-k6zw8-component-kafka-server-sjrzs 61m
    14. $ kubectl get deployments.apps
    15. NAME READY UP-TO-DATE AVAILABLE AGE
    16. serving-6r5dl-deployment-v100-m7nq2 0/0 0 0 12m
    17. serving-9689d-deployment-v100-5qdvk 1/1 1 1 7m17s

    Note

    In the case of using the event bus, the workflow of the EventSource controller is described as follows:

    1. Create an EventSource custom resource named my-eventsource.
    2. Retrieve and reorganize the configuration of the EventBus, including the EventBus name (default in this example) and the name of the Dapr component associated with the EventBus.
    3. Create a Dapr component named serving-xxxxx-component-ebfes-my-eventsource-xxxxx to enable the EventSource to associate with the event bus.
    4. Create a Dapr component named serving-xxxxx-component-esc-kafka-sample-two-xxxxx to enable the EventSource to associate with the event source.
    5. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx for processing events.

Create a Trigger

  1. Use the following content to create a configuration file (for example, trigger.yaml) for a Trigger.

    Note

    • Set the event bus associated with the Trigger through spec.eventBus.
    • Set the event input source through spec.inputs.
    • This is a simple trigger that collects events from the EventBus named default. When it retrieves a sample-two event from the EventSource my-eventsource, it triggers a Knative service named function-sample-serving-qrdx8-ksvc-fwml8 and sends the event to the topic metrics of the event bus at the same time.
    1. apiVersion: events.openfunction.io/v1alpha1
    2. kind: Trigger
    3. metadata:
    4. name: my-trigger
    5. spec:
    6. logLevel: "2"
    7. eventBus: "default"
    8. inputs:
    9. inputDemo:
    10. eventSource: "my-eventsource"
    11. event: "sample-two"
    12. subscribers:
    13. - condition: inputDemo
    14. topic: "metrics"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f trigger.yaml
  3. Run the following commands to check the results.

    1. $ kubectl get triggers.events.openfunction.io
    2. NAME EVENTBUS STATUS
    3. my-trigger default Ready
    4. $ kubectl get eventbus.events.openfunction.io
    5. NAME AGE
    6. default 62m
    7. $ kubectl get components
    8. NAME AGE
    9. serving-9689d-component-ebfes-my-eventsource-cmcbw 46m
    10. serving-9689d-component-esc-kafka-sample-two-l99cg 46m
    11. serving-dxrhd-component-eventbus-t65q7 13m
    12. serving-zwlj4-component-ebft-my-trigger-4925n 100s

    Note

    In the case of using the event bus, the workflow of the Trigger controller is as follows:

    1. Create a Trigger custom resource named my-trigger.
    2. Retrieve and reorganize the configuration of the EventBus, including the EventBus name (default in this example) and the name of the Dapr component associated with the EventBus.
    3. Create a Dapr component named serving-xxxxx-component-ebft-my-trigger-xxxxx to enable the Trigger to associatie with the event bus.
    4. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx for processing trigger tasks.

Create an Event Producer

  1. Use the following content to create an event producer configuration file (for example, events-producer.yaml).

    1. apiVersion: core.openfunction.io/v1beta1
    2. kind: Function
    3. metadata:
    4. name: events-producer
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-bindings:latest
    8. serving:
    9. template:
    10. containers:
    11. - name: function
    12. imagePullPolicy: Always
    13. runtime: "async"
    14. inputs:
    15. - name: cron
    16. component: cron
    17. outputs:
    18. - name: target
    19. component: kafka-server
    20. operation: "create"
    21. bindings:
    22. cron:
    23. type: bindings.cron
    24. version: v1
    25. metadata:
    26. - name: schedule
    27. value: "@every 2s"
    28. kafka-server:
    29. type: bindings.kafka
    30. version: v1
    31. metadata:
    32. - name: brokers
    33. value: "kafka-server-kafka-brokers:9092"
    34. - name: topics
    35. value: "events-sample"
    36. - name: consumerGroup
    37. value: "bindings-with-output"
    38. - name: publishTopic
    39. value: "events-sample"
    40. - name: authRequired
    41. value: "false"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f events-producer.yaml
  3. Run the following commands to observe changes of the target asynchronous function.

    1. $ kubectl get functions.core.openfunction.io
    2. NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
    3. trigger-target Skipped Running serving-dxrhd 20m
    4. $ kubectl get po --watch
    5. NAME READY STATUS RESTARTS AGE
    6. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s
    7. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s
    8. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 0s
    9. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 2s
    10. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s
    11. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s
    12. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 2/2 Running 0 4s