Window Functions Context

Java SDK provides access to a window context object that can be used by a window function. This context object provides a wide variety of information and functionality for Pulsar window functions as below.

  • Spec

    • Names of all input topics and the output topic associated with the function.
    • Tenant and namespace associated with the function.
    • Pulsar window function name, ID, and version.
    • ID of the Pulsar function instance running the window function.
    • Number of instances that invoke the window function.
    • Built-in type or custom class name of the output schema.
  • Logger

    • Logger object used by the window function, which can be used to create window function log messages.
  • User config

    • Access to arbitrary user configuration values.
  • Routing

    • Routing is supported in Pulsar window functions. Pulsar window functions send messages to arbitrary topics as per the publish interface.
  • Metrics

    • Interface for recording metrics.
  • State storage

Spec

Spec contains the basic information of a function.

Get input topics

The getInputTopics method gets the name list of all input topics.

This example demonstrates how to get the name list of all input topics in a Java window function.

  1. public class GetInputTopicsWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. Collection<String> inputTopics = context.getInputTopics();
  5. System.out.println(inputTopics);
  6. return null;
  7. }
  8. }

Get output topic

The getOutputTopic method gets the name of a topic to which the message is sent.

This example demonstrates how to get the name of an output topic in a Java window function.

  1. public class GetOutputTopicWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String outputTopic = context.getOutputTopic();
  5. System.out.println(outputTopic);
  6. return null;
  7. }
  8. }

Get tenant

The getTenant method gets the tenant name associated with the window function.

This example demonstrates how to get the tenant name in a Java window function.

  1. public class GetTenantWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String tenant = context.getTenant();
  5. System.out.println(tenant);
  6. return null;
  7. }
  8. }

Get namespace

The getNamespace method gets the namespace associated with the window function.

This example demonstrates how to get the namespace in a Java window function.

  1. public class GetNamespaceWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String ns = context.getNamespace();
  5. System.out.println(ns);
  6. return null;
  7. }
  8. }

Get function name

The getFunctionName method gets the window function name.

This example demonstrates how to get the function name in a Java window function.

  1. public class GetNameOfWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String functionName = context.getFunctionName();
  5. System.out.println(functionName);
  6. return null;
  7. }
  8. }

Get function ID

The getFunctionId method gets the window function ID.

This example demonstrates how to get the function ID in a Java window function.

  1. public class GetFunctionIDWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String functionID = context.getFunctionId();
  5. System.out.println(functionID);
  6. return null;
  7. }
  8. }

Get function version

The getFunctionVersion method gets the window function version.

This example demonstrates how to get the function version of a Java window function.

  1. public class GetVersionOfWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String functionVersion = context.getFunctionVersion();
  5. System.out.println(functionVersion);
  6. return null;
  7. }
  8. }

Get instance ID

The getInstanceId method gets the instance ID of a window function.

This example demonstrates how to get the instance ID in a Java window function.

  1. public class GetInstanceIDWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. int instanceId = context.getInstanceId();
  5. System.out.println(instanceId);
  6. return null;
  7. }
  8. }

Get num instances

The getNumInstances method gets the number of instances that invoke the window function.

This example demonstrates how to get the number of instances in a Java window function.

  1. public class GetNumInstancesWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. int numInstances = context.getNumInstances();
  5. System.out.println(numInstances);
  6. return null;
  7. }
  8. }

Get output schema type

The getOutputSchemaType method gets the built-in type or custom class name of the output schema.

This example demonstrates how to get the output schema type of a Java window function.

  1. public class GetOutputSchemaTypeWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  4. String schemaType = context.getOutputSchemaType();
  5. System.out.println(schemaType);
  6. return null;
  7. }
  8. }

Logger

Pulsar window functions using Java SDK has access to an SLF4j Logger object that can be used to produce logs at the chosen log level.

This example logs either a WARNING-level or INFO-level log based on whether the incoming string contains the word danger or not in a Java function.

  1. import java.util.Collection;
  2. import org.apache.pulsar.functions.api.Record;
  3. import org.apache.pulsar.functions.api.WindowContext;
  4. import org.apache.pulsar.functions.api.WindowFunction;
  5. import org.slf4j.Logger;
  6. public class LoggingWindowFunction implements WindowFunction<String, Void> {
  7. @Override
  8. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  9. Logger log = context.getLogger();
  10. for (Record<String> record : inputs) {
  11. log.info(record + "-window-log");
  12. }
  13. return null;
  14. }
  15. }

If you need your function to produce logs, specify a log topic when creating or running the function.

  1. bin/pulsar-admin functions create \
  2. --jar my-functions.jar \
  3. --classname my.package.LoggingFunction \
  4. --log-topic persistent://public/default/logging-function-logs \
  5. # Other function configs

You can access all logs produced by LoggingFunction via the persistent://public/default/logging-function-logs topic.

Metrics

Pulsar window functions can publish arbitrary metrics to the metrics interface which can be queried.

Note

If a Pulsar window function uses the language-native interface for Java, that function is not able to publish metrics and stats to Pulsar.

You can record metrics using the context object on a per-key basis.

This example sets a metric for the process-count key and a different metric for the elevens-count key every time the function processes a message in a Java function.

  1. import java.util.Collection;
  2. import org.apache.pulsar.functions.api.Record;
  3. import org.apache.pulsar.functions.api.WindowContext;
  4. import org.apache.pulsar.functions.api.WindowFunction;
  5. /**
  6. * Example function that wants to keep track of
  7. * the event time of each message sent.
  8. */
  9. public class UserMetricWindowFunction implements WindowFunction<String, Void> {
  10. @Override
  11. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  12. for (Record<String> record : inputs) {
  13. if (record.getEventTime().isPresent()) {
  14. context.recordMetric("MessageEventTime", record.getEventTime().get().doubleValue());
  15. }
  16. }
  17. return null;
  18. }
  19. }

User config

When you run or update Pulsar Functions that are created using SDK, you can pass arbitrary key/value pairs to them with the --user-config flag. Key/value pairs must be specified as JSON.

This example passes a user configured key/value to a function.

  1. bin/pulsar-admin functions create \
  2. --name word-filter \
  3. --user-config '{"forbidden-word":"rosebud"}' \
  4. # Other function configs

API

You can use the following APIs to get user-defined information for window functions.

getUserConfigMap

getUserConfigMap API gets a map of all user-defined key/value configurations for the window function.

  1. /**
  2. * Get a map of all user-defined key/value configs for the function.
  3. *
  4. * @return The full map of user-defined config values
  5. */
  6. Map<String, Object> getUserConfigMap();

getUserConfigValue

The getUserConfigValue API gets a user-defined key/value.

  1. /**
  2. * Get any user-defined key/value.
  3. *
  4. * @param key The key
  5. * @return The Optional value specified by the user for that key.
  6. */
  7. Optional<Object> getUserConfigValue(String key);

getUserConfigValueOrDefault

The getUserConfigValueOrDefault API gets a user-defined key/value or a default value if none is present.

  1. /**
  2. * Get any user-defined key/value or a default value if none is present.
  3. *
  4. * @param key
  5. * @param defaultValue
  6. * @return Either the user config value associated with a given key or a supplied default value
  7. */
  8. Object getUserConfigValueOrDefault(String key, Object defaultValue);

This example demonstrates how to access key/value pairs provided to Pulsar window functions.

Java SDK context object enables you to access key/value pairs provided to Pulsar window functions via the command line (as JSON).

Tip

For all key/value pairs passed to Java window functions, both the key and the value are String. To set the value to be a different type, you need to deserialize it from the String type.

This example passes a key/value pair in a Java window function.

  1. bin/pulsar-admin functions create \
  2. --user-config '{"word-of-the-day":"verdure"}' \
  3. # Other function configs

This example accesses values in a Java window function.

The UserConfigFunction function logs the string "The word of the day is verdure" every time the function is invoked (which means every time a message arrives). The user config of word-of-the-day is changed only when the function is updated with a new config value via multiple ways, such as the command line tool or REST API.

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. import java.util.Optional;
  5. public class UserConfigWindowFunction implements WindowFunction<String, String> {
  6. @Override
  7. public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
  8. Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
  9. if (whatToWrite.get() != null) {
  10. return (String)whatToWrite.get();
  11. } else {
  12. return "Not a nice way";
  13. }
  14. }
  15. }

If no value is provided, you can access the entire user config map or set a default value.

  1. // Get the whole config map
  2. Map<String, String> allConfigs = context.getUserConfigMap();
  3. // Get value or resort to default
  4. String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");

Routing

You can use the context.publish() interface to publish as many results as you want.

This example shows that the PublishFunction class uses the built-in function in the context to publish messages to the publishTopic in a Java function.

  1. public class PublishWindowFunction implements WindowFunction<String, Void> {
  2. @Override
  3. public Void process(Collection<Record<String>> input, WindowContext context) throws Exception {
  4. String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
  5. String output = String.format("%s!", input);
  6. context.publish(publishTopic, output);
  7. return null;
  8. }
  9. }

State storage

Pulsar window functions use Apache BookKeeper as a state storage interface. Apache Pulsar installation (including the standalone installation) includes the deployment of BookKeeper bookies.

Apache Pulsar integrates with Apache BookKeeper table service to store the state for functions. For example, the WordCount function can store its counters state into BookKeeper table service via Pulsar Functions state APIs.

States are key-value pairs, where the key is a string and the value is arbitrary binary data—counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function and shared between instances of that function.

Currently, Pulsar window functions expose Java API to access, update, and manage states. These APIs are available in the context object when you use Java SDK functions.

Java API说明
incrCounterIncreases a built-in distributed counter referred by key.
getCounterGets the counter value for the key.
putStateUpdates the state value for the key.

You can use the following APIs to access, update, and manage states in Java window functions.

incrCounter

The incrCounter API increases a built-in distributed counter referred by key.

Applications use the incrCounter API to change the counter of a given key by the given amount. If the key does not exist, a new key is created.

  1. /**
  2. * Increment the builtin distributed counter referred by key
  3. * @param key The name of the key
  4. * @param amount The amount to be incremented
  5. */
  6. void incrCounter(String key, long amount);

getCounter

The getCounter API gets the counter value for the key.

Applications uses the getCounter API to retrieve the counter of a given key changed by the incrCounter API.

  1. /**
  2. * Retrieve the counter value for the key.
  3. *
  4. * @param key name of the key
  5. * @return the amount of the counter value for this key
  6. */
  7. long getCounter(String key);

Except the getCounter API, Pulsar also exposes a general key/value API (putState) for functions to store general key/value state.

putState

The putState API updates the state value for the key.

  1. /**
  2. * Update the state value for the key.
  3. *
  4. * @param key name of the key
  5. * @param value state value of the key
  6. */
  7. void putState(String key, ByteBuffer value);

This example demonstrates how applications store states in Pulsar window functions.

The logic of the WordCountWindowFunction is simple and straightforward.

  1. The function first splits the received string into multiple words using regex \\..

  2. For each word, the function increments the corresponding counter by 1 via incrCounter(key, amount).

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import java.util.Arrays;
  4. public class WordCountWindowFunction implements WindowFunction<String, Void> {
  5. @Override
  6. public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  7. for (Record<String> input : inputs) {
  8. Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
  9. }
  10. return null;
  11. }
  12. }