Kafka Mesh filter

The Apache Kafka mesh filter provides a facade for Apache Kafka producers. Produce requests sent to this filter insance can be forwarded to one of multiple clusters, depending on configured forwarding rules. Corresponding message versions from Kafka 2.8.1 are supported.

  • v3 API reference

  • This filter should be configured with the name envoy.filters.network.kafka_mesh.

Attention

The Kafka mesh filter is only included in contrib images

Attention

The kafka_mesh filter is experimental and is currently under active development. Capabilities will be expanded over time and the configuration structures are likely to change.

Attention

The kafka_mesh filter is does not work on Windows (the blocker is getting librdkafka compiled).

Configuration

Below example shows us typical filter configuration that proxies 3 Kafka clusters. Clients are going to connect to ‘127.0.0.1:19092’, and their messages are going to be distributed to cluster depending on topic names.

  1. listeners:
  2. - address:
  3. socket_address:
  4. address: 127.0.0.1 # Host that Kafka clients should connect to.
  5. port_value: 19092 # Port that Kafka clients should connect to.
  6. filter_chains:
  7. - filters:
  8. - name: envoy.filters.network.kafka_mesh
  9. typed_config:
  10. "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh
  11. advertised_host: "127.0.0.1"
  12. advertised_port: 19092
  13. upstream_clusters:
  14. - cluster_name: kafka_c1
  15. bootstrap_servers: cluster1_node1:9092,cluster1_node2:9092,cluster1_node3:9092
  16. partition_count: 1
  17. - cluster_name: kafka_c2
  18. bootstrap_servers: cluster2_node1:9092,cluster2_node2:9092,cluster2_node3:9092
  19. partition_count: 1
  20. - cluster_name: kafka_c3
  21. bootstrap_servers: cluster3_node1:9092,cluster3_node2:9092
  22. partition_count: 5
  23. producer_config:
  24. acks: "1"
  25. linger.ms: "500"
  26. forwarding_rules:
  27. - target_cluster: kafka_c1
  28. topic_prefix: apples
  29. - target_cluster: kafka_c2
  30. topic_prefix: bananas
  31. - target_cluster: kafka_c3
  32. topic_prefix: cherries

It should be noted that Kafka broker filter can be inserted before Kafka mesh filter in the filter chain to capture the request processing metrics.

Notes

Given that this filter does its own processing of received requests, there are some changes in behaviour compared to explicit connection to a Kafka cluster:

  1. Only ProduceRequests with version 2 are supported (what means very old producers like 0.8 are not going to be supported).

  2. Python producers need to set API version of at least 1.0.0, so that the produce requests they send are going to have records with magic equal to 2.

  3. Downstream handling of Kafka producer ‘acks’ property is delegated to upstream client. E.g. if upstream client is configured to use acks=0 then the response is going to be sent to downstream client as soon as possible (even if they had non-zero acks!).

  4. As the filter splits single producer requests into separate records, it’s possible that delivery of only some of these records fails. In that case, the response returned to upstream client is a failure, however it is possible some of the records have been appended in target cluster.

  5. Because of the splitting mentioned above, records are not necessarily appended one after another (as they do not get sent as single request to upstream). Users that want to avoid this scenario might want to take a look into downstream producer configs: ‘linger.ms’ and ‘batch.size’.

  6. Produce requests that reference to topics that do not match any of the rules are going to close connection and fail. This usually should not happen (clients request metadata first, and they should then fail with ‘no broker available’ first), but is possible if someone tailors binary payloads over the connection.

  7. librdkafka was compiled without ssl, lz4, gssapi, so related custom producer config options are not supported.

  8. Invalid custom producer configs are not found at startup (only when appropriate clusters are being sent to). Requests that would have referenced these clusters are going to close connection and fail.