TDengine Rust Connector

Crates.io Crates.io docs.rs

taos 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

taos 提供两种建立连接的方式。一种是原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。

该 Rust 连接器的源码托管在 GitHub

支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。

版本支持

请参考版本支持列表

Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。

安装

安装前准备

  • 安装 Rust 开发工具链
  • 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动

添加 taos 依赖

根据选择的连接方式,按照如下说明在 Rust 项目中添加 taos 依赖:

  • 同时支持
  • 仅 Websocket
  • 仅原生连接

Cargo.toml 文件中添加 taos

  1. [dependencies]
  2. # use default feature
  3. taos = "*"

Cargo.toml 文件中添加 taos,并启用 ws 特性。

  1. [dependencies]
  2. taos = { version = "*", default-features = false, features = ["ws"] }

Cargo.toml 文件中添加 taos,并启用 native 特性:

  1. [dependencies]
  2. taos = { version = "*", default-features = false, features = ["native"] }

建立连接

TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。

  1. let builder = TaosBuilder::from_dsn("taos://")?;

现在您可以使用该对象创建连接:

  1. let conn = builder.build()?;

连接对象可以创建多个:

  1. let conn1 = builder.build()?;
  2. let conn2 = builder.build()?;

DSN 描述字符串基本结构如下:

  1. <driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
  2. |------|------------|---|-----------|-----------|------|------|------------|-----------------------|
  3. |driver| protocol | | username | password | host | port | database | params |

各部分意义见下表:

  • driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
    • taos: 表名使用 TDengine 连接器驱动。
    • tmq: 使用 TMQ 订阅数据。
    • http/ws: 使用 Websocket 创建连接。
    • https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
  • protocol: 显示指定以何种方式建立连接,例如:taos+ws://localhost:6041 指定以 Websocket 方式建立连接。
  • username/password: 用于创建连接的用户名及密码。
  • host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(taos://),原生连接默认为 localhost:6030,Websocket 连接默认为 localhost:6041
  • database: 指定默认连接的数据库名,可选参数。
  • params:其他可选参数。

一个完整的 DSN 描述字符串示例如下:

  1. taos+ws://localhost:6041/test

表示使用 Websocket(ws)方式通过 6041 端口连接服务器 localhost,并指定默认数据库为 test

这使得用户可以通过 DSN 指定连接方式:

  1. use taos::*;
  2. // use native protocol.
  3. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
  4. let conn1 = builder.build();
  5. // use websocket protocol.
  6. let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;

建立连接后,您可以进行相关数据库操作:

  1. async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
  2. // prepare database
  3. taos.exec_many([
  4. format!("DROP DATABASE IF EXISTS `{db}`"),
  5. format!("CREATE DATABASE `{db}`"),
  6. format!("USE `{db}`"),
  7. ])
  8. .await?;
  9. let inserted = taos.exec_many([
  10. // create super table
  11. "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
  12. TAGS (`groupid` INT, `location` BINARY(24))",
  13. // create child table
  14. "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
  15. // insert into child table
  16. "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
  17. // insert with NULL values
  18. "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
  19. // insert and automatically create table with tags if not exists
  20. "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
  21. // insert many records in a single sql
  22. "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
  23. ]).await?;
  24. assert_eq!(inserted, 6);
  25. let mut result = taos.query("select * from `meters`").await?;
  26. for field in result.fields() {
  27. println!("got field: {}", field.name());
  28. }
  29. let values = result.
  30. }

查询数据可以通过两种方式:使用内建类型或 serde 序列化框架。

  1. // Query option 1, use rows stream.
  2. let mut rows = result.rows();
  3. while let Some(row) = rows.try_next().await? {
  4. for (name, value) in row {
  5. println!("got value of {}: {}", name, value);
  6. }
  7. }
  8. // Query options 2, use deserialization with serde.
  9. #[derive(Debug, serde::Deserialize)]
  10. #[allow(dead_code)]
  11. struct Record {
  12. // deserialize timestamp to chrono::DateTime<Local>
  13. ts: DateTime<Local>,
  14. // float to f32
  15. current: Option<f32>,
  16. // int to i32
  17. voltage: Option<i32>,
  18. phase: Option<f32>,
  19. groupid: i32,
  20. // binary/varchar to String
  21. location: String,
  22. }
  23. let records: Vec<Record> = taos
  24. .query("select * from `meters`")
  25. .await?
  26. .deserialize()
  27. .try_collect()
  28. .await?;
  29. dbg!(records);
  30. Ok(())

使用示例

写入数据

SQL 写入

  1. use taos::*;
  2. #[tokio::main]
  3. async fn main() -> anyhow::Result<()> {
  4. let dsn = "ws://";
  5. let taos = TaosBuilder::from_dsn(dsn)?.build()?;
  6. taos.exec_many([
  7. "DROP DATABASE IF EXISTS power",
  8. "CREATE DATABASE power",
  9. "USE power",
  10. "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
  11. ]).await?;
  12. let inserted = taos.exec("INSERT INTO
  13. power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
  14. VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
  15. ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  16. power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
  17. VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  18. power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
  19. VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  20. power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
  21. VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)").await?;
  22. assert_eq!(inserted, 8);
  23. Ok(())
  24. }

查看源码

STMT 写入

  1. use taos::*;
  2. #[tokio::main]
  3. async fn main() -> anyhow::Result<()> {
  4. let taos = TaosBuilder::from_dsn("taos://")?.build()?;
  5. taos.create_database("power").await?;
  6. taos.use_database("power").await?;
  7. taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
  8. let mut stmt = Stmt::init(&taos)?;
  9. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
  10. // bind table name and tags
  11. stmt.set_tbname_tags(
  12. "d1001",
  13. &[
  14. Value::VarChar("California.SanFransico".into()),
  15. Value::Int(2),
  16. ],
  17. )?;
  18. // bind values.
  19. let values = vec![
  20. ColumnView::from_millis_timestamp(vec![1648432611249]),
  21. ColumnView::from_floats(vec![10.3]),
  22. ColumnView::from_ints(vec![219]),
  23. ColumnView::from_floats(vec![0.31]),
  24. ];
  25. stmt.bind(&values)?;
  26. // bind one more row
  27. let values2 = vec![
  28. ColumnView::from_millis_timestamp(vec![1648432611749]),
  29. ColumnView::from_floats(vec![12.6]),
  30. ColumnView::from_ints(vec![218]),
  31. ColumnView::from_floats(vec![0.33]),
  32. ];
  33. stmt.bind(&values2)?;
  34. stmt.add_batch()?;
  35. // execute.
  36. let rows = stmt.execute()?;
  37. assert_eq!(rows, 2);
  38. Ok(())
  39. }

查看源码

查询数据

  1. use taos::sync::*;
  2. fn main() -> anyhow::Result<()> {
  3. let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
  4. let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
  5. // print column names
  6. let meta = result.fields();
  7. println!("{}", meta.iter().map(|field| field.name()).join("\t"));
  8. // print rows
  9. let rows = result.rows();
  10. for row in rows {
  11. let row = row?;
  12. for (_name, value) in row {
  13. print!("{}\t", value);
  14. }
  15. println!();
  16. }
  17. Ok(())
  18. }
  19. // output(suppose you are in +8 timezone):
  20. // ts current
  21. // 2018-10-03T14:38:05+08:00 10.3
  22. // 2018-10-03T14:38:15+08:00 12.6

查看源码

API 参考

连接构造器

通过 DSN 来构建一个连接器构造器。

  1. let cfg = TaosBuilder::default().build()?;

使用 builder 对象创建多个连接:

  1. let conn: Taos = cfg.build();

连接池

在复杂应用中,建议启用连接池。taos 的连接池使用 r2d2 实现。

如下,可以生成一个默认参数的连接池。

  1. let pool = TaosBuilder::from_dsn(dsn)?.pool()?;

同样可以使用连接池的构造器,对连接池参数进行设置:

  1. let dsn = "taos://localhost:6030";
  2. let opts = PoolBuilder::new()
  3. .max_size(5000) // max connections
  4. .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
  5. .min_idle(Some(1000)) // minimal idle connections
  6. .connection_timeout(Duration::from_secs(2));
  7. let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;

在应用代码中,使用 pool.get()? 来获取一个连接对象 Taos

  1. let taos = pool.get()?;

连接

Taos 对象提供了多个数据库操作的 API:

  1. exec: 执行某个非查询类 SQL 语句,例如 CREATEALTERINSERT 等。

    1. let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
  2. exec_many: 同时(顺序)执行多个 SQL 语句。

    1. taos.exec_many([
    2. "CREATE DATABASE test",
    3. "USE test",
    4. "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    5. ]).await?;
  3. query:执行查询语句,返回 [ResultSet] 对象。

    1. let mut q = taos.query("select * from log.logs").await?;

    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):

    列信息使用 [.fields()] 方法获取:

    1. let cols = q.fields();
    2. for col in cols {
    3. println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
    4. }

    逐行获取数据:

    1. let mut rows = result.rows();
    2. let mut nrows = 0;
    3. while let Some(row) = rows.try_next().await? {
    4. for (col, (name, value)) in row.enumerate() {
    5. println!(
    6. "[{}] got value in col {} (named `{:>8}`): {}",
    7. nrows, col, name, value
    8. );
    9. }
    10. nrows += 1;
    11. }

    或使用 serde 序列化框架。

    1. #[derive(Debug, Deserialize)]
    2. struct Record {
    3. // deserialize timestamp to chrono::DateTime<Local>
    4. ts: DateTime<Local>,
    5. // float to f32
    6. current: Option<f32>,
    7. // int to i32
    8. voltage: Option<i32>,
    9. phase: Option<f32>,
    10. groupid: i32,
    11. // binary/varchar to String
    12. location: String,
    13. }
    14. let records: Vec<Record> = taos
    15. .query("select * from `meters`")
    16. .await?
    17. .deserialize()
    18. .try_collect()
    19. .await?;

需要注意的是,需要使用 Rust 异步函数和异步运行时。

Taos 提供部分 SQL 的 Rust 方法化以减少 format! 代码块的频率:

  • .describe(table: &str): 执行 DESCRIBE 并返回一个 Rust 数据结构。
  • .create_database(database: &str): 执行 CREATE DATABASE 语句。
  • .use_database(database: &str): 执行 USE 语句。

除此之外,该结构也是 参数绑定行协议接口 的入口,使用方法请参考具体的 API 说明。

参数绑定接口

与 C 接口类似,Rust 提供参数绑定接口。首先,通过 Taos 对象创建一个 SQL 语句的参数绑定对象 Stmt

  1. let mut stmt = Stmt::init(&taos).await?;
  2. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;

参数绑定对象提供了一组接口用于实现参数绑定:

.set_tbname(name)

用于绑定表名。

  1. let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
  2. stmt.set_tbname("d0")?;

.set_tags(&[tag])

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

  1. let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
  2. stmt.set_tbname("d0")?;
  3. stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;

.bind(&[column])

用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:

  1. let params = vec![
  2. ColumnView::from_millis_timestamp(vec![164000000000]),
  3. ColumnView::from_bools(vec![true]),
  4. ColumnView::from_tiny_ints(vec![i8::MAX]),
  5. ColumnView::from_small_ints(vec![i16::MAX]),
  6. ColumnView::from_ints(vec![i32::MAX]),
  7. ColumnView::from_big_ints(vec![i64::MAX]),
  8. ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
  9. ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
  10. ColumnView::from_unsigned_ints(vec![u32::MAX]),
  11. ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
  12. ColumnView::from_floats(vec![f32::MAX]),
  13. ColumnView::from_doubles(vec![f64::MAX]),
  14. ColumnView::from_varchar(vec!["ABC"]),
  15. ColumnView::from_nchar(vec!["涛思数据"]),
  16. ];
  17. let rows = stmt.bind(&params)?.add_batch()?.execute()?;

.execute()

执行 SQL。Stmt 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 .add_batch 加入到执行队列中。

  1. stmt.execute()?;
  2. // next bind cycle.
  3. //stmt.set_tbname()?;
  4. //stmt.bind()?;
  5. //stmt.execute()?;

一个可运行的示例请见 GitHub 上的示例

订阅

TDengine 通过消息队列 TMQ 启动一个订阅。

从 DSN 开始,构建一个 TMQ 连接器。

  1. let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;

创建消费者:

  1. let mut consumer = tmq.build()?;

消费者可订阅一个或多个 TOPIC

  1. consumer.subscribe(["tmq_meters"]).await?;

TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过 .commit 进行已消费标记。

  1. {
  2. let mut stream = consumer.stream();
  3. while let Some((offset, message)) = stream.try_next().await? {
  4. // get information from offset
  5. // the topic
  6. let topic = offset.topic();
  7. // the vgroup id, like partition id in kafka.
  8. let vgroup_id = offset.vgroup_id();
  9. println!("* in vgroup id {vgroup_id} of topic {topic}\n");
  10. if let Some(data) = message.into_data() {
  11. while let Some(block) = data.fetch_raw_block().await? {
  12. // one block for one table, get table name if needed
  13. let name = block.table_name();
  14. let records: Vec<Record> = block.deserialize().try_collect()?;
  15. println!(
  16. "** table: {}, got {} records: {:#?}\n",
  17. name.unwrap(),
  18. records.len(),
  19. records
  20. );
  21. }
  22. }
  23. consumer.commit(offset).await?;
  24. }
  25. }

停止订阅:

  1. consumer.unsubscribe().await;

对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id 是必须的。

  • group.id: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
  • client.id: 可选的订阅客户端识别项。
  • auto.offset.reset: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 group.id 中仅生效一次。
  • enable.auto.commit: 当设置为 true 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
  • auto.commit.interval.ms: 自动标记的时间间隔。

完整订阅示例参见 GitHub 示例文件.

其他相关结构体 API 使用说明请移步 Rust 文档托管网页:https://docs.rs/taos