C++ SDK

构建 TubeMQ C++ SDK

C++ SDK 主要基于非 Boost 版本的 Asio 进行通信,构建时使用 CMake,具体操作步骤如下:

  1. # 进入到 cpp sdk 的根路径
  2. git clone https://github.com/apache/inlong.git
  3. cd inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp
  4. mkdir build && cd build
  5. cmake ..
  6. make -j8 # 可根据自己机器的核数决定编译时使用的线程数量

上述构建过程也可以在 InLong 提供的镜像中完成,具体操作如下

  1. # 拉取镜像
  2. docker pull inlong/tubemq-cpp
  3. # 启动容器,将cpp sdk的源码挂载到容器中
  4. docker run -it --net=host -v ${REPLACE_BY_CPP_SOURCE_DIR_PATH}:/tubemq-cpp/ inlong/tubemq-cpp /bin/bash
  5. # 之后与在物理机上相同
  6. mkdir build && cd build
  7. cmake ..
  8. make -j8

C++ SDK API

与其它的消息队列系统类似,C++ SDK 分为生产者 (Producer) 以及消费者 (Consumer),下面分别介绍 Producer 和 Consumer 的API使用。

首先,无论是生产者还是消费者,先启动 TubeMQ SDK 的后台全局服务 (以下例子都假定使用的默认 C++ namespacetubemq)。

  1. bool StartTubeMQService(string& err_info, const TubeMQServiceConfig& serviceConfig);

通过返回值以及 err_info 判断启动是否成功。

Producer

对于用户来说,直接接触的类为 TubeMQProducer,在使用时需要首先创建实例 。

  1. #include "tubemq/tubemq_client.h" // TubeMQProducer定义头文件
  2. TubeMQProducer producer;

之后设置 producer 的配置,并启动。

  1. ProducerConfig producer_config;
  2. producer_config.SetRpcReadTimeoutMs(20000);
  3. producer_config.SetMasterAddrInfo(err_info, master_addr);
  4. bool result = producer.start(); // 启动 producer

当日志显示注册 master 成功后,producer 会以一定频率向 master 发送心跳,以获得最新的关于 topic 的信息,之后用户可以开始订阅想发送消息的 topic,支持订阅多个 topic,传入的 topics 参数类型为 std::set

  1. std::set topics{"topic_0", "topic_1"};
  2. bool result = producer.Publish(err_info, topics);

订阅之后就可以发送消息,首先对消息进行构造

  1. #include "tubemq/tubemq_message.h" // tubemq::Message 定义头文件
  2. std::string topic_name = "demo";
  3. std::string send_data = "hello_tubemq";
  4. Message message(topic_name, send_data.c_str(), send_data.size()); // 接受数据类型为 const char*

构造好消息后,即可发送,SDK 提供了同步和异步两种发送方式

  1. // 同步发送
  2. bool TubeMQProducer::SendMessage(string& err_info, const Message& message);
  3. // 异步发送
  4. void TubeMQProducer::SendMessage(const Message& message, const std::function<void(const ErrorCode&)>& callback);

分别通过如下方式使用

  1. bool result = producer.SendMessage(err_info, message);
  2. void callback(const ErrorCode&);
  3. producer.SendMessage(message, callback); // callback 也可以为其它可调用的对象,如 lambda函数

同步发送在得到结果前将会阻塞,异步发送则不会,调用后通过用户自定义的回调函数接收返回的结果。

最后,关闭 producer

  1. producer.ShutDown();

Consumer

Consumer 同 Producer 类似,都要先初始化 Consumer 实例,并设置配置,订阅 topic 等操作。

  1. #include "tubemq/tubemq_client.h" // TubeMQConsumer 定义头文件
  2. TubeMQConsumer consumer;
  3. // 消费相关的配置
  4. ConsumerConfig consumer_config;
  5. consumer_config.SetRpcReadTimeoutMs(20000);
  6. consumer_config.SetMasterAddrInfo(err_info, master_addr);
  7. // consumer中除了订阅 topic,还要设置消费组
  8. consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
  9. // 启动 consumer
  10. consumer.start();

启动 consumer 后,调用 GetMessage 获取消息即可

  1. ConsumerResult get_result;
  2. ConsumerResult confirm_result;
  3. bool result = consumer.GetMessage(get_result);
  4. if (result) {
  5. list<Message> messages = get_result.GetMessageList();
  6. consumer.Confirm(get_result.GetConfirmContext(), true, confirm_result);
  7. }
  8. // stop the consumer
  9. consumer.ShutDown();

Example

关于 C++ SDK 的更加详细的用例,可以参考 C++ SDK Example,编译好的可执行文件位置在 build/example/producerbuild/example/consumer