TDengine Rust Connector

Crates.io Crates.io docs.rs

libtaos 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

libtaos 提供两种建立连接的方式。一种是原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 REST 连接,它通过 taosAdapter 的 REST 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features)” 来指定使用哪种连接器。REST 连接支持任何平台,但原生连接支持所有 TDengine 客户端能运行的平台。

libtaos 的源码托管在 GitHub

支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 REST 连接支持所有能运行 Rust 的平台。

版本支持

请参考版本支持列表

Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 2.4 版本以上的 TDengine,以避免已知问题。

安装

安装前准备

  • 安装 Rust 开发工具链
  • 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动

添加 libtaos 依赖

根据选择的连接方式,按照如下说明在 Rust 项目中添加 libtaos 依赖:

  • 原生连接
  • REST 连接

Cargo.toml 文件中添加 libtaos

  1. [dependencies]
  2. # use default feature
  3. libtaos = "*"

Cargo.toml 文件中添加 libtaos,并启用 rest 特性。

  1. [dependencies]
  2. # use rest feature
  3. libtaos = { version = "*", features = ["rest"]}

使用连接池

请在 Cargo.toml 中启用 r2d2 特性。

  1. [dependencies]
  2. # with taosc
  3. libtaos = { version = "*", features = ["r2d2"] }
  4. # or rest
  5. libtaos = { version = "*", features = ["rest", "r2d2"] }

建立连接

TaosCfgBuilder 为使用者提供构造器形式的 API,以便于后续创建连接或使用连接池。

  1. let cfg: TaosCfg = TaosCfgBuilder::default()
  2. .ip("127.0.0.1")
  3. .user("root")
  4. .pass("taosdata")
  5. .db("log") // do not set if not require a default database.
  6. .port(6030u16)
  7. .build()
  8. .expect("TaosCfg builder error");
  9. }

现在您可以使用该对象创建连接:

  1. let conn = cfg.connect()?;

连接对象可以创建多个:

  1. let conn = cfg.connect()?;
  2. let conn2 = cfg.connect()?;

可以在应用中使用连接池:

  1. let pool = r2d2::Pool::builder()
  2. .max_size(10000) // max connections
  3. .build(cfg)?;
  4. // ...
  5. // Use pool to get connection
  6. let conn = pool.get()?;

之后您可以对数据库进行相关操作:

  1. async fn demo() -> Result<(), Error> {
  2. // get connection ...
  3. // create database
  4. conn.exec("create database if not exists demo").await?;
  5. // change database context
  6. conn.exec("use demo").await?;
  7. // create table
  8. conn.exec("create table if not exists tb1 (ts timestamp, v int)").await?;
  9. // insert
  10. conn.exec("insert into tb1 values(now, 1)").await?;
  11. // query
  12. let rows = conn.query("select * from tb1").await?;
  13. for row in rows.rows {
  14. println!("{}", row.into_iter().join(","));
  15. }
  16. }

使用示例

写入数据

SQL 写入

  1. use libtaos::*;
  2. #[tokio::main]
  3. async fn main() -> Result<(), Error> {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.create_database("power").await?;
  6. taos.exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
  7. let sql = "INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
  8. power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
  9. power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
  10. power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
  11. let result = taos.query(sql).await?;
  12. println!("{:?}", result);
  13. Ok(())
  14. }
  15. // output:
  16. // TaosQueryData { column_meta: [ColumnMeta { name: "affected_rows", type_: Int, bytes: 4 }], rows: [[Int(8)]] }

查看源码

InfluxDB 行协议写入

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
  8. "meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
  9. "meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
  10. "meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"];
  11. let affected_rows = taos
  12. .schemaless_insert(
  13. &lines,
  14. TSDB_SML_LINE_PROTOCOL,
  15. TSDB_SML_TIMESTAMP_MILLISECONDS,
  16. )
  17. .unwrap();
  18. println!("affected_rows={}", affected_rows);
  19. }
  20. // run with: cargo run --example influxdb_line_example

查看源码

OpenTSDB Telnet 行协议写入

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = [
  8. "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  9. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  10. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  11. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  12. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  13. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  14. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  15. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  16. ];
  17. let affected_rows = taos
  18. .schemaless_insert(
  19. &lines,
  20. TSDB_SML_TELNET_PROTOCOL,
  21. TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
  22. )
  23. .unwrap();
  24. println!("affected_rows={}", affected_rows); // affected_rows=8
  25. }
  26. // run with: cargo run --example opentsdb_telnet_example

查看源码

OpenTSDB JSON 行协议写入

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = [
  8. r#"[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  9. {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
  10. {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
  11. {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#,
  12. ];
  13. let affected_rows = taos
  14. .schemaless_insert(
  15. &lines,
  16. TSDB_SML_JSON_PROTOCOL,
  17. TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
  18. )
  19. .unwrap();
  20. println!("affected_rows={}", affected_rows); // affected_rows=4
  21. }
  22. // run with: cargo run --example opentsdb_json_example

查看源码

查询数据

  1. use libtaos::*;
  2. fn taos_connect() -> Result<Taos, Error> {
  3. TaosCfgBuilder::default()
  4. .ip("localhost")
  5. .user("root")
  6. .pass("taosdata")
  7. .db("power")
  8. .port(6030u16)
  9. .build()
  10. .expect("TaosCfg builder error")
  11. .connect()
  12. }
  13. #[tokio::main]
  14. async fn main() -> Result<(), Error> {
  15. let taos = taos_connect().expect("connect error");
  16. let result = taos.query("SELECT ts, current FROM meters LIMIT 2").await?;
  17. // print column names
  18. let meta: Vec<ColumnMeta> = result.column_meta;
  19. for column in meta {
  20. print!("{}\t", column.name)
  21. }
  22. println!();
  23. // print rows
  24. let rows: Vec<Vec<Field>> = result.rows;
  25. for row in rows {
  26. for field in row {
  27. print!("{}\t", field);
  28. }
  29. println!();
  30. }
  31. Ok(())
  32. }
  33. // output:
  34. // ts current
  35. // 2022-03-28 09:56:51.249 10.3
  36. // 2022-03-28 09:56:51.749 12.6

查看源码

更多示例程序

程序路径程序说明
demo.rs基本API 使用示例
bailongma-rs使用 TDengine 作为存储后端的 Prometheus 远程存储 API 适配器,使用 r2d2 连接池

API 参考

连接构造器 API

Builder Pattern 构造器模式是 Rust 处理复杂数据类型或可选配置类型的解决方案。libtaos 实现中,使用连接构造器 TaosCfgBuilder 作为 TDengine Rust 连接器的入口。TaosCfgBuilder 提供对服务器、端口、数据库、用户名和密码等的可选配置。

使用 default() 方法可以构建一个默认参数的 TaosCfg,用于后续连接数据库或建立连接池。

  1. let cfg = TaosCfgBuilder::default().build()?;

使用构造器模式,用户可按需设置:

  1. let cfg = TaosCfgBuilder::default()
  2. .ip("127.0.0.1")
  3. .user("root")
  4. .pass("taosdata")
  5. .db("log")
  6. .port(6030u16)
  7. .build()?;

使用 TaosCfg 对象创建 TDengine 连接:

  1. let conn: Taos = cfg.connect();

连接池

在复杂应用中,建议启用连接池。libtaos 的连接池使用 r2d2 实现。

如下,可以生成一个默认参数的连接池。

  1. let pool = r2d2::Pool::new(cfg)?;

同样可以使用连接池的构造器,对连接池参数进行设置:

  1. use std::time::Duration;
  2. let pool = r2d2::Pool::builder()
  3. .max_size(5000) // max connections
  4. .max_lifetime(Some(Duration::from_minutes(100))) // lifetime of each connection
  5. .min_idle(Some(1000)) // minimal idle connections
  6. .connection_timeout(Duration::from_minutes(2))
  7. .build(cfg);

在应用代码中,使用 pool.get()? 来获取一个连接对象 Taos

  1. let taos = pool.get()?;

连接

Taos 结构体是 libtaos 中的连接管理者,主要提供了两个 API:

  1. exec: 执行某个非查询类 SQL 语句,例如 CREATEALTERINSERT 等。

    1. taos.exec().await?;
  2. query:执行查询语句,返回 TaosQueryData 对象。

    1. let q = taos.query("select * from log.logs").await?;

    TaosQueryData 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):

    列信息使用 [ColumnMeta] 存储:

    1. let cols = &q.column_meta;
    2. for col in cols {
    3. println!("name: {}, type: {:?}, bytes: {}", col.name, col.type_, col.bytes);
    4. }

    逐行获取数据:

    1. for (i, row) in q.rows.iter().enumerate() {
    2. for (j, cell) in row.iter().enumerate() {
    3. println!("cell({}, {}) data: {}", i, j, cell);
    4. }
    5. }

需要注意的是,需要使用 Rust 异步函数和异步运行时。

Taos 提供部分 SQL 的 Rust 方法化以减少 format! 代码块的频率:

  • .describe(table: &str): 执行 DESCRIBE 并返回一个 Rust 数据结构。
  • .create_database(database: &str): 执行 CREATE DATABASE 语句。
  • .use_database(database: &str): 执行 USE 语句。

除此之外,该结构也是 参数绑定行协议接口 的入口,使用方法请参考具体的 API 说明。

参数绑定接口

与 C 接口类似,Rust 提供参数绑定接口。首先,通过 Taos 对象创建一个 SQL 语句的参数绑定对象 Stmt

  1. let mut stmt: Stmt = taos.stmt("insert into ? values(?,?)")?;

参数绑定对象提供了一组接口用于实现参数绑定:

.set_tbname(tbname: impl ToCString)

用于绑定表名。

.set_tbname_tags(tbname: impl ToCString, tags: impl IntoParams)

当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

  1. let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(?,?)")?;
  2. // tags can be created with any supported type, here is an example using JSON
  3. let v = Field::Json(serde_json::from_str("{\"tag1\":\"一二三四五六七八九十\"}").unwrap());
  4. stmt.set_tbname_tags("tb0", [&tag])?;
.bind(params: impl IntoParams)

用于绑定值类型。使用 Field 结构体构建需要的类型并绑定:

  1. let ts = Field::Timestamp(Timestamp::now());
  2. let value = Field::Float(0.0);
  3. stmt.bind(vec![ts, value].iter())?;
.execute()

执行 SQL。Stmt 对象可以复用,在执行后可以重新绑定并执行。

  1. stmt.execute()?;
  2. // next bind cycle.
  3. //stmt.set_tbname()?;
  4. //stmt.bind()?;
  5. //stmt.execute()?;

行协议接口

行协议接口支持多种模式和不同精度,需要引入 schemaless 模块中的常量以进行设置:

  1. use libtaos::*;
  2. use libtaos::schemaless::*;
  • InfluxDB 行协议

    1. let lines = [
    2. "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"pass\",c2=false 1626006833639000000"
    3. "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"abc\",c4=4f64 1626006833639000000"
    4. ];
    5. taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANOSECONDS)?;
  • OpenTSDB Telnet 协议

    1. let lines = ["sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"];
    2. taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)?;
  • OpenTSDB JSON 协议

    1. let lines = [r#"
    2. {
    3. "metric": "st",
    4. "timestamp": 1626006833,
    5. "value": 10,
    6. "tags": {
    7. "t1": true,
    8. "t2": false,
    9. "t3": 10,
    10. "t4": "123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>"
    11. }
    12. }"#];
    13. taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)?;

其他相关结构体 API 使用说明请移步 Rust 文档托管网页:https://docs.rs/libtaos