This section describes how to manage Pulsar IO connectors in a Pulsar cluster. You will learn how to:

  • Deploy builtin connectors
  • Monitor and update running connectors with Pulsar Admin CLI
  • Deploy customized connectors
  • Upgrade a connector

Using Builtin Connectors

Pulsar bundles several builtin connectors that should be used for moving data in and out of commonly used systems such as databases, messaging systems. Getting set up to use these builtin connectors is simple. You can follow the instructions on installing builtin connectors. After setup, all the builtin connectors will be automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.

Configuring Connectors

Configuring Pulsar IO connectors is straightforward. What you need to do is to provide a yaml configuration file when your run connectors. The yaml configuration file basically tells Pulsar where to locate the sources and sinks and how to connect those sources and sinks with Pulsar topics.

Below is an example yaml configuration file for Cassandra Sink:

  1. tenant: public
  2. namespace: default
  3. name: cassandra-test-sink
  4. ...
  5. # cassandra specific config
  6. configs:
  7. roots: "localhost:9042"
  8. keyspace: "pulsar_test_keyspace"
  9. columnFamily: "pulsar_test_table"
  10. keyname: "key"
  11. columnName: "col"

The example yaml basically tells Pulsar which Cassandra cluster to connect, what is the keyspace and columnFamily to be used in Cassandra for collecting data, and how to map a Pulsar message into Cassandra table key and columns.

For details, consult the documentation for individual connectors.

Running Connectors

Pulsar connectors can be managed using the source and sink commands of the pulsar-admin CLI tool.

Running sources

You can submit a source to be run in an existing Pulsar cluster using a command of this form:

  1. $ ./bin/pulsar-admin source create --classname <classname> --archive <jar-location> --tenant <tenant> --namespace <namespace> --name <source-name> --destination-topic-name <output-topic>

Here’s an example command:

  1. bin/pulsar-admin source create --classname org.apache.pulsar.io.twitter.TwitterFireHose --archive ~/application.jar --tenant test --namespace ns1 --name twitter-source --destination-topic-name twitter_data

Instead of submitting a source to run on an existing Pulsar cluster, you alternatively can run a source as a process on your local machine:

  1. bin/pulsar-admin source localrun --classname org.apache.pulsar.io.twitter.TwitterFireHose --archive ~/application.jar --tenant test --namespace ns1 --name twitter-source --destination-topic-name twitter_data

If you are submitting a built-in source, you don’t need to specify --classname and --archive. You can simply specify the source type --source-type. The command to submit a built-in source is in following form:

  1. ./bin/pulsar-admin source create \
  2. --tenant <tenant> \
  3. --namespace <namespace> \
  4. --name <source-name> \
  5. --destination-topic-name <input-topics> \
  6. --source-type <source-type>

Here’s an example to submit a Kafka source:

  1. ./bin/pulsar-admin source create \
  2. --tenant test-tenant \
  3. --namespace test-namespace \
  4. --name test-kafka-source \
  5. --destination-topic-name pulsar_sink_topic \
  6. --source-type kafka

Running Sinks

You can submit a sink to be run in an existing Pulsar cluster using a command of this form:

  1. ./bin/pulsar-admin sink create --classname <classname> --archive <jar-location> --tenant test --namespace <namespace> --name <sink-name> --inputs <input-topics>

Here’s an example command:

  1. ./bin/pulsar-admin sink create --classname org.apache.pulsar.io.cassandra --archive ~/application.jar --tenant test --namespace ns1 --name cassandra-sink --inputs test_topic

Instead of submitting a sink to run on an existing Pulsar cluster, you alternatively can run a sink as a process on your local machine:

  1. ./bin/pulsar-admin sink localrun --classname org.apache.pulsar.io.cassandra --archive ~/application.jar --tenant test --namespace ns1 --name cassandra-sink --inputs test_topic

If you are submitting a built-in sink, you don’t need to specify --classname and --archive. You can simply specify the sink type --sink-type. The command to submit a built-in sink is in following form:

Note

The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter specified in the pulsar-io.yaml file.

  1. ./bin/pulsar-admin sink create \
  2. --tenant <tenant> \
  3. --namespace <namespace> \
  4. --name <sink-name> \
  5. --inputs <input-topics> \
  6. --sink-type <sink-type>

Here’s an example to submit a Cassandra sink:

  1. ./bin/pulsar-admin sink create \
  2. --tenant test-tenant \
  3. --namespace test-namespace \
  4. --name test-cassandra-sink \
  5. --inputs pulsar_input_topic \
  6. --sink-type cassandra

Monitoring Connectors

Since Pulsar IO connectors are running as Pulsar Functions, so you can use functions commands available in the pulsar-admin CLI tool.

Retrieve Connector Metadata

  1. bin/pulsar-admin functions get \
  2. --tenant <tenant> \
  3. --namespace <namespace> \
  4. --name <connector-name>

Retrieve Connector Running Status

  1. bin/pulsar-admin functions getstatus \
  2. --tenant <tenant> \
  3. --namespace <namespace> \
  4. --name <connector-name>