TDengine Rust Connector
taos
是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。
taos
提供两种建立连接的方式。一种是原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features
)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。
该 Rust 连接器的源码托管在 GitHub。
支持的平台
原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。
版本支持
请参考版本支持列表
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
安装
安装前准备
- 安装 Rust 开发工具链
- 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动
添加 taos 依赖
根据选择的连接方式,按照如下说明在 Rust 项目中添加 taos 依赖:
- 同时支持
- 仅 Websocket
- 仅原生连接
在 Cargo.toml
文件中添加 taos:
[dependencies]
# use default feature
taos = "*"
在 Cargo.toml
文件中添加 taos,并启用 ws
特性。
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
在 Cargo.toml
文件中添加 taos,并启用 native
特性:
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
建立连接
TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。
let builder = TaosBuilder::from_dsn("taos://")?;
现在您可以使用该对象创建连接:
let conn = builder.build()?;
连接对象可以创建多个:
let conn1 = builder.build()?;
let conn2 = builder.build()?;
DSN 描述字符串基本结构如下:
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |
各部分意义见下表:
- driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
- taos: 表名使用 TDengine 连接器驱动。
- tmq: 使用 TMQ 订阅数据。
- http/ws: 使用 Websocket 创建连接。
- https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- protocol: 显示指定以何种方式建立连接,例如:
taos+ws://localhost:6041
指定以 Websocket 方式建立连接。 - username/password: 用于创建连接的用户名及密码。
- host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(
taos://
),原生连接默认为localhost:6030
,Websocket 连接默认为localhost:6041
。 - database: 指定默认连接的数据库名,可选参数。
- params:其他可选参数。
一个完整的 DSN 描述字符串示例如下:
taos+ws://localhost:6041/test
表示使用 Websocket(ws
)方式通过 6041
端口连接服务器 localhost
,并指定默认数据库为 test
。
这使得用户可以通过 DSN 指定连接方式:
use taos::*;
// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();
// use websocket protocol.
let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
建立连接后,您可以进行相关数据库操作:
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let values = result.
}
查询数据可以通过两种方式:使用内建类型或 serde 序列化框架。
// Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
使用示例
写入数据
SQL 写入
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build()?;
taos.exec_many([
"DROP DATABASE IF EXISTS power",
"CREATE DATABASE power",
"USE power",
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
]).await?;
let inserted = taos.exec("INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000)
('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2)
VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3)
VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)").await?;
assert_eq!(inserted, 8);
Ok(())
}
STMT 写入
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build()?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let mut stmt = Stmt::init(&taos)?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
// bind table name and tags
stmt.set_tbname_tags(
"d1001",
&[
Value::VarChar("California.SanFransico".into()),
Value::Int(2),
],
)?;
// bind values.
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249]),
ColumnView::from_floats(vec![10.3]),
ColumnView::from_ints(vec![219]),
ColumnView::from_floats(vec![0.31]),
];
stmt.bind(&values)?;
// bind one more row
let values2 = vec![
ColumnView::from_millis_timestamp(vec![1648432611749]),
ColumnView::from_floats(vec![12.6]),
ColumnView::from_ints(vec![218]),
ColumnView::from_floats(vec![0.33]),
];
stmt.bind(&values2)?;
stmt.add_batch()?;
// execute.
let rows = stmt.execute()?;
assert_eq!(rows, 2);
Ok(())
}
查询数据
use taos::sync::*;
fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
// print column names
let meta = result.fields();
println!("{}", meta.iter().map(|field| field.name()).join("\t"));
// print rows
let rows = result.rows();
for row in rows {
let row = row?;
for (_name, value) in row {
print!("{}\t", value);
}
println!();
}
Ok(())
}
// output(suppose you are in +8 timezone):
// ts current
// 2018-10-03T14:38:05+08:00 10.3
// 2018-10-03T14:38:15+08:00 12.6
API 参考
连接构造器
通过 DSN 来构建一个连接器构造器。
let cfg = TaosBuilder::default().build()?;
使用 builder
对象创建多个连接:
let conn: Taos = cfg.build();
连接池
在复杂应用中,建议启用连接池。taos 的连接池使用 r2d2 实现。
如下,可以生成一个默认参数的连接池。
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
同样可以使用连接池的构造器,对连接池参数进行设置:
let dsn = "taos://localhost:6030";
let opts = PoolBuilder::new()
.max_size(5000) // max connections
.max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
.min_idle(Some(1000)) // minimal idle connections
.connection_timeout(Duration::from_secs(2));
let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;
在应用代码中,使用 pool.get()?
来获取一个连接对象 Taos。
let taos = pool.get()?;
连接
Taos 对象提供了多个数据库操作的 API:
exec
: 执行某个非查询类 SQL 语句,例如CREATE
,ALTER
,INSERT
等。let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
exec_many
: 同时(顺序)执行多个 SQL 语句。taos.exec_many([
"CREATE DATABASE test",
"USE test",
"CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
]).await?;
query
:执行查询语句,返回 [ResultSet] 对象。let mut q = taos.query("select * from log.logs").await?;
[ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):
列信息使用 [.fields()] 方法获取:
let cols = q.fields();
for col in cols {
println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
}
逐行获取数据:
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
或使用 serde 序列化框架。
#[derive(Debug, Deserialize)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
需要注意的是,需要使用 Rust 异步函数和异步运行时。
Taos 提供部分 SQL 的 Rust 方法化以减少 format!
代码块的频率:
.describe(table: &str)
: 执行DESCRIBE
并返回一个 Rust 数据结构。.create_database(database: &str)
: 执行CREATE DATABASE
语句。.use_database(database: &str)
: 执行USE
语句。
除此之外,该结构也是 参数绑定 和 行协议接口 的入口,使用方法请参考具体的 API 说明。
参数绑定接口
与 C 接口类似,Rust 提供参数绑定接口。首先,通过 Taos 对象创建一个 SQL 语句的参数绑定对象 Stmt:
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
参数绑定对象提供了一组接口用于实现参数绑定:
.set_tbname(name)
用于绑定表名。
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
.set_tags(&[tag])
当 SQL 语句使用超级表时,用于绑定子表表名和标签值:
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
.bind(&[column])
用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:
let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(¶ms)?.add_batch()?.execute()?;
.execute()
执行 SQL。Stmt 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 .add_batch
加入到执行队列中。
stmt.execute()?;
// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
一个可运行的示例请见 GitHub 上的示例。
订阅
TDengine 通过消息队列 TMQ 启动一个订阅。
从 DSN 开始,构建一个 TMQ 连接器。
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
创建消费者:
let mut consumer = tmq.build()?;
消费者可订阅一个或多个 TOPIC
。
consumer.subscribe(["tmq_meters"]).await?;
TMQ 消息队列是一个 futures::Stream 类型,可以使用相应 API 对每个消息进行消费,并通过 .commit
进行已消费标记。
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
停止订阅:
consumer.unsubscribe().await;
对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id
是必须的。
group.id
: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。client.id
: 可选的订阅客户端识别项。auto.offset.reset
: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个group.id
中仅生效一次。enable.auto.commit
: 当设置为true
时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。auto.commit.interval.ms
: 自动标记的时间间隔。
完整订阅示例参见 GitHub 示例文件.
其他相关结构体 API 使用说明请移步 Rust 文档托管网页:https://docs.rs/taos。