Apache Kafka

Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink’s universal Kafka connector and provides exactly-once processing semantics. The Kafka I/O Module is configurable in Yaml or Java.

Dependency

To use the Kafka I/O Module in Java, please include the following dependency in your pom.

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>statefun-kafka-io</artifactId>
  4. <version>2.2.0</version>
  5. <scope>provided</scope>
  6. </dependency>

Kafka Ingress Spec

A KafkaIngressSpec declares an ingress spec for consuming from Kafka cluster.

It accepts the following arguments:

  1. The ingress identifier associated with this ingress
  2. The topic name / list of topic names
  3. The address of the bootstrap servers
  4. The consumer group id to use
  5. A KafkaIngressDeserializer for deserializing data from Kafka (Java only)
  6. The position to start consuming from
  1. version: "1.0"
  2. module:
  3. meta:
  4. type: remote
  5. spec:
  6. ingresses:
  7. - ingress:
  8. meta:
  9. type: statefun.kafka.io/routable-protobuf-ingress
  10. id: example/user-ingress
  11. spec:
  12. address: kafka-broker:9092
  13. consumerGroupId: routable-kafka-e2e
  14. startupPosition:
  15. type: earliest
  16. topics:
  17. - topic: messages-1
  18. typeUrl: org.apache.flink.statefun.docs.models.User
  19. targets:
  20. - example-namespace/my-function-1
  21. - example-namespace/my-function-2
  1. package org.apache.flink.statefun.docs.io.kafka;
  2. import org.apache.flink.statefun.docs.models.User;
  3. import org.apache.flink.statefun.sdk.io.IngressIdentifier;
  4. import org.apache.flink.statefun.sdk.io.IngressSpec;
  5. import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
  6. import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
  7. public class IngressSpecs {
  8. public static final IngressIdentifier<User> ID =
  9. new IngressIdentifier<>(User.class, "example", "input-ingress");
  10. public static final IngressSpec<User> kafkaIngress =
  11. KafkaIngressBuilder.forIdentifier(ID)
  12. .withKafkaAddress("localhost:9092")
  13. .withConsumerGroupId("greetings")
  14. .withTopic("my-topic")
  15. .withDeserializer(UserDeserializer.class)
  16. .withStartupPosition(KafkaIngressStartupPosition.fromLatest())
  17. .build();
  18. }

The ingress also accepts properties to directly configure the Kafka client, using KafkaIngressBuilder#withProperties(Properties). Please refer to the Kafka consumer configuration documentation for the full list of available properties. Note that configuration passed using named methods, such as KafkaIngressBuilder#withConsumerGroupId(String), will have higher precedence and overwrite their respective settings in the provided properties.

Startup Position

The ingress allows configuring the startup position to be one of the following:

From Group Offset (default)

Starts from offsets that were committed to Kafka for the specified consumer group.

  1. startupPosition:
  2. type: group-offsets
  1. KafkaIngressStartupPosition#fromGroupOffsets();

Earlist

Starts from the earliest offset.

  1. startupPosition:
  2. type: earliest
  1. KafkaIngressStartupPosition#fromEarliest();

Latest

Starts from the latest offset.

  1. startupPosition:
  2. type: latest
  1. KafkaIngressStartupPosition#fromLatest();

Specific Offsets

Starts from specific offsets, defined as a map of partitions to their target starting offset.

  1. startupPosition:
  2. type: specific-offsets
  3. offsets:
  4. - user-topic/0: 91
  5. - user-topic/1: 11
  6. - user-topic/2: 8
  1. Map<TopicPartition, Long> offsets = new HashMap<>();
  2. offsets.add(new TopicPartition("user-topic", 0), 91);
  3. offsets.add(new TopicPartition("user-topic", 11), 11);
  4. offsets.add(new TopicPartition("user-topic", 8), 8);
  5. KafkaIngressStartupPosition#fromSpecificOffsets(offsets);

Date

Starts from offsets that have an ingestion time larger than or equal to a specified date.

  1. startupPosition:
  2. type: date
  3. date: 2020-02-01 04:15:00.00 Z
  1. KafkaIngressStartupPosition#fromDate(ZonedDateTime.now());

On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition). By default, this is set to be the latest position.

Kafka Deserializer

When using the Java api, the Kafka ingress needs to know how to turn the binary data in Kafka into Java objects. The KafkaIngressDeserializer allows users to specify such a schema. The T deserialize(ConsumerRecord<byte[], byte[]> record) method gets called for each Kafka message, passing the key, value, and metadata from Kafka.

  1. package org.apache.flink.statefun.docs.io.kafka;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import java.io.IOException;
  4. import org.apache.flink.statefun.docs.models.User;
  5. import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class UserDeserializer implements KafkaIngressDeserializer<User> {
  10. private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
  11. private final ObjectMapper mapper = new ObjectMapper();
  12. @Override
  13. public User deserialize(ConsumerRecord<byte[], byte[]> input) {
  14. try {
  15. return mapper.readValue(input.value(), User.class);
  16. } catch (IOException e) {
  17. LOG.debug("Failed to deserialize record", e);
  18. return null;
  19. }
  20. }
  21. }

Kafka Egress Spec

A KafkaEgressBuilder declares an egress spec for writing data out to a Kafka cluster.

It accepts the following arguments:

  1. The egress identifier associated with this egress
  2. The address of the bootstrap servers
  3. A KafkaEgressSerializer for serializing data into Kafka (Java only)
  4. The fault tolerance semantic
  5. Properties for the Kafka producer
  1. version: "1.0"
  2. module:
  3. meta:
  4. type: remote
  5. spec:
  6. egresses:
  7. - egress:
  8. meta:
  9. type: statefun.kafka.io/generic-egress
  10. id: example/output-messages
  11. spec:
  12. address: kafka-broker:9092
  13. deliverySemantic:
  14. type: exactly-once
  15. transactionTimeoutMillis: 100000
  16. properties:
  17. - foo.config: bar
  1. package org.apache.flink.statefun.docs.io.kafka;
  2. import org.apache.flink.statefun.docs.models.User;
  3. import org.apache.flink.statefun.sdk.io.EgressIdentifier;
  4. import org.apache.flink.statefun.sdk.io.EgressSpec;
  5. import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
  6. public class EgressSpecs {
  7. public static final EgressIdentifier<User> ID =
  8. new EgressIdentifier<>("example", "output-egress", User.class);
  9. public static final EgressSpec<User> kafkaEgress =
  10. KafkaEgressBuilder.forIdentifier(ID)
  11. .withKafkaAddress("localhost:9092")
  12. .withSerializer(UserSerializer.class)
  13. .build();
  14. }

Please refer to the Kafka producer configuration documentation for the full list of available properties.

Kafka Egress and Fault Tolerance

With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. You can choose three different modes of operation.

None

Nothing is guaranteed, produced records can be lost or duplicated.

  1. deliverySemantic:
  2. type: none
  1. KafkaEgressBuilder#withNoProducerSemantics();

At Least Once

Stateful Functions will guarantee that no records will be lost but they can be duplicated.

  1. deliverySemantic:
  2. type: at-least-once
  1. KafkaEgressBuilder#withAtLeastOnceProducerSemantics();

Exactly Once

Stateful Functions uses Kafka transactions to provide exactly-once semantics.

  1. deliverySemantic:
  2. type: exactly-once
  3. transactionTimeoutMillis: 900000 # 15 min
  1. KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15));

Kafka Serializer

When using the Java api, the Kafka egress needs to know how to turn Java objects into binary data. The KafkaEgressSerializer allows users to specify such a schema. The ProducerRecord<byte[], byte[]> serialize(T out) method gets called for each message, allowing users to set a key, value, and other metadata.

  1. package org.apache.flink.statefun.docs.io.kafka;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import org.apache.flink.statefun.docs.models.User;
  5. import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class UserSerializer implements KafkaEgressSerializer<User> {
  10. private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
  11. private static final String TOPIC = "user-topic";
  12. private final ObjectMapper mapper = new ObjectMapper();
  13. @Override
  14. public ProducerRecord<byte[], byte[]> serialize(User user) {
  15. try {
  16. byte[] key = user.getUserId().getBytes();
  17. byte[] value = mapper.writeValueAsBytes(user);
  18. return new ProducerRecord<>(TOPIC, key, value);
  19. } catch (JsonProcessingException e) {
  20. LOG.info("Failed to serializer user", e);
  21. return null;
  22. }
  23. }
  24. }