OpenTSDB 行协议

协议介绍

OpenTSDB 行协议同样采用一行字符串来表示一行数据。OpenTSDB 采用的是单列模型,因此一行只能包含一个普通数据列。标签列依然可以有多个。分为四部分,具体格式约定如下:

  1. <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
  • metric 将作为超级表名。
  • timestamp 本行数据对应的时间戳。根据时间戳的长度自动识别时间精度。支持秒和毫秒两种时间精度
  • value 度量值,必须为一个数值。对应的列名也是 “value”。
  • 最后一部分是标签集, 用空格分隔不同标签, 所有标签自动转化为 nchar 数据类型;

例如:

  1. meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3

参考OpenTSDB Telnet API 文档

示例代码

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  1. package com.taos.example;
  2. import com.taosdata.jdbc.SchemalessWriter;
  3. import com.taosdata.jdbc.enums.SchemalessProtocolType;
  4. import com.taosdata.jdbc.enums.SchemalessTimestampType;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.SQLException;
  8. import java.sql.Statement;
  9. public class TelnetLineProtocolExample {
  10. // format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
  11. private static String[] lines = { "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  12. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  13. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  14. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  15. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  16. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  17. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  18. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  19. };
  20. private static Connection getConnection() throws SQLException {
  21. String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
  22. return DriverManager.getConnection(jdbcUrl);
  23. }
  24. private static void createDatabase(Connection conn) throws SQLException {
  25. try (Statement stmt = conn.createStatement()) {
  26. // the default precision is ms (microsecond), but we use us(microsecond) here.
  27. stmt.execute("CREATE DATABASE IF NOT EXISTS test precision 'us'");
  28. stmt.execute("USE test");
  29. }
  30. }
  31. public static void main(String[] args) throws SQLException {
  32. try (Connection conn = getConnection()) {
  33. createDatabase(conn);
  34. SchemalessWriter writer = new SchemalessWriter(conn);
  35. writer.write(lines, SchemalessProtocolType.TELNET, SchemalessTimestampType.NOT_CONFIGURED);
  36. }
  37. }
  38. }

查看源码

  1. import taos
  2. from taos import SmlProtocol, SmlPrecision
  3. # format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
  4. lines = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  5. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  6. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  7. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  8. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  9. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  10. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  11. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  12. ]
  13. # create connection use firstEp in taos.cfg.
  14. def get_connection():
  15. return taos.connect()
  16. def create_database(conn):
  17. conn.execute("CREATE DATABASE test")
  18. conn.execute("USE test")
  19. def insert_lines(conn):
  20. affected_rows = conn.schemaless_insert(
  21. lines, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
  22. print(affected_rows) # 8
  23. if __name__ == '__main__':
  24. connection = get_connection()
  25. try:
  26. create_database(connection)
  27. insert_lines(connection)
  28. finally:
  29. connection.close()

查看源码

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/taosdata/driver-go/v2/af"
  5. )
  6. func prepareDatabase(conn *af.Connector) {
  7. _, err := conn.Exec("CREATE DATABASE test")
  8. if err != nil {
  9. panic(err)
  10. }
  11. _, err = conn.Exec("USE test")
  12. if err != nil {
  13. panic(err)
  14. }
  15. }
  16. func main() {
  17. conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
  18. if err != nil {
  19. fmt.Println("fail to connect, err:", err)
  20. }
  21. defer conn.Close()
  22. prepareDatabase(conn)
  23. var lines = []string{
  24. "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  25. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  26. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  27. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  28. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  29. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  30. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  31. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  32. }
  33. err = conn.OpenTSDBInsertTelnetLines(lines)
  34. if err != nil {
  35. fmt.Println("insert error:", err)
  36. }
  37. }

查看源码

  1. use libtaos::schemaless::*;
  2. use libtaos::*;
  3. fn main() {
  4. let taos = TaosCfg::default().connect().expect("fail to connect");
  5. taos.raw_query("CREATE DATABASE test").unwrap();
  6. taos.raw_query("USE test").unwrap();
  7. let lines = [
  8. "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  9. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  10. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  11. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  12. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  13. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  14. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  15. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  16. ];
  17. let affected_rows = taos
  18. .schemaless_insert(
  19. &lines,
  20. TSDB_SML_TELNET_PROTOCOL,
  21. TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
  22. )
  23. .unwrap();
  24. println!("affected_rows={}", affected_rows); // affected_rows=8
  25. }
  26. // run with: cargo run --example opentsdb_telnet_example

查看源码

  1. const taos = require("td2.0-connector");
  2. const conn = taos.connect({
  3. host: "localhost",
  4. });
  5. const cursor = conn.cursor();
  6. function createDatabase() {
  7. cursor.execute("CREATE DATABASE test");
  8. cursor.execute("USE test");
  9. }
  10. function insertData() {
  11. const lines = [
  12. "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  13. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  14. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  15. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  16. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  17. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  18. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  19. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  20. ];
  21. cursor.schemalessInsert(
  22. lines,
  23. taos.SCHEMALESS_PROTOCOL.TSDB_SML_TELNET_PROTOCOL,
  24. taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_NOT_CONFIGURED
  25. );
  26. }
  27. try {
  28. createDatabase();
  29. insertData();
  30. } finally {
  31. cursor.close();
  32. conn.close();
  33. }

查看源码

  1. using TDengineDriver;
  2. namespace TDengineExample
  3. {
  4. internal class OptsTelnetExample
  5. {
  6. static void Main()
  7. {
  8. IntPtr conn = GetConnection();
  9. PrepareDatabase(conn);
  10. string[] lines = {
  11. "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  12. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  13. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  14. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  15. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  16. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  17. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  18. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  19. };
  20. IntPtr res = TDengine.SchemalessInsert(conn, lines, lines.Length, (int)TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL, (int)TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
  21. if (TDengine.ErrorNo(res) != 0)
  22. {
  23. Console.WriteLine("SchemalessInsert failed since " + TDengine.Error(res));
  24. ExitProgram(conn, 1);
  25. }
  26. else
  27. {
  28. int affectedRows = TDengine.AffectRows(res);
  29. Console.WriteLine($"SchemalessInsert success, affected {affectedRows} rows");
  30. }
  31. TDengine.FreeResult(res);
  32. ExitProgram(conn, 0);
  33. }
  34. static IntPtr GetConnection()
  35. {
  36. string host = "localhost";
  37. short port = 6030;
  38. string username = "root";
  39. string password = "taosdata";
  40. string dbname = "";
  41. var conn = TDengine.Connect(host, username, password, dbname, port);
  42. if (conn == IntPtr.Zero)
  43. {
  44. Console.WriteLine("Connect to TDengine failed");
  45. TDengine.Cleanup();
  46. Environment.Exit(1);
  47. }
  48. else
  49. {
  50. Console.WriteLine("Connect to TDengine success");
  51. }
  52. return conn;
  53. }
  54. static void PrepareDatabase(IntPtr conn)
  55. {
  56. IntPtr res = TDengine.Query(conn, "CREATE DATABASE test");
  57. if (TDengine.ErrorNo(res) != 0)
  58. {
  59. Console.WriteLine("failed to create database, reason: " + TDengine.Error(res));
  60. ExitProgram(conn, 1);
  61. }
  62. res = TDengine.Query(conn, "USE test");
  63. if (TDengine.ErrorNo(res) != 0)
  64. {
  65. Console.WriteLine("failed to change database, reason: " + TDengine.Error(res));
  66. ExitProgram(conn, 1);
  67. }
  68. }
  69. static void ExitProgram(IntPtr conn, int exitCode)
  70. {
  71. TDengine.Close(conn);
  72. TDengine.Cleanup();
  73. Environment.Exit(exitCode);
  74. }
  75. }
  76. }

查看源码

  1. int main() {
  2. TAOS *taos = taos_connect("localhost", "root", "taosdata", "", 6030);
  3. if (taos == NULL) {
  4. printf("failed to connect to server\n");
  5. exit(EXIT_FAILURE);
  6. }
  7. executeSQL(taos, "DROP DATABASE IF EXISTS test");
  8. executeSQL(taos, "CREATE DATABASE test");
  9. executeSQL(taos, "USE test");
  10. char *lines[] = {
  11. "meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
  12. "meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
  13. "meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
  14. "meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
  15. "meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
  16. "meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
  17. "meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
  18. "meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
  19. };
  20. TAOS_RES *res = taos_schemaless_insert(taos, lines, 8, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
  21. if (taos_errno(res) != 0) {
  22. printf("failed to insert schema-less data, reason: %s\n", taos_errstr(res));
  23. } else {
  24. int affectedRow = taos_affected_rows(res);
  25. printf("successfully inserted %d rows\n", affectedRow);
  26. }
  27. taos_free_result(res);
  28. taos_close(taos);
  29. taos_cleanup();
  30. }
  31. // output:
  32. // successfully inserted 8 rows

查看源码

以上示例代码会自动创建 2 个超级表, 每个超级表有 4 条数据。

  1. taos> use test;
  2. Database changed.
  3. taos> show stables;
  4. name | created_time | columns | tags | tables |
  5. ============================================================================================
  6. meters.current | 2022-03-30 17:04:10.877 | 2 | 2 | 2 |
  7. meters.voltage | 2022-03-30 17:04:10.882 | 2 | 2 | 2 |
  8. Query OK, 2 row(s) in set (0.002544s)
  9. taos> select tbname, * from `meters.current`;
  10. tbname | ts | value | groupid | location |
  11. ==================================================================================================================================
  12. t_0e7bcfa21a02331c06764f275... | 2022-03-28 09:56:51.249 | 10.800000000 | 3 | California.LosAngeles |
  13. t_0e7bcfa21a02331c06764f275... | 2022-03-28 09:56:51.250 | 11.300000000 | 3 | California.LosAngeles |
  14. t_7e7b26dd860280242c6492a16... | 2022-03-28 09:56:51.249 | 10.300000000 | 2 | California.SanFrancisco |
  15. t_7e7b26dd860280242c6492a16... | 2022-03-28 09:56:51.250 | 12.600000000 | 2 | California.SanFrancisco |
  16. Query OK, 4 row(s) in set (0.005399s)