Create a receive adapter

As part of the source reconciliation process, you must create and deploy the underlying receive adapter.

The receive adapter requires an injection-based main method that is located in cmd/receiver_adapter/main.go:

  1. // This Adapter generates events at a regular interval.
  2. package main
  3. import (
  4. "knative.dev/eventing/pkg/adapter"
  5. myadapter "knative.dev/sample-source/pkg/adapter"
  6. )
  7. func main() {
  8. adapter.Main("sample-source", myadapter.NewEnv, myadapter.NewAdapter)
  9. }

The receive adapter’s pkg implementation consists of two main functions:

  1. A NewAdapter(ctx context.Context, aEnv adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {} call, which creates the new adapter with passed variables via the EnvConfigAccessor. The created adapter is passed the CloudEvents client (which is where the events are forwarded to). This is sometimes referred to as a sink, or ceClient in the Knative ecosystem. The return value is a reference to the adapter as defined by the adapter’s local struct.

    In the case of the sample source:

    1. // Adapter generates events at a regular interval.
    2. type Adapter struct {
    3. logger *zap.Logger
    4. interval time.Duration
    5. nextID int
    6. client cloudevents.Client
    7. }
  2. A Start function, implemented as an interface to the adapter struct:

    1. func (a *Adapter) Start(stopCh <-chan struct{}) error {

    stopCh is the signal to stop the adapter. Otherwise, the role of the function is to process the next event.

    In the case of the sample-source, this function creates a CloudEvent to forward to the specified sink every X interval, as specified by the EnvConfigAccessor parameter, which is loaded by the resource YAML:

    1. func (a *Adapter) Start(stopCh <-chan struct{}) error {
    2. a.logger.Infow("Starting heartbeat", zap.String("interval", a.interval.String()))
    3. for {
    4. select {
    5. case <-time.After(a.interval):
    6. event := a.newEvent()
    7. a.logger.Infow("Sending new event", zap.String("event", event.String()))
    8. if result := a.client.Send(context.Background(), event); !cloudevents.IsACK(result) {
    9. a.logger.Infow("failed to send event", zap.String("event", event.String()), zap.Error(result))
    10. // We got an error but it could be transient, try again next interval.
    11. continue
    12. }
    13. case <-stopCh:
    14. a.logger.Info("Shutting down...")
    15. return nil
    16. }
    17. }
    18. }

Managing the Receive Adapter in the Controller

  1. Update the ObservedGeneration and initialize the Status conditions, as defined in the samplesource_lifecycle.go and samplesource_types.go files:

    1. src.Status.InitializeConditions()
    2. src.Status.ObservedGeneration = src.Generation
  2. Create a receive adapter.

    1. Verify that the specified Kubernetes resources are valid, and update the Status accordingly.

    2. Assemble the ReceiveAdapterArgs:

      1. raArgs := resources.ReceiveAdapterArgs{
      2. EventSource: src.Namespace + "/" + src.Name,
      3. Image: r.ReceiveAdapterImage,
      4. Source: src,
      5. Labels: resources.Labels(src.Name),
      6. AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
      7. }

      Note

      The exact arguments may change based on functional requirements. Create the underlying deployment from the arguments provided, matching pod templates, labels, owner references, etc as needed to fill out the deployment. Example: pkg/reconciler/sample/resources/receive_adapter.go

    3. Fetch the existing receive adapter deployment:

      1. namespace := owner.GetObjectMeta().GetNamespace()
      2. ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
    4. If there is no existing receive adapter deployment, create one:

      1. ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
    5. Check if the expected spec is different from the existing spec, and update the deployment if required:

      1. } else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
      2. ra.Spec.Template.Spec = expected.Spec.Template.Spec
      3. if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
      4. return ra, err
      5. }
    6. If updated, record the event:

      1. return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)
    7. If successful, update the Status and MarkDeployed:

      1. src.Status.PropagateDeploymentAvailability(ra)
  3. Create a SinkBinding to bind the receive adapter with the sink.

    1. Create a Reference for the receive adapter deployment. This deployment is the SinkBinding’s source:

      1. tracker.Reference{
      2. APIVersion: appsv1.SchemeGroupVersion.String(),
      3. Kind: "Deployment",
      4. Namespace: ra.Namespace,
      5. Name: ra.Name,
      6. }
    2. Fetch the existing SinkBinding:

      1. namespace := owner.GetObjectMeta().GetNamespace()
      2. sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
    3. If there is no existing SinkBinding, create one:

      1. sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
    4. Check if the expected spec is different to the existing spec, and update the SinkBinding if required:

      1. else if r.specChanged(sb.Spec, expected.Spec) {
      2. sb.Spec = expected.Spec
      3. if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil {
      4. return sb, err
      5. }
    5. If updated, record the event:

      1. return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)
    6. MarkSink with the result:

      1. src.Status.MarkSink(sb.Status.SinkURI)
  4. Return a new reconciler event stating that the process is done:

    1. return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)