使用 Kafka 连接 MatrixOne
概述
Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
MatrixOne 支持与 Apache Kafka 进行连接,本文将指导您如何通过 Apache Kafka 连接到 MatrixOne 并实现高效数据流集成与持久化。
开始前准备
已完成安装和启动 MatrixOne。
操作步骤
第一步:启动 Kafka 并生产数据
解压二进制包 (注意对应版本)
tar -xzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
启动 ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
开启一个新的终端,启动 Kafka
bin/kafka-server-start.sh config/server.properties
开启一个新的终端,创建一个 topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
开启一个生产者往 topic 中写入 json 数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
{"c1": -2147483648,"c2":20,"c3": -3,"c4":8,"c5":425,"c6":55}
{"c1": 21474,"c2":-20,"c3": 3,"c4":9090,"c5":42,"c6":53}
开启一个消费者查看是否成功写入 topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
{"c1": -2147483648,"c2":20,"c3": -3,"c4":8,"c5":425,"c6":55}
{"c1": 21474,"c2":-20,"c3": 3,"c4":9090,"c5":42,"c6":53}
第二步:创建 Source 表连接 Kafka
创建 Source 表
create source stream_test(c1 int,c2 tinyint,c3 smallint,c4 bigint,c5 int unsigned ,c6 tinyint unsigned)
with(
"type"='kafka',
"topic"= 'test',
"partition" = '0',
"value"= 'json',
"bootstrap.servers"='127.0.0.1:9092'
)
查看是否接受了数据:
select * from stream_test;
+-------------+------+------+------+------+------+
| c1 | c2 | c3 | c4 | c5 | c6 |
+-------------+------+------+------+------+------+
| -2147483648 | 20 | -3 | 8 | 425 | 55 |
| 21474 | -20 | 3 | 9090 | 42 | 53 |
+-------------+------+------+------+------+------+
2 rows in set (0.37 sec)
往 topic 中持续写入 json 数据,并检查是否继续接受了数据:
{"c1": -3421474,"c2":92,"c3": 333,"c4":9,"c5":42233,"c6":87}
查看是否接受了数据:
select * from stream_test;
+-------------+------+------+------+-------+------+
| c1 | c2 | c3 | c4 | c5 | c6 |
+-------------+------+------+------+-------+------+
| -2147483648 | 20 | -3 | 8 | 425 | 55 |
| 21474 | -20 | 3 | 9090 | 42 | 53 |
| -3421474 | 92 | 333 | 9 | 42233 | 87 |
+-------------+------+------+------+-------+------+
3 rows in set (0.44 sec)
第三步:创建动态表消费 Source 表中数据
创建动态表以消费 Source 表:
create dynamic table dt_test as select c1, c2+c3, c3*c4,c5/c3,c6/10 from stream_test;
查看动态表:
select * from dt_test;
+-------------+---------+---------+---------------------+---------+
| c1 | c2 + c3 | c3 * c4 | c5 / c3 | c6 / 10 |
+-------------+---------+---------+---------------------+---------+
| -2147483648 | 17 | -24 | -141.66666666666666 | 5.5 |
| 21474 | -17 | 27270 | 14 | 5.3 |
| -3421474 | 425 | 2997 | 126.82582582582583 | 8.7 |
+-------------+---------+---------+---------------------+---------+
3 rows in set (0.00 sec)
往 topic 中持续写入 json 数据,并检查动态表是否更新:
{"c1": 1474,"c2":2,"c3": 453,"c4":1,"c5":56233,"c6":7}
查看动态表发现成功更新:
select * from dt_test;
+-------------+---------+---------+---------------------+---------+
| c1 | c2 + c3 | c3 * c4 | c5 / c3 | c6 / 10 |
+-------------+---------+---------+---------------------+---------+
| -2147483648 | 17 | -24 | -141.66666666666666 | 5.5 |
| 21474 | -17 | 27270 | 14 | 5.3 |
| -3421474 | 425 | 2997 | 126.82582582582583 | 8.7 |
| 1474 | 455 | 453 | 124.13465783664459 | 0.7 |
+-------------+---------+---------+---------------------+---------+
4 rows in set (0.00 sec)