OpenTSDB JSON 格式协议
协议介绍
OpenTSDB JSON 格式协议采用一个 JSON 字符串表示一行或多行数据。例如:
[
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
},
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 9,
"tags": {
"host": "web02",
"dc": "lga"
}
}
]
与 OpenTSDB 行协议类似, metric 将作为超级表名, timestamp 表示时间戳,value 表示度量值, tags 表示标签集。
note
- 对于 JSON 格式协议,TDengine 并不会自动把所有标签转成 NCHAR 类型, 字符串将将转为 NCHAR 类型, 数值将同样转换为 DOUBLE 类型。
- 默认生成的子表名是根据规则生成的唯一 ID 值。用户也可以通过在 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为
"tags": { "host": "web02","dc": "lga","tname":"cpu1"}
则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。
示例代码
- Java
- Python
- Go
- Node.js
- C#
- C
package com.taos.example;
import com.taosdata.jdbc.SchemalessWriter;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class JSONProtocolExample {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
private static void createDatabase(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS test");
stmt.execute("USE test");
}
}
private static String getJSONData() {
return "[{\"metric\": \"meters.current\", \"timestamp\": 1648432611249, \"value\": 10.3, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
" {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611249, \"value\": 219, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}, " +
"{\"metric\": \"meters.current\", \"timestamp\": 1648432611250, \"value\": 12.6, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
" {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611250, \"value\": 221, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}]";
}
public static void main(String[] args) throws SQLException {
try (Connection conn = getConnection()) {
createDatabase(conn);
SchemalessWriter writer = new SchemalessWriter(conn);
String jsonData = getJSONData();
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
}
}
}
import json
import taos
from taos import SmlProtocol, SmlPrecision
lines = [{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219,
"tags": {"location": "California.LosAngeles", "groupid": 1}},
{"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6,
"tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]
def get_connection():
return taos.connect()
def create_database(conn):
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
def insert_lines(conn):
global lines
lines = json.dumps(lines)
# note: the first parameter must be a list with only one element.
affected_rows = conn.schemaless_insert(
[lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
print(affected_rows) # 4
if __name__ == '__main__':
connection = get_connection()
try:
create_database(connection)
insert_lines(connection)
finally:
connection.close()
package main
import (
"log"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
_, err := conn.Exec("CREATE DATABASE test")
if err != nil {
panic(err)
}
_, err = conn.Exec("USE test")
if err != nil {
panic(err)
}
}
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
payload := `[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
{"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]`
err = conn.OpenTSDBInsertJsonPayload(payload)
if err != nil {
log.Fatalln("insert error:", err)
}
}
const taos = require("@tdengine/client");
const conn = taos.connect({
host: "localhost",
});
const cursor = conn.cursor();
function createDatabase() {
cursor.execute("CREATE DATABASE test");
cursor.execute("USE test");
}
function insertData() {
const lines = [
{
metric: "meters.current",
timestamp: 1648432611249,
value: 10.3,
tags: { location: "California.SanFrancisco", groupid: 2 },
},
{
metric: "meters.voltage",
timestamp: 1648432611249,
value: 219,
tags: { location: "California.LosAngeles", groupid: 1 },
},
{
metric: "meters.current",
timestamp: 1648432611250,
value: 12.6,
tags: { location: "California.SanFrancisco", groupid: 2 },
},
{
metric: "meters.voltage",
timestamp: 1648432611250,
value: 221,
tags: { location: "California.LosAngeles", groupid: 1 },
},
];
cursor.schemalessInsert(
[JSON.stringify(lines)],
taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL,
taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_NOT_CONFIGURED
);
}
try {
createDatabase();
insertData();
} finally {
cursor.close();
conn.close();
}
using TDengineDriver;
namespace TDengineExample
{
internal class OptsJsonExample
{
static void Main()
{
IntPtr conn = GetConnection();
try
{
PrepareDatabase(conn);
string[] lines = { "[{\"metric\": \"meters.current\", \"timestamp\": 1648432611249, \"value\": 10.3, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
" {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611249, \"value\": 219, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}, " +
"{\"metric\": \"meters.current\", \"timestamp\": 1648432611250, \"value\": 12.6, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": 2}}," +
" {\"metric\": \"meters.voltage\", \"timestamp\": 1648432611250, \"value\": 221, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}}]"
};
IntPtr res = TDengine.SchemalessInsert(conn, lines, 1, (int)TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL, (int)TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
if (TDengine.ErrorNo(res) != 0)
{
throw new Exception("SchemalessInsert failed since " + TDengine.Error(res));
}
else
{
int affectedRows = TDengine.AffectRows(res);
Console.WriteLine($"SchemalessInsert success, affected {affectedRows} rows");
}
TDengine.FreeResult(res);
}
finally
{
TDengine.Close(conn);
}
}
static IntPtr GetConnection()
{
string host = "localhost";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
throw new Exception("Connect to TDengine failed");
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
static void PrepareDatabase(IntPtr conn)
{
IntPtr res = TDengine.Query(conn, "CREATE DATABASE test");
if (TDengine.ErrorNo(res) != 0)
{
throw new Exception("failed to create database, reason: " + TDengine.Error(res));
}
res = TDengine.Query(conn, "USE test");
if (TDengine.ErrorNo(res) != 0)
{
throw new Exception("failed to change database, reason: " + TDengine.Error(res));
}
}
}
}
int main() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", "", 6030);
if (taos == NULL) {
printf("failed to connect to server\n");
exit(EXIT_FAILURE);
}
executeSQL(taos, "DROP DATABASE IF EXISTS test");
executeSQL(taos, "CREATE DATABASE test");
executeSQL(taos, "USE test");
char *line =
"[{\"metric\": \"meters.current\", \"timestamp\": 1648432611249, \"value\": 10.3, \"tags\": {\"location\": "
"\"California.SanFrancisco\", \"groupid\": 2}},{\"metric\": \"meters.voltage\", \"timestamp\": 1648432611249, "
"\"value\": 219, \"tags\": {\"location\": \"California.LosAngeles\", \"groupid\": 1}},{\"metric\": \"meters.current\", "
"\"timestamp\": 1648432611250, \"value\": 12.6, \"tags\": {\"location\": \"California.SanFrancisco\", \"groupid\": "
"2}},{\"metric\": \"meters.voltage\", \"timestamp\": 1648432611250, \"value\": 221, \"tags\": {\"location\": "
"\"California.LosAngeles\", \"groupid\": 1}}]";
char *lines[] = {line};
TAOS_RES *res = taos_schemaless_insert(taos, lines, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
if (taos_errno(res) != 0) {
printf("failed to insert schema-less data, reason: %s\n", taos_errstr(res));
} else {
int affectedRow = taos_affected_rows(res);
printf("successfully inserted %d rows\n", affectedRow);
}
taos_free_result(res);
taos_close(taos);
taos_cleanup();
}
// output:
// successfully inserted 4 rows
以上示例代码会自动创建 2 个超级表, 每个超级表有 2 条数据。
taos> USE test;
Database changed.
taos> SHOW STABLES;
name |
=================================
meters.current |
meters.voltage |
Query OK, 2 row(s) in set (0.001954s)
taos> SELECT * FROM `meters.current`;
_ts | _value | groupid | location |
===================================================================================================================
2022-03-28 09:56:51.249 | 10.300000000 | 2.000000000 | California.SanFrancisco |
2022-03-28 09:56:51.250 | 12.600000000 | 2.000000000 | California.SanFrancisco |
Query OK, 2 row(s) in set (0.004076s)
SQL 查询示例
meters.voltage
是插入数据的超级表名。
可以通过超级表的 TAG 来过滤数据,比如查询 location=California.LosAngeles groupid=1
可以通过如下 SQL:
SELECT * FROM `meters.current` WHERE location = "California.LosAngeles" AND groupid = 3;