TDengine Go Connector
driver-go
是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 database/sql 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
driver-go
提供两种建立连接的方式。一种是原生连接,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 REST 连接,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。
本文介绍如何安装 driver-go
,并通过 driver-go
连接 TDengine 集群、进行数据查询、数据写入等基本操作。
driver-go
的源码托管在 GitHub。
支持的平台
原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 REST 连接支持所有能运行 Go 的平台。
版本支持
请参考版本支持列表
支持的功能特性
原生连接
“原生连接”指连接器通过 TDengine 客户端驱动(taosc)直接与 TDengine 运行实例建立的连接。支持的功能特性有:
- 普通查询
- 连续查询
- 订阅
- schemaless 接口
- 参数绑定接口
REST 连接
“REST 连接”指连接器通过 taosAdapter 组件提供的 REST API 与 TDengine 运行实例建立的连接。支持的功能特性有:
- 普通查询
- 连续查询
安装步骤
安装前准备
- 安装 Go 开发环境(Go 1.14 及以上,GCC 4.8.5 及以上)
- 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动
配置好环境变量,检查命令:
go env
gcc -v
使用 go get 安装
go get -u github.com/taosdata/driver-go/v3@latest
使用 go mod 管理
使用
go mod
命令初始化项目:go mod init taos-demo
引入 taosSql :
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
使用
go mod tidy
更新依赖包:go mod tidy
使用
go run taos-demo
运行程序或使用go build
命令编译出二进制文件。go run taos-demo
go build
建立连接
数据源名称(DSN)
数据源名称具有通用格式,例如 PEAR DB,但没有类型前缀(方括号表示可选):
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...¶mN=valueN]
完整形式的 DSN:
username:password@protocol(address)/dbname?param=value
使用连接器进行连接
- 原生连接
- REST 连接
- WebSocket 连接
taosSql 通过 cgo 实现了 Go 的 database/sql/driver
接口。只需要引入驱动就可以使用 database/sql 的接口。
使用 taosSql
作为 driverName
并且使用一个正确的 DSN 作为 dataSourceName
,DSN 支持的参数:
- configPath 指定 taos.cfg 目录
示例:
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
func main() {
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
}
taosRestful 通过 http client
实现了 Go 的 database/sql/driver
接口。只需要引入驱动就可以使用database/sql的接口。
使用 taosRestful
作为 driverName
并且使用一个正确的 DSN 作为 dataSourceName
,DSN 支持的参数:
disableCompression
是否接受压缩数据,默认为 true 不接受压缩数据,如果传输数据使用 gzip 压缩设置为 false。readBufferSize
读取数据的缓存区大小默认为 4K(4096),当查询结果数据量多时可以适当调大该值。
示例:
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
}
taosWS 通过 WebSocket
实现了 Go 的 database/sql/driver
接口。只需要引入驱动(driver-go 最低版本 3.0.2)就可以使用database/sql的接口。
使用 taosWS
作为 driverName
并且使用一个正确的 DSN 作为 dataSourceName
,DSN 支持的参数:
writeTimeout
通过 WebSocket 发送数据的超时时间。readTimeout
通过 WebSocket 接收响应数据的超时时间。
示例:
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v3/taosWS"
)
func main() {
var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
}
使用示例
写入数据
SQL 写入
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func createStable(taos *sql.DB) {
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
log.Fatalln("failed to create stable, err:", err)
}
}
func insertData(taos *sql.DB) {
sql := `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
result, err := taos.Exec(sql)
if err != nil {
log.Fatalln("failed to insert, err:", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Fatalln("failed to get affected rows, err:", err)
}
fmt.Println("RowsAffected", rowsAffected)
}
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
createStable(taos)
insertData(taos)
}
InfluxDB 行协议写入
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
_, err := conn.Exec("CREATE DATABASE test")
if err != nil {
panic(err)
}
_, err = conn.Exec("USE test")
if err != nil {
panic(err)
}
}
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
var lines = []string{
"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
"meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
"meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
"meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250",
}
err = conn.InfluxDBInsertLines(lines, "ms")
if err != nil {
log.Fatalln("insert error:", err)
}
}
OpenTSDB Telnet 行协议写入
package main
import (
"log"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
_, err := conn.Exec("CREATE DATABASE test")
if err != nil {
panic(err)
}
_, err = conn.Exec("USE test")
if err != nil {
panic(err)
}
}
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
var lines = []string{
"meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
"meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
}
err = conn.OpenTSDBInsertTelnetLines(lines)
if err != nil {
log.Fatalln("insert error:", err)
}
}
OpenTSDB JSON 行协议写入
package main
import (
"log"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
_, err := conn.Exec("CREATE DATABASE test")
if err != nil {
panic(err)
}
_, err = conn.Exec("USE test")
if err != nil {
panic(err)
}
}
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
payload := `[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
{"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]`
err = conn.OpenTSDBInsertJsonPayload(payload)
if err != nil {
log.Fatalln("insert error:", err)
}
}
查询数据
package main
import (
"database/sql"
"log"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/power"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
rows, err := taos.Query("SELECT ts, current FROM meters LIMIT 2")
if err != nil {
log.Fatalln("failed to select from table, err:", err)
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
current float32
}
err := rows.Scan(&r.ts, &r.current)
if err != nil {
log.Fatalln("scan error:\n", err)
return
}
log.Println(r.ts, r.current)
}
}
更多示例程序
使用限制
由于 REST 接口无状态所以 use db
语法不会生效,需要将 db 名称放到 SQL 语句中,如:create table if not exists tb1 (ts timestamp, a int)
改为create table if not exists test.tb1 (ts timestamp, a int)
否则将报错[0x217] Database not specified or available
。
也可以将 db 名称放到 DSN 中,将 root:taosdata@http(localhost:6041)/
改为 root:taosdata@http(localhost:6041)/test
。当指定的 db 不存在时执行 create database
语句不会报错,而执行针对该 db 的其他查询或写入操作会报错。
完整示例如下:
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/test"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rows, err := taos.Query("select * from tb1")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
}
err := rows.Scan(&r.ts, &r.a)
if err != nil {
fmt.Println("scan error:\n", err)
return
}
fmt.Println(r.ts, r.a)
}
}
常见问题
database/sql 中 stmt(参数绑定)相关接口崩溃
REST 不支持参数绑定相关接口,建议使用
db.Exec
和db.Query
。使用
use db
语句后执行其他语句报错[0x217] Database not specified or available
在 REST 接口中 SQL 语句的执行无上下文关联,使用
use db
语句不会生效,解决办法见上方使用限制章节。使用 taosSql 不报错使用 taosRestful 报错
[0x217] Database not specified or available
因为 REST 接口无状态,使用
use db
语句不会生效,解决办法见上方使用限制章节。readBufferSize
参数调大后无明显效果readBufferSize
调大后会减少获取结果时syscall
的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。disableCompression
参数设置为false
时查询效率降低当
disableCompression
参数设置为false
时查询结果会使用gzip
压缩后传输,拿到数据后要先进行gzip
解压。go get
命令无法获取包,或者获取包超时设置 Go 代理
go env -w GOPROXY=https://goproxy.cn,direct
。
常用 API
database/sql API
sql.Open(DRIVER_NAME string, dataSourceName string) *DB
该 API 用来打开 DB,返回一个类型为 *DB 的对象。
info
该 API 成功创建的时候,并没有做权限等检查,只有在真正执行 Query 或者 Exec 的时候才能真正的去创建连接,并同时检查 user/password/host/port 是不是合法。
func (db *DB) Exec(query string, args ...interface{}) (Result, error)
sql.Open
内置的方法,用来执行非查询相关 SQL。func (db *DB) Query(query string, args ...interface{}) (*Rows, error)
sql.Open
内置的方法,用来执行查询语句。
高级功能(af)API
af
包封装了连接管理、订阅、schemaless、参数绑定等 TDengine 高级功能。
连接管理
af.Open(host, user, pass, db string, port int) (*Connector, error)
该 API 通过 cgo 创建与 taosd 的连接。
func (conn *Connector) Close() error
关闭与 taosd 的连接。
订阅
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
创建消费者。
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
注意:出于兼容目的保留rebalanceCb
参数,当前未使用订阅单个主题。
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
注意:出于兼容目的保留rebalanceCb
参数,当前未使用订阅主题。
func (c *Consumer) Poll(timeoutMs int) tmq.Event
轮询消息。
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
注意:出于兼容目的保留tmq.TopicPartition
参数,当前未使用提交消息。
func (c *Consumer) Close() error
关闭连接。
schemaless
func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error
写入 InfluxDB 行协议。
func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error
写入 OpenTDSB telnet 协议数据。
func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error
写入 OpenTSDB JSON 协议数据。
参数绑定
func (conn *Connector) StmtExecute(sql string, params *param.Param) (res driver.Result, err error)
参数绑定单行插入。
func (conn *Connector) InsertStmt() *insertstmt.InsertStmt
初始化参数。
func (stmt *InsertStmt) Prepare(sql string) error
参数绑定预处理 SQL 语句。
func (stmt *InsertStmt) SetTableName(name string) error
参数绑定设置表名。
func (stmt *InsertStmt) SetSubTableName(name string) error
参数绑定设置子表名。
func (stmt *InsertStmt) BindParam(params []*param.Param, bindType *param.ColumnType) error
参数绑定多行数据。
func (stmt *InsertStmt) AddBatch() error
添加到参数绑定批处理。
func (stmt *InsertStmt) Execute() error
执行参数绑定。
func (stmt *InsertStmt) GetAffectedRows() int
获取参数绑定插入受影响行数。
func (stmt *InsertStmt) Close() error
结束参数绑定。
通过 WebSocket 订阅
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
创建消费者。
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
注意:出于兼容目的保留rebalanceCb
参数,当前未使用订阅单个主题。
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
注意:出于兼容目的保留rebalanceCb
参数,当前未使用订阅主题。
func (c *Consumer) Poll(timeoutMs int) tmq.Event
轮询消息。
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
注意:出于兼容目的保留tmq.TopicPartition
参数,当前未使用提交消息。
func (c *Consumer) Close() error
关闭连接。
完整订阅示例参见 GitHub 示例文件
通过 WebSocket 进行参数绑定
func NewConnector(config *Config) (*Connector, error)
创建连接。
func (c *Connector) Init() (*Stmt, error)
初始化参数。
func (c *Connector) Close() error
关闭连接。
func (s *Stmt) Prepare(sql string) error
参数绑定预处理 SQL 语句。
func (s *Stmt) SetTableName(name string) error
参数绑定设置表名。
func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType) error
参数绑定设置标签。
func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error
参数绑定多行数据。
func (s *Stmt) AddBatch() error
添加到参数绑定批处理。
func (s *Stmt) Exec() error
执行参数绑定。
func (s *Stmt) GetAffectedRows() int
获取参数绑定插入受影响行数。
func (s *Stmt) Close() error
结束参数绑定。
完整参数绑定示例参见 GitHub 示例文件
API 参考
全部 API 见 driver-go 文档