AutoMQ 是基于云重新设计的云原生 Kafka。通过将存储分离至对象存储,在保持和 Apache Kafka 100% 兼容的前提下,为用户提供高达 10 倍的成本优势以及百倍的弹性优势。通过其创新的共享存储架构,在保证高吞吐、低延迟的性能指标下实现了秒级分区迁移、流量自平衡、秒级自动弹性等能力。

AutoMQ Storage Architecture

环境准备

准备 Apache Doris 和测试数据

确保当前已准备好可用的 Apache Doris 集群。为了便于演示,我们参考 Docker 部署 Doris 文档在 Linux 上部署了一套测试用的 Apache Doris 环境。 创建库和测试表:

  1. create database automq_db;
  2. CREATE TABLE automq_db.users (
  3. id bigint NOT NULL,
  4. name string NOT NULL,
  5. timestamp string NULL,
  6. status string NULL
  7. ) DISTRIBUTED BY hash (id) PROPERTIES ('replication_num' = '1');

准备 Kafka 命令行工具

AutoMQ Releases 下载最新的 TGZ 包并解压。假设解压目录为 $AUTOMQ_HOME,在本文中将会使用 $AUTOMQ_HOME/bin 下的工具命令来创建主题和生成测试数据。

准备 AutoMQ 和测试数据

参考 AutoMQ 官方部署文档部署一套可用的集群,确保 AutoMQ 与 Apache Doris 之间保持网络连通。 在 AutoMQ 中快速创建一个名为 example_topic 的主题,并向其中写入一条测试 JSON 数据,按照以下步骤操作。

创建 Topic

使用 Apache Kafka 命令行工具创建主题,需要确保当前拥有 Kafka 环境的访问权限并且 Kafka 服务正在运行。以下是创建主题的命令示例:

  1. $AUTOMQ_HOME/bin/kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 AutoMQ Bootstarp Server 地址。 创建完主题后,可以使用以下命令来验证主题是否已成功创建。

  1. $AUTOMQ_HOME/bin/kafka-topics.sh --describe example_topic --bootstrap-server 127.0.0.1:9092

生成测试数据

生成一条 JSON 格式的测试数据,和前文的表需要对应。

  1. {
  2. "id": 1,
  3. "name": "测试用户",
  4. "timestamp": "2023-11-10T12:00:00",
  5. "status": "active"
  6. }

写入测试数据

通过 Kafka 的命令行工具或编程方式将测试数据写入到名为 example_topic 的主题中。下面是一个使用命令行工具的示例:

  1. echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic example_topic

使用如下命令可以查看刚写入的 topic 数据:

  1. sh $AUTOMQ_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic example_topic --from-beginning

注意:在执行命令时,需要将 topic 和 bootstarp-server 替换为实际使用的 AutoMQ Bootstarp Server 地址。

创建 Routine Load 导入作业

在 Apache Doris 的命令行中创建一个接收 JSON 数据的 Routine Load 作业,用来持续导入 AutoMQ Kafka topic 中的数据。具体 Routine Load 的参数说明请参考 Doris Routine Load

  1. CREATE ROUTINE LOAD automq_example_load ON users
  2. COLUMNS(id, name, timestamp, status)
  3. PROPERTIES
  4. (
  5. "format" = "json",
  6. "jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
  7. )
  8. FROM KAFKA
  9. (
  10. "kafka_broker_list" = "127.0.0.1:9092",
  11. "kafka_topic" = "example_topic",
  12. "property.kafka_default_offsets" = "OFFSET_BEGINNING"
  13. );

注意:在执行命令时,需要将 kafka_broker_list 替换为实际使用的 AutoMQ Bootstarp Server 地址。

验证数据导入

首先,检查 Routine Load 导入作业的状态,确保任务正在运行中。

  1. show routine load\G;

然后查询 Apache Doris 数据库中的相关表,可以看到数据已经被成功导入。

  1. select * from users;
  2. +------+--------------+---------------------+--------+
  3. | id | name | timestamp | status |
  4. +------+--------------+---------------------+--------+
  5. | 1 | 测试用户 | 2023-11-10T12:00:00 | active |
  6. | 2 | 测试用户 | 2023-11-10T12:00:00 | active |
  7. +------+--------------+---------------------+--------+
  8. 2 rows in set (0.01 sec)