C# 原生接口

依赖

  • .NET SDK >= 5.0 或 .NET Framework 4.x
  • Thrift >= 0.14.1
  • NLog >= 4.7.9

安装

您可以使用 NuGet Package Manager, .NET CLI等工具来安装,以 .NET CLI为例

如果您使用的是.NETC# - 图1open in new window 5.0 或者更高版本的SDK,输入如下命令即可安装最新的NuGet包

  1. dotnet add package Apache.IoTDB

为了适配 .NET Framework 4.x,我们单独构建了一个NuGet包,如果您使用的是.NETC# - 图2open in new window Framework 4.x,输入如下命令即可安装最新的包

  1. dotnet add package Apache.IoTDB.framework

如果您想安装更早版本的客户端,只需要指定版本即可

  1. # 安装0.12.1.2版本的客户端
  2. dotnet add package Apache.IoTDB --version 0.12.1.2

基本接口说明

Session接口在语义上和其他语言客户端相同

  1. // 参数定义
  2. string host = "localhost";
  3. int port = 6667;
  4. int pool_size = 2;
  5. // 初始化session
  6. var session_pool = new SessionPool(host, port, pool_size);
  7. // 开启session
  8. await session_pool.Open(false);
  9. // 创建时间序列
  10. await session_pool.CreateTimeSeries("root.test_group.test_device.ts1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.UNCOMPRESSED);
  11. await session_pool.CreateTimeSeries("root.test_group.test_device.ts2", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.UNCOMPRESSED);
  12. await session_pool.CreateTimeSeries("root.test_group.test_device.ts3", TSDataType.INT32, TSEncoding.PLAIN, Compressor.UNCOMPRESSED);
  13. // 插入record
  14. var measures = new List<string>{"ts1", "ts2", "ts3"};
  15. var values = new List<object> { "test_text", true, (int)123 };
  16. var timestamp = 1;
  17. var rowRecord = new RowRecord(timestamp, values, measures);
  18. await session_pool.InsertRecordAsync("root.test_group.test_device", rowRecord);
  19. // 插入Tablet
  20. var timestamp_lst = new List<long>{ timestamp + 1 };
  21. var value_lst = new List<object> {new() {"iotdb", true, (int) 12}};
  22. var tablet = new Tablet("root.test_group.test_device", measures, value_lst, timestamp_ls);
  23. await session_pool.InsertTabletAsync(tablet);
  24. // 关闭Session
  25. await session_pool.Close();

Row Record

  • IoTDB中的record数据进行封装和抽象。
  • 示例:
timestampstatustemperature
1020
  • 构造方法:
  1. var rowRecord =
  2. new RowRecord(long timestamps, List<object> values, List<string> measurements);

Tablet

  • 一种类似于表格的数据结构,包含一个设备的若干行非空数据块。
  • 示例:
timestatustemperature
1020
2020
3321
  • 构造方法:
  1. var tablet =
  2. Tablet(string deviceId, List<string> measurements, List<List<object>> values, List<long> timestamps);

API

基础接口

api nameparametersnotesuse example
Openboolopen sessionsession_pool.Open(false)
Closenullclose sessionsession_pool.Close()
IsOpennullcheck if session is opensession_pool.IsOpen()
OpenDebugModeLoggingConfiguration=nullopen debug modesession_pool.OpenDebugMode()
CloseDebugModenullclose debug modesession_pool.CloseDebugMode()
SetTimeZonestringset time zonesession_pool.GetTimeZone()
GetTimeZonenullget time zonesession_pool.GetTimeZone()

Record相关接口

api nameparametersnotesuse example
InsertRecordAsyncstring, RowRecordinsert single recordsession_pool.InsertRecordAsync(“root.97209_TEST_CSHARP_CLIENT_GROUP.TEST_CSHARP_CLIENT_DEVICE”, new RowRecord(1, values, measures));
InsertRecordsAsyncList<string>, List<RowRecord>insert recordssession_pool.InsertRecordsAsync(device_id, rowRecords)
InsertRecordsOfOneDeviceAsyncstring, List<RowRecord>insert records of one devicesession_pool.InsertRecordsOfOneDeviceAsync(device_id, rowRecords)
InsertRecordsOfOneDeviceSortedAsyncstring, List<RowRecord>insert sorted records of one deviceInsertRecordsOfOneDeviceSortedAsync(deviceId, sortedRowRecords);
TestInsertRecordAsyncstring, RowRecordtest insert recordsession_pool.TestInsertRecordAsync(“root.97209_TEST_CSHARP_CLIENT_GROUP.TEST_CSHARP_CLIENT_DEVICE”, rowRecord)
TestInsertRecordsAsyncList<string>, List<RowRecord>test insert recordsession_pool.TestInsertRecordsAsync(device_id, rowRecords)

Tablet相关接口

api nameparametersnotesuse example
InsertTabletAsyncTabletinsert single tabletsession_pool.InsertTabletAsync(tablet)
InsertTabletsAsyncList<Tablet>insert tabletssession_pool.InsertTabletsAsync(tablets)
TestInsertTabletAsyncTablettest insert tabletsession_pool.TestInsertTabletAsync(tablet)
TestInsertTabletsAsyncList<Tablet>test insert tabletssession_pool.TestInsertTabletsAsync(tablets)

SQL语句接口

api nameparametersnotesuse example
ExecuteQueryStatementAsyncstringexecute sql query statementsession_pool.ExecuteQueryStatementAsync(“select * from root.97209_TEST_CSHARP_CLIENT_GROUP.TEST_CSHARP_CLIENT_DEVICE where time<15”);
ExecuteNonQueryStatementAsyncstringexecute sql nonquery statementsession_pool.ExecuteNonQueryStatementAsync( “create timeseries root.97209_TEST_CSHARP_CLIENT_GROUP.TEST_CSHARP_CLIENT_DEVICE.status with datatype=BOOLEAN,encoding=PLAIN”)

数据表接口

api nameparametersnotesuse example
SetStorageGroupstringset storage groupsession_pool.SetStorageGroup(“root.97209_TEST_CSHARP_CLIENT_GROUP_01”)
CreateTimeSeriesstring, TSDataType, TSEncoding, Compressorcreate time seriessession_pool.InsertTabletsAsync(tablets)
DeleteStorageGroupAsyncstringdelete single storage groupsession_pool.DeleteStorageGroupAsync(“root.97209_TEST_CSHARP_CLIENT_GROUP_01”)
DeleteStorageGroupsAsyncList<string>delete storage groupsession_pool.DeleteStorageGroupAsync(“root.97209_TEST_CSHARP_CLIENT_GROUP”)
CreateMultiTimeSeriesAsyncList<string>, List<TSDataType> , List<TSEncoding> , List<Compressor>create multi time seriessession_pool.CreateMultiTimeSeriesAsync(ts_path_lst, data_type_lst, encoding_lst, compressor_lst);
DeleteTimeSeriesAsyncList<string>delete time series
DeleteTimeSeriesAsyncstringdelete time series
DeleteDataAsyncList<string>, long, longdelete datasession_pool.DeleteDataAsync(ts_path_lst, 2, 3)

辅助接口

api nameparametersnotesuse example
CheckTimeSeriesExistsAsyncstringcheck if time series existssession_pool.CheckTimeSeriesExistsAsync(time series)

用法可以参考用户示例C# - 图3open in new window

连接池

为了实现并发客户端请求,我们提供了针对原生接口的连接池(SessionPool),由于SessionPool本身为Session的超集,当SessionPoolpool_size参数设置为1时,退化为原来的Session

我们使用ConcurrentQueue数据结构封装了一个客户端队列,以维护与服务端的多个连接,当调用Open()接口时,会在该队列中创建指定个数的客户端,同时通过System.Threading.Monitor类实现对队列的同步访问。

当请求发生时,会尝试从连接池中寻找一个空闲的客户端连接,如果没有空闲连接,那么程序将需要等待直到有空闲连接

当一个连接被用完后,他会自动返回池中等待下次被使用

ByteBuffer

在传入RPC接口参数时,需要对Record和Tablet两种数据结构进行序列化,我们主要通过封装的ByteBuffer类实现

在封装字节序列的基础上,我们进行了内存预申请与内存倍增的优化,减少了序列化过程中内存的申请和释放,在一个拥有20000行的Tablet上进行序列化测试时,速度比起原生的数组动态增长具有35倍的性能加速

实现介绍

在进行RowRecords以及Tablet的插入时,我们需要对多行RowRecord和Tablet进行序列化以进行发送。客户端中的序列化实现主要依赖于ByteBuffer完成。接下来我们介绍ByteBuffer的实现细节。本文包含如下几点内容:

  • 序列化的协议
  • C#与Java的大小端的差异
  • ByteBuffer内存倍增算法

序列化协议

客户端向IoTDB服务器发送的序列化数据总体应该包含两个信息。

  • 数据类型
  • 数据本身

其中对于字符串的序列化时,我们需要再加入字符串的长度信息。即一个字符串的序列化完整结果为:

  1. [类型][长度][数据内容]

接下来我们分别介绍RowRecordTablet的序列化方式

RowRecord

我们对RowRecord进行序列化时,伪代码如下:

  1. public byte[] value_to_bytes(List<TSDataType> data_types, List<string> values){
  2. ByteBuffer buffer = new ByteBuffer(values.Count);
  3. for(int i = 0;i < data_types.Count(); i++){
  4. buffer.add_type((data_types[i]);
  5. buffer.add_val(values[i]);
  6. }
  7. }

对于其序列化的结果格式如下:

  1. [数据类型1][数据1][数据类型2][数据2]...[数据类型N][数据N]

其中数据类型为自定义的Enum变量,分别如下:

  1. public enum TSDataType{BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NONE};

Tablet序列化

使用Tabelt进行数据插入时有如下限制:

  1. 限制:Tablet中数据不能有空值

由于向 IoTDB服务器发送Tablet数据插入请求时会携带行数列数, 列数据类型,所以Tabelt序列化时我们不需要加入数据类型信息。Tablet按照列进行序列化,这是因为后端可以通过行数得知出当前列的元素个数,同时根据列类型来对数据进行解析。

CSharp与Java序列化数据时的大小端差异

由于Java序列化默认大端协议,而CSharp序列化默认得到小端序列。所以我们在CSharp中序列化数据之后,需要对数据进行反转这样后端才可以正常解析。同时当我们从后端获取到序列化的结果时(如SessionDataset),我们也需要对获得的数据进行反转以解析内容。这其中特例便是字符串的序列化,CSharp中对字符串的序列化结果为大端序,所以序列化字符串或者接收到字符串序列化结果时,不需要反转序列结果。

ByteBuffer内存倍增法

拥有数万行的Tablet的序列化结果可能有上百兆,为了能够高效的实现大Tablet的序列化,我们对ByteBuffer使用内存倍增法的策略来减少序列化过程中对于内存的申请和释放。即当当前的buffer的长度不足以放下序列化结果时,我们将当前buffer的内存至少扩增2倍。这极大的减少了内存的申请释放次数,加速了大Tablet的序列化速度。

  1. private void extend_buffer(int space_need){
  2. if(write_pos + space_need >= total_length){
  3. total_length = max(space_need, total_length);
  4. byte[] new_buffer = new byte[total_length * 2];
  5. buffer.CopyTo(new_buffer, 0);
  6. buffer = new_buffer;
  7. total_length = 2 * total_length;
  8. }
  9. }

同时在序列化Tablet时,我们首先根据Tablet的行数列数以及每一列的数据类型估计当前Tablet序列化结果所需要的内存大小,并在初始化时进行内存的申请。这进一步的减少了内存的申请释放频率。

通过上述的策略,我们在一个有20000行的Tablet上进行测试时,序列化速度相比Naive数组长度动态生长实现算法具有约35倍的性能加速。

异常重连

当服务端发生异常或者宕机重启时,客户端中原来通过Open()产生的的session会失效,抛出TException异常

为了避免这一情况的发生,我们对大部分的接口进行了增强,一旦出现连接问题,就会尝试重新调用Open()接口并创建新的Session,并尝试重新发送对应的请求