C++ SDK

Build TubeMQ C++ SDK

C++ SDK is based on the non-boost asio, and the CMake is used for building, the commands are:

  1. # enter the root directory of c++ sdk source
  2. git clone https://github.com/apache/inlong.git
  3. cd inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp
  4. mkdir build && cd build
  5. cmake ..
  6. make -j8 # the thread num is based on the cpu cores

The building can also be completed in docker environment provided by InLong.

  1. # pull image
  2. docker pull inlong/tubemq-cpp
  3. # start container and mount the source code
  4. docker run -it --net=host -v ${REPLACE_BY_CPP_SOURCE_DIR_PATH}:/tubemq-cpp/ inlong/tubemq-cpp /bin/bash
  5. # same as the build process in physical machines
  6. mkdir build && cd build
  7. cmake ..
  8. make -j8

C++ SDK API

Similar with other MQ systems,C++ SDK is diveded into Producer and Consumer. The API of Producer and Consumer are introduced respectively below.

First of all, regardless of role, start the background global TubeMQ SDK service (support the default C++ namespace is tubemq)。

  1. bool StartTubeMQService(string& err_info, const TubeMQServiceConfig& serviceConfig);

Look up the return boolean value and the err_info to check the start process is successful。

Producer

The user-level api class is TubeMQProducer,the first thing is initializing the class.

  1. #include "tubemq/tubemq_client.h" // header file of TubeMQProducer
  2. TubeMQProducer producer;

Set the config of producer, then start the producer

  1. ProducerConfig producer_config;
  2. producer_config.SetRpcReadTimeoutMs(20000);
  3. producer_config.SetMasterAddrInfo(err_info, master_addr);
  4. bool result = producer.start(); // start producer

When register2Master is successed,producer will send heartbeat request to master periodically to update the meta info of topics. Then users can publish topics, multiple topics can be published at once, and the param type is std::set.

  1. std::set topics{"topic_0", "topic_1"};
  2. bool result = producer.Publish(err_info, topics);

After publishing, construct the message

  1. #include "tubemq/tubemq_message.h" // header file of tubemq::Message
  2. std::string topic_name = "demo";
  3. std::string send_data = "hello_world";
  4. Message message(topic_name, send_data.c_str(), send_data.size()); // type is const char*

There are two SendMessage API: synchronous sending and asynchronous sending.

  1. // sync send
  2. bool TubeMQProducer::SendMessage(string& err_info, const Message& message);
  3. // async send
  4. void TubeMQProducer::SendMessage(const Message& message, const std::function<void(const ErrorCode&)>& callback);

How to use these two SendMessage

  1. bool result = producer.SendMessage(err_info, message);
  2. void callback(const ErrorCode&);
  3. producer.SendMessage(message, callback); // callback can be other invoked objects

Synchronous sending will block until the result is returned, asynchronous sending will not, and the returnted result is received through the user-defined callback function.

Finally, close the producer.

  1. producer.ShutDown();

Consumer

Similar with producer,initialize the consumer object and set the config.

  1. #include "tubemq/tubemq_client.h" // header file of TubeMQConsumer
  2. TubeMQConsumer consumer;
  3. // config of consumption
  4. ConsumerConfig consumer_config;
  5. consumer_config.SetRpcReadTimeoutMs(20000);
  6. consumer_config.SetMasterAddrInfo(err_info, master_addr);
  7. // set the consume group and the topic
  8. consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
  9. // start the consumer
  10. consumer.start();

After starting the consumer, invoke the GetMessage to get messages.

  1. ConsumerResult get_result;
  2. ConsumerResult confirm_result;
  3. bool result = consumer.GetMessage(get_result);
  4. if (result) {
  5. list<Message> messages = get_result.GetMessageList();
  6. consumer.Confirm(get_result.GetConfirmContext(), true, confirm_result);
  7. }
  8. // stop the consumer
  9. consumer.ShutDown();

Example

There are more detailed examples about C++ SDK in C++ SDK Example,the compiled executable is located in build/example/producer and build/example/consumer