User Defined Functions(UDF)
In some use cases, built-in functions are not adequate for the query capability required by application programs. With UDF, the functions developed by users can be utilized by the query framework to meet business and application requirements. UDF normally takes one column of data as input, but can also support the result of a sub-query as input.
From version 2.2.0.0, UDF written in C/C++ are supported by TDengine.
Types of UDF
Two kinds of functions can be implemented by UDF: scalar functions and aggregate functions.
Scalar functions return multiple rows and aggregate functions return either 0 or 1 row.
In the case of a scalar function you only have to implement the “normal” function template.
In the case of an aggregate function, in addition to the “normal” function, you also need to implement the “merge” and “finalize” function templates even if the implementation is empty. This will become clear in the sections below.
Scalar Function
As mentioned earlier, a scalar UDF only has to implement the “normal” function template. The function template below can be used to define your own scalar function.
void udfNormalFunc(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf)
udfNormalFunc
is the place holder for a function name. A function implemented based on the above template can be used to perform scalar computation on data rows. The parameters are fixed to control the data exchange between UDF and TDengine.
Definitions of the parameters:
- data:input data
- itype:the type of input data, for details please refer to type definition in column_meta, for example 4 represents INT
- iBytes:the number of bytes consumed by each value in the input data
- oType:the type of output data, similar to iType
- oBytes:the number of bytes consumed by each value in the output data
- numOfRows:the number of rows in the input data
- ts: the column of timestamp corresponding to the input data
- dataOutput:the buffer for output data, total size is
oBytes * numberOfRows
- interBuf:the buffer for an intermediate result. Its size is specified by the
BUFSIZE
parameter when creating a UDF. It’s normally used when the intermediate result is not same as the final result. This buffer is allocated and freed by TDengine. - tsOutput:the column of timestamps corresponding to the output data; it can be used to output timestamp together with the output data if it’s not NULL
- numOfOutput:the number of rows in output data
- buf:for the state exchange between UDF and TDengine
add_one.c is one example of a very simple UDF implementation, i.e. one instance of the above
udfNormalFunc
template. It adds one to each value of a passed in column, which can be filtered using thewhere
clause, and outputs the result.
Aggregate Function
For aggregate UDF, as mentioned earlier you must implement a “normal” function template (described above) and also implement the “merge” and “finalize” templates.
Merge Function Template
The function template below can be used to define your own merge function for an aggregate UDF.
void udfMergeFunc(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf)
udfMergeFunc
is the place holder for a function name. The function implemented with the above template is used to aggregate intermediate results and can only be used in the aggregate query for STable.
Definitions of the parameters:
- data:array of output data, if interBuf is used it’s an array of interBuf
- numOfRows:number of rows in
data
- dataOutput:the buffer for output data, the size is same as that of the final result; If the result is not final, it can be put in the interBuf, i.e.
data
. - numOfOutput:number of rows in the output data
- buf:for the state exchange between UDF and TDengine
Finalize Function Template
The function template below can be used to finalize the result of your own UDF, normally used when interBuf is used.
void udfFinalizeFunc(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf)
udfFinalizeFunc
is the place holder of function name, definitions of the parameter are as below:
- dataOutput:buffer for output data
- interBuf:buffer for intermediate result, can be used as input for next processing step
- numOfOutput:number of output data, can only be 0 or 1 for aggregate function
- buf:for state exchange between UDF and TDengine
Example abs_max.c
abs_max.c is an example of a user defined aggregate function to get the maximum from the absolute values of a column.
The internal processing happens as follows. The results of the select statement are divided into multiple row blocks and udfNormalFunc
, i.e. abs_max
in this case, is performed on each row block to generate the intermediate results for each sub table. Then udfMergeFunc
, i.e. abs_max_merge
in this case, is performed on the intermediate result of sub tables to aggregate and generate the final or intermediate result of STable. The intermediate result of STable is finally processed by udfFinalizeFunc
, i.e. abs_max_finalize
in this example, to generate the final result, which contains either 0 or 1 row.
Other typical aggregation functions such as covariance, can also be implemented using aggregate UDF.
UDF Naming Conventions
The naming convention for the 3 kinds of function templates required by UDF is as follows:
- udfNormalFunc, udfMergeFunc, and udfFinalizeFunc are required to have same prefix, i.e. the actual name of udfNormalFunc. The udfNormalFunc doesn’t need a suffix following the function name.
- udfMergeFunc should be udfNormalFunc followed by
_merge
- udfFinalizeFunc should be udfNormalFunc followed by
_finalize
.
The naming convention is part of TDengine’s UDF framework. TDengine follows this convention to invoke the corresponding actual functions.
Depending on whether you are creating a scalar UDF or aggregate UDF, the functions that you need to implement are different.
- Scalar function:udfNormalFunc is required.
- Aggregate function:udfNormalFunc, udfMergeFunc (if query on STable) and udfFinalizeFunc are required.
For clarity, assuming we want to implement a UDF named “foo”:
- If the function is a scalar function, we only need to implement the “normal” function template and it should be named simply
foo
. - If the function is an aggregate function, we need to implement
foo
,foo_merge
, andfoo_finalize
. Note that for aggregate UDF, even though one of the three functions is not necessary, there must be an empty implementation.
Compile UDF
The source code of UDF in C can’t be utilized by TDengine directly. UDF can only be loaded into TDengine after compiling to dynamically linked library (DLL).
For example, the example UDF add_one.c
mentioned earlier, can be compiled into DLL using the command below, in a Linux Shell.
gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
The generated DLL file add_one.so
can be used later when creating a UDF. It’s recommended to use GCC not older than 7.5.
Create and Use UDF
When a UDF is created in a TDengine instance, it is available across the databases in that instance.
Create UDF
SQL command can be executed on the host where the generated UDF DLL resides to load the UDF DLL into TDengine. This operation cannot be done through REST interface or web console. Once created, any client of the current TDengine can use these UDF functions in their SQL commands. UDF are stored in the management node of TDengine. The UDFs loaded in TDengine would be still available after TDengine is restarted.
When creating UDF, the type of UDF, i.e. a scalar function or aggregate function must be specified. If the specified type is wrong, the SQL statements using the function would fail with errors. The input type and output type don’t need to be the same in UDF, but the input data type and output data type must be consistent with the UDF definition.
- Create Scalar Function
CREATE FUNCTION userDefinedFunctionName AS "/absolute/path/to/userDefinedFunctionName.so" OUTPUTTYPE <supported TDengine type> [BUFSIZE B];
- userDefinedFunctionName:The function name to be used in SQL statement which must be consistent with the function name defined by
udfNormalFunc
and is also the name of the compiled DLL (.so file). - path:The absolute path of the DLL file including the name of the shared object file (.so). The path must be quoted with single or double quotes.
- outputtype:The output data type, the value is the literal string of the supported TDengine data type.
- B:the size of intermediate buffer, in bytes; it is an optional parameter and the range is [0,512].
For example, below SQL statement can be used to create a UDF from add_one.so
.
CREATE FUNCTION add_one AS "/home/taos/udf_example/add_one.so" OUTPUTTYPE INT;
- Create Aggregate Function
CREATE AGGREGATE FUNCTION userDefinedFunctionName AS "/absolute/path/to/userDefinedFunctionName.so" OUTPUTTYPE <supported TDengine data type> [ BUFSIZE B ];
- userDefinedFunctionName:the function name to be used in SQL statement which must be consistent with the function name defined by
udfNormalFunc
and is also the name of the compiled DLL (.so file). - path:the absolute path of the DLL file including the name of the shared object file (.so). The path needs to be quoted by single or double quotes.
- OUTPUTTYPE:the output data type, the value is the literal string of the type
- B:the size of intermediate buffer, in bytes; it’s an optional parameter and the range is [0,512]
For details about how to use intermediate result, please refer to example program demo.c.
For example, below SQL statement can be used to create a UDF from demo.so
.
CREATE AGGREGATE FUNCTION demo AS "/home/taos/udf_example/demo.so" OUTPUTTYPE DOUBLE bufsize 14;
Manage UDF
- Delete UDF
DROP FUNCTION ids(X);
- ids(X):same as that in
CREATE FUNCTION
statement
DROP FUNCTION add_one;
- Show Available UDF
SHOW FUNCTIONS;
Use UDF
The function name specified when creating UDF can be used directly in SQL statements, just like builtin functions.
SELECT X(c) FROM table/STable;
The above SQL statement invokes function X for column c.
Restrictions for UDF
In current version there are some restrictions for UDF
- Only Linux is supported when creating and invoking UDF for both client side and server side
- UDF can’t be mixed with builtin functions
- Only one UDF can be used in a SQL statement
- Only a single column is supported as input for UDF
- Once created successfully, UDF is persisted in MNode of TDengineUDF
- UDF can’t be created through REST interface
- The function name used when creating UDF in SQL must be consistent with the function name defined in the DLL, i.e. the name defined by
udfNormalFunc
- The name of a UDF should not conflict with any of TDengine’s built-in functions
Examples
Scalar function example add_one
add_one.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
// printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
for(i=0;i<numOfRows;++i) {
// printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) + 1;
// printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
// printf("add_one out, numOfOutput:%d\n", *numOfOutput);
}
}
Aggregate function example abs_max
abs_max.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
int64_t length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
void abs_max(char* data, short itype, short ibytes, int numOfRows, int64_t* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int64_t r = 0;
// printf("abs_max input data:%p, type:%d, rows:%d, ts:%p, %" PRId64 ", dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 5) {
r=*(int64_t *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
if (*((int64_t *)data + i) == TSDB_DATA_BIGINT_NULL) {
continue;
}
*numOfOutput=1;
//int64_t v = abs(*((int64_t *)data + i));
int64_t v = *((int64_t *)data + i);
if (v < 0) {
v = 0 - v;
}
if (v > r) {
r = v;
}
}
*(int64_t *)dataOutput=r;
// printf("abs_max out, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}else {
*numOfOutput=0;
}
}
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
//int64_t r = 0;
// printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
// *numOfOutput=1;
// printf("abs_max finalize, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int64_t r = 0;
if (numOfRows > 0) {
r = *((int64_t *)data);
}
// printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 1; i < numOfRows; ++i) {
// printf("abs_max_merge %d - %" PRId64"\n", i, *((int64_t *)data + i));
if (*((int64_t*)data + i) > r) {
r= *((int64_t*)data + i);
}
}
*(int64_t*)dataOutput=r;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
// printf("abs_max_merge, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
int abs_max_init(SUdfInit* buf) {
// printf("abs_max init\n");
return 0;
}
void abs_max_destroy(SUdfInit* buf) {
// printf("abs_max destroy\n");
}
Example for using intermediate result demo
demo.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
typedef struct SDemo{
double sum;
int num;
short otype;
}SDemo;
#define FLOAT_NULL 0x7FF00000 // it is an NAN
#define DOUBLE_NULL 0x7FFFFF0000000000L // it is an NAN
void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
double r = 0;
SDemo *p = (SDemo *)interBuf;
SDemo *q = (SDemo *)dataOutput;
printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
if (itype == 4) {
r=*((int *)data+i);
} else if (itype == 6) {
r=*((float *)data+i);
} else if (itype == 7) {
r=*((double *)data+i);
}
p->sum += r*r;
}
p->otype = otype;
p->num += numOfRows;
q->sum = p->sum;
q->num = p->num;
q->otype = p->otype;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int i;
SDemo *p = (SDemo *)data;
SDemo res = {0};
printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
res.sum += p->sum * p->sum;
res.num += p->num;
p++;
}
p->sum = res.sum;
p->num = res.num;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
SDemo *p = (SDemo *)interBuf;
printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num);
if (p->otype == 6) {
if (p->num != 30000) {
*(unsigned int *)dataOutput = FLOAT_NULL;
} else {
*(float *)dataOutput = (float)(p->sum / p->num);
}
printf("finalize values:%f\n", *(float *)dataOutput);
} else if (p->otype == 7) {
if (p->num != 30000) {
*(unsigned long long *)dataOutput = DOUBLE_NULL;
} else {
*(double *)dataOutput = (double)(p->sum / p->num);
}
printf("finalize values:%f\n", *(double *)dataOutput);
}
*numOfOutput=1;
printf("demo finalize, numOfOutput:%d\n", *numOfOutput);
}
int demo_init(SUdfInit* buf) {
printf("demo init\n");
return 0;
}
void demo_destroy(SUdfInit* buf) {
printf("demo destroy\n");
}