Pulsar Functions CLI and YAML configs

Pulsar admin CLI for Pulsar Functions

The Pulsar admin interface enables you to create and manage Pulsar Functions through CLI. For the latest and complete information, including commands, flags, and descriptions, refer to Pulsar admin CLI.

YAML configurations for Pulsar Functions

You can configure a function by using a predefined YAML file. The following table outlines the required fields and arguments.

Field NameTypeRelated Command ArgumentDescription
runtimeFlagsStringN/AAny flags that you want to pass to a runtime (for process & Kubernetes runtime only).
tenantString—tenantThe tenant of a function.
namespaceString—namespaceThe namespace of a function.
nameString—nameThe name of a function.
classNameString—classnameThe class name of a function.
functionTypeString—function-typeThe built-in function type.
inputsList<String>-i, —inputsThe input topics of a function. Multiple topics can be specified as a comma-separated list.
customSerdeInputsMap<String,String>—custom-serde-inputsThe mapping from input topics to SerDe class names.
topicsPatternString—topics-patternThe topic pattern to consume from a list of topics under a namespace.
Note: —input and —topic-pattern are mutually exclusive. For Java functions, you need to add the SerDe class name for a pattern in —custom-serde-inputs.
customSchemaInputsMap<String,String>—custom-schema-inputsThe mapping from input topics to schema properties.
customSchemaOutputsMap<String,String>—custom-schema-outputsThe mapping from output topics to schema properties.
inputSpecsMap<String,ConsumerConfig>—input-specsThe mapping from inputs to custom configurations.
outputString-o, —outputThe output topic of a function. If none is specified, no output is written.
producerConfigProducerConfig—producer-configThe custom configurations for producers.
outputSchemaTypeString-st, —schema-typeThe built-in schema type or custom schema class name used for message outputs.
outputSerdeClassNameString—output-serde-classnameThe SerDe class used for message outputs.
logTopicString—log-topicThe topic that the logs of a function are produced to.
processingGuaranteesString—processing-guaranteesThe processing guarantees (delivery semantics) applied to a function. Available values: ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE, MANUAL.
retainOrderingBoolean—retain-orderingWhether functions consume and process messages in order or not.
retainKeyOrderingBoolean—retain-key-orderingWhether functions consume and process messages in key order or not.
batchBuilderString—batch-builderUse producerConfig.batchBuilder instead.
Note: batchBuilder will be deprecated in code soon.
forwardSourceMessagePropertyBoolean—forward-source-message-propertyWhether the properties of input messages are forwarded to output topics or not during processing. When the value is set to false, the forwarding is disabled.
userConfigMap<String,Object>—user-configUser-defined config key/values.
secretsMap<String,Object>—secretsThe mapping from secretName to objects that encapsulate how the secret is fetched by the underlying secrets provider.
runtimeStringN/AThe runtime of a function. Available values: java,python, go.
autoAckBoolean—auto-ackWhether the framework acknowledges messages automatically or not.

Note: This configuration will be deprecated in future releases. If you specify a delivery semantic, the framework automatically acknowledges messages. If you do not want the framework to auto-ack messages, set the processingGuarantees to MANUAL.
maxMessageRetriesInt—max-message-retriesThe number of retries to process a message before giving up.
deadLetterTopicString—dead-letter-topicThe topic used for storing messages that are not processed successfully.
subNameString—subs-nameThe name of Pulsar source subscription used for input-topic consumers if required.
parallelismInt—parallelismThe parallelism factor of a function, that is, the number of function instances to run.
resourcesResourcesN/AN/A
fqfnString—fqfnThe Fully Qualified Function Name (FQFN) of a function.
windowConfigWindowConfigN/AN/A
timeoutMsLong—timeout-msThe message timeout (in milliseconds).
jarString—jarThe absolute path of the JAR file for a function (written in Java). It also supports URL paths that workers can download the package from, including HTTP, HTTPS, file (file protocol assuming that file already exists on worker host), and function (package URL from packages management service).
pyString—pyThe absolute path of the main Python/Python wheel file for a function (written in Python). It also supports URL paths that workers can download the package from, including HTTP, HTTPS, file (file protocol assuming that file already exists on worker host), and function (package URL from packages management service).
goString—goThe absolute path of the main Go executable binary for the function (written in Go). It also supports URL paths that workers can download the package from, including HTTP, HTTPS, file (file protocol assuming that file already exists on worker host), and function (package URL from packages management service).
cleanupSubscriptionBoolean—cleanup-subscriptionWhether the subscriptions that a function creates or uses should be deleted or not when the function is deleted. The default value is true
customRuntimeOptionsString—custom-runtime-optionsA string that encodes options to customize the runtime.
maxPendingAsyncRequestsInt—max-message-retriesThe max number of pending async requests per instance to avoid a large number of concurrent requests.
exposePulsarAdminClientEnabledBooleanN/AWhether the Pulsar admin client is exposed to function context or not. By default, it is disabled.
subscriptionPositionString—subs-positionThe position of Pulsar source subscription used for consuming messages from a specified location. The default value is Latest.
skipToLatestBoolean—skip-to-latestWhether the consumer should skip to the latest message once the function instance restarts.
ConsumerConfig

The following table outlines the nested fields and related arguments under the inputSpecs field.

Field NameTypeRelated Command ArgumentDescription
schemaTypeStringN/AN/A
serdeClassNameStringN/AN/A
isRegexPatternBooleanN/AN/A
schemaPropertiesMap<String,String>N/AN/A
consumerPropertiesMap<String,String>N/AN/A
receiverQueueSizeIntN/AN/A
cryptoConfigCryptoConfigN/ARefer to code.
poolMessagesBooleanN/AN/A
ProducerConfig

The following table outlines the nested fields and related arguments under the producerConfig field.

Field NameTypeRelated Command ArgumentDescription
maxPendingMessagesIntN/AThe max size of a queue that holds messages pending to receive an acknowledgment from a broker.
maxPendingMessagesAcrossPartitionsIntN/AThe number of maxPendingMessages across all partitions.
useThreadLocalProducersBooleanN/AN/A
cryptoConfigCryptoConfigN/ARefer to code.
batchBuilderString—batch-builderThe type of batch construction method. Available values: DEFAULT and KEY_BASED. The default value is DEFAULT.
compressionTypeStringN/AMessage data compression type used by a producer. The default value is LZ4.
Available options:
  • NONE (no compression)
  • ZLIB
  • ZSTD
  • SNAPPY
  • Resources

    The following table outlines the nested fields and related arguments under the resources field.

    Field NameTypeRelated Command ArgumentDescription
    cpudouble—cpuThe CPU in cores that need to be allocated per function instance (for Kubernetes runtime only).
    ramLong—ramThe RAM in bytes that need to be allocated per function instance (for process/Kubernetes runtime only).
    diskLong—diskThe disk in bytes that need to be allocated per function instance (for Kubernetes runtime only).
    WindowConfig

    The following table outlines the nested fields and related arguments under the windowConfig field.

    Field NameTypeRelated Command ArgumentDescription
    windowLengthCountInt—window-length-countThe number of messages per window.
    windowLengthDurationMsLong—window-length-duration-msThe time duration (in milliseconds) per window.
    slidingIntervalCountInt—sliding-interval-countThe number of messages after which a window slides.
    slidingIntervalDurationMsLong—sliding-interval-duration-msThe time duration after which a window slides.
    lateDataTopicStringN/AN/A
    maxLagMsLongN/AN/A
    watermarkEmitIntervalMsLongN/AN/A
    timestampExtractorClassNameStringN/AN/A
    actualWindowFunctionClassNameStringN/AN/A
    CryptoConfig

    The following table outlines the nested fields and related arguments under the cryptoConfig field.

    Field NameTypeRelated Command ArgumentDescription
    cryptoKeyReaderClassNameStringN/ARefer to code.
    cryptoKeyReaderConfigMap<String,Object>N/AN/A
    encryptionKeysString[]N/AN/A
    producerCryptoFailureActionProducerCryptoFailureActionN/AN/A
    consumerCryptoFailureActionConsumerCryptoFailureActionN/AN/A

    Example

    The following example shows how to configure a function using YAML or JSON.

    • YAML
    • JSON
    1. tenant: "public"
    2. namespace: "default"
    3. name: "config-file-function"
    4. inputs:
    5. - "persistent://public/default/config-file-function-input-1"
    6. - "persistent://public/default/config-file-function-input-2"
    7. output: "persistent://public/default/config-file-function-output"
    8. jar: "function.jar"
    9. parallelism: 1
    10. resources:
    11. cpu: 8
    12. ram: 8589934592
    13. autoAck: true
    14. userConfig:
    15. foo: "bar"
    1. {
    2. "tenant": "public",
    3. "namespace": "default",
    4. "name": "config-file-function",
    5. "inputs": [
    6. "persistent://public/default/config-file-function-input-1",
    7. "persistent://public/default/config-file-function-input-2"
    8. ],
    9. "output": "persistent://public/default/config-file-function-output",
    10. "jar": "function.jar",
    11. "parallelism": 1,
    12. "resources": {
    13. "cpu": 8,
    14. "ram": 8589934592
    15. },
    16. "autoAck": true,
    17. "userConfig": {
    18. "foo": "bar"
    19. }
    20. }