Use EventSource
This document gives an example of how to use an event source to trigger a synchronous function.
In this example, an EventSource is defined for synchronous invocation to use the event source (a Kafka server) as an input bindings of a function (a Knative service). When the event source generates an event, it will invoke the function and get a synchronous return through the spec.sink
configuration.
Create a Function
Use the following content to create a function as the EventSource Sink. For more information about how to create a function, see Create sync functions.
apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
name: sink
spec:
version: "v1.0.0"
image: "openfunction/sink-sample:latest"
port: 8080
serving:
runtime: "knative"
template:
containers:
- name: function
imagePullPolicy: Always
After the function is created, run the following command to get the URL of the function.
Note
In the URL of the function, the openfunction
is the name of the Kubernetes Service and the io
is the namespace where the Kubernetes Service runs. For more information, see Namespaces of Services.
$ kubectl get functions.core.openfunction.io
NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE
sink Skipped Running serving-4x5wh https://openfunction.io/default/sink 13s
Create a Kafka Cluster
Run the following commands to install strimzi-kafka-operator in the default namespace.
helm repo add strimzi https://strimzi.io/charts/
helm install kafka-operator -n default strimzi/strimzi-kafka-operator
Use the following content to create a file
kafka.yaml
.apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-server
namespace: default
spec:
kafka:
version: 3.1.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.1"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: events-sample
namespace: default
labels:
strimzi.io/cluster: kafka-server
spec:
partitions: 10
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
Run the following command to deploy a 1-replica Kafka server named
kafka-server
and 1-replica Kafka topic namedevents-sample
in the default namespace. The Kafka and Zookeeper clusters created by this command have a storage type of ephemeral and are demonstrated using emptyDir.kubectl apply -f kafka.yaml
Run the following command to check pod status and wait for Kafka and Zookeeper to be up and running.
$ kubectl get po
NAME READY STATUS RESTARTS AGE
kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s
kafka-server-kafka-0 1/1 Running 0 9m13s
kafka-server-zookeeper-0 1/1 Running 0 9m46s
strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m
Run the following command to view the metadata of the Kafka cluster.
kafkacat -L -b kafka-server-kafka-brokers:9092
Trigger a Synchronous Function
Create an EventSource
Use the following content to create an EventSource configuration file (for example,
eventsource-sink.yaml
).Note
- The following example defines an event source named
my-eventsource
and mark the events generated by the specified Kafka server assample-one
events. spec.sink
references the target function (Knative service) created in the prerequisites.
apiVersion: events.openfunction.io/v1alpha1
kind: EventSource
metadata:
name: my-eventsource
spec:
logLevel: "2"
kafka:
sample-one:
brokers: "kafka-server-kafka-brokers.default.svc.cluster.local:9092"
topic: "events-sample"
authRequired: false
sink:
uri: "http://openfunction.io.svc.cluster.local/default/sink"
- The following example defines an event source named
Run the following command to apply the configuration file.
kubectl apply -f eventsource-sink.yaml
Run the following commands to check the results.
$ kubectl get eventsources.events.openfunction.io
NAME EVENTBUS SINK STATUS
my-eventsource Ready
$ kubectl get components
NAME AGE
serving-8f6md-component-esc-kafka-sample-one-r527t 68m
serving-8f6md-component-ts-my-eventsource-default-wz8jt 68m
$ kubectl get deployments.apps
NAME READY UP-TO-DATE AVAILABLE AGE
serving-8f6md-deployment-v100-pg9sd 1/1 1 1 68m
Note
In this example of triggering a synchronous function, the workflow of the EventSource controller is described as follows:
- Create an EventSource custom resource named
my-eventsource
. - Create a Dapr component named
serving-xxxxx-component-esc-kafka-sample-one-xxxxx
to enable the EventSource to associate with the event source. - Create a Dapr component named
serving-xxxxx-component-ts-my-eventsource-default-xxxxx
enable the EventSource to associate with the sink function. - Create a Deployment named
serving-xxxxx-deployment-v100-xxxxx-xxxxxxxxxx-xxxxx
for processing events.
Create an event producer
To start the target function, you need to create some events to trigger the function.
Use the following content to create an event producer configuration file (for example,
events-producer.yaml
).apiVersion: core.openfunction.io/v1beta1
kind: Function
metadata:
name: events-producer
spec:
version: "v1.0.0"
image: openfunctiondev/v1beta1-bindings:latest
serving:
template:
containers:
- name: function
imagePullPolicy: Always
runtime: "async"
inputs:
- name: cron
component: cron
outputs:
- name: target
component: kafka-server
operation: "create"
bindings:
cron:
type: bindings.cron
version: v1
metadata:
- name: schedule
value: "@every 2s"
kafka-server:
type: bindings.kafka
version: v1
metadata:
- name: brokers
value: "kafka-server-kafka-brokers:9092"
- name: topics
value: "events-sample"
- name: consumerGroup
value: "bindings-with-output"
- name: publishTopic
value: "events-sample"
- name: authRequired
value: "false"
Run the following command to apply the configuration file.
kubectl apply -f events-producer.yaml
Run the following command to check the results in real time.
$ kubectl get po --watch
NAME READY STATUS RESTARTS AGE
serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s
serving-8f6md-deployment-v100-pg9sd-6666c5577f-4rpdg 2/2 Running 0 23m
serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 0/2 ContainerCreating 0 1s
serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 1/2 Running 0 5s
serving-k6zw8-deployment-v100-fbtdc-dc96c4589-s25dh 2/2 Running 0 8s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 Pending 0 0s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 0s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 0/2 ContainerCreating 0 2s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 1/2 Running 0 4s
serving-4x5wh-ksvc-wxbf2-v100-deployment-5c495c84f6-8n6mk 2/2 Running 0 4s