Use SerDe

Pulsar Functions use SerDe (Serialization and Deserialization) when publishing data to or consuming data from Pulsar topics. How SerDe works by default depends on the language you use (Java or Python) for a particular function. In both languages, however, you can write custom SerDe logic for more complex, application-specific types.

Use SerDe for Java functions

The following basic Java types are built-in and supported by default for Java functions: string, double, integer, float, long, short, and byte.

To customize Java types, you need to implement the following interface.

  1. public interface SerDe<T> {
  2. T deserialize(byte[] input);
  3. byte[] serialize(T input);
  4. }

SerDe works in the following ways for Java functions.

  • If the input and output topics have a schema, Pulsar Functions use the schema for SerDe.
  • If the input or output topics do not exist, Pulsar Functions adopt the following rules to determine SerDe:
    • If the schema type is specified, Pulsar Functions use the specified schema type.
    • If SerDe is specified, Pulsar Functions use the specified SerDe, and the schema type for input and output topics is byte.
    • If neither the schema type nor SerDe is specified, Pulsar Functions use the built-in SerDe. For non-primitive schema types, the built-in SerDe serializes and deserializes objects in the JSON format.

For example, imagine that you’re writing a function that processes tweet objects. You can refer to the following example of the Tweet class in Java.

  1. public class Tweet {
  2. private String username;
  3. private String tweetContent;
  4. public Tweet(String username, String tweetContent) {
  5. this.username = username;
  6. this.tweetContent = tweetContent;
  7. }
  8. // Standard setters and getters
  9. }

To pass Tweet objects directly between functions, you need to provide a custom SerDe class. In the example below, Tweet objects are basically strings, and username and tweet content are separated by |.

  1. package com.example.serde;
  2. import org.apache.pulsar.functions.api.SerDe;
  3. import java.util.regex.Pattern;
  4. public class TweetSerde implements SerDe<Tweet> {
  5. public Tweet deserialize(byte[] input) {
  6. String s = new String(input);
  7. String[] fields = s.split(Pattern.quote("|"));
  8. return new Tweet(fields[0], fields[1]);
  9. }
  10. public byte[] serialize(Tweet input) {
  11. return "%s|%s".format(input.getUsername(), input.getTweetContent()).getBytes();
  12. }
  13. }

To apply a customized SerDe to a particular function, you need to:

  • Package the Tweet and TweetSerde classes into a JAR.
  • Specify a path to the JAR and SerDe class names when deploying the function.

The following is an example of using the create command to deploy a function by applying a customized SerDe.

  1. bin/pulsar-admin functions create \
  2. --jar /path/to/your.jar \
  3. --output-serde-classname com.example.serde.TweetSerde \
  4. # Other function attributes

Use SerDe - 图1note

Custom SerDe classes must be packaged with your function JARs.

Use SerDe for Python functions

In Python, the default SerDe is an identity, meaning that the type is serialized as whatever type the function returns.

For example, you can specify the SerDe as follows when deploying a function in cluster mode.

  1. bin/pulsar-admin functions create \
  2. --tenant public \
  3. --namespace default \
  4. --name my_function \
  5. --py my_function.py \
  6. --classname my_function.MyFunction \
  7. --custom-serde-inputs '{"input-topic-1":"Serde1","input-topic-2":"Serde2"}' \
  8. --output-serde-classname Serde3 \
  9. --output output-topic-1

This case contains two input topics: input-topic-1 and input-topic-2, each of which is mapped to a different SerDe class (the mapping must be specified as a JSON string). The output topic output-topic-1 uses the Serde3 class for SerDe.

Use SerDe - 图2note

All function-related logic, including processing and SerDe classes, must be contained within a single Python file.

The table outlines three SerDe options for Python functions.

SerDe optionDescriptionUse case
IdentitySerde (default)Use the IdentitySerde, which leaves the data unchanged. Creating or running a function without explicitly specifying SerDe means that this option is used.When you work with simple types like strings, booleans, integers.
PickleSerDeUse the PickleSerDe, which uses Python pickle for SerDe.When you work with complex, application-specific types and are comfortable with the “best-effort” approach of pickle.
Custom SerDeCreate a custom SerDe class by implementing the baseline SerDe class, which has just two methods:
serialize for converting the object into bytes.
deserialize for converting bytes into an object of the required application-specific type.
When you require explicit control over SerDe, potentially for performance or data compatibility purposes.

For example, imagine that you are writing a function that processes tweet objects. You can refer to the following example of the Tweet class in Python.

  1. class Tweet(object):
  2. def __init__(self, username, tweet_content):
  3. self.username = username
  4. self.tweet_content = tweet_content

To use this class in Pulsar Functions, you have two options:

  • Specify PickleSerDe, which applies the pickle library for SerDe.
  • Create your own SerDe class. The following is an example.
  1. from pulsar import SerDe
  2. class TweetSerDe(SerDe):
  3. def serialize(self, input):
  4. return bytes("{0}|{1}".format(input.username, input.tweet_content))
  5. def deserialize(self, input_bytes):
  6. tweet_components = str(input_bytes).split('|')
  7. return Tweet(tweet_components[0], tweet_componentsp[1])

For more details, see code example.