Pulsar SQL configuration and deployment

You can configure Presto Pulsar connector and deploy a cluster with the following instruction.

Configure Presto Pulsar Connector

You can configure Presto Pulsar Connector in the ${project.root}/conf/presto/catalog/pulsar.properties properties file. The configuration for the connector and the default values are as follows.

  1. # name of the connector to be displayed in the catalog
  2. connector.name=pulsar
  3. # the url of Pulsar broker service
  4. pulsar.web-service-url=http://localhost:8080
  5. # URI of Zookeeper cluster
  6. pulsar.zookeeper-uri=localhost:2181
  7. # minimum number of entries to read at a single time
  8. pulsar.entry-read-batch-size=100
  9. # default number of splits to use per query
  10. pulsar.target-num-splits=4
  11. # max size of one batch message (default value is 5MB)
  12. pulsar.max-message-size=5242880
  13. # number of split used when querying data from pulsar
  14. pulsar.target-num-splits=2
  15. # size of queue to buffer entry read from pulsar
  16. pulsar.max-split-entry-queue-size=1000
  17. # size of queue to buffer message extract from entries
  18. pulsar.max-split-message-queue-size=10000
  19. # status provider to record connector metrics
  20. pulsar.stats-provider=org.apache.bookkeeper.stats.NullStatsProvider
  21. # config in map format for stats provider e.g. {"key1":"val1","key2":"val2"}
  22. pulsar.stats-provider-configs={}
  23. # whether to rewrite Pulsar's default topic delimiter '/'
  24. pulsar.namespace-delimiter-rewrite-enable=false
  25. # delimiter used to rewrite Pulsar's default delimiter '/', use if default is causing incompatibility with other system like Superset
  26. pulsar.rewrite-namespace-delimiter=“/”
  27. # maximum number of thread pool size for ledger offloader.
  28. pulsar.managed-ledger-offload-max-threads=2
  29. # driver used to offload or read cold data to or from long-term storage
  30. pulsar.managed-ledger-offload-driver=null
  31. # directory to load offloaders nar file.
  32. pulsar.offloaders-directory="./offloaders"
  33. # properties and configurations related to specific offloader implementation as map e.g. {"key1":"val1","key2":"val2"}
  34. pulsar.offloader-properties={}
  35. # authentication plugin used to authenticate to Pulsar cluster
  36. pulsar.auth-plugin=null
  37. # authentication parameter used to authenticate to the Pulsar cluster as a string e.g. "key1:val1,key2:val2".
  38. pulsar.auth-params=null
  39. # whether the Pulsar client accept an untrusted TLS certificate from broker
  40. pulsar.tls-allow-insecure-connection=null
  41. # whether to allow hostname verification when a client connects to broker over TLS.
  42. pulsar.tls-hostname-verification-enable=null
  43. # path for the trusted TLS certificate file of Pulsar broker
  44. pulsar.tls-trust-cert-file-path=null
  45. # set the threshold for BookKeeper request throttle, default is disabled
  46. pulsar.bookkeeper-throttle-value=0
  47. # set the number of IO thread
  48. pulsar.bookkeeper-num-io-threads=2 * Runtime.getRuntime().availableProcessors()
  49. # set the number of worker thread
  50. pulsar.bookkeeper-num-worker-threads=Runtime.getRuntime().availableProcessors()
  51. # whether to use BookKeeper V2 wire protocol
  52. pulsar.bookkeeper-use-v2-protocol=true
  53. # interval to check the need for sending an explicit LAC, default is disabled
  54. pulsar.bookkeeper-explicit-interval=0
  55. # size for managed ledger entry cache (in MB).
  56. pulsar.managed-ledger-cache-size-MB=0
  57. # number of threads to be used for managed ledger tasks dispatching
  58. pulsar.managed-ledger-num-worker-threads=Runtime.getRuntime().availableProcessors()
  59. # number of threads to be used for managed ledger scheduled tasks
  60. pulsar.managed-ledger-num-scheduler-threads=Runtime.getRuntime().availableProcessors()
  61. # directory used to store extraction NAR file
  62. pulsar.nar-extraction-directory=System.getProperty("java.io.tmpdir")

You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to pulsar.web-service-url. To configure multiple hosts for ZooKeeper, add multiple URIs to pulsar.zookeeper-uri. The following is an example.

  1. pulsar.web-service-url=http://localhost:8080,localhost:8081,localhost:8082
  2. pulsar.zookeeper-uri=localhost1,localhost2:2181

Note: by default, Pulsar SQL does not get the last message in a topic. It is by design and controlled by settings. By default, BookKeeper LAC only advances when subsequent entries are added. If there is no subsequent entry added, the last written entry is not visible to readers until the ledger is closed. This is not a problem for Pulsar which uses managed ledger, but Pulsar SQL directly reads from BookKeeper ledger.

If you want to get the last message in a topic, set the following configurations:

  1. For the broker configuration, set bookkeeperExplicitLacIntervalInMills > 0 in broker.conf or standalone.conf.
  2. For the Presto configuration, set pulsar.bookkeeper-explicit-interval > 0 and pulsar.bookkeeper-use-v2-protocol=false.

However, using BookKeeper V3 protocol introduces additional GC overhead to BK as it uses Protobuf.

Query data from existing Presto clusters

If you already have a Presto cluster, you can copy the Presto Pulsar connector plugin to your existing cluster. Download the archived plugin package with the following command.

  1. $ wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz

Deploy a new cluster

Since Pulsar SQL is powered by Trino (formerly Presto SQL), the configuration for deployment is the same for the Pulsar SQL worker.

note

For how to set up a standalone single node environment, refer to Query data.

You can use the same CLI args as the Presto launcher.

  1. $ ./bin/pulsar sql-worker --help
  2. Usage: launcher [options] command
  3. Commands: run, start, stop, restart, kill, status
  4. Options:
  5. -h, --help show this help message and exit
  6. -v, --verbose Run verbosely
  7. --etc-dir=DIR Defaults to INSTALL_PATH/etc
  8. --launcher-config=FILE
  9. Defaults to INSTALL_PATH/bin/launcher.properties
  10. --node-config=FILE Defaults to ETC_DIR/node.properties
  11. --jvm-config=FILE Defaults to ETC_DIR/jvm.config
  12. --config=FILE Defaults to ETC_DIR/config.properties
  13. --log-levels-file=FILE
  14. Defaults to ETC_DIR/log.properties
  15. --data-dir=DIR Defaults to INSTALL_PATH
  16. --pid-file=FILE Defaults to DATA_DIR/var/run/launcher.pid
  17. --launcher-log-file=FILE
  18. Defaults to DATA_DIR/var/log/launcher.log (only in
  19. daemon mode)
  20. --server-log-file=FILE
  21. Defaults to DATA_DIR/var/log/server.log (only in
  22. daemon mode)
  23. -D NAME=VALUE Set a Java system property

The default configuration for the cluster is located in ${project.root}/conf/presto. You can customize your deployment by modifying the default configuration.

You can set the worker to read from a different configuration directory, or set a different directory to write data.

  1. $ ./bin/pulsar sql-worker run --etc-dir /tmp/incubator-pulsar/conf/presto --data-dir /tmp/presto-1

You can start the worker as daemon process.

  1. $ ./bin/pulsar sql-worker start

Deploy a cluster on multiple nodes

You can deploy a Pulsar SQL cluster or Presto cluster on multiple nodes. The following example shows how to deploy a cluster on three-node cluster.

  1. Copy the Pulsar binary distribution to three nodes.

The first node runs as Presto coordinator. The minimal configuration requirement in the ${project.root}/conf/presto/config.properties file is as follows.

  1. coordinator=true
  2. node-scheduler.include-coordinator=true
  3. http-server.http.port=8080
  4. query.max-memory=50GB
  5. query.max-memory-per-node=1GB
  6. discovery-server.enabled=true
  7. discovery.uri=<coordinator-url>

The other two nodes serve as worker nodes, you can use the following configuration for worker nodes.

  1. coordinator=false
  2. http-server.http.port=8080
  3. query.max-memory=50GB
  4. query.max-memory-per-node=1GB
  5. discovery.uri=<coordinator-url>
  1. Modify pulsar.web-service-url and pulsar.zookeeper-uri configuration in the ${project.root}/conf/presto/catalog/pulsar.properties file accordingly for the three nodes.

  2. Start the coordinator node.

  1. $ ./bin/pulsar sql-worker run
  1. Start worker nodes.
  1. $ ./bin/pulsar sql-worker run
  1. Start the SQL CLI and check the status of your cluster.
  1. $ ./bin/pulsar sql --server <coordinate_url>
  1. Check the status of your nodes.
  1. presto> SELECT * FROM system.runtime.nodes;
  2. node_id | http_uri | node_version | coordinator | state
  3. ---------+-------------------------+--------------+-------------+--------
  4. 1 | http://192.168.2.1:8081 | testversion | true | active
  5. 3 | http://192.168.2.2:8081 | testversion | false | active
  6. 2 | http://192.168.2.3:8081 | testversion | false | active

For more information about deployment in Presto, refer to Presto deployment.

note

The broker does not advance LAC, so when Pulsar SQL bypass broker to query data, it can only read entries up to the LAC that all the bookies learned. You can enable periodically write LAC on the broker by setting “bookkeeperExplicitLacIntervalInMills” in the broker.conf.