C/C++ Connector
C/C++ 开发人员可以使用 TDengine 的客户端驱动,即 C/C++连接器 (以下都用 TDengine 客户端驱动表示),开发自己的应用来连接 TDengine 集群完成数据存储、查询以及其他功能。TDengine 客户端驱动的 API 类似于 MySQL 的 C API。应用程序使用时,需要包含 TDengine 头文件 taos.h,里面列出了提供的 API 的函数原型;应用程序还要链接到所在平台上对应的动态库。
#include <taos.h>
TDengine 服务端或客户端安装后,taos.h
位于:
- Linux:
/usr/local/taos/include
- Windows:
C:\TDengine\include
TDengine 客户端驱动的动态库位于:
- Linux:
/usr/local/taos/driver/libtaos.so
- Windows:
C:\TDengine\taos.dll
支持的平台
请参考支持的平台列表
支持的版本
TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一对应的强对应关系,建议使用与 TDengine 服务端完全相同的客户端驱动。虽然低版本的客户端驱动在前三段版本号一致(即仅第四段版本号不同)的情况下也能够与高版本的服务端相兼容,但这并非推荐用法。强烈不建议使用高版本的客户端驱动访问低版本的服务端。
安装步骤
TDengine 客户端驱动的安装请参考 安装指南
建立连接
使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。
下面为建立连接的示例代码,其中省略了查询和写入部分,展示了如何建立连接、关闭连接以及清除资源。
TAOS *taos = taos_connect("localhost:6030", "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
/* put your code here for read and write */
taos_close(taos);
taos_cleanup();
在上面的示例代码中, taos_connect()
建立到客户端程序所在主机的 6030 端口的连接,taos_close()
关闭当前连接,taos_cleanup()
清除客户端驱动所申请和使用的资源。
note
- 如未特别说明,当 API 的返回值是整数时,0 代表成功,其它是代表失败原因的错误码,当返回值是指针时, NULL 表示失败。
- 所有的错误码以及对应的原因描述在
taoserror.h
文件中。
示例程序
本节展示了使用客户端驱动访问 TDengine 集群的常见访问方式的示例代码。
同步查询示例
同步查询
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h> // TAOS header file
static void queryDB(TAOS *taos, char *command) {
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
if (NULL != pSql) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(pSql);
}
void Test(TAOS *taos, char *qstr, int i);
int main(int argc, char *argv[]) {
char qstr[1024];
// connect to server
if (argc < 2) {
printf("please input server-ip \n");
return 0;
}
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
for (int i = 0; i < 100; i++) {
Test(taos, qstr, i);
}
taos_close(taos);
taos_cleanup();
}
void Test(TAOS *taos, char *qstr, int index) {
printf("==================test at %d\n================================", index);
queryDB(taos, "drop database if exists demo");
queryDB(taos, "create database demo");
TAOS_RES *result;
queryDB(taos, "use demo");
queryDB(taos,
"create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
printf("success to create table\n");
int i = 0;
for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')",
(uint64_t)(1546300800000 + i * 1000), i, i, i, i * 10000000, i * 1.0, i * 2.0, "hello");
printf("qstr: %s\n", qstr);
// note: how do you wanna do if taos_query returns non-NULL
// if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// }
TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1);
exit(1);
} else {
printf("insert row: %i\n", i);
}
taos_free_result(result1);
}
printf("success to insert rows, total %d rows\n", i);
// query the records
sprintf(qstr, "SELECT * FROM m1");
result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result);
exit(1);
}
TAOS_ROW row;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
printf("num_fields = %d\n", num_fields);
printf("select * from table, result:\n");
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
printf("====demo end====\n\n");
}
异步查询示例
异步查询
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS asynchronous API example
// this example opens multiple tables, insert/retrieve multiple tables
// it is used by TAOS internally for one performance testing
// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <taos.h>
int points = 5;
int numOfTables = 3;
int tablesInsertProcessed = 0;
int tablesSelectProcessed = 0;
int64_t st, et;
typedef struct {
int id;
TAOS * taos;
char name[16];
time_t timeStamp;
int value;
int rowsInserted;
int rowsTried;
int rowsRetrieved;
} STable;
void taos_insert_call_back(void *param, TAOS_RES *tres, int code);
void taos_select_call_back(void *param, TAOS_RES *tres, int code);
void taos_error(TAOS *taos);
static void queryDB(TAOS *taos, char *command) {
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
if (NULL != pSql) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
taos_cleanup();
exit(EXIT_FAILURE);
}
taos_free_result(pSql);
}
int main(int argc, char *argv[]) {
TAOS * taos;
struct timeval systemTime;
int i;
char sql[1024] = {0};
char prefix[20] = {0};
char db[128] = {0};
STable * tableList;
if (argc != 5) {
printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]);
exit(0);
}
// a simple way to parse input parameters
if (argc >= 3) strcpy(db, argv[2]);
if (argc >= 4) points = atoi(argv[3]);
if (argc >= 5) numOfTables = atoi(argv[4]);
size_t size = sizeof(STable) * (size_t)numOfTables;
tableList = (STable *)malloc(size);
memset(tableList, 0, size);
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) taos_error(taos);
printf("success to connect to server\n");
sprintf(sql, "drop database if exists %s", db);
queryDB(taos, sql);
sprintf(sql, "create database %s", db);
queryDB(taos, sql);
sprintf(sql, "use %s", db);
queryDB(taos, sql);
strcpy(prefix, "asytbl_");
for (i = 0; i < numOfTables; ++i) {
tableList[i].id = i;
tableList[i].taos = taos;
sprintf(tableList[i].name, "%s%d", prefix, i);
sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i);
queryDB(taos, sql);
}
gettimeofday(&systemTime, NULL);
for (i = 0; i < numOfTables; ++i)
tableList[i].timeStamp = (time_t)(systemTime.tv_sec) * 1000 + systemTime.tv_usec / 1000;
printf("success to create tables, press any key to insert\n");
getchar();
printf("start to insert...\n");
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesInsertProcessed = 0;
tablesSelectProcessed = 0;
for (i = 0; i < numOfTables; ++i) {
// insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
taos_query_a(taos, sql, taos_insert_call_back, (void *)(tableList + i));
}
printf("once insert finished, presse any key to query\n");
getchar();
while (1) {
if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
printf("start to query...\n");
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API
sprintf(sql, "select * from %s", tableList[i].name);
taos_query_a(taos, sql, taos_select_call_back, (void *)(tableList + i));
}
printf("\nonce finished, press any key to exit\n");
getchar();
while (1) {
if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
for (i = 0; i < numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
taos_close(taos);
free(tableList);
printf("==== async demo end====\n");
printf("\n");
return 0;
}
void taos_error(TAOS *con) {
fprintf(stderr, "TDengine error: %s\n", taos_errstr(con));
taos_close(con);
taos_cleanup();
exit(1);
}
void taos_insert_call_back(void *param, TAOS_RES *tres, int code) {
STable * pTable = (STable *)param;
struct timeval systemTime;
char sql[128];
pTable->rowsTried++;
if (code < 0) {
printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried);
} else if (code == 0) {
printf("%s not inserted\n", pTable->name);
} else {
pTable->rowsInserted++;
}
if (pTable->rowsTried < points) {
// for this demo, insert another record
sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000 + pTable->rowsTried * 1000,
pTable->rowsTried);
taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable);
} else {
printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesInsertProcessed++;
if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points * numOfTables);
}
}
taos_free_result(tres);
}
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) {
STable * pTable = (STable *)param;
struct timeval systemTime;
if (numOfRows > 0) {
for (int i = 0; i < numOfRows; ++i) {
// synchronous API to retrieve a row from batch of records
/*TAOS_ROW row = */ (void)taos_fetch_row(tres);
// process row
}
pTable->rowsRetrieved += numOfRows;
// retrieve next batch of rows
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
} else {
if (numOfRows < 0) printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
// taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesSelectProcessed++;
if (tablesSelectProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
}
taos_free_result(tres);
}
}
void taos_select_call_back(void *param, TAOS_RES *tres, int code) {
STable *pTable = (STable *)param;
if (code == 0 && tres) {
// asynchronous API to fetch a batch of records
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
} else {
printf("%s select failed, code:%d\n", pTable->name, code);
taos_free_result(tres);
taos_cleanup();
exit(1);
}
}
参数绑定示例
参数绑定
// TAOS standard API example. The same syntax as MySQL, but only a subet
// to compile: gcc -o prepare prepare.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
#include <unistd.h>
void taosMsleep(int mseconds);
void verify_prepare(TAOS* taos) {
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database test;");
int code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
usleep(100000);
taos_select_db(taos, "test");
// create table
const char* sql =
"create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin "
"binary(40), blob nchar(10), u1 tinyint unsigned, u2 smallint unsigned, u4 int unsigned, u8 bigint unsigned)";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
// insert 10 records
struct {
int64_t ts;
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
char bin[40];
char blob[80];
uint8_t u1;
uint16_t u2;
uint32_t u4;
uint64_t u8;
} v = {0};
TAOS_STMT* stmt = taos_stmt_init(taos);
TAOS_BIND params[14];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = ¶ms[0].buffer_length;
params[0].is_null = NULL;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b);
params[1].buffer = &v.b;
params[1].length = ¶ms[1].buffer_length;
params[1].is_null = NULL;
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[2].buffer_length = sizeof(v.v1);
params[2].buffer = &v.v1;
params[2].length = ¶ms[2].buffer_length;
params[2].is_null = NULL;
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[3].buffer_length = sizeof(v.v2);
params[3].buffer = &v.v2;
params[3].length = ¶ms[3].buffer_length;
params[3].is_null = NULL;
params[4].buffer_type = TSDB_DATA_TYPE_INT;
params[4].buffer_length = sizeof(v.v4);
params[4].buffer = &v.v4;
params[4].length = ¶ms[4].buffer_length;
params[4].is_null = NULL;
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[5].buffer_length = sizeof(v.v8);
params[5].buffer = &v.v8;
params[5].length = ¶ms[5].buffer_length;
params[5].is_null = NULL;
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[6].buffer_length = sizeof(v.f4);
params[6].buffer = &v.f4;
params[6].length = ¶ms[6].buffer_length;
params[6].is_null = NULL;
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[7].buffer_length = sizeof(v.f8);
params[7].buffer = &v.f8;
params[7].length = ¶ms[7].buffer_length;
params[7].is_null = NULL;
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
params[8].buffer_length = sizeof(v.bin);
params[8].buffer = v.bin;
params[8].length = ¶ms[8].buffer_length;
params[8].is_null = NULL;
strcpy(v.blob, "一二三四五六七八九十");
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer_length = strlen(v.blob);
params[9].buffer = v.blob;
params[9].length = ¶ms[9].buffer_length;
params[9].is_null = NULL;
params[10].buffer_type = TSDB_DATA_TYPE_UTINYINT;
params[10].buffer_length = sizeof(v.u1);
params[10].buffer = &v.u1;
params[10].length = ¶ms[10].buffer_length;
params[10].is_null = NULL;
params[11].buffer_type = TSDB_DATA_TYPE_USMALLINT;
params[11].buffer_length = sizeof(v.u2);
params[11].buffer = &v.u2;
params[11].length = ¶ms[11].buffer_length;
params[11].is_null = NULL;
params[12].buffer_type = TSDB_DATA_TYPE_UINT;
params[12].buffer_length = sizeof(v.u4);
params[12].buffer = &v.u4;
params[12].length = ¶ms[12].buffer_length;
params[12].is_null = NULL;
params[13].buffer_type = TSDB_DATA_TYPE_UBIGINT;
params[13].buffer_length = sizeof(v.u8);
params[13].buffer = &v.u8;
params[13].length = ¶ms[13].buffer_length;
params[13].is_null = NULL;
int is_null = 1;
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0) {
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 10; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
v.v1 = (int8_t)i;
v.v2 = (int16_t)(i * 2);
v.v4 = (int32_t)(i * 4);
v.v8 = (int64_t)(i * 8);
v.f4 = (float)(i * 40);
v.f8 = (double)(i * 80);
for (int j = 0; j < sizeof(v.bin); ++j) {
v.bin[j] = (char)(i + '0');
}
v.u1 = (uint8_t)i;
v.u2 = (uint16_t)(i * 2);
v.u4 = (uint32_t)(i * 4);
v.u8 = (uint64_t)(i * 8);
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute insert statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
int affectedRows = taos_stmt_affected_rows(stmt);
printf("sucessfully inserted %d rows\n", affectedRows);
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
v.v1 = 5;
v.v2 = 15;
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute select statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD* fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
taos_stmt_close(stmt);
}
void verify_prepare2(TAOS* taos) {
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database test;");
int code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
usleep(100000);
taos_select_db(taos, "test");
// create table
const char* sql =
"create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin "
"binary(40), blob nchar(10), u1 tinyint unsigned, u2 smallint unsigned, u4 int unsigned, u8 bigint unsigned)";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
// insert 10 records
struct {
int64_t ts;
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
char bin[40];
char blob[80];
uint8_t u1;
uint16_t u2;
uint32_t u4;
uint64_t u8;
} v = {0};
TAOS_STMT* stmt = taos_stmt_init(taos);
TAOS_BIND params[14];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = ¶ms[0].buffer_length;
params[0].is_null = NULL;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b);
params[1].buffer = &v.b;
params[1].length = ¶ms[1].buffer_length;
params[1].is_null = NULL;
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[2].buffer_length = sizeof(v.v1);
params[2].buffer = &v.v1;
params[2].length = ¶ms[2].buffer_length;
params[2].is_null = NULL;
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[3].buffer_length = sizeof(v.v2);
params[3].buffer = &v.v2;
params[3].length = ¶ms[3].buffer_length;
params[3].is_null = NULL;
params[4].buffer_type = TSDB_DATA_TYPE_INT;
params[4].buffer_length = sizeof(v.v4);
params[4].buffer = &v.v4;
params[4].length = ¶ms[4].buffer_length;
params[4].is_null = NULL;
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[5].buffer_length = sizeof(v.v8);
params[5].buffer = &v.v8;
params[5].length = ¶ms[5].buffer_length;
params[5].is_null = NULL;
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[6].buffer_length = sizeof(v.f4);
params[6].buffer = &v.f4;
params[6].length = ¶ms[6].buffer_length;
params[6].is_null = NULL;
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[7].buffer_length = sizeof(v.f8);
params[7].buffer = &v.f8;
params[7].length = ¶ms[7].buffer_length;
params[7].is_null = NULL;
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
params[8].buffer_length = sizeof(v.bin);
params[8].buffer = v.bin;
params[8].length = ¶ms[8].buffer_length;
params[8].is_null = NULL;
strcpy(v.blob, "一二三四五六七八九十");
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer_length = strlen(v.blob);
params[9].buffer = v.blob;
params[9].length = ¶ms[9].buffer_length;
params[9].is_null = NULL;
params[10].buffer_type = TSDB_DATA_TYPE_UTINYINT;
params[10].buffer_length = sizeof(v.u1);
params[10].buffer = &v.u1;
params[10].length = ¶ms[10].buffer_length;
params[10].is_null = NULL;
params[11].buffer_type = TSDB_DATA_TYPE_USMALLINT;
params[11].buffer_length = sizeof(v.u2);
params[11].buffer = &v.u2;
params[11].length = ¶ms[11].buffer_length;
params[11].is_null = NULL;
params[12].buffer_type = TSDB_DATA_TYPE_UINT;
params[12].buffer_length = sizeof(v.u4);
params[12].buffer = &v.u4;
params[12].length = ¶ms[12].buffer_length;
params[12].is_null = NULL;
params[13].buffer_type = TSDB_DATA_TYPE_UBIGINT;
params[13].buffer_length = sizeof(v.u8);
params[13].buffer = &v.u8;
params[13].length = ¶ms[13].buffer_length;
params[13].is_null = NULL;
sql = "insert into ? values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0) {
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
code = taos_stmt_set_tbname(stmt, "m1");
if (code != 0) {
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
int is_null = 1;
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 10; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
v.v1 = (int8_t)i;
v.v2 = (int16_t)(i * 2);
v.v4 = (int32_t)(i * 4);
v.v8 = (int64_t)(i * 8);
v.f4 = (float)(i * 40);
v.f8 = (double)(i * 80);
for (int j = 0; j < sizeof(v.bin); ++j) {
v.bin[j] = (char)(i + '0');
}
v.u1 = (uint8_t)i;
v.u2 = (uint16_t)(i * 2);
v.u4 = (uint32_t)(i * 4);
v.u8 = (uint64_t)(i * 8);
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute insert statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
int affectedRows = taos_stmt_affected_rows(stmt);
printf("sucessfully inserted %d rows\n", affectedRows);
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
TAOS_BIND qparams[2];
int8_t v1 = 5;
int16_t v2 = 15;
qparams[0].buffer_type = TSDB_DATA_TYPE_TINYINT;
qparams[0].buffer_length = sizeof(v1);
qparams[0].buffer = &v1;
qparams[0].length = &qparams[0].buffer_length;
qparams[0].is_null = NULL;
qparams[1].buffer_type = TSDB_DATA_TYPE_SMALLINT;
qparams[1].buffer_length = sizeof(v2);
qparams[1].buffer = &v2;
qparams[1].length = &qparams[1].buffer_length;
qparams[1].is_null = NULL;
taos_stmt_bind_param(stmt, qparams);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute select statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD* fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
taos_stmt_close(stmt);
}
void verify_prepare3(TAOS* taos) {
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database test;");
int code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
usleep(100000);
taos_select_db(taos, "test");
// create table
const char* sql =
"create stable st1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin "
"binary(40), blob nchar(10), u1 tinyint unsigned, u2 smallint unsigned, u4 int unsigned, u8 bigint unsigned) "
"tags "
"(b_tag bool, v1_tag tinyint, v2_tag smallint, v4_tag int, v8_tag bigint, f4_tag float, f8_tag double, bin_tag "
"binary(40), blob_tag nchar(10), u1_tag tinyint unsigned, u2_tag smallint unsigned, u4_tag int unsigned, u8_tag "
"bigint "
"unsigned)";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
TAOS_BIND tags[13];
struct {
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
char bin[40];
char blob[80];
uint8_t u1;
uint16_t u2;
uint32_t u4;
uint64_t u8;
} id = {0};
id.b = (int8_t)1;
id.v1 = (int8_t)1;
id.v2 = (int16_t)2;
id.v4 = (int32_t)4;
id.v8 = (int64_t)8;
id.f4 = (float)40;
id.f8 = (double)80;
for (int j = 0; j < sizeof(id.bin); ++j) {
id.bin[j] = (char)('1' + '0');
}
strcpy(id.blob, "一二三四五六七八九十");
id.u1 = (uint8_t)1;
id.u2 = (uint16_t)2;
id.u4 = (uint32_t)4;
id.u8 = (uint64_t)8;
tags[0].buffer_type = TSDB_DATA_TYPE_BOOL;
tags[0].buffer_length = sizeof(id.b);
tags[0].buffer = &id.b;
tags[0].length = &tags[0].buffer_length;
tags[0].is_null = NULL;
tags[1].buffer_type = TSDB_DATA_TYPE_TINYINT;
tags[1].buffer_length = sizeof(id.v1);
tags[1].buffer = &id.v1;
tags[1].length = &tags[1].buffer_length;
tags[1].is_null = NULL;
tags[2].buffer_type = TSDB_DATA_TYPE_SMALLINT;
tags[2].buffer_length = sizeof(id.v2);
tags[2].buffer = &id.v2;
tags[2].length = &tags[2].buffer_length;
tags[2].is_null = NULL;
tags[3].buffer_type = TSDB_DATA_TYPE_INT;
tags[3].buffer_length = sizeof(id.v4);
tags[3].buffer = &id.v4;
tags[3].length = &tags[3].buffer_length;
tags[3].is_null = NULL;
tags[4].buffer_type = TSDB_DATA_TYPE_BIGINT;
tags[4].buffer_length = sizeof(id.v8);
tags[4].buffer = &id.v8;
tags[4].length = &tags[4].buffer_length;
tags[4].is_null = NULL;
tags[5].buffer_type = TSDB_DATA_TYPE_FLOAT;
tags[5].buffer_length = sizeof(id.f4);
tags[5].buffer = &id.f4;
tags[5].length = &tags[5].buffer_length;
tags[5].is_null = NULL;
tags[6].buffer_type = TSDB_DATA_TYPE_DOUBLE;
tags[6].buffer_length = sizeof(id.f8);
tags[6].buffer = &id.f8;
tags[6].length = &tags[6].buffer_length;
tags[6].is_null = NULL;
tags[7].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[7].buffer_length = sizeof(id.bin);
tags[7].buffer = &id.bin;
tags[7].length = &tags[7].buffer_length;
tags[7].is_null = NULL;
tags[8].buffer_type = TSDB_DATA_TYPE_NCHAR;
tags[8].buffer_length = strlen(id.blob);
tags[8].buffer = &id.blob;
tags[8].length = &tags[8].buffer_length;
tags[8].is_null = NULL;
tags[9].buffer_type = TSDB_DATA_TYPE_UTINYINT;
tags[9].buffer_length = sizeof(id.u1);
tags[9].buffer = &id.u1;
tags[9].length = &tags[9].buffer_length;
tags[9].is_null = NULL;
tags[10].buffer_type = TSDB_DATA_TYPE_USMALLINT;
tags[10].buffer_length = sizeof(id.u2);
tags[10].buffer = &id.u2;
tags[10].length = &tags[10].buffer_length;
tags[10].is_null = NULL;
tags[11].buffer_type = TSDB_DATA_TYPE_UINT;
tags[11].buffer_length = sizeof(id.u4);
tags[11].buffer = &id.u4;
tags[11].length = &tags[11].buffer_length;
tags[11].is_null = NULL;
tags[12].buffer_type = TSDB_DATA_TYPE_UBIGINT;
tags[12].buffer_length = sizeof(id.u8);
tags[12].buffer = &id.u8;
tags[12].length = &tags[12].buffer_length;
tags[12].is_null = NULL;
// insert 10 records
struct {
int64_t ts[10];
int8_t b[10];
int8_t v1[10];
int16_t v2[10];
int32_t v4[10];
int64_t v8[10];
float f4[10];
double f8[10];
char bin[10][40];
char blob[10][80];
uint8_t u1[10];
uint16_t u2[10];
uint32_t u4[10];
uint64_t u8[10];
} v;
int32_t* t8_len = malloc(sizeof(int32_t) * 10);
int32_t* t16_len = malloc(sizeof(int32_t) * 10);
int32_t* t32_len = malloc(sizeof(int32_t) * 10);
int32_t* t64_len = malloc(sizeof(int32_t) * 10);
int32_t* float_len = malloc(sizeof(int32_t) * 10);
int32_t* double_len = malloc(sizeof(int32_t) * 10);
int32_t* bin_len = malloc(sizeof(int32_t) * 10);
int32_t* blob_len = malloc(sizeof(int32_t) * 10);
int32_t* u8_len = malloc(sizeof(int32_t) * 10);
int32_t* u16_len = malloc(sizeof(int32_t) * 10);
int32_t* u32_len = malloc(sizeof(int32_t) * 10);
int32_t* u64_len = malloc(sizeof(int32_t) * 10);
TAOS_STMT* stmt = taos_stmt_init(taos);
TAOS_MULTI_BIND params[14];
char is_null[10] = {0};
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts[0]);
params[0].buffer = v.ts;
params[0].length = t64_len;
params[0].is_null = is_null;
params[0].num = 10;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b[0]);
params[1].buffer = v.b;
params[1].length = t8_len;
params[1].is_null = is_null;
params[1].num = 10;
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[2].buffer_length = sizeof(v.v1[0]);
params[2].buffer = v.v1;
params[2].length = t8_len;
params[2].is_null = is_null;
params[2].num = 10;
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[3].buffer_length = sizeof(v.v2[0]);
params[3].buffer = v.v2;
params[3].length = t16_len;
params[3].is_null = is_null;
params[3].num = 10;
params[4].buffer_type = TSDB_DATA_TYPE_INT;
params[4].buffer_length = sizeof(v.v4[0]);
params[4].buffer = v.v4;
params[4].length = t32_len;
params[4].is_null = is_null;
params[4].num = 10;
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[5].buffer_length = sizeof(v.v8[0]);
params[5].buffer = v.v8;
params[5].length = t64_len;
params[5].is_null = is_null;
params[5].num = 10;
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[6].buffer_length = sizeof(v.f4[0]);
params[6].buffer = v.f4;
params[6].length = float_len;
params[6].is_null = is_null;
params[6].num = 10;
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[7].buffer_length = sizeof(v.f8[0]);
params[7].buffer = v.f8;
params[7].length = double_len;
params[7].is_null = is_null;
params[7].num = 10;
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
params[8].buffer_length = sizeof(v.bin[0]);
params[8].buffer = v.bin;
params[8].length = bin_len;
params[8].is_null = is_null;
params[8].num = 10;
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer_length = sizeof(v.blob[0]);
params[9].buffer = v.blob;
params[9].length = blob_len;
params[9].is_null = is_null;
params[9].num = 10;
params[10].buffer_type = TSDB_DATA_TYPE_UTINYINT;
params[10].buffer_length = sizeof(v.u1[0]);
params[10].buffer = v.u1;
params[10].length = u8_len;
params[10].is_null = is_null;
params[10].num = 10;
params[11].buffer_type = TSDB_DATA_TYPE_USMALLINT;
params[11].buffer_length = sizeof(v.u2[0]);
params[11].buffer = v.u2;
params[11].length = u16_len;
params[11].is_null = is_null;
params[11].num = 10;
params[12].buffer_type = TSDB_DATA_TYPE_UINT;
params[12].buffer_length = sizeof(v.u4[0]);
params[12].buffer = v.u4;
params[12].length = u32_len;
params[12].is_null = is_null;
params[12].num = 10;
params[13].buffer_type = TSDB_DATA_TYPE_UBIGINT;
params[13].buffer_length = sizeof(v.u8[0]);
params[13].buffer = v.u8;
params[13].length = u64_len;
params[13].is_null = is_null;
params[13].num = 10;
sql = "insert into ? using st1 tags(?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0) {
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
code = taos_stmt_set_tbname_tags(stmt, "m1", tags);
if (code != 0) {
printf("\033[31mfailed to execute taos_stmt_set_tbname_tags. error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
int64_t ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts[i] = ts++;
is_null[i] = 0;
v.b[i] = (int8_t)i % 2;
v.v1[i] = (int8_t)i;
v.v2[i] = (int16_t)(i * 2);
v.v4[i] = (int32_t)(i * 4);
v.v8[i] = (int64_t)(i * 8);
v.f4[i] = (float)(i * 40);
v.f8[i] = (double)(i * 80);
for (int j = 0; j < sizeof(v.bin[0]); ++j) {
v.bin[i][j] = (char)(i + '0');
}
strcpy(v.blob[i], "一二三四五六七八九十");
v.u1[i] = (uint8_t)i;
v.u2[i] = (uint16_t)(i * 2);
v.u4[i] = (uint32_t)(i * 4);
v.u8[i] = (uint64_t)(i * 8);
t8_len[i] = sizeof(int8_t);
t16_len[i] = sizeof(int16_t);
t32_len[i] = sizeof(int32_t);
t64_len[i] = sizeof(int64_t);
float_len[i] = sizeof(float);
double_len[i] = sizeof(double);
bin_len[i] = sizeof(v.bin[0]);
blob_len[i] = (int32_t)strlen(v.blob[i]);
u8_len[i] = sizeof(uint8_t);
u16_len[i] = sizeof(uint16_t);
u32_len[i] = sizeof(uint32_t);
u64_len[i] = sizeof(uint64_t);
}
taos_stmt_bind_param_batch(stmt, params);
taos_stmt_add_batch(stmt);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute insert statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
int affectedRows = taos_stmt_affected_rows(stmt);
printf("successfully inserted %d rows\n", affectedRows);
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
TAOS_BIND qparams[2];
int8_t v1 = 5;
int16_t v2 = 15;
qparams[0].buffer_type = TSDB_DATA_TYPE_TINYINT;
qparams[0].buffer_length = sizeof(v1);
qparams[0].buffer = &v1;
qparams[0].length = &qparams[0].buffer_length;
qparams[0].is_null = NULL;
qparams[1].buffer_type = TSDB_DATA_TYPE_SMALLINT;
qparams[1].buffer_length = sizeof(v2);
qparams[1].buffer = &v2;
qparams[1].length = &qparams[1].buffer_length;
qparams[1].is_null = NULL;
taos_stmt_bind_param(stmt, qparams);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute select statement.error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD* fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
taos_stmt_close(stmt);
free(t8_len);
free(t16_len);
free(t32_len);
free(t64_len);
free(float_len);
free(double_len);
free(bin_len);
free(blob_len);
free(u8_len);
free(u16_len);
free(u32_len);
free(u64_len);
}
/**
* @brief Verify the upper/lower case of tableName for create(by setTableName)/query/show/describe/drop.
* https://jira.taosdata.com:18080/browse/TS-904
* https://jira.taosdata.com:18090/pages/viewpage.action?pageId=129140555
* @param taos
*/
void verify_prepare4(TAOS* taos) {
printf("Verify the upper/lower case of tableName for create(by setTableName)/query/show/describe/drop etc.\n");
TAOS_RES* result = taos_query(taos, "drop database if exists test;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database test;");
int code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create database, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
usleep(100000);
taos_select_db(taos, "test");
// create table
const char* sql =
"create stable st1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin "
"binary(40), blob nchar(10), u1 tinyint unsigned, u2 smallint unsigned, u4 int unsigned, u8 bigint unsigned) "
"tags "
"(b_tag bool, v1_tag tinyint, v2_tag smallint, v4_tag int, v8_tag bigint, f4_tag float, f8_tag double, bin_tag "
"binary(40), blob_tag nchar(10), u1_tag tinyint unsigned, u2_tag smallint unsigned, u4_tag int unsigned, u8_tag "
"bigint "
"unsigned)";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("\033[31mfailed to create table, reason:%s\033[0m\n", taos_errstr(result));
taos_free_result(result);
exit(EXIT_FAILURE);
}
taos_free_result(result);
TAOS_BIND tags[13];
struct {
int8_t b;
int8_t v1;
int16_t v2;
int32_t v4;
int64_t v8;
float f4;
double f8;
char bin[40];
char blob[80];
uint8_t u1;
uint16_t u2;
uint32_t u4;
uint64_t u8;
} id = {0};
id.b = (int8_t)1;
id.v1 = (int8_t)1;
id.v2 = (int16_t)2;
id.v4 = (int32_t)4;
id.v8 = (int64_t)8;
id.f4 = (float)40;
id.f8 = (double)80;
for (int j = 0; j < sizeof(id.bin); ++j) {
id.bin[j] = (char)('1' + '0');
}
strcpy(id.blob, "一二三四五六七八九十");
id.u1 = (uint8_t)1;
id.u2 = (uint16_t)2;
id.u4 = (uint32_t)4;
id.u8 = (uint64_t)8;
tags[0].buffer_type = TSDB_DATA_TYPE_BOOL;
tags[0].buffer_length = sizeof(id.b);
tags[0].buffer = &id.b;
tags[0].length = &tags[0].buffer_length;
tags[0].is_null = NULL;
tags[1].buffer_type = TSDB_DATA_TYPE_TINYINT;
tags[1].buffer_length = sizeof(id.v1);
tags[1].buffer = &id.v1;
tags[1].length = &tags[1].buffer_length;
tags[1].is_null = NULL;
tags[2].buffer_type = TSDB_DATA_TYPE_SMALLINT;
tags[2].buffer_length = sizeof(id.v2);
tags[2].buffer = &id.v2;
tags[2].length = &tags[2].buffer_length;
tags[2].is_null = NULL;
tags[3].buffer_type = TSDB_DATA_TYPE_INT;
tags[3].buffer_length = sizeof(id.v4);
tags[3].buffer = &id.v4;
tags[3].length = &tags[3].buffer_length;
tags[3].is_null = NULL;
tags[4].buffer_type = TSDB_DATA_TYPE_BIGINT;
tags[4].buffer_length = sizeof(id.v8);
tags[4].buffer = &id.v8;
tags[4].length = &tags[4].buffer_length;
tags[4].is_null = NULL;
tags[5].buffer_type = TSDB_DATA_TYPE_FLOAT;
tags[5].buffer_length = sizeof(id.f4);
tags[5].buffer = &id.f4;
tags[5].length = &tags[5].buffer_length;
tags[5].is_null = NULL;
tags[6].buffer_type = TSDB_DATA_TYPE_DOUBLE;
tags[6].buffer_length = sizeof(id.f8);
tags[6].buffer = &id.f8;
tags[6].length = &tags[6].buffer_length;
tags[6].is_null = NULL;
tags[7].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[7].buffer_length = sizeof(id.bin);
tags[7].buffer = &id.bin;
tags[7].length = &tags[7].buffer_length;
tags[7].is_null = NULL;
tags[8].buffer_type = TSDB_DATA_TYPE_NCHAR;
tags[8].buffer_length = strlen(id.blob);
tags[8].buffer = &id.blob;
tags[8].length = &tags[8].buffer_length;
tags[8].is_null = NULL;
tags[9].buffer_type = TSDB_DATA_TYPE_UTINYINT;
tags[9].buffer_length = sizeof(id.u1);
tags[9].buffer = &id.u1;
tags[9].length = &tags[9].buffer_length;
tags[9].is_null = NULL;
tags[10].buffer_type = TSDB_DATA_TYPE_USMALLINT;
tags[10].buffer_length = sizeof(id.u2);
tags[10].buffer = &id.u2;
tags[10].length = &tags[10].buffer_length;
tags[10].is_null = NULL;
tags[11].buffer_type = TSDB_DATA_TYPE_UINT;
tags[11].buffer_length = sizeof(id.u4);
tags[11].buffer = &id.u4;
tags[11].length = &tags[11].buffer_length;
tags[11].is_null = NULL;
tags[12].buffer_type = TSDB_DATA_TYPE_UBIGINT;
tags[12].buffer_length = sizeof(id.u8);
tags[12].buffer = &id.u8;
tags[12].length = &tags[12].buffer_length;
tags[12].is_null = NULL;
// insert 10 records
struct {
int64_t ts[10];
int8_t b[10];
int8_t v1[10];
int16_t v2[10];
int32_t v4[10];
int64_t v8[10];
float f4[10];
double f8[10];
char bin[10][40];
char blob[10][80];
uint8_t u1[10];
uint16_t u2[10];
uint32_t u4[10];
uint64_t u8[10];
} v;
int32_t* t8_len = malloc(sizeof(int32_t) * 10);
int32_t* t16_len = malloc(sizeof(int32_t) * 10);
int32_t* t32_len = malloc(sizeof(int32_t) * 10);
int32_t* t64_len = malloc(sizeof(int32_t) * 10);
int32_t* float_len = malloc(sizeof(int32_t) * 10);
int32_t* double_len = malloc(sizeof(int32_t) * 10);
int32_t* bin_len = malloc(sizeof(int32_t) * 10);
int32_t* blob_len = malloc(sizeof(int32_t) * 10);
int32_t* u8_len = malloc(sizeof(int32_t) * 10);
int32_t* u16_len = malloc(sizeof(int32_t) * 10);
int32_t* u32_len = malloc(sizeof(int32_t) * 10);
int32_t* u64_len = malloc(sizeof(int32_t) * 10);
TAOS_MULTI_BIND params[14];
char is_null[10] = {0};
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts[0]);
params[0].buffer = v.ts;
params[0].length = t64_len;
params[0].is_null = is_null;
params[0].num = 10;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b[0]);
params[1].buffer = v.b;
params[1].length = t8_len;
params[1].is_null = is_null;
params[1].num = 10;
params[2].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[2].buffer_length = sizeof(v.v1[0]);
params[2].buffer = v.v1;
params[2].length = t8_len;
params[2].is_null = is_null;
params[2].num = 10;
params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[3].buffer_length = sizeof(v.v2[0]);
params[3].buffer = v.v2;
params[3].length = t16_len;
params[3].is_null = is_null;
params[3].num = 10;
params[4].buffer_type = TSDB_DATA_TYPE_INT;
params[4].buffer_length = sizeof(v.v4[0]);
params[4].buffer = v.v4;
params[4].length = t32_len;
params[4].is_null = is_null;
params[4].num = 10;
params[5].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[5].buffer_length = sizeof(v.v8[0]);
params[5].buffer = v.v8;
params[5].length = t64_len;
params[5].is_null = is_null;
params[5].num = 10;
params[6].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[6].buffer_length = sizeof(v.f4[0]);
params[6].buffer = v.f4;
params[6].length = float_len;
params[6].is_null = is_null;
params[6].num = 10;
params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[7].buffer_length = sizeof(v.f8[0]);
params[7].buffer = v.f8;
params[7].length = double_len;
params[7].is_null = is_null;
params[7].num = 10;
params[8].buffer_type = TSDB_DATA_TYPE_BINARY;
params[8].buffer_length = sizeof(v.bin[0]);
params[8].buffer = v.bin;
params[8].length = bin_len;
params[8].is_null = is_null;
params[8].num = 10;
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer_length = sizeof(v.blob[0]);
params[9].buffer = v.blob;
params[9].length = blob_len;
params[9].is_null = is_null;
params[9].num = 10;
params[10].buffer_type = TSDB_DATA_TYPE_UTINYINT;
params[10].buffer_length = sizeof(v.u1[0]);
params[10].buffer = v.u1;
params[10].length = u8_len;
params[10].is_null = is_null;
params[10].num = 10;
params[11].buffer_type = TSDB_DATA_TYPE_USMALLINT;
params[11].buffer_length = sizeof(v.u2[0]);
params[11].buffer = v.u2;
params[11].length = u16_len;
params[11].is_null = is_null;
params[11].num = 10;
params[12].buffer_type = TSDB_DATA_TYPE_UINT;
params[12].buffer_length = sizeof(v.u4[0]);
params[12].buffer = v.u4;
params[12].length = u32_len;
params[12].is_null = is_null;
params[12].num = 10;
params[13].buffer_type = TSDB_DATA_TYPE_UBIGINT;
params[13].buffer_length = sizeof(v.u8[0]);
params[13].buffer = v.u8;
params[13].length = u64_len;
params[13].is_null = is_null;
params[13].num = 10;
// verify table names for upper/lower case
#define VERIFY_CNT 5
typedef struct {
char setTbName[20];
char showName[20];
char describeName[20];
char queryName[20];
char dropName[20];
} STbNames;
/**
* @brief
* 0 - success expected
* NonZero - fail expected
*/
typedef struct {
int32_t setTbName;
int32_t showName;
int32_t describeName;
int32_t queryName;
int32_t dropName;
} STbNamesResult;
STbNames tbName[VERIFY_CNT] = {0};
STbNamesResult tbNameResult[VERIFY_CNT] = {0};
STbNames* pTbName = NULL;
STbNamesResult* pTbNameResult = NULL;
pTbName = &tbName[0];
pTbNameResult = &tbNameResult[0];
strcpy(pTbName->setTbName, "Mn1");
strcpy(pTbName->showName, "mn1");
strcpy(pTbName->describeName, "mn1");
strcpy(pTbName->queryName, "mn1");
strcpy(pTbName->dropName, "mn1");
pTbName = &tbName[1];
pTbNameResult = &tbNameResult[1];
strcpy(pTbName->setTbName, "'Mn1'");
strcpy(pTbName->showName, "'mn1'");
strcpy(pTbName->describeName, "'mn1'");
strcpy(pTbName->queryName, "'mn1'");
strcpy(pTbName->dropName, "'mn1'");
pTbName = &tbName[2];
pTbNameResult = &tbNameResult[2];
strcpy(pTbName->setTbName, "\"Mn1\"");
strcpy(pTbName->showName, "\"mn1\"");
strcpy(pTbName->describeName, "\"mn1\"");
strcpy(pTbName->queryName, "\"mn1\"");
strcpy(pTbName->dropName, "\"mn1\"");
pTbName = &tbName[3];
pTbNameResult = &tbNameResult[3];
strcpy(pTbName->setTbName, "\"Mn1\"");
strcpy(pTbName->showName, "'mn1'");
strcpy(pTbName->describeName, "'mn1'");
strcpy(pTbName->queryName, "mn1");
strcpy(pTbName->dropName, "\"mn1\"");
pTbName = &tbName[4];
pTbNameResult = &tbNameResult[4];
strcpy(pTbName->setTbName, "`Mn1`");
strcpy(pTbName->showName, "Mn1"); // TODO support uniform of ``
strcpy(pTbName->describeName, "`Mn1`");
strcpy(pTbName->queryName, "`Mn1`");
strcpy(pTbName->dropName, "`Mn1`");
TAOS_STMT* stmt = NULL;
for (int n = 0; n < VERIFY_CNT; ++n) {
printf("\033[31m[%d] ===================================\033[0m\n", n);
pTbName = &tbName[n];
pTbNameResult = &tbNameResult[n];
char tmpStr[256] = {0};
// set table name
stmt = taos_stmt_init(taos);
if (!stmt) {
printf("\033[31m[%d] failed to execute taos_stmt_init. error:%s\033[0m\n", n);
exit(EXIT_FAILURE);
}
sql = "insert into ? using st1 tags(?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0) {
printf("\033[31mfailed to execute taos_stmt_prepare. error:%s\033[0m\n", taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
printf("[%d] taos_stmt_set_tbname_tags, tbname=%s\n", n, pTbName->setTbName);
code = taos_stmt_set_tbname_tags(stmt, pTbName->setTbName, tags);
if ((!pTbNameResult->setTbName && (0 != code)) || (pTbNameResult->setTbName && (0 == code))) {
printf("\033[31m[%d] failed to execute taos_stmt_set_tbname_tags. error:%s\033[0m\n", n, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
if (code == 0) {
int64_t ts = 1591060628000 + 1000 * n;
for (int i = 0; i < 10; ++i) {
v.ts[i] = ts++;
is_null[i] = 0;
v.b[i] = (int8_t)i % 2;
v.v1[i] = (int8_t)i;
v.v2[i] = (int16_t)(i * 2);
v.v4[i] = (int32_t)(i * 4);
v.v8[i] = (int64_t)(i * 8);
v.f4[i] = (float)(i * 40);
v.f8[i] = (double)(i * 80);
for (int j = 0; j < sizeof(v.bin[0]); ++j) {
v.bin[i][j] = (char)(i + '0');
}
strcpy(v.blob[i], "一二三四五六七八九十");
v.u1[i] = (uint8_t)i;
v.u2[i] = (uint16_t)(i * 2);
v.u4[i] = (uint32_t)(i * 4);
v.u8[i] = (uint64_t)(i * 8);
t8_len[i] = sizeof(int8_t);
t16_len[i] = sizeof(int16_t);
t32_len[i] = sizeof(int32_t);
t64_len[i] = sizeof(int64_t);
float_len[i] = sizeof(float);
double_len[i] = sizeof(double);
bin_len[i] = sizeof(v.bin[0]);
blob_len[i] = (int32_t)strlen(v.blob[i]);
u8_len[i] = sizeof(uint8_t);
u16_len[i] = sizeof(uint16_t);
u32_len[i] = sizeof(uint32_t);
u64_len[i] = sizeof(uint64_t);
}
taos_stmt_bind_param_batch(stmt, params);
taos_stmt_add_batch(stmt);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31m[%d] failed to execute insert statement.error:%s\033[0m\n", n, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
taos_stmt_close(stmt);
// show the table
printf("[%d] show tables, tbName = %s\n", n, pTbName->showName);
stmt = taos_stmt_init(taos);
sprintf(tmpStr, "show tables like %s", pTbName->showName);
taos_stmt_prepare(stmt, tmpStr, 0);
code = taos_stmt_execute(stmt);
if ((!pTbNameResult->showName && (0 != code)) || (pTbNameResult->showName && (0 == code))) {
printf("\033[31m[%d] failed to execute show tables like. error:%s\033[0m\n", n, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
taos_stmt_close(stmt);
// describe the table
printf("[%d] describe tables, tbName = %s\n", n, pTbName->describeName);
stmt = taos_stmt_init(taos);
sprintf(tmpStr, "describe %s", pTbName->describeName);
taos_stmt_prepare(stmt, tmpStr, 0);
code = taos_stmt_execute(stmt);
if ((!pTbNameResult->describeName && (0 != code)) || (pTbNameResult->describeName && (0 == code))) {
printf("\033[31m[%d] failed to execute describe tables. error:%s\033[0m\n", n, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
taos_stmt_close(stmt);
// query the records
printf("[%d] select statement, tbName = %s\n", n, pTbName->queryName);
stmt = taos_stmt_init(taos);
sprintf(tmpStr, "SELECT * FROM %s", pTbName->queryName);
taos_stmt_prepare(stmt, tmpStr, 0);
TAOS_BIND qparams[2];
int8_t v1 = 5;
int16_t v2 = 15;
qparams[0].buffer_type = TSDB_DATA_TYPE_TINYINT;
qparams[0].buffer_length = sizeof(v1);
qparams[0].buffer = &v1;
qparams[0].length = &qparams[0].buffer_length;
qparams[0].is_null = NULL;
qparams[1].buffer_type = TSDB_DATA_TYPE_SMALLINT;
qparams[1].buffer_length = sizeof(v2);
qparams[1].buffer = &v2;
qparams[1].length = &qparams[1].buffer_length;
qparams[1].is_null = NULL;
taos_stmt_bind_param(stmt, qparams);
code = taos_stmt_execute(stmt);
if ((!pTbNameResult->queryName && (0 != code)) || (pTbNameResult->queryName && (0 == code))) {
printf("\033[31m[%d] failed to execute select statement.error:%s\033[0m\n", n, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD* fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("[%d] row = %s\n", n, temp);
}
taos_free_result(result);
taos_stmt_close(stmt);
// drop table
printf("[%d] drop table, tbName = %s\n", n, pTbName->dropName);
stmt = taos_stmt_init(taos);
sprintf(tmpStr, "drop table %s", pTbName->dropName);
taos_stmt_prepare(stmt, tmpStr, 0);
code = taos_stmt_execute(stmt);
if ((!pTbNameResult->dropName && (0 != code)) || (pTbNameResult->dropName && (0 == code))) {
printf("\033[31m[%d] failed to drop table. error:%s\033[0m\n", n, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
taos_stmt_close(stmt);
}
free(t8_len);
free(t16_len);
free(t32_len);
free(t64_len);
free(float_len);
free(double_len);
free(bin_len);
free(blob_len);
free(u8_len);
free(u16_len);
free(u32_len);
free(u64_len);
}
int main(int argc, char* argv[]) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
exit(1);
}
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
info = taos_get_client_info(taos);
printf("client info: %s\n", info);
printf("************ verify prepare *************\n");
verify_prepare(taos);
printf("************ verify prepare2 *************\n");
verify_prepare2(taos);
printf("************ verify prepare3 *************\n");
verify_prepare3(taos);
printf("************ verify prepare4 *************\n");
verify_prepare4(taos);
printf("************ verify end *************\n");
exit(EXIT_SUCCESS);
}
无模式写入示例
无模式写入
#include "os.h"
#include "taos.h"
#include "taoserror.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
bool verbose = false;
void printThreadId(pthread_t id, char* buf)
{
size_t i;
for (i = sizeof(i); i; --i)
sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1));
}
static int64_t getTimeInUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
typedef struct {
char** lines;
int numLines;
} SThreadLinesBatch;
typedef struct {
TAOS* taos;
int protocol;
int numBatches;
SThreadLinesBatch *batches;
int64_t costTime;
} SThreadInsertArgs;
static void* insertLines(void* args) {
SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
char tidBuf[32] = {0};
printThreadId(pthread_self(), tidBuf);
for (int i = 0; i < insertArgs->numBatches; ++i) {
SThreadLinesBatch* batch = insertArgs->batches + i;
if (verbose) printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t begin = getTimeInUs();
//int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
TAOS_RES * res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->protocol, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int32_t code = taos_errno(res);
int64_t end = getTimeInUs();
insertArgs->costTime += end - begin;
if (verbose) printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
}
return NULL;
}
int32_t getTelenetTemplate(char* lineTemplate, int templateLen) {
char* sample = "sta%d %lld 44.3 t0=False t1=127i8 t2=32 t3=%di32 t4=9223372036854775807i64 t5=11.12345f32 t6=22.123456789f64 t7=\"hpxzrdiw\" t8=\"ncharTagValue\" t9=127i8";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
if (numFields <= 4) {
char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lld";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
if (numFields <= 13) {
char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lld";
snprintf(lineTemplate, templateLen, "%s", sample);
return 0;
}
char* lineFormatTable = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32 ";
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "%s", lineFormatTable);
int offset[] = {numFields*2/5, numFields*4/5, numFields};
for (int i = 0; i < offset[0]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%di32,", i, i);
}
for (int i=offset[0]+1; i < offset[1]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%d.43f64,", i, i);
}
for (int i = offset[1]+1; i < offset[2]; ++i) {
snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i);
}
char* lineFormatTs = " %lld";
snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs);
return 0;
}
int32_t generateLine(char* line, int lineLen, char* lineTemplate, int protocol, int superTable, int childTable, int64_t ts) {
if (protocol == TSDB_SML_LINE_PROTOCOL) {
snprintf(line, lineLen, lineTemplate, superTable, childTable, ts);
} else if (protocol == TSDB_SML_TELNET_PROTOCOL) {
snprintf(line, lineLen, lineTemplate, superTable, ts, childTable);
}
return TSDB_CODE_SUCCESS;
}
int32_t setupSuperTables(TAOS* taos, char* lineTemplate, int protocol,
int numSuperTables, int numChildTables, int numRowsPerChildTable,
int maxBatchesPerThread, int64_t ts) {
printf("setup supertables...");
{
char** linesStb = calloc(numSuperTables, sizeof(char*));
for (int i = 0; i < numSuperTables; i++) {
char* lineStb = calloc(strlen(lineTemplate)+128, 1);
generateLine(lineStb, strlen(lineTemplate)+128, lineTemplate, protocol, i,
numSuperTables * numChildTables,
ts + numSuperTables * numChildTables * numRowsPerChildTable);
linesStb[i] = lineStb;
}
SThreadInsertArgs args = {0};
args.protocol = protocol;
args.batches = calloc(maxBatchesPerThread, sizeof(maxBatchesPerThread));
args.taos = taos;
args.batches[0].lines = linesStb;
args.batches[0].numLines = numSuperTables;
insertLines(&args);
free(args.batches);
for (int i = 0; i < numSuperTables; ++i) {
free(linesStb[i]);
}
free(linesStb);
}
return TSDB_CODE_SUCCESS;
}
int main(int argc, char* argv[]) {
int numThreads = 8;
int maxBatchesPerThread = 1024;
int numSuperTables = 1;
int numChildTables = 256;
int numRowsPerChildTable = 8192;
int numFields = 13;
int maxLinesPerBatch = 16384;
int protocol = TSDB_SML_TELNET_PROTOCOL;
int assembleSTables = 0;
int opt;
while ((opt = getopt(argc, argv, "s:c:r:f:t:b:p:w:hv")) != -1) {
switch (opt) {
case 's':
numSuperTables = atoi(optarg);
break;
case 'c':
numChildTables = atoi(optarg);
break;
case 'r':
numRowsPerChildTable = atoi(optarg);
break;
case 'f':
numFields = atoi(optarg);
break;
case 't':
numThreads = atoi(optarg);
break;
case 'b':
maxLinesPerBatch = atoi(optarg);
break;
case 'v':
verbose = true;
break;
case 'a':
assembleSTables = atoi(optarg);
break;
case 'p':
if (optarg[0] == 't') {
protocol = TSDB_SML_TELNET_PROTOCOL;
} else if (optarg[0] == 'l') {
protocol = TSDB_SML_LINE_PROTOCOL;
} else if (optarg[0] == 'j') {
protocol = TSDB_SML_JSON_PROTOCOL;
}
break;
case 'h':
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -a assemble-stables -v\n",
argv[0]);
exit(0);
default: /* '?' */
fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -a assemble-stables -v\n",
argv[0]);
exit(-1);
}
}
TAOS_RES* result;
//const char* host = "127.0.0.1";
const char* host = NULL;
const char* user = "root";
const char* passwd = "taosdata";
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
exit(1);
}
maxBatchesPerThread = (numSuperTables*numChildTables*numRowsPerChildTable)/(numThreads * maxLinesPerBatch) + 1;
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
info = taos_get_client_info(taos);
printf("client info: %s\n", info);
result = taos_query(taos, "drop database if exists db;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database db precision 'us';");
taos_free_result(result);
usleep(100000);
(void)taos_select_db(taos, "db");
time_t ct = time(0);
int64_t ts = ct * 1000 ;
char* lineTemplate = calloc(65536, sizeof(char));
if (protocol == TSDB_SML_LINE_PROTOCOL) {
getLineTemplate(lineTemplate, 65535, numFields);
} else if (protocol == TSDB_SML_TELNET_PROTOCOL ) {
getTelenetTemplate(lineTemplate, 65535);
}
if (assembleSTables) {
setupSuperTables(taos, lineTemplate, protocol,
numSuperTables, numChildTables, numRowsPerChildTable, maxBatchesPerThread, ts);
}
printf("generate lines...\n");
pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i = 0; i < numThreads; ++i) {
argsThread[i].batches = calloc(maxBatchesPerThread, sizeof(SThreadLinesBatch));
argsThread[i].taos = taos;
argsThread[i].numBatches = 0;
argsThread[i].protocol = protocol;
}
int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
int totalBatches = (int) ((totalLines) / maxLinesPerBatch);
if (totalLines % maxLinesPerBatch != 0) {
totalBatches += 1;
}
char*** allBatches = calloc(totalBatches, sizeof(char**));
for (int i = 0; i < totalBatches; ++i) {
allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*));
int threadNo = i % numThreads;
int batchNo = i / numThreads;
argsThread[threadNo].batches[batchNo].lines = allBatches[i];
argsThread[threadNo].numBatches = batchNo + 1;
}
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
int stIdx = i;
int ctIdx = numSuperTables*numChildTables + j;
char* line = calloc(strlen(lineTemplate)+128, 1);
generateLine(line, strlen(lineTemplate)+128, lineTemplate, protocol, stIdx, ctIdx, ts + l);
int batchNo = l / maxLinesPerBatch;
int lineNo = l % maxLinesPerBatch;
allBatches[batchNo][lineNo] = line;
argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1;
++l;
}
}
}
printf("begin multi-thread insertion...\n");
int64_t begin = taosGetTimestampUs();
for (int i=0; i < numThreads; ++i) {
pthread_create(tids+i, NULL, insertLines, argsThread+i);
}
for (int i = 0; i < numThreads; ++i) {
pthread_join(tids[i], NULL);
}
int64_t end = taosGetTimestampUs();
size_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable;
printf("TOTAL LINES: %zu\n", linesNum);
printf("THREADS: %d\n", numThreads);
printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
printf("THROUGHPUT:%d/s\n", (int)throughput);
for (int i = 0; i < totalBatches; ++i) {
free(allBatches[i]);
}
free(allBatches);
for (int i = 0; i < numThreads; i++) {
free(argsThread[i].batches);
}
free(argsThread);
free(tids);
free(lineTemplate);
taos_close(taos);
return 0;
}
订阅和消费示例
订阅和消费
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h> // include TDengine header file
#include <unistd.h>
int nTotalRows;
void print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
if (blockFetch) {
nRows = taos_fetch_block(res, &row);
// for (int i = 0; i < nRows; i++) {
// taos_print_row(buf, row + i, fields, num_fields);
// puts(buf);
//}
} else {
while ((row = taos_fetch_row(res))) {
char buf[4096] = {0};
taos_print_row(buf, row, fields, num_fields);
puts(buf);
nRows++;
}
}
nTotalRows += nRows;
printf("%d rows consumed.\n", nRows);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) { print_result(res, *(int*)param); }
void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = 0;
TAOS_ROW row;
while ((row = taos_fetch_row(res))) {
actual++;
}
if (actual != expected) {
printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
} else {
printf("line %d: %d rows consumed as expected\n", line, actual);
}
}
void do_query(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, sql);
taos_free_result(res);
}
void run_test(TAOS* taos) {
do_query(taos, "drop database if exists test;");
usleep(100000);
do_query(taos, "create database test;");
usleep(100000);
do_query(taos, "use test;");
usleep(100000);
do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
do_query(taos, "create table t0 using meters tags(0);");
do_query(taos, "create table t1 using meters tags(1);");
do_query(taos, "create table t2 using meters tags(2);");
do_query(taos, "create table t3 using meters tags(3);");
do_query(taos, "create table t4 using meters tags(4);");
do_query(taos, "create table t5 using meters tags(5);");
do_query(taos, "create table t6 using meters tags(6);");
do_query(taos, "create table t7 using meters tags(7);");
do_query(taos, "create table t8 using meters tags(8);");
do_query(taos, "create table t9 using meters tags(9);");
do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
// super tables subscription
usleep(1000000);
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub);
check_row_count(__LINE__, res, 18);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription
taos_unsubscribe(tsub, 1);
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
// don't keep progress information and continue previous subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// single meter subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 5);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
taos_unsubscribe(tsub, 0);
}
int main(int argc, char* argv[]) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
const char* sql = "select * from meters;";
const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-h=", 3) == 0) {
host = argv[i] + 3;
continue;
}
if (strncmp(argv[i], "-u=", 3) == 0) {
user = argv[i] + 3;
continue;
}
if (strncmp(argv[i], "-p=", 3) == 0) {
passwd = argv[i] + 3;
continue;
}
if (strcmp(argv[i], "-sync") == 0) {
async = 0;
continue;
}
if (strcmp(argv[i], "-restart") == 0) {
restart = 1;
continue;
}
if (strcmp(argv[i], "-single") == 0) {
sql = "select * from t0;";
topic = "test-single";
continue;
}
if (strcmp(argv[i], "-nokeep") == 0) {
keep = 0;
continue;
}
if (strncmp(argv[i], "-sql=", 5) == 0) {
sql = argv[i] + 5;
topic = "test-custom";
continue;
}
if (strcmp(argv[i], "-test") == 0) {
test = 1;
continue;
}
if (strcmp(argv[i], "-block-fetch") == 0) {
blockFetch = 1;
continue;
}
}
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1);
}
if (test) {
run_test(taos);
taos_close(taos);
exit(0);
}
taos_select_db(taos, "test");
TAOS_SUB* tsub = NULL;
if (async) {
// create an asynchronized subscription, the callback function will be called every 1s
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
// create an synchronized subscription, need to call 'taos_consume' manually
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}
if (tsub == NULL) {
printf("failed to create subscription.\n");
exit(0);
}
if (async) {
getchar();
} else
while (1) {
TAOS_RES* res = taos_consume(tsub);
if (res == NULL) {
printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
}
}
printf("total rows consumed: %d\n", nTotalRows);
taos_unsubscribe(tsub, keep);
taos_close(taos);
return 0;
}
info
更多示例代码及下载请见 GitHub。 也可以在安装目录下的 examples/c
路径下找到。 该目录下有 makefile,在 Linux 环境下,直接执行 make 就可以编译得到执行文件。 提示:在 ARM 环境下编译时,请将 makefile 中的 -msse4.2
去掉,这个选项只有在 x64/x86 硬件平台上才能支持。
API 参考
以下分别介绍 TDengine 客户端驱动的基础 API、同步 API、异步 API、订阅 API 和无模式写入 API。
基础 API
基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。
void taos_init()
初始化运行环境。如果没有主动调用该 API,那么调用
taos_connect()
时驱动将自动调用该 API,故程序一般无需手动调用。void taos_cleanup()
清理运行环境,应用退出前应调用。
int taos_options(TSDB_OPTION option, const void * arg, ...)
设置客户端选项,目前支持区域设置(
TSDB_OPTION_LOCALE
)、字符集设置(TSDB_OPTION_CHARSET
)、时区设置(TSDB_OPTION_TIMEZONE
)、配置文件路径设置(TSDB_OPTION_CONFIGDIR
)。区域设置、字符集、时区默认为操作系统当前设置。char *taos_get_client_info()
获取客户端版本信息。
TAOS *taos_connect(const char *host, const char *user, const char *pass, const char *db, int port)
创建数据库连接,初始化连接上下文。其中需要用户提供的参数包含:
- host:TDengine 集群中任一节点的 FQDN
- user:用户名
- pass:密码
- db: 数据库名字,如果用户没有提供,也可以正常连接,用户可以通过该连接创建新的数据库,如果用户提供了数据库名字,则说明该数据库用户已经创建好,缺省使用该数据库
- port:taosd 程序监听的端口
返回值为空表示失败。应用程序需要保存返回的参数,以便后续使用。
info
同一进程可以根据不同的 host/port 连接多个 TDengine 集群
char *taos_get_server_info(TAOS *taos)
获取服务端版本信息。
int taos_select_db(TAOS *taos, const char *db)
将当前的缺省数据库设置为
db
。void taos_close(TAOS *taos)
关闭连接,其中
taos
是taos_connect()
返回的句柄。
同步查询 API
本小节介绍 API 均属于同步接口。应用调用后,会阻塞等待响应,直到获得返回结果或错误信息。
TAOS_RES* taos_query(TAOS *taos, const char *sql)
执行 SQL 语句,可以是 DQL、DML 或 DDL 语句。 其中的
taos
参数是通过taos_connect()
获得的句柄。不能通过返回值是否是NULL
来判断执行结果是否失败,而是需要用taos_errno()
函数解析结果集中的错误代码来进行判断。int taos_result_precision(TAOS_RES *res)
返回结果集时间戳字段的精度,
0
代表毫秒,1
代表微秒,2
代表纳秒。TAOS_ROW taos_fetch_row(TAOS_RES *res)
按行获取查询结果集中的数据。
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
批量获取查询结果集中的数据,返回值为获取到的数据的行数。
int taos_num_fields(TAOS_RES *res)
和int taos_field_count(TAOS_RES *res)
这两个 API 等价,用于获取查询结果集中的列数。
int* taos_fetch_lengths(TAOS_RES *res)
获取结果集中每个字段的长度。返回值是一个数组,其长度为结果集的列数。
int taos_affected_rows(TAOS_RES *res)
获取被所执行的 SQL 语句影响的行数。
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)
获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与
taos_num_fileds()
配合使用,可用来解析taos_fetch_row()
返回的一个元组(一行)的数据。TAOS_FIELD
的结构如下:
typedef struct taosField {
char name[65]; // column name
uint8_t type; // data type
int16_t bytes; // length, in bytes
} TAOS_FIELD;
void taos_stop_query(TAOS_RES *res)
停止当前查询的执行。
void taos_free_result(TAOS_RES *res)
释放查询结果集以及相关的资源。查询完成后,务必调用该 API 释放资源,否则可能导致应用内存泄露。但也需注意,释放资源后,如果再调用
taos_consume()
等获取查询结果的函数,将导致应用崩溃。char *taos_errstr(TAOS_RES *res)
获取最近一次 API 调用失败的原因,返回值为字符串标识的错误提示信息。
int taos_errno(TAOS_RES *res)
获取最近一次 API 调用失败的原因,返回值为错误代码。
note
2.0 及以上版本 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池。而不推荐在应用中将该连接 (TAOS*) 结构体传递到不同的线程共享使用。基于 TAOS 结构体发出的查询、写入等操作具有多线程安全性,但 “USE statement” 等状态量有可能在线程之间相互干扰。此外,C 语言的连接器可以按照需求动态建立面向数据库的新连接(该过程对用户不可见),同时建议只有在程序最后退出的时候才调用 taos_close()
关闭连接。
异步查询 API
TDengine 还提供性能更高的异步 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2 ~ 4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优势尤为突出。
异步 API 都需要应用提供相应的回调函数,回调函数参数设置如下:前两个参数都是一致的,第三个参数依不同的 API 而定。第一个参数 param 是应用调用异步 API 时提供给系统的,用于回调时,应用能够找回具体操作的上下文,依具体实现而定。第二个参数是 SQL 操作的结果集,如果为空,比如 insert 操作,表示没有记录返回,如果不为空,比如 select 操作,表示有记录返回。
异步 API 对于使用者的要求相对较高,用户可根据具体应用场景选择性使用。下面是两个重要的异步 API:
void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);
异步执行 SQL 语句。
- taos:调用
taos_connect()
返回的数据库连接 - sql:需要执行的 SQL 语句
- fp:用户定义的回调函数,其第三个参数
code
用于指示操作是否成功,0
表示成功,负数表示失败(调用taos_errstr()
可获取失败原因)。应用在定义回调函数的时候,主要处理第二个参数TAOS_RES *
,该参数是查询返回的结果集 - param:应用提供一个用于回调的参数
- taos:调用
void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
批量获取异步查询的结果集,只能与
taos_query_a()
配合使用。其中:- res:
taos_query_a()
回调时返回的结果集 - fp:回调函数。其参数
param
是用户可定义的传递给回调函数的参数结构体;numOfRows
是获取到的数据的行数(不是整个查询结果集的函数)。 在回调函数中,应用可以通过调用taos_fetch_row()
前向迭代获取批量记录中每一行记录。读完一块内的所有记录后,应用需要在回调函数中继续调用taos_fetch_rows_a()
获取下一批记录进行处理,直到返回的记录数numOfRows
为零(结果返回完成)或记录数为负值(查询出错)。
- res:
TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多线程同时打开多张表,并可以同时对每张打开的表进行查询或者插入操作。需要指出的是,客户端应用必须确保对同一张表的操作完全串行化,即对同一个表的插入或查询操作未完成时(未返回时),不能够执行第二个插入或查询操作。
参数绑定 API
除了直接调用 taos_query()
进行查询,TDengine 也提供了支持参数绑定的 Prepare API,风格与 MySQL 类似,目前也仅支持用问号 ?
来代表待绑定的参数。
从 2.1.1.0 和 2.1.2.0 版本开始,TDengine 大幅改进了参数绑定接口对数据写入(INSERT)场景的支持。这样在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。此时的典型操作步骤如下:
- 调用
taos_stmt_init()
创建参数绑定对象; - 调用
taos_stmt_prepare()
解析 INSERT 语句; - 如果 INSERT 语句中预留了表名但没有预留 TAGS,那么调用
taos_stmt_set_tbname()
来设置表名; - 如果 INSERT 语句中既预留了表名又预留了 TAGS(例如 INSERT 语句采取的是自动建表的方式),那么调用
taos_stmt_set_tbname_tags()
来设置表名和 TAGS 的值; - 调用
taos_stmt_bind_param_batch()
以多行的方式设置 VALUES 的值,或者调用taos_stmt_bind_param()
以单行的方式设置 VALUES 的值; - 调用
taos_stmt_add_batch()
把当前绑定的参数加入批处理; - 可以重复第 3 ~ 6 步,为批处理加入更多的数据行;
- 调用
taos_stmt_execute()
执行已经准备好的批处理指令; - 执行完毕,调用
taos_stmt_close()
释放所有资源。
说明:如果 taos_stmt_execute()
执行成功,假如不需要改变 SQL 语句的话,那么是可以复用 taos_stmt_prepare()
的解析结果,直接进行第 3 ~ 6 步绑定新数据的。但如果执行出错,那么并不建议继续在当前的环境上下文下继续工作,而是建议释放资源,然后从 taos_stmt_init()
步骤重新开始。
接口相关的具体函数如下(也可以参考 prepare.c 文件中使用对应函数的方式):
TAOS_STMT* taos_stmt_init(TAOS *taos)
创建一个 TAOS_STMT 对象用于后续调用。
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)
解析一条 SQL 语句,将解析结果和参数信息绑定到 stmt 上,如果参数 length 大于 0,将使用此参数作为 SQL 语句的长度,如等于 0,将自动判断 SQL 语句的长度。
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)
不如
taos_stmt_bind_param_batch()
效率高,但可以支持非 INSERT 类型的 SQL 语句。 进行参数绑定,bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下:typedef struct TAOS_BIND { int buffer_type; void * buffer; uintptr_t buffer_length; // not in use uintptr_t * length; int * is_null; int is_unsigned; // not in use int * error; // not in use } TAOS_BIND;
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)
(2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 当 SQL 语句中的表名使用了
?
占位时,可以使用此函数绑定一个具体的表名。int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags)
(2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 当 SQL 语句中的表名和 TAGS 都使用了
?
占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列)。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind)
(2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。TAOS_MULTI_BIND 的具体定义如下:
typedef struct TAOS_MULTI_BIND { int buffer_type; void * buffer; uintptr_t buffer_length; uintptr_t * length; char * is_null; int num; // the number of columns } TAOS_MULTI_BIND;
int taos_stmt_add_batch(TAOS_STMT *stmt)
将当前绑定的参数加入批处理中,调用此函数后,可以再次调用
taos_stmt_bind_param()
或taos_stmt_bind_param_batch()
绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。int taos_stmt_execute(TAOS_STMT *stmt)
执行准备好的语句。目前,一条语句只能执行一次。
TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)
获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用
taos_free_result()
以释放资源。int taos_stmt_close(TAOS_STMT *stmt)
执行完毕,释放所有资源。
char * taos_stmt_errstr(TAOS_STMT *stmt)
(2.1.3.0 版本新增) 用于在其他 STMT API 返回错误(返回错误码或空指针)时获取错误信息。
无模式(schemaless)写入 API
除了使用 SQL 方式或者使用参数绑定 API 写入数据外,还可以使用 Schemaless 的方式完成写入。Schemaless 可以免于预先创建超级表/数据子表的数据结构,而是可以直接写入数据,TDengine 系统会根据写入的数据内容自动创建和维护所需要的表结构。Schemaless 的使用方式详见 Schemaless 写入 章节,这里介绍与之配套使用的 C/C++ API。
TAOS_RES* taos_schemaless_insert(TAOS* taos, const char* lines[], int numLines, int protocol, int precision)
功能说明 该接口将行协议的文本数据写入到 TDengine 中。
参数说明 taos: 数据库连接,通过
taos_connect()
函数建立的数据库连接。 lines:文本数据。满足解析格式要求的无模式文本字符串。 numLines:文本数据的行数,不能为 0 。 protocol: 行协议类型,用于标识文本数据格式。 precision:文本数据中的时间戳精度字符串。返回值 TAOS_RES 结构体,应用可以通过使用
taos_errstr()
获得错误信息,也可以使用taos_errno()
获得错误码。 在某些情况下,返回的 TAOS_RES 为NULL
,此时仍然可以调用taos_errno()
来安全地获得错误码信息。 返回的 TAOS_RES 需要调用方来负责释放,否则会出现内存泄漏。说明 协议类型是枚举类型,包含以下三种格式:
- TSDB_SML_LINE_PROTOCOL:InfluxDB 行协议(Line Protocol)
- TSDB_SML_TELNET_PROTOCOL: OpenTSDB Telnet 文本行协议
- TSDB_SML_JSON_PROTOCOL: OpenTSDB Json 协议格式
时间戳分辨率的定义,定义在
taos.h
文件中,具体内容如下:- TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0,
- TSDB_SML_TIMESTAMP_HOURS,
- TSDB_SML_TIMESTAMP_MINUTES,
- TSDB_SML_TIMESTAMP_SECONDS,
- TSDB_SML_TIMESTAMP_MILLI_SECONDS,
- TSDB_SML_TIMESTAMP_MICRO_SECONDS,
- TSDB_SML_TIMESTAMP_NANO_SECONDS
需要注意的是,时间戳分辨率参数只在协议类型为
SML_LINE_PROTOCOL
的时候生效。 对于 OpenTSDB 的文本协议,时间戳的解析遵循其官方解析规则 — 按照时间戳包含的字符的数量来确认时间精度。支持版本 该功能接口从 2.3.0.0 版本开始支持。
订阅和消费 API
订阅 API 目前支持订阅一张或多张表,并通过定期轮询的方式不断获取写入表中的最新数据。
TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
该函数负责启动订阅服务,成功时返回订阅对象,失败时返回
NULL
,其参数为:- taos:已经建立好的数据库连接
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
- sql:订阅的查询语句,此语句只能是
select
语句,只应查询原始数据,只能按时间正序查询数据 - fp:收到查询结果时的回调函数(稍后介绍函数原型),只在异步调用时使用,同步调用时此参数应该传
NULL
- param:调用回调函数时的附加参数,系统 API 将其原样传递到回调函数,不进行任何处理
- interval:轮询周期,单位为毫秒。异步调用时,将根据此参数周期性的调用回调函数,为避免对系统性能造成影响,不建议将此参数设置的过小;同步调用时,如两次调用
taos_consume()
的间隔小于此周期,API 将会阻塞,直到时间间隔超过此周期。
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code)
异步模式下,回调函数的原型,其参数为:
- tsub:订阅对象
- res:查询结果集,注意结果集中可能没有记录
- param:调用
taos_subscribe()
时客户程序提供的附加参数 - code:错误码
note
在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
TAOS_RES *taos_consume(TAOS_SUB *tsub)
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用
taos_consume()
的间隔小于订阅的轮询周期,API 将会阻塞,直到时间间隔超过此周期。如果数据库有新记录到达,该 API 将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为NULL
,说明系统出错。 异步模式下,用户程序不应调用此 API。note
在调用
taos_consume()
之后,用户应用应确保尽快调用taos_fetch_row()
或taos_fetch_block()
来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)
取消订阅。 如参数
keepProgress
不为 0,API 会保留订阅的进度信息,后续调用taos_subscribe()
时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。