Use EventBus and Trigger

This document gives an example of how to use EventBus and Trigger.


  • You need to create a function as the target function to be triggered. Please refer to Create a function for more details.
  • You need to create a Kafka cluster. Please refer to Create a Kafka cluster for more details.

Deploy an NATS streaming server

Run the following commands to deploy an NATS streaming server. This document uses nats://nats.default:4222 as the access address of the NATS streaming server and stan as the cluster ID. For more information, see NATS Streaming (STAN).

  1. helm repo add nats
  2. helm install nats nats/nats
  3. helm install stan nats/stan --set stan.nats.url=nats://nats:4222

Create an OpenFuncAsync Runtime Function

  1. Use the following content to create a configuration file (for example, openfuncasync-function.yaml) for the target function, which is triggered by the Trigger CRD and prints the received message.

    1. apiVersion:
    2. kind: Function
    3. metadata:
    4. name: trigger-target
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-trigger-target:latest
    8. port: 8080
    9. serving:
    10. runtime: "async"
    11. scaleOptions:
    12. keda:
    13. scaledObject:
    14. pollingInterval: 15
    15. minReplicaCount: 0
    16. maxReplicaCount: 10
    17. cooldownPeriod: 30
    18. triggers:
    19. - type: stan
    20. metadata:
    21. natsServerMonitoringEndpoint: "stan.default.svc.cluster.local:8222"
    22. queueGroup: "grp1"
    23. durableName: "ImDurable"
    24. subject: "metrics"
    25. lagThreshold: "10"
    26. inputs:
    27. - name: autoscaling-pubsub
    28. component: eventbus
    29. topic: metrics
    30. pubsub:
    31. eventbus:
    32. type: pubsub.natsstreaming
    33. version: v1
    34. metadata:
    35. - name: natsURL
    36. value: "nats://nats.default:4222"
    37. - name: natsStreamingClusterID
    38. value: "stan"
    39. - name: subscriptionType
    40. value: "queue"
    41. - name: durableSubscriptionName
    42. value: "ImDurable"
    43. - name: consumerID
    44. value: "grp1"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f openfuncasync-function.yaml

Create an EventBus and an EventSource

  1. Use the following content to create a configuration file (for example, eventbus.yaml) for an EventBus.

    1. apiVersion:
    2. kind: EventBus
    3. metadata:
    4. name: default
    5. spec:
    6. natsStreaming:
    7. natsURL: "nats://nats.default:4222"
    8. natsStreamingClusterID: "stan"
    9. subscriptionType: "queue"
    10. durableSubscriptionName: "ImDurable"
  2. Use the following content to create a configuration file (for example, eventsource.yaml) for an EventSource.


    Set the name of the event bus through spec.eventBus.

    1. apiVersion:
    2. kind: EventSource
    3. metadata:
    4. name: my-eventsource
    5. spec:
    6. logLevel: "2"
    7. eventBus: "default"
    8. kafka:
    9. sample-two:
    10. brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
    11. topic: "events-sample"
    12. authRequired: false
  3. Run the following commands to apply these configuration files.

    1. kubectl apply -f eventbus.yaml
    2. kubectl apply -f eventsource.yaml
  4. Run the following commands to check the results.

    1. $ kubectl get
    3. my-eventsource default Ready
    4. $ kubectl get
    5. NAME AGE
    6. default 6m53s
    7. $ kubectl get components
    8. NAME AGE
    9. serving-6r5dl-component-eventbus-jlpqf 11m
    10. serving-9689d-component-ebfes-my-eventsource-cmcbw 6m57s
    11. serving-9689d-component-esc-kafka-sample-two-l99cg 6m57s
    12. serving-k6zw8-component-cron-9x8hl 61m
    13. serving-k6zw8-component-kafka-server-sjrzs 61m
    14. $ kubectl get deployments.apps
    16. serving-6r5dl-deployment-v100-m7nq2 0/0 0 0 12m
    17. serving-9689d-deployment-v100-5qdvk 1/1 1 1 7m17s


    In the case of using the event bus, the workflow of the EventSource controller is described as follows:

    1. Create an EventSource custom resource named my-eventsource.
    2. Retrieve and reorganize the configuration of the EventBus, including the EventBus name (default in this example) and the name of the Dapr component associated with the EventBus.
    3. Create a Dapr component named serving-xxxxx-component-ebfes-my-eventsource-xxxxx to enable the EventSource to associate with the event bus.
    4. Create a Dapr component named serving-xxxxx-component-esc-kafka-sample-two-xxxxx to enable the EventSource to associate with the event source.
    5. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx for processing events.

Create a Trigger

  1. Use the following content to create a configuration file (for example, trigger.yaml) for a Trigger.


    • Set the event bus associated with the Trigger through spec.eventBus.
    • Set the event input source through spec.inputs.
    • This is a simple trigger that collects events from the EventBus named default. When it retrieves a sample-two event from the EventSource my-eventsource, it triggers a Knative service named function-sample-serving-qrdx8-ksvc-fwml8 and sends the event to the topic metrics of the event bus at the same time.
    1. apiVersion:
    2. kind: Trigger
    3. metadata:
    4. name: my-trigger
    5. spec:
    6. logLevel: "2"
    7. eventBus: "default"
    8. inputs:
    9. inputDemo:
    10. eventSource: "my-eventsource"
    11. event: "sample-two"
    12. subscribers:
    13. - condition: inputDemo
    14. topic: "metrics"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f trigger.yaml
  3. Run the following commands to check the results.

    1. $ kubectl get
    3. my-trigger default Ready
    4. $ kubectl get
    5. NAME AGE
    6. default 62m
    7. $ kubectl get components
    8. NAME AGE
    9. serving-9689d-component-ebfes-my-eventsource-cmcbw 46m
    10. serving-9689d-component-esc-kafka-sample-two-l99cg 46m
    11. serving-dxrhd-component-eventbus-t65q7 13m
    12. serving-zwlj4-component-ebft-my-trigger-4925n 100s


    In the case of using the event bus, the workflow of the Trigger controller is as follows:

    1. Create a Trigger custom resource named my-trigger.
    2. Retrieve and reorganize the configuration of the EventBus, including the EventBus name (default in this example) and the name of the Dapr component associated with the EventBus.
    3. Create a Dapr component named serving-xxxxx-component-ebft-my-trigger-xxxxx to enable the Trigger to associatie with the event bus.
    4. Create a Deployment named serving-xxxxx-deployment-v100-xxxxx for processing trigger tasks.

Create an Event Producer

  1. Use the following content to create an event producer configuration file (for example, events-producer.yaml).

    1. apiVersion:
    2. kind: Function
    3. metadata:
    4. name: events-producer
    5. spec:
    6. version: "v1.0.0"
    7. image: openfunctiondev/v1beta1-bindings:latest
    8. serving:
    9. template:
    10. containers:
    11. - name: function
    12. imagePullPolicy: Always
    13. runtime: "async"
    14. inputs:
    15. - name: cron
    16. component: cron
    17. outputs:
    18. - name: target
    19. component: kafka-server
    20. operation: "create"
    21. bindings:
    22. cron:
    23. type: bindings.cron
    24. version: v1
    25. metadata:
    26. - name: schedule
    27. value: "@every 2s"
    28. kafka-server:
    29. type: bindings.kafka
    30. version: v1
    31. metadata:
    32. - name: brokers
    33. value: "kafka-server-kafka-brokers:9092"
    34. - name: topics
    35. value: "events-sample"
    36. - name: consumerGroup
    37. value: "bindings-with-output"
    38. - name: publishTopic
    39. value: "events-sample"
    40. - name: authRequired
    41. value: "false"
  2. Run the following command to apply the configuration file.

    1. kubectl apply -f events-producer.yaml
  3. Run the following commands to observe changes of the target asynchronous function.

    1. $ kubectl get
    3. trigger-target Skipped Running serving-dxrhd 20m
    4. $ kubectl get po --watch
    6. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s
    7. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 Pending 0 0s
    8. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 0s
    9. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 0/2 ContainerCreating 0 2s
    10. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s
    11. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 1/2 Running 0 4s
    12. serving-dxrhd-deployment-v100-xmrkq-785cb5f99-6hclm 2/2 Running 0 4s