Call Pulsar admin APIs

Pulsar Functions that use the Java SDK have access to the Pulsar admin client, which allows the Pulsar admin client to manage API calls to your Pulsar clusters.

Below is an example of how to use the Pulsar admin client exposed from the function context.

  1. import org.apache.pulsar.client.admin.PulsarAdmin;
  2. import org.apache.pulsar.functions.api.Context;
  3. import org.apache.pulsar.functions.api.Function;
  4. /**
  5. * In this particular example, for every input message,
  6. * the function resets the cursor of the current function's subscription to a
  7. * specified timestamp.
  8. */
  9. public class CursorManagementFunction implements Function<String, String> {
  10. @Override
  11. public String process(String input, Context context) throws Exception {
  12. PulsarAdmin adminClient = context.getPulsarAdmin();
  13. if (adminClient != null) {
  14. String topic = context.getCurrentRecord().getTopicName().isPresent() ?
  15. context.getCurrentRecord().getTopicName().get() : null;
  16. String subName = context.getTenant() + "/" + context.getNamespace() + "/" + context.getFunctionName();
  17. if (topic != null) {
  18. // 1578188166 below is a random-pick timestamp
  19. adminClient.topics().resetCursor(topic, subName, 1578188166);
  20. return "reset cursor successfully";
  21. }
  22. }
  23. return null;
  24. }
  25. }

To enable your function to get access to the Pulsar admin client, you need to set exposeAdminClientEnabled=true in the conf/functions_worker.yml file. To test whether it is enabled or not, you can use the command pulsar-admin functions localrun with the flag --web-service-url as follows.

  1. bin/pulsar-admin functions localrun \
  2. --jar $PWD/my-functions.jar \
  3. --classname my.package.CursorManagementFunction \
  4. --web-service-url http://pulsar-web-service:8080 \
  5. # Other function configs