Package Pulsar Functions

You can package Pulsar functions in Java, Python, and Go. Packaging the window function in Java is the same as packaging a function in Java.

note

Currently, the window function is not available in Python and Go.

Prerequisite

Before running a Pulsar function, you need to start Pulsar. You can run a standalone Pulsar in Docker, or run Pulsar in Kubernetes.

To check whether the Docker image starts, you can use the docker ps command.

Java

To package a function in Java, complete the following steps.

  1. Create a new maven project with a pom file. In the following code sample, the value of mainClass is your package name.

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>java-function</groupId>
    7. <artifactId>java-function</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <dependencies>
    10. <dependency>
    11. <groupId>org.apache.pulsar</groupId>
    12. <artifactId>pulsar-functions-api</artifactId>
    13. <version>2.6.0</version>
    14. </dependency>
    15. </dependencies>
    16. <build>
    17. <plugins>
    18. <plugin>
    19. <artifactId>maven-assembly-plugin</artifactId>
    20. <configuration>
    21. <appendAssemblyId>false</appendAssemblyId>
    22. <descriptorRefs>
    23. <descriptorRef>jar-with-dependencies</descriptorRef>
    24. </descriptorRefs>
    25. <archive>
    26. <manifest>
    27. <mainClass>org.example.test.ExclamationFunction</mainClass>
    28. </manifest>
    29. </archive>
    30. </configuration>
    31. <executions>
    32. <execution>
    33. <id>make-assembly</id>
    34. <phase>package</phase>
    35. <goals>
    36. <goal>assembly</goal>
    37. </goals>
    38. </execution>
    39. </executions>
    40. </plugin>
    41. <plugin>
    42. <groupId>org.apache.maven.plugins</groupId>
    43. <artifactId>maven-compiler-plugin</artifactId>
    44. <configuration>
    45. <source>8</source>
    46. <target>8</target>
    47. </configuration>
    48. </plugin>
    49. </plugins>
    50. </build>
    51. </project>
  1. Write a Java function.

    1. package org.example.test;
    2. import java.util.function.Function;
    3. public class ExclamationFunction implements Function<String, String> {
    4. @Override
    5. public String apply(String s) {
    6. return "This is my function!";
    7. }
    8. }
  1. For the imported package, you can use one of the following interfaces:
  2. - Function interface provided by Java 8: `java.util.function.Function`
  3. - Pulsar Function interface: `org.apache.pulsar.functions.api.Function`
  4. The main difference between the two interfaces is that the `org.apache.pulsar.functions.api.Function` interface provides the context interface. When you write a function and want to interact with it, you can use context to obtain a wide variety of information and functionality for Pulsar Functions.
  5. The following example uses `org.apache.pulsar.functions.api.Function` interface with context.
  6. ```
  7. package org.example.functions;
  8. import org.apache.pulsar.functions.api.Context;
  9. import org.apache.pulsar.functions.api.Function;
  10. import java.util.Arrays;
  11. public class WordCountFunction implements Function<String, Void> {
  12. // This function is invoked every time a message is published to the input topic
  13. @Override
  14. public Void process(String input, Context context) throws Exception {
  15. Arrays.asList(input.split(" ")).forEach(word -> {
  16. String counterKey = word.toLowerCase();
  17. context.incrCounter(counterKey, 1);
  18. });
  19. return null;
  20. }
  21. }
  22. ```
  1. Package the Java function.

    1. mvn package
  1. After the Java function is packaged, a `target` directory is created automatically. Open the `target` directory to check if there is a JAR package similar to `java-function-1.0-SNAPSHOT.jar`.
  1. Run the Java function.

    (1) Copy the packaged jar file to the Pulsar image.

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <path of java-function-1.0-SNAPSHOT.jar> CONTAINER ID:/pulsar
  1. (2) Run the Java function using the following command.
  2. ```
  3. ./bin/pulsar-admin functions localrun \
  4. --classname org.example.test.ExclamationFunction \
  5. --jar java-function-1.0-SNAPSHOT.jar \
  6. --inputs persistent://public/default/my-topic-1 \
  7. --output persistent://public/default/test-1 \
  8. --tenant public \
  9. --namespace default \
  10. --name JavaFunction
  11. ```
  12. The following log indicates that the Java function starts successfully.
  13. ```
  14. ...
  15. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
  16. ...
  17. ```

Python

Python Function supports the following three formats:

  • One python file
  • ZIP file
  • PIP

One python file

To package a function with one python file in Python, complete the following steps.

  1. Write a Python function.

    1. from pulsar import Function // import the Function module from Pulsar
    2. # The classic ExclamationFunction that appends an exclamation at the end
    3. # of the input
    4. class ExclamationFunction(Function):
    5. def __init__(self):
    6. pass
    7. def process(self, input, context):
    8. return input + '!'
  1. In this example, when you write a Python function, you need to inherit the Function class and implement the `process()` method.
  2. `process()` mainly has two parameters:
  3. - `input` represents your input.
  4. - `context` represents an interface exposed by the Pulsar Function. You can get the attributes in the Python function based on the provided context object.
  1. Install a Python client.

    The implementation of a Python function depends on the Python client, so before deploying a Python function, you need to install the corresponding version of the Python client.

    1. pip install pulsar-client==2.6.0
  1. Run the Python Function.

    (1) Copy the Python function file to the Pulsar image.

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <path of Python function file> CONTAINER ID:/pulsar
  1. (2) Run the Python function using the following command.
  2. ```
  3. ./bin/pulsar-admin functions localrun \
  4. --classname <Python Function file name>.<Python Function class name> \
  5. --py <path of Python Function file> \
  6. --inputs persistent://public/default/my-topic-1 \
  7. --output persistent://public/default/test-1 \
  8. --tenant public \
  9. --namespace default \
  10. --name PythonFunction
  11. ```
  12. The following log indicates that the Python function starts successfully.
  13. ```
  14. ...
  15. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
  16. ...
  17. ```

ZIP file

To package a function with the ZIP file in Python, complete the following steps.

  1. Prepare the ZIP file.

    The following is required when packaging the ZIP file of the Python Function.

    1. Assuming the zip file is named as `func.zip`, unzip the `func.zip` folder:
    2. "func/src"
    3. "func/requirements.txt"
    4. "func/deps"
  1. Take [exclamation.zip](https://github.com/apache/pulsar/tree/master/tests/docker-images/latest-version-image/python-examples) as an example. The internal structure of the example is as follows.
  2. ```
  3. .
  4. ├── deps
  5. │ └── sh-1.12.14-py2.py3-none-any.whl
  6. └── src
  7. └── exclamation.py
  8. ```
  1. Run the Python Function.

    (1) Copy the ZIP file to the Pulsar image.

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <path of ZIP file> CONTAINER ID:/pulsar
  1. (2) Run the Python function using the following command.
  2. ```
  3. ./bin/pulsar-admin functions localrun \
  4. --classname exclamation \
  5. --py <path of ZIP file> \
  6. --inputs persistent://public/default/in-topic \
  7. --output persistent://public/default/out-topic \
  8. --tenant public \
  9. --namespace default \
  10. --name PythonFunction
  11. ```
  12. The following log indicates that the Python function starts successfully.
  13. ```
  14. ...
  15. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
  16. ...
  17. ```

PIP

The PIP method is only supported in Kubernetes runtime. To package a function with PIP in Python, complete the following steps.

  1. Configure the functions_worker.yml file.

    1. #### Kubernetes Runtime ####
    2. installUserCodeDependencies: true
  1. Write your Python Function.

    1. from pulsar import Function
    2. import js2xml
    3. # The classic ExclamationFunction that appends an exclamation at the end
    4. # of the input
    5. class ExclamationFunction(Function):
    6. def __init__(self):
    7. pass
    8. def process(self, input, context):
    9. // add your logic
    10. return input + '!'
  1. You can introduce additional dependencies. When Python Function detects that the file currently used is `whl` and the `installUserCodeDependencies` parameter is specified, the system uses the `pip install` command to install the dependencies required in Python Function.
  1. Generate the whl file.

    1. $ cd $PULSAR_HOME/pulsar-functions/scripts/python
    2. $ chmod +x generate.sh
    3. $ ./generate.sh <path of your Python Function> <path of the whl output dir> <the version of whl>
    4. # e.g: ./generate.sh /path/to/python /path/to/python/output 1.0.0
  1. The output is written in `/path/to/python/output`:
  2. ```
  3. -rw-r--r-- 1 root staff 1.8K 8 27 14:29 pulsarfunction-1.0.0-py2-none-any.whl
  4. -rw-r--r-- 1 root staff 1.4K 8 27 14:29 pulsarfunction-1.0.0.tar.gz
  5. -rw-r--r-- 1 root staff 0B 8 27 14:29 pulsarfunction.whl
  6. ```

Go

To package a function in Go, complete the following steps.

  1. Write a Go function.

    Currently, Go function can be only implemented using SDK and the interface of the function is exposed in the form of SDK. Before using the Go function, you need to import “github.com/apache/pulsar/pulsar-function-go/pf”.

    1. import (
    2. "context"
    3. "fmt"
    4. "github.com/apache/pulsar/pulsar-function-go/pf"
    5. )
    6. func HandleRequest(ctx context.Context, input []byte) error {
    7. fmt.Println(string(input) + "!")
    8. return nil
    9. }
    10. func main() {
    11. pf.Start(HandleRequest)
    12. }
  1. You can use context to connect to the Go function.
  2. ```
  3. if fc, ok := pf.FromContext(ctx); ok {
  4. fmt.Printf("function ID is:%s, ", fc.GetFuncID())
  5. fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
  6. }
  7. ```
  8. When writing a Go function, remember that
  9. - In `main()`, you **only** need to register the function name to `Start()`. **Only** one function name is received in `Start()`.
  10. - Go function uses Go reflection, which is based on the received function name, to verify whether the parameter list and returned value list are correct. The parameter list and returned value list **must be** one of the following sample functions:
  11. ```
  12. func ()
  13. func () error
  14. func (input) error
  15. func () (output, error)
  16. func (input) (output, error)
  17. func (context.Context) error
  18. func (context.Context, input) error
  19. func (context.Context) (output, error)
  20. func (context.Context, input) (output, error)
  21. ```
  1. Build the Go function.

    1. go build <your Go Function filename>.go
  1. Run the Go Function.

    (1) Copy the Go function file to the Pulsar image.

    1. docker exec -it [CONTAINER ID] /bin/bash
    2. docker cp <your go function path> CONTAINER ID:/pulsar
  1. (2) Run the Go function with the following command.
  2. ```
  3. ./bin/pulsar-admin functions localrun \
  4. --go [your go function path]
  5. --inputs [input topics] \
  6. --output [output topic] \
  7. --tenant [default:public] \
  8. --namespace [default:default] \
  9. --name [custom unique go function name]
  10. ```
  11. The following log indicates that the Go function starts successfully.
  12. ```
  13. ...
  14. 07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
  15. ...
  16. ```

Start Functions in cluster mode

If you want to start a function in cluster mode, replace localrun with create in the commands above. The following log indicates that your function starts successfully.

  1. "Created successfully"

For information about parameters on --classname, --jar, --py, --go, --inputs, run the command ./bin/pulsar-admin functions or see here.