Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper table service for storing the State
for functions. For example, A WordCount
function can store its counters
state into BookKeeper’s table service via Pulsar Functions State API.
API
Java API
Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are avaible in the Context object when you are using Java SDK functions.
incrCounter
/**
* Increment the builtin distributed counter refered by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
Application can use incrCounter
to change the counter of a given key
by the given amount
.
getCounter
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
Application can use getCounter
to retrieve the counter of a given key
mutated by incrCounter
.
Besides the counter
API, Pulsar also exposes a general key/value API for functions to store general key/value state.
putState
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
getState
/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);
Python API
State currently is not supported at Python SDK.
Query State
A Pulsar Function can use the State API for storing state into Pulsar’s state storage and retrieving state back from Pulsar’s state storage. Additionally Pulsar also provides CLI commands for querying its state.
$ bin/pulsar-admin functions querystate \
--tenant <tenant> \
--namespace <namespace> \
--name <function-name> \
--state-storage-url <bookkeeper-service-url> \
--key <state-key> \
[---watch]
If --watch
is specified, the CLI will watch the value of the provided state-key
.
示例
Java Example
WordCountFunction
is a very good example demonstrating on how Application can easily store state
in Pulsar Functions.
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
The logic of this WordCount
function is pretty simple and straightforward:
- The function first splits the received
String
into multiple words using regex\\.
. - For each
word
, the function increments the correspondingcounter
by 1 (viaincrCounter(key, amount)
).
Python Example
State currently is not supported at Python SDK.