How to connect Pulsar to database

This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code.

It is helpful to review the concepts for Pulsar I/O with running the steps in this guide to gain a deeper understanding.

At the end of this tutorial, you are able to:

tip
  • These instructions assume you are running Pulsar in standalone mode. However, all the commands used in this tutorial can be used in a multi-node Pulsar cluster without any changes.
  • All the instructions are assumed to run at the root directory of a Pulsar binary distribution.

Install Pulsar and built-in connector

Before connecting Pulsar to a database, you need to install Pulsar and the desired built-in connector.

For more information about how to install a standalone Pulsar and built-in connectors, see here.

Start Pulsar standalone

  1. Start Pulsar locally.

    1. bin/pulsar standalone
  1. All the components of a Pulsar service are started in order.
  2. You can curl those pulsar service endpoints to make sure Pulsar service is up and running correctly.
  1. Check Pulsar binary protocol port.

    1. telnet localhost 6650
  1. Check Pulsar Function cluster.

    1. curl -s http://localhost:8080/admin/v2/worker/cluster
  1. **Example output**
  2. ```
  3. [{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
  4. ```
  1. Make sure a public tenant and a default namespace exist.

    1. curl -s http://localhost:8080/admin/v2/namespaces/public
  1. **Example output**
  2. ```
  3. ["public/default","public/functions"]
  4. ```
  1. All built-in connectors should be listed as available.

    1. curl -s http://localhost:8080/admin/v2/functions/connectors
  1. **Example output**
  2. ```
  3. [{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest data from Twitter firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
  4. ```
  5. If an error occurs when starting Pulsar service, you may see an exception at the terminal running `pulsar/standalone`, or you can navigate to the `logs` directory under the Pulsar directory to view the logs.

Connect Pulsar to Cassandra

This section demonstrates how to connect Pulsar to Cassandra.

tip
  • Make sure you have Docker installed. If you do not have one, see install Docker.
  • The Cassandra sink connector reads messages from Pulsar topics and writes the messages into Cassandra tables. For more information, see Cassandra sink connector.

Setup a Cassandra cluster

This example uses cassandra Docker image to start a single-node Cassandra cluster in Docker.

  1. Start a Cassandra cluster.

    1. docker run -d --rm --name=cassandra -p 9042:9042 cassandra
  1. ##### note
  2. Before moving to the next steps, make sure the Cassandra cluster is running.
  1. Make sure the Docker process is running.

    1. docker ps
  1. Check the Cassandra logs to make sure the Cassandra process is running as expected.

    1. docker logs cassandra
  1. Check the status of the Cassandra cluster.

    1. docker exec cassandra nodetool status
  1. **Example output**
  2. ```
  3. Datacenter: datacenter1
  4. =======================
  5. Status=Up/Down
  6. |/ State=Normal/Leaving/Joining/Moving
  7. -- Address Load Tokens Owns (effective) Host ID Rack
  8. UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
  9. ```
  1. Use cqlsh to connect to the Cassandra cluster.

    1. $ docker exec -ti cassandra cqlsh localhost
    2. Connected to Test Cluster at localhost:9042.
    3. [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
    4. Use HELP for help.
    5. cqlsh>
  1. Create a keyspace pulsar_test_keyspace.

    1. cqlsh> CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
  1. Create a table pulsar_test_table.

    1. cqlsh> USE pulsar_test_keyspace;
    2. cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);

Configure a Cassandra sink

Now that we have a Cassandra cluster running locally.

In this section, you need to configure a Cassandra sink connector.

To run a Cassandra sink connector, you need to prepare a configuration file including the information that Pulsar connector runtime needs to know.

For example, how Pulsar connector can find the Cassandra cluster, what is the keyspace and the table that Pulsar connector uses for writing Pulsar messages to, and so on.

You can create a configuration file through one of the following methods.

  • JSON

    1. {
    2. "roots": "localhost:9042",
    3. "keyspace": "pulsar_test_keyspace",
    4. "columnFamily": "pulsar_test_table",
    5. "keyname": "key",
    6. "columnName": "col"
    7. }
  • YAML

    1. configs:
    2. roots: "localhost:9042"
    3. keyspace: "pulsar_test_keyspace"
    4. columnFamily: "pulsar_test_table"
    5. keyname: "key"
    6. columnName: "col"

For more information, see Cassandra sink connector.

Create a Cassandra sink

You can use the Connector Admin CLI to create a sink connector and perform other operations on them.

Run the following command to create a Cassandra sink connector with sink type cassandra and the config file examples/cassandra-sink.yml created previously.

Note

The sink-type parameter of the currently built-in connectors is determined by the setting of the name parameter specified in the pulsar-io.yaml file.

  1. bin/pulsar-admin sinks create \
  2. --tenant public \
  3. --namespace default \
  4. --name cassandra-test-sink \
  5. --sink-type cassandra \
  6. --sink-config-file examples/cassandra-sink.yml \
  7. --inputs test_cassandra

Once the command is executed, Pulsar creates the sink connector cassandra-test-sink.

This sink connector runs as a Pulsar Function and writes the messages produced in the topic test_cassandra to the Cassandra table pulsar_test_table.

Inspect a Cassandra sink

You can use the Connector Admin CLI to monitor a connector and perform other operations on it.

  • Get the information of a Cassandra sink.

    1. bin/pulsar-admin sinks get \
    2. --tenant public \
    3. --namespace default \
    4. --name cassandra-test-sink
  1. **Example output**
  2. ```
  3. {
  4. "tenant": "public",
  5. "namespace": "default",
  6. "name": "cassandra-test-sink",
  7. "className": "org.apache.pulsar.io.cassandra.CassandraStringSink",
  8. "inputSpecs": {
  9. "test_cassandra": {
  10. "isRegexPattern": false
  11. }
  12. },
  13. "configs": {
  14. "roots": "localhost:9042",
  15. "keyspace": "pulsar_test_keyspace",
  16. "columnFamily": "pulsar_test_table",
  17. "keyname": "key",
  18. "columnName": "col"
  19. },
  20. "parallelism": 1,
  21. "processingGuarantees": "ATLEAST_ONCE",
  22. "retainOrdering": false,
  23. "autoAck": true,
  24. "archive": "builtin://cassandra"
  25. }
  26. ```
  • Check the status of a Cassandra sink.

    1. bin/pulsar-admin sinks status \
    2. --tenant public \
    3. --namespace default \
    4. --name cassandra-test-sink
  1. **Example output**
  2. ```
  3. {
  4. "numInstances" : 1,
  5. "numRunning" : 1,
  6. "instances" : [ {
  7. "instanceId" : 0,
  8. "status" : {
  9. "running" : true,
  10. "error" : "",
  11. "numRestarts" : 0,
  12. "numReadFromPulsar" : 0,
  13. "numSystemExceptions" : 0,
  14. "latestSystemExceptions" : [ ],
  15. "numSinkExceptions" : 0,
  16. "latestSinkExceptions" : [ ],
  17. "numWrittenToSink" : 0,
  18. "lastReceivedTime" : 0,
  19. "workerId" : "c-standalone-fw-localhost-8080"
  20. }
  21. } ]
  22. }
  23. ```

Verify a Cassandra sink

  1. Produce some messages to the input topic of the Cassandra sink test_cassandra.

    1. for i in {0..9}; do bin/pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
  1. Inspect the status of the Cassandra sink test_cassandra.

    1. bin/pulsar-admin sinks status \
    2. --tenant public \
    3. --namespace default \
    4. --name cassandra-test-sink
  1. You can see 10 messages are processed by the Cassandra sink *test\_cassandra*.
  2. **Example output**
  3. ```
  4. {
  5. "numInstances" : 1,
  6. "numRunning" : 1,
  7. "instances" : [ {
  8. "instanceId" : 0,
  9. "status" : {
  10. "running" : true,
  11. "error" : "",
  12. "numRestarts" : 0,
  13. "numReadFromPulsar" : 10,
  14. "numSystemExceptions" : 0,
  15. "latestSystemExceptions" : [ ],
  16. "numSinkExceptions" : 0,
  17. "latestSinkExceptions" : [ ],
  18. "numWrittenToSink" : 10,
  19. "lastReceivedTime" : 1551685489136,
  20. "workerId" : "c-standalone-fw-localhost-8080"
  21. }
  22. } ]
  23. }
  24. ```
  1. Use cqlsh to connect to the Cassandra cluster.

    1. docker exec -ti cassandra cqlsh localhost
  1. Check the data of the Cassandra table pulsar_test_table.

    1. cqlsh> use pulsar_test_keyspace;
    2. cqlsh:pulsar_test_keyspace> select * from pulsar_test_table;
    3. key | col
    4. --------+--------
    5. key-5 | key-5
    6. key-0 | key-0
    7. key-9 | key-9
    8. key-2 | key-2
    9. key-1 | key-1
    10. key-3 | key-3
    11. key-6 | key-6
    12. key-7 | key-7
    13. key-4 | key-4
    14. key-8 | key-8

Delete a Cassandra Sink

You can use the Connector Admin CLI to delete a connector and perform other operations on it.

  1. bin/pulsar-admin sinks delete \
  2. --tenant public \
  3. --namespace default \
  4. --name cassandra-test-sink

Connect Pulsar to PostgreSQL

This section demonstrates how to connect Pulsar to PostgreSQL.

tip
  • Make sure you have Docker installed. If you do not have one, see install Docker.
  • The JDBC sink connector pulls messages from Pulsar topics

and persists the messages to ClickHouse, MariaDB, PostgreSQL, or SQlite.

For more information, see JDBC sink connector.

Setup a PostgreSQL cluster

This example uses the PostgreSQL 12 docker image to start a single-node PostgreSQL cluster in Docker.

  1. Pull the PostgreSQL 12 image from Docker.

    1. $ docker pull postgres:12
  1. Start PostgreSQL.

    1. $ docker run -d -it --rm \
    2. --name pulsar-postgres \
    3. -p 5432:5432 \
    4. -e POSTGRES_PASSWORD=password \
    5. -e POSTGRES_USER=postgres \
    6. postgres:12
  1. #### Tip
  2. <table><thead><tr><th>Flag</th><th>Description</th><th>This example</th></tr></thead><tbody><tr><td><code>-d</code></td><td>To start a container in detached mode.</td><td>/</td></tr><tr><td><code>-it</code></td><td>Keep STDIN open even if not attached and allocate a terminal.</td><td>/</td></tr><tr><td><code>--rm</code></td><td>Remove the container automatically when it exits.</td><td>/</td></tr><tr><td><code>-name</code></td><td>Assign a name to the container.</td><td>This example specifies <em>pulsar-postgres</em> for the container.</td></tr><tr><td><code>-p</code></td><td>Publish the port of the container to the host.</td><td>This example publishes the port <em>5432</em> of the container to the host.</td></tr><tr><td><code>-e</code></td><td>Set environment variables.</td><td>This example sets the following variables:<br>- The password for the user is <em>password</em>.<br>- The name for the user is <em>postgres</em>.</td></tr></tbody></table>
  3. :::tip
  4. For more information about Docker commands, see [Docker CLI](https://docs.docker.com/engine/reference/commandline/run/).
  5. :::
  1. Check if PostgreSQL has been started successfully.

    1. $ docker logs -f pulsar-postgres
  1. PostgreSQL has been started successfully if the following message appears.
  2. ```
  3. 2020-05-11 20:09:24.492 UTC [1] LOG: starting PostgreSQL 12.2 (Debian 12.2-2.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
  4. 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
  5. 2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv6 address "::", port 5432
  6. 2020-05-11 20:09:24.499 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
  7. 2020-05-11 20:09:24.523 UTC [55] LOG: database system was shut down at 2020-05-11 20:09:24 UTC
  8. 2020-05-11 20:09:24.533 UTC [1] LOG: database system is ready to accept connections
  9. ```
  1. Access to PostgreSQL.

    1. $ docker exec -it pulsar-postgres /bin/bash
  1. Create a PostgreSQL table pulsar_postgres_jdbc_sink.

    1. $ psql -U postgres postgres
    2. postgres=# create table if not exists pulsar_postgres_jdbc_sink
    3. (
    4. id serial PRIMARY KEY,
    5. name VARCHAR(255) NOT NULL
    6. );

Configure a JDBC sink

Now we have a PostgreSQL running locally.

In this section, you need to configure a JDBC sink connector.

  1. Add a configuration file.

    To run a JDBC sink connector, you need to prepare a YAML configuration file including the information that Pulsar connector runtime needs to know.

    For example, how Pulsar connector can find the PostgreSQL cluster, what is the JDBC URL and the table that Pulsar connector uses for writing messages.

    Create a pulsar-postgres-jdbc-sink.yaml file, copy the following contents to this file, and place the file in the pulsar/connectors folder.

    1. configs:
    2. userName: "postgres"
    3. password: "password"
    4. jdbcUrl: "jdbc:postgresql://localhost:5432/postgres"
    5. tableName: "pulsar_postgres_jdbc_sink"
  1. Create a schema.

    Create a avro-schema file, copy the following contents to this file, and place the file in the pulsar/connectors folder.

    1. {
    2. "type": "AVRO",
    3. "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
    4. "properties": {}
    5. }
  1. ##### tip
  2. For more information about AVRO, see [Apache Avro](https://avro.apache.org/docs/1.9.1/).
  1. Upload a schema to a topic.

    This example uploads the avro-schema schema to the pulsar-postgres-jdbc-sink-topic topic.

    1. $ bin/pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
  1. Check if the schema has been uploaded successfully.

    1. $ bin/pulsar-admin schemas get pulsar-postgres-jdbc-sink-topic
  1. The schema has been uploaded successfully if the following message appears.
  2. ```
  3. {"name":"pulsar-postgres-jdbc-sink-topic","schema":"{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"AVRO","properties":{}}
  4. ```

Create a JDBC sink

You can use the Connector Admin CLI to create a sink connector and perform other operations on it.

This example creates a sink connector and specifies the desired information.

  1. $ bin/pulsar-admin sinks create \
  2. --archive ./connectors/pulsar-io-jdbc-postgres-2.10.0.nar \
  3. --inputs pulsar-postgres-jdbc-sink-topic \
  4. --name pulsar-postgres-jdbc-sink \
  5. --sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \
  6. --parallelism 1

Once the command is executed, Pulsar creates a sink connector pulsar-postgres-jdbc-sink.

This sink connector runs as a Pulsar Function and writes the messages produced in the topic pulsar-postgres-jdbc-sink-topic to the PostgreSQL table pulsar_postgres_jdbc_sink.

Tip

FlagDescriptionThis example
—archiveThe path to the archive file for the sink.pulsar-io-jdbc-postgres-2.10.0.nar
—inputsThe input topic(s) of the sink.

Multiple topics can be specified as a comma-separated list.
—nameThe name of the sink.pulsar-postgres-jdbc-sink
—sink-config-fileThe path to a YAML config file specifying the configuration of the sink.pulsar-postgres-jdbc-sink.yaml
—parallelismThe parallelism factor of the sink.

For example, the number of sink instances to run.
1
tip

For more information about pulsar-admin sinks create options, see Pulsar admin docs.

The sink has been created successfully if the following message appears.

  1. Created successfully

Inspect a JDBC sink

You can use the Connector Admin CLI to monitor a connector and perform other operations on it.

  • List all running JDBC sink(s).

    1. $ bin/pulsar-admin sinks list \
    2. --tenant public \
    3. --namespace default
  1. ##### tip
  2. For more information about `pulsar-admin sinks list options`, see [Pulsar admin docs](https://pulsar.apache.org/tools/pulsar-admin/).
  3. The result shows that only the *postgres-jdbc-sink* sink is running.
  4. ```
  5. [
  6. "pulsar-postgres-jdbc-sink"
  7. ]
  8. ```
  • Get the information of a JDBC sink.

    1. $ bin/pulsar-admin sinks get \
    2. --tenant public \
    3. --namespace default \
    4. --name pulsar-postgres-jdbc-sink
  1. ##### tip
  2. For more information about `pulsar-admin sinks get options`, see [Pulsar admin docs](https://pulsar.apache.org/tools/pulsar-admin/).
  3. The result shows the information of the sink connector, including tenant, namespace, topic and so on.
  4. ```
  5. {
  6. "tenant": "public",
  7. "namespace": "default",
  8. "name": "pulsar-postgres-jdbc-sink",
  9. "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
  10. "inputSpecs": {
  11. "pulsar-postgres-jdbc-sink-topic": {
  12. "isRegexPattern": false
  13. }
  14. },
  15. "configs": {
  16. "password": "password",
  17. "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
  18. "userName": "postgres",
  19. "tableName": "pulsar_postgres_jdbc_sink"
  20. },
  21. "parallelism": 1,
  22. "processingGuarantees": "ATLEAST_ONCE",
  23. "retainOrdering": false,
  24. "autoAck": true
  25. }
  26. ```
  • Get the status of a JDBC sink

    1. $ bin/pulsar-admin sinks status \
    2. --tenant public \
    3. --namespace default \
    4. --name pulsar-postgres-jdbc-sink
  1. ##### tip
  2. For more information about `pulsar-admin sinks status options`, see [Pulsar admin docs](https://pulsar.apache.org/tools/pulsar-admin/).
  3. The result shows the current status of sink connector, including the number of instances, running status, worker ID and so on.
  4. ```
  5. {
  6. "numInstances" : 1,
  7. "numRunning" : 1,
  8. "instances" : [ {
  9. "instanceId" : 0,
  10. "status" : {
  11. "running" : true,
  12. "error" : "",
  13. "numRestarts" : 0,
  14. "numReadFromPulsar" : 0,
  15. "numSystemExceptions" : 0,
  16. "latestSystemExceptions" : [ ],
  17. "numSinkExceptions" : 0,
  18. "latestSinkExceptions" : [ ],
  19. "numWrittenToSink" : 0,
  20. "lastReceivedTime" : 0,
  21. "workerId" : "c-standalone-fw-192.168.2.52-8080"
  22. }
  23. } ]
  24. }
  25. ```

Stop a JDBC sink

You can use the Connector Admin CLI to stop a connector and perform other operations on it.

  1. $ bin/pulsar-admin sinks stop \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink
tip

For more information about pulsar-admin sinks stop options, see Pulsar admin docs.

The sink instance has been stopped successfully if the following message disappears.

  1. Stopped successfully

Restart a JDBC sink

You can use the Connector Admin CLI to restart a connector and perform other operations on it.

  1. $ bin/pulsar-admin sinks restart \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink
tip

For more information about pulsar-admin sinks restart options, see Pulsar admin docs.

The sink instance has been started successfully if the following message disappears.

  1. Started successfully
tip
  • Optionally, you can run a standalone sink connector using pulsar-admin sinks localrun options. Note that pulsar-admin sinks localrun options runs a sink connector locally, while pulsar-admin sinks start options starts a sink connector in a cluster.
  • For more information about pulsar-admin sinks localrun options, see Pulsar admin docs.

Update a JDBC sink

You can use the Connector Admin CLI to update a connector and perform other operations on it.

This example updates the parallelism of the pulsar-postgres-jdbc-sink sink connector to 2.

  1. $ bin/pulsar-admin sinks update \
  2. --name pulsar-postgres-jdbc-sink \
  3. --parallelism 2
tip

For more information about pulsar-admin sinks update options, see Pulsar admin docs.

The sink connector has been updated successfully if the following message disappears.

  1. Updated successfully

This example double-checks the information.

  1. $ bin/pulsar-admin sinks get \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

The result shows that the parallelism is 2.

  1. {
  2. "tenant": "public",
  3. "namespace": "default",
  4. "name": "pulsar-postgres-jdbc-sink",
  5. "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink",
  6. "inputSpecs": {
  7. "pulsar-postgres-jdbc-sink-topic": {
  8. "isRegexPattern": false
  9. }
  10. },
  11. "configs": {
  12. "password": "password",
  13. "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink",
  14. "userName": "postgres",
  15. "tableName": "pulsar_postgres_jdbc_sink"
  16. },
  17. "parallelism": 2,
  18. "processingGuarantees": "ATLEAST_ONCE",
  19. "retainOrdering": false,
  20. "autoAck": true
  21. }

Delete a JDBC sink

You can use the Connector Admin CLI to delete a connector and perform other operations on it.

This example deletes the pulsar-postgres-jdbc-sink sink connector.

  1. $ bin/pulsar-admin sinks delete \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink
tip

For more information about pulsar-admin sinks delete options, see Pulsar admin docs.

The sink connector has been deleted successfully if the following message appears.

  1. Deleted successfully

This example double-checks the status of the sink connector.

  1. $ bin/pulsar-admin sinks get \
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-postgres-jdbc-sink

The result shows that the sink connector does not exist.

  1. HTTP 404 Not Found
  2. Reason: Sink pulsar-postgres-jdbc-sink doesn't exist