Produce function logs

Produce logs for Java functions

Pulsar Functions that use the Java SDK have access to an SLF4j Logger object. The logger object can be used to produce logs at a specified log level.

For example, the following function logs either a WARNING- or INFO-level log based on whether the incoming string contains the word danger.

  1. import org.apache.pulsar.functions.api.Context;
  2. import org.apache.pulsar.functions.api.Function;
  3. import org.slf4j.Logger;
  4. public class LoggingFunction implements Function<String, Void> {
  5. @Override
  6. public void apply(String input, Context context) {
  7. Logger LOG = context.getLogger();
  8. String messageId = new String(context.getMessageId());
  9. if (input.contains("danger")) {
  10. LOG.warn("A warning was received in message {}", messageId);
  11. } else {
  12. LOG.info("Message {} received\nContent: {}", messageId, input);
  13. }
  14. return null;
  15. }
  16. }

To enable your function to produce logs, you need to specify a log topic when creating or running the function. The following is an example.

  1. bin/pulsar-admin functions create \
  2. --jar $PWD/my-functions.jar \
  3. --classname my.package.LoggingFunction \
  4. --log-topic persistent://public/default/logging-function-logs \
  5. # Other function configs

You can access all the logs produced by LoggingFunction via the persistent://public/default/logging-function-logs topic.

Customize log levels for Java functions

By default, the log level for Java functions is info. If you want to customize the log level of your Java functions, for example, change it to debug, you can update the functions_log4j2.xml file.

Produce function logs - 图1tip

The functions_log4j2.xml file is under your Pulsar configuration directory, for example, /etc/pulsar/ on bare-metal, or /pulsar/conf on Kubernetes.

  1. Set the value of property.

    1. <Property>
    2. <name>pulsar.log.level</name>
    3. <value>debug</value>
    4. </Property>
  2. Apply the log level to places where they are referenced. In the following example, debug applies to all function logs.

    1. <Root>
    2. <level>${sys:pulsar.log.level}</level>
    3. <AppenderRef>
    4. <ref>${sys:pulsar.log.appender}</ref>
    5. <level>${sys:pulsar.log.level}</level>
    6. </AppenderRef>
    7. </Root>

    To be more selective, you can apply different log levels to different classes or modules. For example:

    1. <Logger>
    2. <name>com.example.module</name>
    3. <level>info</level>
    4. <additivity>false</additivity>
    5. <AppenderRef>
    6. <ref>${sys:pulsar.log.appender}</ref>
    7. </AppenderRef>
    8. </Logger>

    To apply a more verbose log level to a class in the module, you can reference the following example:

    1. <Logger>
    2. <name>com.example.module.className</name>
    3. <level>debug</level>
    4. <additivity>false</additivity>
    5. <AppenderRef>
    6. <ref>Console</ref>
    7. </AppenderRef>
    8. </Logger>
    • additivity indicates whether log messages will be duplicated if multiple <Logger> entries overlap. Disabling additivity (false) prevents duplication of log messages when one or more <Logger> entries contain classes or modules that overlap.
    • AppenderRef allows you to output the log to a target specified in the definition of the Appender section. For example:
    1. <Console>
    2. <name>Console</name>
    3. <target>SYSTEM_OUT</target>
    4. <PatternLayout>
    5. <Pattern>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n</Pattern>
    6. </PatternLayout>
    7. </Console>

Produce logs for Python functions

Pulsar Functions that use the Python SDK have access to a logger object. The logger object can be used to produce logs at a specified log level.

For example, the following function logs either a WARNING- or INFO-level log based on whether the incoming string contains the word danger.

  1. from pulsar import Function
  2. class LoggingFunction(Function):
  3. def process(self, input, context):
  4. logger = context.get_logger()
  5. msg_id = context.get_message_id()
  6. if 'danger' in input:
  7. logger.warn("A warning was received in message {0}".format(context.get_message_id()))
  8. else:
  9. logger.info("Message {0} received\nContent: {1}".format(msg_id, input))

To enable your function to produce logs, you need to specify a log topic when creating or running the function. The following is an example.

  1. bin/pulsar-admin functions create \
  2. --py logging_function.py \
  3. --classname logging_function.LoggingFunction \
  4. --log-topic logging-function-logs \
  5. # Other function configs

All logs produced by LoggingFunction can be accessed via the logging-function-logs topic. Additionally, you can specify the function log levels through context.get_logger().setLevel(level). For more information, refer to Log facility for Python .

Produce logs for Go functions

When you use logTopic related functionalities in Go functions, you can import github.com/apache/pulsar/pulsar-function-go/logutil rather than using the getLogger() context object.

The following function shows different log levels based on the function input.

  1. import (
  2. "context"
  3. "github.com/apache/pulsar/pulsar-function-go/pf"
  4. log "github.com/apache/pulsar/pulsar-function-go/logutil"
  5. )
  6. func loggerFunc(ctx context.Context, input []byte) {
  7. if len(input) &lt;= 100 {
  8. log.Infof("This input has a length of: %d", len(input))
  9. } else {
  10. log.Warnf("This input is getting too long! It has {%d} characters", len(input))
  11. }
  12. }
  13. func main() {
  14. pf.Start(loggerFunc)
  15. }