Pulsar C++ client

You can use Pulsar C++ client to create Pulsar producers and consumers in C++.

All the methods in producer, consumer, and reader of a C++ client are thread-safe.

支持的平台

Pulsar C++ client is supported on Linux and MacOS platforms.

Doxygen-generated API docs for the C++ client are available here.

Linux

Since 2.1.0 release, Pulsar ships pre-built RPM and Debian packages. You can download and install those packages directly.

Four kind of libraries libpulsar.so / libpulsarnossl.so / libpulsar.a / libpulsarwithdeps.a are included in your /usr/lib after rpm/deb download and install. By default, they are build under code path ${PULSAR_HOME}/pulsar-client-cpp, using command cmake . -DBUILD_TESTS=OFF -DLINK_STATIC=ON && make pulsarShared pulsarSharedNossl pulsarStatic pulsarStaticWithDeps -j 3 These libraries rely on some other libraries, if you want to get detailed version of dependencies libraries, please reference these files.

  1. libpulsar.so is the Shared library, it contains statically linked boost and openssl, and will also dynamically link all other needed libraries. The command the when use this pulsar library is like this:
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.so -I/usr/local/ssl/include
  1. libpulsarnossl.so is the Shared library that similar to libpulsar.so except that the library openssl and crypto are dynamically linked. The command the when use this pulsar library is like this:
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarnossl.so -lssl -lcrypto -I/usr/local/ssl/include -L/usr/local/ssl/lib
  1. libpulsar.a is the Static library, it need to load some dependencies library when using it. The command the when use this pulsar library is like this:
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib -lboost_system -lboost_regex -lcurl -lprotobuf -lzstd -lz
  1. libpulsarwithdeps.a is the Static library, base on libpulsar.a, and archived in the dependencies libraries of libboost_regex, libboost_system, libcurl, libprotobuf, libzstd and libz, The command the when use this pulsar library is like this:
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarwithdeps.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib

libpulsarwithdeps.a does not include library openssl related libraries: libssl and libcrypto, because these 2 library is related to security, by using user local system provided version is more reasonable, and more easy for user to handling security issue and library upgrade.

Install RPM

  1. Download a RPM package from the links in the table.
链接加密文件
clientasc, sha512
client-debuginfoasc, sha512
client-develasc, sha512
  1. Install the package using the following command.
  1. $ rpm -ivh apache-pulsar-client*.rpm

After install, Pulsar libraries will be placed under /usr/lib.

Install Debian

  1. Download a Debian package from the links in the table.
链接加密文件
clientasc, sha512
client-develasc, sha512
  1. Install the package using the following command:
  1. $ apt install ./apache-pulsar-client*.deb

After install, Pulsar libraries will be placed under /usr/lib.

编译

If you want to build RPM and Debian packages from the latest master, follow the instructions below. All the instructions are run at the root directory of your cloned Pulsar repository.

There are recipes that build RPM and Debian packages containing a statically linked libpulsar.so / libpulsarnossl.so / libpulsar.a / libpulsarwithdeps.a with all the required dependencies.

To build the C++ library packages, build the Java packages first.

  1. mvn install -DskipTests

RPM

  1. pulsar-client-cpp/pkg/rpm/docker-build-rpm.sh

This builds the RPM inside a Docker container and it leaves the RPMs in pulsar-client-cpp/pkg/rpm/RPMS/x86_64/.

包名内容
pulsar-clientShared library libpulsar.so and libpulsarnossl.so
pulsar-client-develStatic library libpulsar.a, libpulsarwithdeps.aand C++ and C headers
pulsar-client-debuginfoDebug symbols for libpulsar.so

Debian

To build Debian packages, enter the following command.

  1. pulsar-client-cpp/pkg/deb/docker-build-deb.sh

Debian packages are created at pulsar-client-cpp/pkg/deb/BUILD/DEB/.

包名内容
pulsar-clientShared library libpulsar.so and libpulsarnossl.so
pulsar-client-devStatic library libpulsar.a, libpulsarwithdeps.a and C++ and C headers

MacOS

Pulsar releases are available in the Homebrew core repository. You can install the C++ client library with the following command. The package is installed with the library and headers.

  1. brew install libpulsar

连接URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, you can use the Pulsar URI scheme. The default port is 6650. The following is an example for localhost.

  1. pulsar://localhost:6650

In a Pulsar cluster in production, the URL looks as follows:

  1. pulsar://pulsar.us-west.example.com:6650

If you use TLS authentication, you need to add ssl, and the default port is 6651. The following is an example.

  1. pulsar+ssl://pulsar.us-west.example.com:6651

Create a consumer

To connect to Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example.

  1. Client client("pulsar://localhost:6650");
  2. Consumer consumer;
  3. Result result = client.subscribe("my-topic", "my-subscription-name", consumer);
  4. if (result != ResultOk) {
  5. LOG_ERROR("Failed to subscribe: " << result);
  6. return -1;
  7. }
  8. Message msg;
  9. while (true) {
  10. consumer.receive(msg);
  11. LOG_INFO("Received: " << msg
  12. << " with payload '" << msg.getDataAsString() << "'");
  13. consumer.acknowledge(msg);
  14. }
  15. client.close();

Create a producer

To connect to Pulsar as a producer, you need to create a producer on the C++ client. The following is an example.

  1. Client client("pulsar://localhost:6650");
  2. Producer producer;
  3. Result result = client.createProducer("my-topic", producer);
  4. if (result != ResultOk) {
  5. LOG_ERROR("Error creating producer: " << result);
  6. return -1;
  7. }
  8. // Publish 10 messages to the topic
  9. for (int i = 0; i < 10; i++){
  10. Message msg = MessageBuilder().setContent("my-message").build();
  11. Result res = producer.send(msg);
  12. LOG_INFO("Message sent: " << res);
  13. }
  14. client.close();

Enable authentication in connection URLs

If you use TLS authentication when connecting to Pulsar, you need to add ssl in the connection URLs, and the default port is 6651. The following is an example.

  1. ClientConfiguration config = ClientConfiguration();
  2. config.setUseTls(true);
  3. config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
  4. config.setTlsAllowInsecureConnection(false);
  5. config.setAuth(pulsar::AuthTls::create(
  6. "/path/to/client-cert.pem", "/path/to/client-key.pem"););
  7. Client client("pulsar+ssl://my-broker.com:6651", config);

For complete examples, refer to C++ client examples.

Schema

This section describes some examples about schema. For more information about schema, see Pulsar schema.

Create producer with Avro schema

The following example shows how to create a producer with an Avro schema.

  1. static const std::string exampleSchema =
  2. "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
  3. "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
  4. Producer producer;
  5. ProducerConfiguration producerConf;
  6. producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
  7. client.createProducer("topic-avro", producerConf, producer);

Create consumer with Avro schema

The following example shows how to create a consumer with an Avro schema.

  1. static const std::string exampleSchema =
  2. "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
  3. "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
  4. ConsumerConfiguration consumerConf;
  5. Consumer consumer;
  6. consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
  7. client.subscribe("topic-avro", "sub-2", consumerConf, consumer)