Query data

Introduction

SQL is used by TDengine as its query language. Application programs can send SQL statements to TDengine through REST API or connectors. TDengine’s CLI taos can also be used to execute ad hoc SQL queries. Here is the list of major query functionalities supported by TDengine:

  • Query on single column or multiple columns
  • Filter on tags or data columns:>, <, =, <>, like
  • Grouping of results: Group By
  • Sorting of results: Order By
  • Limit the number of results: Limit/Offset
  • Arithmetic on columns of numeric types or aggregate results
  • Join query with timestamp alignment
  • Aggregate functions: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff

For example, the SQL statement below can be executed in TDengine CLI taos to select records with voltage greater than 215 and limit the output to only 2 rows.

  1. select * from d1001 where voltage > 215 order by ts desc limit 2;
  1. taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
  2. ts | current | voltage | phase |
  3. ======================================================================================
  4. 2018-10-03 14:38:16.800 | 12.30000 | 221 | 0.31000 |
  5. 2018-10-03 14:38:15.000 | 12.60000 | 218 | 0.33000 |
  6. Query OK, 2 row(s) in set (0.001100s)

To meet the requirements of varied use cases, some special functions have been added in TDengine. Some examples are twa (Time Weighted Average), spread (The difference between the maximum and the minimum), and last_row (the last row). Furthermore, continuous query is also supported in TDengine.

For detailed query syntax please refer to Select.

Aggregation among Tables

In most use cases, there are always multiple kinds of data collection points. A new concept, called STable (abbreviation for super table), is used in TDengine to represent one type of data collection point, and a subtable is used to represent a specific data collection point of that type. Tags are used by TDengine to represent the static properties of data collection points. A specific data collection point has its own values for static properties. By specifying filter conditions on tags, aggregation can be performed efficiently among all the subtables created via the same STable, i.e. same type of data collection points. Aggregate functions applicable for tables can be used directly on STables; the syntax is exactly the same.

In summary, records across subtables can be aggregated by a simple query on their STable. It is like a join operation. However, tables belonging to different STables can not be aggregated.

Example 1

In TDengine CLI taos, use the SQL below to get the average voltage of all the meters in California grouped by location.

  1. taos> SELECT AVG(voltage) FROM meters GROUP BY location;
  2. avg(voltage) | location |
  3. =============================================================
  4. 222.000000000 | California.LosAngeles |
  5. 219.200000000 | California.SanFrancisco |
  6. Query OK, 2 row(s) in set (0.002136s)

Example 2

In TDengine CLI taos, use the SQL below to get the number of rows and the maximum current in the past 24 hours from meters whose groupId is 2.

  1. taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now - 24h;
  2. count(*) | max(current) |
  3. ==================================
  4. 5 | 13.4 |
  5. Query OK, 1 row(s) in set (0.002136s)

Join queries are only allowed between subtables of the same STable. In Select, all query operations are marked as to whether they support STables or not.

Down Sampling and Interpolation

In IoT use cases, down sampling is widely used to aggregate data by time range. The INTERVAL keyword in TDengine can be used to simplify the query by time window. For example, the SQL statement below can be used to get the sum of current every 10 seconds from meters table d1001.

  1. taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
  2. ts | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:00.000 | 10.300000191 |
  5. 2018-10-03 14:38:10.000 | 24.900000572 |
  6. Query OK, 2 row(s) in set (0.000883s)

Down sampling can also be used for STable. For example, the below SQL statement can be used to get the sum of current from all meters in California.

  1. taos> SELECT SUM(current) FROM meters where location like "California%" INTERVAL(1s);
  2. ts | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:04.000 | 10.199999809 |
  5. 2018-10-03 14:38:05.000 | 32.900000572 |
  6. 2018-10-03 14:38:06.000 | 11.500000000 |
  7. 2018-10-03 14:38:15.000 | 12.600000381 |
  8. 2018-10-03 14:38:16.000 | 36.000000000 |
  9. Query OK, 5 row(s) in set (0.001538s)

Down sampling also supports time offset. For example, the below SQL statement can be used to get the sum of current from all meters but each time window must start at the boundary of 500 milliseconds.

  1. taos> SELECT SUM(current) FROM meters INTERVAL(1s, 500a);
  2. ts | sum(current) |
  3. ======================================================
  4. 2018-10-03 14:38:04.500 | 11.189999809 |
  5. 2018-10-03 14:38:05.500 | 31.900000572 |
  6. 2018-10-03 14:38:06.500 | 11.600000000 |
  7. 2018-10-03 14:38:15.500 | 12.300000381 |
  8. 2018-10-03 14:38:16.500 | 35.000000000 |
  9. Query OK, 5 row(s) in set (0.001521s)

In many use cases, it’s hard to align the timestamp of the data collected by each collection point. However, a lot of algorithms like FFT require the data to be aligned with same time interval and application programs have to handle this by themselves. In TDengine, it’s easy to achieve the alignment using down sampling.

Interpolation can be performed in TDengine if there is no data in a time range.

For more details please refer to Aggregate by Window.

Examples

Query

In the section describing Insert, a database named power is created and some data are inserted into STable meters. Below sample code demonstrates how to query the data in this STable.

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  1. package com.taos.example;
  2. import java.sql.*;
  3. public class RestQueryExample {
  4. private static Connection getConnection() throws SQLException {
  5. String jdbcUrl = "jdbc:TAOS-RS://localhost:6041/power?user=root&password=taosdata";
  6. return DriverManager.getConnection(jdbcUrl);
  7. }
  8. private static void printRow(ResultSet rs) throws SQLException {
  9. ResultSetMetaData meta = rs.getMetaData();
  10. for (int i = 1; i <= meta.getColumnCount(); i++) {
  11. String value = rs.getString(i);
  12. System.out.print(value);
  13. System.out.print("\t");
  14. }
  15. System.out.println();
  16. }
  17. private static void printColName(ResultSet rs) throws SQLException {
  18. ResultSetMetaData meta = rs.getMetaData();
  19. for (int i = 1; i <= meta.getColumnCount(); i++) {
  20. String colLabel = meta.getColumnLabel(i);
  21. System.out.print(colLabel);
  22. System.out.print("\t");
  23. }
  24. System.out.println();
  25. }
  26. private static void processResult(ResultSet rs) throws SQLException {
  27. printColName(rs);
  28. while (rs.next()) {
  29. printRow(rs);
  30. }
  31. }
  32. private static void queryData() throws SQLException {
  33. try (Connection conn = getConnection()) {
  34. try (Statement stmt = conn.createStatement()) {
  35. ResultSet rs = stmt.executeQuery("SELECT AVG(voltage) FROM meters GROUP BY location");
  36. processResult(rs);
  37. }
  38. }
  39. }
  40. public static void main(String[] args) throws SQLException {
  41. queryData();
  42. }
  43. }
  44. // possible output:
  45. // avg(voltage) location
  46. // 222.0 California.LosAngeles
  47. // 219.0 California.SanFrancisco

view source code

Result set is iterated row by row.

  1. def query_api_demo(conn: taos.TaosConnection):
  2. result: taos.TaosResult = conn.query("SELECT tbname, * FROM meters LIMIT 2")
  3. print("field count:", result.field_count)
  4. print("meta of fields[1]:", result.fields[1])
  5. print("======================Iterate on result=========================")
  6. for row in result:
  7. print(row)
  8. # field count: 7
  9. # meta of fields[1]: {name: ts, type: 9, bytes: 8}
  10. # ======================Iterate on result=========================
  11. # ('d1003', datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 11.800000190734863, 221, 0.2800000011920929, 'california.losangeles', 2)
  12. # ('d1003', datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 13.399999618530273, 223, 0.28999999165534973, 'california.losangeles', 2)

view source code

Result set is retrieved as a whole, each row is converted to a dict and returned.

  1. def fetch_all_demo(conn: taos.TaosConnection):
  2. result: taos.TaosResult = conn.query("SELECT ts, current FROM meters LIMIT 2")
  3. rows = result.fetch_all_into_dict()
  4. print("row count:", result.row_count)
  5. print("===============all data===================")
  6. print(rows)
  7. # row count: 2
  8. # ===============all data===================
  9. # [{'ts': datetime.datetime(2018, 10, 3, 14, 38, 5, 500000), 'current': 11.800000190734863},
  10. # {'ts': datetime.datetime(2018, 10, 3, 14, 38, 16, 600000), 'current': 13.399999618530273}]

view source code

  1. package main
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "time"
  6. _ "github.com/taosdata/driver-go/v2/taosRestful"
  7. )
  8. func main() {
  9. var taosDSN = "root:taosdata@http(localhost:6041)/power"
  10. taos, err := sql.Open("taosRestful", taosDSN)
  11. if err != nil {
  12. fmt.Println("failed to connect TDengine, err:", err)
  13. return
  14. }
  15. defer taos.Close()
  16. rows, err := taos.Query("SELECT ts, current FROM meters LIMIT 2")
  17. if err != nil {
  18. fmt.Println("failed to select from table, err:", err)
  19. return
  20. }
  21. defer rows.Close()
  22. for rows.Next() {
  23. var r struct {
  24. ts time.Time
  25. current float32
  26. }
  27. err := rows.Scan(&r.ts, &r.current)
  28. if err != nil {
  29. fmt.Println("scan error:\n", err)
  30. return
  31. }
  32. fmt.Println(r.ts, r.current)
  33. }
  34. }

view source code

  1. use libtaos::*;
  2. fn taos_connect() -> Result<Taos, Error> {
  3. TaosCfgBuilder::default()
  4. .ip("localhost")
  5. .user("root")
  6. .pass("taosdata")
  7. .db("power")
  8. .port(6030u16)
  9. .build()
  10. .expect("TaosCfg builder error")
  11. .connect()
  12. }
  13. #[tokio::main]
  14. async fn main() -> Result<(), Error> {
  15. let taos = taos_connect().expect("connect error");
  16. let result = taos.query("SELECT ts, current FROM meters LIMIT 2").await?;
  17. // print column names
  18. let meta: Vec<ColumnMeta> = result.column_meta;
  19. for column in meta {
  20. print!("{}\t", column.name)
  21. }
  22. println!();
  23. // print rows
  24. let rows: Vec<Vec<Field>> = result.rows;
  25. for row in rows {
  26. for field in row {
  27. print!("{}\t", field);
  28. }
  29. println!();
  30. }
  31. Ok(())
  32. }
  33. // output:
  34. // ts current
  35. // 2022-03-28 09:56:51.249 10.3
  36. // 2022-03-28 09:56:51.749 12.6

view source code

  1. const taos = require("td2.0-connector");
  2. const conn = taos.connect({ host: "localhost", database: "power" });
  3. const cursor = conn.cursor();
  4. const query = cursor.query("SELECT ts, current FROM meters LIMIT 2");
  5. query.execute().then(function (result) {
  6. result.pretty();
  7. });
  8. // output:
  9. // Successfully connected to TDengine
  10. // Query OK, 2 row(s) in set (0.00317767s)
  11. // ts | current |
  12. // =======================================================
  13. // 2018-10-03 14:38:05.000 | 10.3 |
  14. // 2018-10-03 14:38:15.000 | 12.6 |

view source code

  1. using TDengineDriver;
  2. using System.Runtime.InteropServices;
  3. namespace TDengineExample
  4. {
  5. internal class QueryExample
  6. {
  7. static void Main()
  8. {
  9. IntPtr conn = GetConnection();
  10. // run query
  11. IntPtr res = TDengine.Query(conn, "SELECT * FROM test.meters LIMIT 2");
  12. if (TDengine.ErrorNo(res) != 0)
  13. {
  14. Console.WriteLine("Failed to query since: " + TDengine.Error(res));
  15. TDengine.Close(conn);
  16. TDengine.Cleanup();
  17. return;
  18. }
  19. // get filed count
  20. int fieldCount = TDengine.FieldCount(res);
  21. Console.WriteLine("fieldCount=" + fieldCount);
  22. // print column names
  23. List<TDengineMeta> metas = TDengine.FetchFields(res);
  24. for (int i = 0; i < metas.Count; i++)
  25. {
  26. Console.Write(metas[i].name + "\t");
  27. }
  28. Console.WriteLine();
  29. // print values
  30. IntPtr row;
  31. while ((row = TDengine.FetchRows(res)) != IntPtr.Zero)
  32. {
  33. List<TDengineMeta> metaList = TDengine.FetchFields(res);
  34. int numOfFiled = TDengine.FieldCount(res);
  35. List<String> dataRaw = new List<string>();
  36. IntPtr colLengthPrt = TDengine.FetchLengths(res);
  37. int[] colLengthArr = new int[numOfFiled];
  38. Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
  39. for (int i = 0; i < numOfFiled; i++)
  40. {
  41. TDengineMeta meta = metaList[i];
  42. IntPtr data = Marshal.ReadIntPtr(row, IntPtr.Size * i);
  43. if (data == IntPtr.Zero)
  44. {
  45. Console.Write("NULL\t");
  46. continue;
  47. }
  48. switch ((TDengineDataType)meta.type)
  49. {
  50. case TDengineDataType.TSDB_DATA_TYPE_BOOL:
  51. bool v1 = Marshal.ReadByte(data) == 0 ? false : true;
  52. Console.Write(v1.ToString() + "\t");
  53. break;
  54. case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
  55. sbyte v2 = (sbyte)Marshal.ReadByte(data);
  56. Console.Write(v2.ToString() + "\t");
  57. break;
  58. case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
  59. short v3 = Marshal.ReadInt16(data);
  60. Console.Write(v3.ToString() + "\t");
  61. break;
  62. case TDengineDataType.TSDB_DATA_TYPE_INT:
  63. int v4 = Marshal.ReadInt32(data);
  64. Console.Write(v4.ToString() + "\t");
  65. break;
  66. case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
  67. long v5 = Marshal.ReadInt64(data);
  68. Console.Write(v5.ToString() + "\t");
  69. break;
  70. case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
  71. float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
  72. Console.Write(v6.ToString() + "\t");
  73. break;
  74. case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
  75. double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
  76. Console.Write(v7.ToString() + "\t");
  77. break;
  78. case TDengineDataType.TSDB_DATA_TYPE_BINARY:
  79. string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  80. Console.Write(v8 + "\t");
  81. break;
  82. case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
  83. long v9 = Marshal.ReadInt64(data);
  84. Console.Write(v9.ToString() + "\t");
  85. break;
  86. case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
  87. string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  88. Console.Write(v10 + "\t");
  89. break;
  90. case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
  91. byte v12 = Marshal.ReadByte(data);
  92. Console.Write(v12.ToString() + "\t");
  93. break;
  94. case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
  95. ushort v13 = (ushort)Marshal.ReadInt16(data);
  96. Console.Write(v13.ToString() + "\t");
  97. break;
  98. case TDengineDataType.TSDB_DATA_TYPE_UINT:
  99. uint v14 = (uint)Marshal.ReadInt32(data);
  100. Console.Write(v14.ToString() + "\t");
  101. break;
  102. case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
  103. ulong v15 = (ulong)Marshal.ReadInt64(data);
  104. Console.Write(v15.ToString() + "\t");
  105. break;
  106. case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
  107. string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  108. Console.Write(v16 + "\t");
  109. break;
  110. default:
  111. Console.Write("nonsupport data type value");
  112. break;
  113. }
  114. }
  115. Console.WriteLine();
  116. }
  117. if (TDengine.ErrorNo(res) != 0)
  118. {
  119. Console.WriteLine($"Query is not complete, Error {TDengine.ErrorNo(res)} {TDengine.Error(res)}");
  120. }
  121. // exit
  122. TDengine.FreeResult(res);
  123. TDengine.Close(conn);
  124. TDengine.Cleanup();
  125. }
  126. static IntPtr GetConnection()
  127. {
  128. string host = "localhost";
  129. short port = 6030;
  130. string username = "root";
  131. string password = "taosdata";
  132. string dbname = "power";
  133. var conn = TDengine.Connect(host, username, password, dbname, port);
  134. if (conn == IntPtr.Zero)
  135. {
  136. Console.WriteLine("Connect to TDengine failed");
  137. System.Environment.Exit(0);
  138. }
  139. else
  140. {
  141. Console.WriteLine("Connect to TDengine success");
  142. }
  143. return conn;
  144. }
  145. }
  146. }
  147. // output:
  148. // Connect to TDengine success
  149. // fieldCount=6
  150. // ts current voltage phase location groupid
  151. // 1648432611249 10.3 219 0.31 California.SanFrancisco 2
  152. // 1648432611749 12.6 218 0.33 California.SanFrancisco 2

view source code

  1. // compile with:
  2. // gcc -o query_example query_example.c -ltaos
  3. #include <inttypes.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <taos.h>
  8. typedef int16_t VarDataLenT;
  9. #define TSDB_NCHAR_SIZE sizeof(int32_t)
  10. #define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
  11. #define GET_FLOAT_VAL(x) (*(float *)(x))
  12. #define GET_DOUBLE_VAL(x) (*(double *)(x))
  13. #define varDataLen(v) ((VarDataLenT *)(v))[0]
  14. int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) {
  15. int len = 0;
  16. char split = ' ';
  17. for (int i = 0; i < numFields; ++i) {
  18. if (i > 0) {
  19. str[len++] = split;
  20. }
  21. if (row[i] == NULL) {
  22. len += sprintf(str + len, "%s", "NULL");
  23. continue;
  24. }
  25. switch (fields[i].type) {
  26. case TSDB_DATA_TYPE_TINYINT:
  27. len += sprintf(str + len, "%d", *((int8_t *)row[i]));
  28. break;
  29. case TSDB_DATA_TYPE_UTINYINT:
  30. len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
  31. break;
  32. case TSDB_DATA_TYPE_SMALLINT:
  33. len += sprintf(str + len, "%d", *((int16_t *)row[i]));
  34. break;
  35. case TSDB_DATA_TYPE_USMALLINT:
  36. len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
  37. break;
  38. case TSDB_DATA_TYPE_INT:
  39. len += sprintf(str + len, "%d", *((int32_t *)row[i]));
  40. break;
  41. case TSDB_DATA_TYPE_UINT:
  42. len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
  43. break;
  44. case TSDB_DATA_TYPE_BIGINT:
  45. len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
  46. break;
  47. case TSDB_DATA_TYPE_UBIGINT:
  48. len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
  49. break;
  50. case TSDB_DATA_TYPE_FLOAT: {
  51. float fv = 0;
  52. fv = GET_FLOAT_VAL(row[i]);
  53. len += sprintf(str + len, "%f", fv);
  54. } break;
  55. case TSDB_DATA_TYPE_DOUBLE: {
  56. double dv = 0;
  57. dv = GET_DOUBLE_VAL(row[i]);
  58. len += sprintf(str + len, "%lf", dv);
  59. } break;
  60. case TSDB_DATA_TYPE_BINARY:
  61. case TSDB_DATA_TYPE_NCHAR: {
  62. int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
  63. memcpy(str + len, row[i], charLen);
  64. len += charLen;
  65. } break;
  66. case TSDB_DATA_TYPE_TIMESTAMP:
  67. len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
  68. break;
  69. case TSDB_DATA_TYPE_BOOL:
  70. len += sprintf(str + len, "%d", *((int8_t *)row[i]));
  71. default:
  72. break;
  73. }
  74. }
  75. return len;
  76. }
  77. /**
  78. * @brief print column name and values of each row
  79. *
  80. * @param res
  81. * @return int
  82. */
  83. static int printResult(TAOS_RES *res) {
  84. int numFields = taos_num_fields(res);
  85. TAOS_FIELD *fields = taos_fetch_fields(res);
  86. char header[256] = {0};
  87. int len = 0;
  88. for (int i = 0; i < numFields; ++i) {
  89. len += sprintf(header + len, "%s ", fields[i].name);
  90. }
  91. puts(header);
  92. TAOS_ROW row = NULL;
  93. while ((row = taos_fetch_row(res))) {
  94. char temp[256] = {0};
  95. printRow(temp, row, fields, numFields);
  96. puts(temp);
  97. }
  98. }
  99. int main() {
  100. TAOS *taos = taos_connect("localhost", "root", "taosdata", "power", 6030);
  101. if (taos == NULL) {
  102. puts("failed to connect to server");
  103. exit(EXIT_FAILURE);
  104. }
  105. TAOS_RES *res = taos_query(taos, "SELECT * FROM meters LIMIT 2");
  106. if (taos_errno(res) != 0) {
  107. printf("failed to execute taos_query. error: %s\n", taos_errstr(res));
  108. exit(EXIT_FAILURE);
  109. }
  110. printResult(res);
  111. taos_free_result(res);
  112. taos_close(taos);
  113. taos_cleanup();
  114. }
  115. // output:
  116. // ts current voltage phase location groupid
  117. // 1648432611249 10.300000 219 0.310000 California.SanFrancisco 2
  118. // 1648432611749 12.600000 218 0.330000 California.SanFrancisco 2

view source code

Query Data - 图1note
  1. With either REST connection or native connection, the above sample code works well.
  2. Please note that use db can’t be used in case of REST connection because it’s stateless.

Asynchronous Query

Besides synchronous queries, an asynchronous query API is also provided by TDengine to insert or query data more efficiently. With a similar hardware and software environment, the async API is 2~4 times faster than sync APIs. Async API works in non-blocking mode, which means an operation can be returned without finishing so that the calling thread can switch to other work to improve the performance of the whole application system. Async APIs perform especially better in the case of poor networks.

Please note that async query can only be used with a native connection.

  • Python
  • C#
  • C
  1. import time
  2. from ctypes import *
  3. from taos import *
  4. def fetch_callback(p_param, p_result, num_of_rows):
  5. print("fetched ", num_of_rows, "rows")
  6. p = cast(p_param, POINTER(Counter))
  7. result = TaosResult(p_result)
  8. if num_of_rows == 0:
  9. print("fetching completed")
  10. p.contents.done = True
  11. result.close()
  12. return
  13. if num_of_rows < 0:
  14. p.contents.done = True
  15. result.check_error(num_of_rows)
  16. result.close()
  17. return None
  18. for row in result.rows_iter(num_of_rows):
  19. print(row)
  20. p.contents.count += result.row_count
  21. result.fetch_rows_a(fetch_callback, p_param)
  22. def query_callback(p_param, p_result, code):
  23. if p_result is None:
  24. return
  25. result = TaosResult(p_result)
  26. if code == 0:
  27. result.fetch_rows_a(fetch_callback, p_param)
  28. result.check_error(code)
  29. class Counter(Structure):
  30. _fields_ = [("count", c_int), ("done", c_bool)]
  31. def __str__(self):
  32. return "{ count: %d, done: %s }" % (self.count, self.done)
  33. def test_query(conn):
  34. counter = Counter(count=0)
  35. conn.query_a("select ts, current, voltage from power.meters", query_callback, byref(counter))
  36. while not counter.done:
  37. print(counter)
  38. time.sleep(1)
  39. print(counter)
  40. conn.close()
  41. if __name__ == "__main__":
  42. test_query(connect())
  43. # possible output:
  44. # { count: 0, done: False }
  45. # fetched 8 rows
  46. # 1538548685000 10.300000 219
  47. # 1538548695000 12.600000 218
  48. # 1538548696800 12.300000 221
  49. # 1538548696650 10.300000 218
  50. # 1538548685500 11.800000 221
  51. # 1538548696600 13.400000 223
  52. # 1538548685500 10.800000 223
  53. # 1538548686500 11.500000 221
  54. # fetched 0 rows
  55. # fetching completed
  56. # { count: 8, done: True }

view source code

Query Data - 图2note

This sample code can’t be run on Windows system for now.

  1. using TDengineDriver;
  2. using System.Runtime.InteropServices;
  3. namespace TDengineExample
  4. {
  5. public class AsyncQueryExample
  6. {
  7. static void Main()
  8. {
  9. IntPtr conn = GetConnection();
  10. QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
  11. TDengine.QueryAsync(conn, "select * from meters", queryAsyncCallback, IntPtr.Zero);
  12. Thread.Sleep(2000);
  13. TDengine.Close(conn);
  14. TDengine.Cleanup();
  15. }
  16. static void QueryCallback(IntPtr param, IntPtr taosRes, int code)
  17. {
  18. if (code == 0 && taosRes != IntPtr.Zero)
  19. {
  20. FetchRowAsyncCallback fetchRowAsyncCallback = new FetchRowAsyncCallback(FetchRowCallback);
  21. TDengine.FetchRowAsync(taosRes, fetchRowAsyncCallback, param);
  22. }
  23. else
  24. {
  25. Console.WriteLine($"async query data failed, failed code {code}");
  26. }
  27. }
  28. static void FetchRowCallback(IntPtr param, IntPtr taosRes, int numOfRows)
  29. {
  30. if (numOfRows > 0)
  31. {
  32. Console.WriteLine($"{numOfRows} rows async retrieved");
  33. DisplayRes(taosRes);
  34. TDengine.FetchRowAsync(taosRes, FetchRowCallback, param);
  35. }
  36. else
  37. {
  38. if (numOfRows == 0)
  39. {
  40. Console.WriteLine("async retrieve complete.");
  41. }
  42. else
  43. {
  44. Console.WriteLine($"FetchRowAsync callback error, error code {numOfRows}");
  45. }
  46. TDengine.FreeResult(taosRes);
  47. }
  48. }
  49. public static void DisplayRes(IntPtr res)
  50. {
  51. if (!IsValidResult(res))
  52. {
  53. TDengine.Cleanup();
  54. System.Environment.Exit(1);
  55. }
  56. List<TDengineMeta> metaList = TDengine.FetchFields(res);
  57. int fieldCount = metaList.Count;
  58. // metaList.ForEach((item) => { Console.Write("{0} ({1}) \t|\t", item.name, item.size); });
  59. List<object> dataList = QueryRes(res, metaList);
  60. for (int index = 0; index < dataList.Count; index++)
  61. {
  62. if (index % fieldCount == 0 && index != 0)
  63. {
  64. Console.WriteLine("");
  65. }
  66. Console.Write("{0} \t|\t", dataList[index].ToString());
  67. }
  68. Console.WriteLine("");
  69. }
  70. public static bool IsValidResult(IntPtr res)
  71. {
  72. if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
  73. {
  74. if (res != IntPtr.Zero)
  75. {
  76. Console.Write("reason: " + TDengine.Error(res));
  77. return false;
  78. }
  79. Console.WriteLine("");
  80. return false;
  81. }
  82. return true;
  83. }
  84. private static List<object> QueryRes(IntPtr res, List<TDengineMeta> meta)
  85. {
  86. IntPtr taosRow;
  87. List<object> dataRaw = new();
  88. while ((taosRow = TDengine.FetchRows(res)) != IntPtr.Zero)
  89. {
  90. dataRaw.AddRange(FetchRow(taosRow, res));
  91. }
  92. if (TDengine.ErrorNo(res) != 0)
  93. {
  94. Console.Write("Query is not complete, Error {0} {1}", TDengine.ErrorNo(res), TDengine.Error(res));
  95. }
  96. TDengine.FreeResult(res);
  97. Console.WriteLine("");
  98. return dataRaw;
  99. }
  100. public static List<object> FetchRow(IntPtr taosRow, IntPtr taosRes)//, List<TDengineMeta> metaList, int numOfFiled
  101. {
  102. List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
  103. int numOfFiled = TDengine.FieldCount(taosRes);
  104. List<object> dataRaw = new();
  105. IntPtr colLengthPrt = TDengine.FetchLengths(taosRes);
  106. int[] colLengthArr = new int[numOfFiled];
  107. Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
  108. for (int i = 0; i < numOfFiled; i++)
  109. {
  110. TDengineMeta meta = metaList[i];
  111. IntPtr data = Marshal.ReadIntPtr(taosRow, IntPtr.Size * i);
  112. if (data == IntPtr.Zero)
  113. {
  114. dataRaw.Add("NULL");
  115. continue;
  116. }
  117. switch ((TDengineDataType)meta.type)
  118. {
  119. case TDengineDataType.TSDB_DATA_TYPE_BOOL:
  120. bool v1 = Marshal.ReadByte(data) != 0;
  121. dataRaw.Add(v1);
  122. break;
  123. case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
  124. sbyte v2 = (sbyte)Marshal.ReadByte(data);
  125. dataRaw.Add(v2);
  126. break;
  127. case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
  128. short v3 = Marshal.ReadInt16(data);
  129. dataRaw.Add(v3);
  130. break;
  131. case TDengineDataType.TSDB_DATA_TYPE_INT:
  132. int v4 = Marshal.ReadInt32(data);
  133. dataRaw.Add(v4);
  134. break;
  135. case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
  136. long v5 = Marshal.ReadInt64(data);
  137. dataRaw.Add(v5);
  138. break;
  139. case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
  140. float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
  141. dataRaw.Add(v6);
  142. break;
  143. case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
  144. double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
  145. dataRaw.Add(v7);
  146. break;
  147. case TDengineDataType.TSDB_DATA_TYPE_BINARY:
  148. string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  149. dataRaw.Add(v8);
  150. break;
  151. case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
  152. long v9 = Marshal.ReadInt64(data);
  153. dataRaw.Add(v9);
  154. break;
  155. case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
  156. string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  157. dataRaw.Add(v10);
  158. break;
  159. case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
  160. byte v12 = Marshal.ReadByte(data);
  161. dataRaw.Add(v12.ToString());
  162. break;
  163. case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
  164. ushort v13 = (ushort)Marshal.ReadInt16(data);
  165. dataRaw.Add(v13);
  166. break;
  167. case TDengineDataType.TSDB_DATA_TYPE_UINT:
  168. uint v14 = (uint)Marshal.ReadInt32(data);
  169. dataRaw.Add(v14);
  170. break;
  171. case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
  172. ulong v15 = (ulong)Marshal.ReadInt64(data);
  173. dataRaw.Add(v15);
  174. break;
  175. case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
  176. string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
  177. dataRaw.Add(v16);
  178. break;
  179. default:
  180. dataRaw.Add("nonsupport data type");
  181. break;
  182. }
  183. }
  184. return dataRaw;
  185. }
  186. static IntPtr GetConnection()
  187. {
  188. string host = "localhost";
  189. short port = 6030;
  190. string username = "root";
  191. string password = "taosdata";
  192. string dbname = "power";
  193. var conn = TDengine.Connect(host, username, password, dbname, port);
  194. if (conn == IntPtr.Zero)
  195. {
  196. Console.WriteLine("Connect to TDengine failed");
  197. Environment.Exit(0);
  198. }
  199. else
  200. {
  201. Console.WriteLine("Connect to TDengine success");
  202. }
  203. return conn;
  204. }
  205. }
  206. }
  207. //output:
  208. // Connect to TDengine success
  209. // 8 rows async retrieved
  210. // 1538548685500 | 11.8 | 221 | 0.28 | california.losangeles | 2 |
  211. // 1538548696600 | 13.4 | 223 | 0.29 | california.losangeles | 2 |
  212. // 1538548685000 | 10.8 | 223 | 0.29 | california.losangeles | 3 |
  213. // 1538548686500 | 11.5 | 221 | 0.35 | california.losangeles | 3 |
  214. // 1538548685000 | 10.3 | 219 | 0.31 | california.sanfrancisco | 2 |
  215. // 1538548695000 | 12.6 | 218 | 0.33 | california.sanfrancisco | 2 |
  216. // 1538548696800 | 12.3 | 221 | 0.31 | california.sanfrancisco | 2 |
  217. // 1538548696650 | 10.3 | 218 | 0.25 | california.sanfrancisco | 3 |
  218. // async retrieve complete.

view source code

  1. /**
  2. * @brief call back function of taos_fetch_row_a
  3. *
  4. * @param param : the third parameter you passed to taos_fetch_row_a
  5. * @param res : pointer of TAOS_RES
  6. * @param numOfRow : number of rows fetched in this batch. will be 0 if there is no more data.
  7. * @return void*
  8. */
  9. void *fetch_row_callback(void *param, TAOS_RES *res, int numOfRow) {
  10. printf("numOfRow = %d \n", numOfRow);
  11. int numFields = taos_num_fields(res);
  12. TAOS_FIELD *fields = taos_fetch_fields(res);
  13. TAOS *_taos = (TAOS *)param;
  14. if (numOfRow > 0) {
  15. for (int i = 0; i < numOfRow; ++i) {
  16. TAOS_ROW row = taos_fetch_row(res);
  17. char temp[256] = {0};
  18. printRow(temp, row, fields, numFields);
  19. puts(temp);
  20. }
  21. taos_fetch_rows_a(res, fetch_row_callback, _taos);
  22. } else {
  23. printf("no more data, close the connection.\n");
  24. taos_free_result(res);
  25. taos_close(_taos);
  26. taos_cleanup();
  27. }
  28. }
  29. /**
  30. * @brief callback function of taos_query_a
  31. *
  32. * @param param: the fourth parameter you passed to taos_query_a
  33. * @param res : the result set
  34. * @param code : status code
  35. * @return void*
  36. */
  37. void *select_callback(void *param, TAOS_RES *res, int code) {
  38. printf("query callback ...\n");
  39. TAOS *_taos = (TAOS *)param;
  40. if (code == 0 && res) {
  41. printHeader(res);
  42. taos_fetch_rows_a(res, fetch_row_callback, _taos);
  43. } else {
  44. printf("failed to execute taos_query. error: %s\n", taos_errstr(res));
  45. taos_free_result(res);
  46. taos_close(_taos);
  47. taos_cleanup();
  48. exit(EXIT_FAILURE);
  49. }
  50. }
  51. int main() {
  52. TAOS *taos = taos_connect("localhost", "root", "taosdata", "power", 6030);
  53. if (taos == NULL) {
  54. puts("failed to connect to server");
  55. exit(EXIT_FAILURE);
  56. }
  57. // param one is the connection returned by taos_connect.
  58. // param two is the SQL to execute.
  59. // param three is the callback function.
  60. // param four can be any pointer. It will be passed to your callback function as the first parameter. we use taos
  61. // here, because we want to close it after getting data.
  62. taos_query_a(taos, "SELECT * FROM meters", select_callback, taos);
  63. sleep(1);
  64. }
  65. // output:
  66. // query callback ...
  67. // ts current voltage phase location groupid
  68. // numOfRow = 8
  69. // 1538548685500 11.800000 221 0.280000 california.losangeles 2
  70. // 1538548696600 13.400000 223 0.290000 california.losangeles 2
  71. // 1538548685000 10.800000 223 0.290000 california.losangeles 3
  72. // 1538548686500 11.500000 221 0.350000 california.losangeles 3
  73. // 1538548685000 10.300000 219 0.310000 california.sanfrancisco 2
  74. // 1538548695000 12.600000 218 0.330000 california.sanfrancisco 2
  75. // 1538548696800 12.300000 221 0.310000 california.sanfrancisco 2
  76. // 1538548696650 10.300000 218 0.250000 california.sanfrancisco 3
  77. // numOfRow = 0
  78. // no more data, close the connection.

view source code