Data Subscription

Introduction

Due to the nature of time series data, data insertion into TDengine is similar to data publishing in message queues. Data is stored in ascending order of timestamp inside TDengine, and so each table in TDengine can essentially be considered as a message queue.

A lightweight service for data subscription and publishing is built into TDengine. With the API provided by TDengine, client programs can use select statements to subscribe to data from one or more tables. The subscription and state maintenance is performed on the client side. The client programs poll the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start retrieving new data is up to the client side.

There are 3 major APIs related to subscription provided in the TDengine client driver.

  1. taos_subscribe
  2. taos_consume
  3. taos_unsubscribe

For more details about these APIs please refer to C/C++ Connector. Their usage will be introduced below using the use case of meters, in which the schema of STable and subtables from the previous section Continuous Query are used. Full sample code can be found here.

If we want to get a notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:

The first way is to query each sub table and record the last timestamp matching the criteria. Then after some time, query the data later than the recorded timestamp, and repeat this process. The SQL statements for this way are as below.

  1. select * from D1001 where ts > {last_timestamp1} and current > 10;
  2. select * from D1002 where ts > {last_timestamp2} and current > 10;
  3. ...

The above way works, but the problem is that the number of select statements increases with the number of meters. Additionally, the performance of both client side and server side will be unacceptable once the number of meters grows to a big enough number.

A better way is to query on the STable, only one select is enough regardless of the number of meters, like below:

  1. select * from meters where ts > {last_timestamp} and current > 10;

However, this presents a new problem in how to choose last_timestamp. First, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Second, the time when the data from different meters arrives at the database may be different too. If the timestamp of the “slowest” meter is used as last_timestamp in the query, the data from other meters may be selected repeatedly; but if the timestamp of the “fastest” meter is used as last_timestamp, some data from other meters may be missed.

All the problems mentioned above can be resolved easily using the subscription functionality provided by TDengine.

The first step is to create subscription using taos_subscribe.

  1. TAOS_SUB* tsub = NULL;
  2. if (async) {
  3.   // create an asynchronous subscription, the callback function will be called every 1s
  4.   tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
  5. } else {
  6.   // create an synchronous subscription, need to call 'taos_consume' manually
  7.   tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
  8. }

The subscription in TDengine can be either synchronous or asynchronous. In the above sample code, the value of variable async is determined from the CLI input, then it’s used to create either an async or sync subscription. Sync subscription means the client program needs to invoke taos_consume to retrieve data, and async subscription means another thread created by taos_subscribe internally invokes taos_consume to retrieve data and pass the data to subscribe_callback for processing. subscribe_callback is a callback function provided by the client program. You should not perform time consuming operations in the callback function.

The parameter taos is an established connection. Nothing special needs to be done for thread safety for synchronous subscription. For asynchronous subscription, the taos_subscribe function should be called exclusively by the current thread, to avoid unpredictable errors.

The parameter sql is a select statement in which the where clause can be used to specify filter conditions. In our example, we can subscribe to the records in which the current exceeds 10A, with the following SQL statement:

  1. select * from meters where current > 10;

Please note that, all the data will be processed because no start time is specified. If we only want to process data for the past day, a time related condition can be added:

  1. select * from meters where ts > now - 1d and current > 10;

The parameter topic is the name of the subscription. The client application must guarantee that the name is unique. However, it doesn’t have to be globally unique because subscription is implemented in the APIs on the client side.

If the subscription named as topic doesn’t exist, the parameter restart will be ignored. If the subscription named as topic has been created before by the client program, when the client program is restarted with the subscription named topic, parameter restart is used to determine whether to retrieve data from the beginning or from the last point where the subscription was broken.

If the value of restart is true (i.e. a non-zero value), data will be retrieved from the beginning. If it is false (i.e. zero), the data already consumed before will not be processed again.

The last parameter of taos_subscribe is the polling interval in units of millisecond. In sync mode, if the time difference between two continuous invocations to taos_consume is smaller than the interval specified by taos_subscribe, taos_consume will be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.

The second to last parameter of taos_subscribe is used to pass arguments to the call back function. taos_subscribe doesn’t process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.

After a subscription is created, its data can be consumed and processed. Shown below is the sample code to consume data in sync mode, in the else condition of if (async).

  1. if (async) {
  2.   getchar();
  3. } else while(1) {
  4.   TAOS_RES* res = taos_consume(tsub);
  5.   if (res == NULL) {
  6.     printf("failed to consume data.");
  7.     break;
  8.   } else {
  9.     print_result(res, blockFetch);
  10.     getchar();
  11.   }
  12. }

In the above sample code in the else condition, there is an infinite loop. Each time carriage return is entered taos_consume is invoked. The return value of taos_consume is the selected result set. In the above sample, print_result is used to simplify the printing of the result set. It is similar to taos_use_result. Below is the implementation of print_result.

  1. void print_result(TAOS_RES* res, int blockFetch) {
  2.   TAOS_ROW row = NULL;
  3.   int num_fields = taos_num_fields(res);
  4.   TAOS_FIELD* fields = taos_fetch_fields(res);
  5.   int nRows = 0;
  6.   if (blockFetch) {
  7.     nRows = taos_fetch_block(res, &row);
  8.     for (int i = 0; i < nRows; i++) {
  9.       char temp[256];
  10.       taos_print_row(temp, row + i, fields, num_fields);
  11.       puts(temp);
  12.     }
  13.   } else {
  14.     while ((row = taos_fetch_row(res))) {
  15.       char temp[256];
  16.       taos_print_row(temp, row, fields, num_fields);
  17.       puts(temp);
  18.       nRows++;
  19.     }
  20.   }
  21.   printf("%d rows consumed.\n", nRows);
  22. }

In the above code taos_print_row is used to process the data consumed. All matching rows are printed.

In async mode, consuming data is simpler as shown below.

  1. void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  2.   print_result(res, *(int*)param);
  3. }

taos_unsubscribe can be invoked to terminate a subscription.

  1. taos_unsubscribe(tsub, keep);

The second parameter keep is used to specify whether to keep the subscription progress on the client sde. If it is false, i.e. 0, then subscription will be restarted from beginning regardless of the restart parameter’s value when taos_subscribe is invoked again. The subscription progress information is stored in {DataDir}/subscribe/ , under which there is a file with the same name as topic for each subscription(Note: The default value of DataDir in the taos.cfg file is /var/lib/taos/. However, /var/lib/taos/ does not exist on the Windows server. So you need to change the DataDir value to the corresponding existing directory.”), the subscription will be restarted from the beginning if the corresponding progress file is removed.

Now let’s see the effect of the above sample code, assuming below prerequisites have been done.

  • The sample code has been downloaded to local system
  • TDengine has been installed and launched properly on same system
  • The database, STable, and subtables required in the sample code are ready

Launch the command below in the directory where the sample code resides to compile and start the program.

  1. make
  2. ./subscribe -sql='select * from meters where current > 10;'

After the program is started, open another terminal and launch TDengine CLI taos, then use the below SQL commands to insert a row whose current is 12A into table D1001.

  1. use test;
  2. insert into D1001 values(now, 12, 220, 1);

Then, this row of data will be shown by the example program on the first terminal because its current exceeds 10A. More data can be inserted for you to observe the output of the example program.

Examples

The example program below demonstrates how to subscribe, using connectors, to data rows in which current exceeds 10A.

Prepare Data

  1. # create database "power"
  2. taos> create database power;
  3. # use "power" as the database in following operations
  4. taos> use power;
  5. # create super table "meters"
  6. taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
  7. # create tabes using the schema defined by super table "meters"
  8. taos> create table d1001 using meters tags ("California.SanFrancisco", 2);
  9. taos> create table d1002 using meters tags ("California.LoSangeles", 2);
  10. # insert some rows
  11. taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
  12. taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
  13. # filter out the rows in which current is bigger than 10A
  14. taos> select * from meters where current > 10;
  15. ts | current | voltage | phase | location | groupid |
  16. ===========================================================================================================
  17. 2020-08-15 12:10:00.000 | 10.30000 | 220 | 1 | California.LoSangeles | 2 |
  18. 2020-08-15 12:20:00.000 | 11.20000 | 220 | 1 | California.LoSangeles | 2 |
  19. 2020-08-15 12:00:00.000 | 12.00000 | 220 | 1 | California.SanFrancisco | 2 |
  20. 2020-08-15 12:10:00.000 | 12.30000 | 220 | 2 | California.SanFrancisco | 2 |
  21. 2020-08-15 12:20:00.000 | 12.20000 | 220 | 1 | California.SanFrancisco | 2 |
  22. Query OK, 5 row(s) in set (0.004896s)

Example Programs

  • Java
  • Python
  • C
  1. package com.taos.example;
  2. import com.taosdata.jdbc.TSDBConnection;
  3. import com.taosdata.jdbc.TSDBDriver;
  4. import com.taosdata.jdbc.TSDBResultSet;
  5. import com.taosdata.jdbc.TSDBSubscribe;
  6. import java.sql.Connection;
  7. import java.sql.DriverManager;
  8. import java.sql.ResultSetMetaData;
  9. import java.sql.SQLException;
  10. import java.util.Properties;
  11. import java.util.concurrent.TimeUnit;
  12. public class SubscribeDemo {
  13. private static final String topic = "topic-meter-current-bg-10";
  14. private static final String sql = "select * from meters where current > 10";
  15. public static void main(String[] args) {
  16. Connection connection = null;
  17. TSDBSubscribe subscribe = null;
  18. try {
  19. Class.forName("com.taosdata.jdbc.TSDBDriver");
  20. Properties properties = new Properties();
  21. properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
  22. properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
  23. String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
  24. connection = DriverManager.getConnection(jdbcUrl, properties);
  25. // create subscribe
  26. subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true);
  27. int count = 0;
  28. while (count < 10) {
  29. // wait 1 second to avoid frequent calls to consume
  30. TimeUnit.SECONDS.sleep(1);
  31. // consume
  32. TSDBResultSet resultSet = subscribe.consume();
  33. if (resultSet == null) {
  34. continue;
  35. }
  36. ResultSetMetaData metaData = resultSet.getMetaData();
  37. while (resultSet.next()) {
  38. int columnCount = metaData.getColumnCount();
  39. for (int i = 1; i <= columnCount; i++) {
  40. System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
  41. }
  42. System.out.println();
  43. count++;
  44. }
  45. }
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. try {
  50. if (null != subscribe)
  51. // close subscribe
  52. subscribe.close(true);
  53. if (connection != null)
  54. connection.close();
  55. } catch (SQLException throwable) {
  56. throwable.printStackTrace();
  57. }
  58. }
  59. }
  60. }

view source code

Data Subscription - 图1note

For now Java connector doesn’t provide asynchronous subscription, but TimerTask can be used to achieve similar purpose.

  1. """
  2. Python asynchronous subscribe demo.
  3. run on Linux system with: python3 subscribe_demo.py
  4. """
  5. from ctypes import c_void_p
  6. import taos
  7. import time
  8. def query_callback(p_sub, p_result, p_param, code):
  9. """
  10. :param p_sub: pointer returned by native API -- taos_subscribe
  11. :param p_result: pointer to native TAOS_RES
  12. :param p_param: None
  13. :param code: error code
  14. :return: None
  15. """
  16. print("in callback")
  17. result = taos.TaosResult(c_void_p(p_result))
  18. # raise exception if error occur
  19. result.check_error(code)
  20. for row in result.rows_iter():
  21. print(row)
  22. print(f"{result.row_count} rows consumed.")
  23. if __name__ == '__main__':
  24. conn = taos.connect()
  25. restart = True
  26. topic = "topic-meter-current-bg"
  27. sql = "select * from power.meters where current > 10" # Error sql
  28. interval = 2000 # consumption interval in microseconds.
  29. _ = conn.subscribe(restart, topic, sql, interval, query_callback)
  30. # Note: we received the return value as _ above, to avoid the TaosSubscription object to be deleted by gc.
  31. while True:
  32. time.sleep(10) # use Ctrl + C to interrupt

view source code

  1. // A simple demo for asynchronous subscription.
  2. // compile with:
  3. // gcc -o subscribe_demo subscribe_demo.c -ltaos
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <taos.h>
  8. int nTotalRows;
  9. /**
  10. * @brief callback function of subscription.
  11. *
  12. * @param tsub
  13. * @param res
  14. * @param param. the additional parameter passed to taos_subscribe
  15. * @param code. error code
  16. */
  17. void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) {
  18. if (code != 0) {
  19. printf("error: %d\n", code);
  20. exit(EXIT_FAILURE);
  21. }
  22. TAOS_ROW row = NULL;
  23. int num_fields = taos_num_fields(res);
  24. TAOS_FIELD* fields = taos_fetch_fields(res);
  25. int nRows = 0;
  26. while ((row = taos_fetch_row(res))) {
  27. char buf[4096] = {0};
  28. taos_print_row(buf, row, fields, num_fields);
  29. puts(buf);
  30. nRows++;
  31. }
  32. nTotalRows += nRows;
  33. printf("%d rows consumed.\n", nRows);
  34. }
  35. int main() {
  36. TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
  37. if (taos == NULL) {
  38. printf("failed to connect to server\n");
  39. exit(EXIT_FAILURE);
  40. }
  41. int restart = 1; // if the topic already exists, where to subscribe from the begin.
  42. const char* topic = "topic-meter-current-bg-10";
  43. const char* sql = "select * from power.meters where current > 10";
  44. void* param = NULL; // additional parameter.
  45. int interval = 2000; // consumption interval in microseconds.
  46. TAOS_SUB* tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, interval);
  47. // wait for insert from others process. you can open TDengine CLI to insert some records for test.
  48. getchar(); // press Enter to stop
  49. printf("total rows consumed: %d\n", nTotalRows);
  50. int keep = 0; // whether to keep subscribe process
  51. taos_unsubscribe(tsub, keep);
  52. taos_close(taos);
  53. taos_cleanup();
  54. }

view source code

Run the Examples

The example programs first consume all historical data matching the criteria.

  1. ts: 1597464000000 current: 12.0 voltage: 220 phase: 1 location: California.SanFrancisco groupid : 2
  2. ts: 1597464600000 current: 12.3 voltage: 220 phase: 2 location: California.SanFrancisco groupid : 2
  3. ts: 1597465200000 current: 12.2 voltage: 220 phase: 1 location: California.SanFrancisco groupid : 2
  4. ts: 1597464600000 current: 10.3 voltage: 220 phase: 1 location: California.LoSangeles groupid : 2
  5. ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: California.LoSangeles groupid : 2

Next, use TDengine CLI to insert a new row.

  1. # taos
  2. taos> use power;
  3. taos> insert into d1001 values(now, 12.4, 220, 1);

Because the current in the inserted row exceeds 10A, it will be consumed by the example program.

  1. ts: 1651146662805 current: 12.4 voltage: 220 phase: 1 location: California.SanFrancisco groupid: 2