应用编程接口

Java 原生接口

安装

依赖

  • JDK >= 1.8
  • Maven >= 3.6

安装方法

在根目录下运行:

  1. mvn clean install -pl session -am -Dmaven.test.skip=true

在 MAVEN 中使用原生接口

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.iotdb</groupId>
  4. <artifactId>iotdb-session</artifactId>
  5. <version>1.0.0</version>
  6. </dependency>
  7. </dependencies>

语法说明

  • 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 语法规范 ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。)
  • 对于其他接口:
    • 经参数传入的路径或路径前缀中的节点: 在 SQL 语句中需要使用反引号(`)进行转义的,此处均需要进行转义。
    • 经参数传入的标识符(如模板名):在 SQL 语句中需要使用反引号(`)进行转义的,均可以不用进行转义。
  • 语法说明相关代码示例可以参考:example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java

基本接口说明

下面将给出 Session 对应的接口的简要介绍和对应参数:

初始化

  • 初始化 Session
  1. // 全部使用默认配置
  2. session = new Session.Builder.build();
  3. // 指定一个可连接节点
  4. session =
  5. new Session.Builder()
  6. .host(String host)
  7. .port(int port)
  8. .build();
  9. // 指定多个可连接节点
  10. session =
  11. new Session.Builder()
  12. .nodeUrls(List<String> nodeUrls)
  13. .build();
  14. // 其他配置项
  15. session =
  16. new Session.Builder()
  17. .fetchSize(int fetchSize)
  18. .username(String username)
  19. .password(String password)
  20. .thriftDefaultBufferSize(int thriftDefaultBufferSize)
  21. .thriftMaxFrameSize(int thriftMaxFrameSize)
  22. .enableRedirection(boolean enableRedirection)
  23. .version(Version version)
  24. .build();

其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:V_0_12V_0_13V_1_0

  • 开启 Session
  1. void open()
  • 开启 Session,并决定是否开启 RPC 压缩
  1. void open(boolean enableRPCCompression)

注意: 客户端的 RPC 压缩开启状态需和服务端一致

  • 关闭 Session
  1. void close()

数据定义接口 DDL

Database 管理
  • 设置 database
  1. void setStorageGroup(String storageGroupId)
  • 删除单个或多个 database
  1. void deleteStorageGroup(String storageGroup)
  2. void deleteStorageGroups(List<String> storageGroups)
时间序列管理
  • 创建单个或多个时间序列
  1. void createTimeseries(String path, TSDataType dataType,
  2. TSEncoding encoding, CompressionType compressor, Map<String, String> props,
  3. Map<String, String> tags, Map<String, String> attributes, String measurementAlias)
  4. void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,
  5. List<TSEncoding> encodings, List<CompressionType> compressors,
  6. List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
  7. List<Map<String, String>> attributesList, List<String> measurementAliasList)
  • 创建对齐时间序列
  1. void createAlignedTimeseries(String prefixPath, List<String> measurements,
  2. List<TSDataType> dataTypes, List<TSEncoding> encodings,
  3. List <CompressionType> compressors, List<String> measurementAliasList);

注意:目前暂不支持使用传感器别名。

  • 删除一个或多个时间序列
  1. void deleteTimeseries(String path)
  2. void deleteTimeseries(List<String> paths)
  • 检测时间序列是否存在
  1. boolean checkTimeseriesExists(String path)
元数据模版
  • 创建元数据模板,可以通过先后创建 Template、MeasurementNode 的对象,描述模板内物理量结构与类型、编码方式、压缩方式等信息,并通过以下接口创建模板
  1. public void createSchemaTemplate(Template template);
  2. Class Template {
  3. private String name;
  4. private boolean directShareTime;
  5. Map<String, Node> children;
  6. public Template(String name, boolean isShareTime);
  7. public void addToTemplate(Node node);
  8. public void deleteFromTemplate(String name);
  9. public void setShareTime(boolean shareTime);
  10. }
  11. Abstract Class Node {
  12. private String name;
  13. public void addChild(Node node);
  14. public void deleteChild(Node node);
  15. }
  16. Class MeasurementNode extends Node {
  17. TSDataType dataType;
  18. TSEncoding encoding;
  19. CompressionType compressor;
  20. public MeasurementNode(String name,
  21. TSDataType dataType,
  22. TSEncoding encoding,
  23. CompressionType compressor);
  24. }

通过上述类的实例描述模板时,Template 内应当仅能包含单层的 MeasurementNode,具体可以参见如下示例:

  1. MeasurementNode nodeX = new MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
  2. MeasurementNode nodeY = new MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
  3. MeasurementNode nodeSpeed = new MeasurementNode("speed", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
  4. // This is the template we suggest to implement
  5. Template flatTemplate = new Template("flatTemplate");
  6. template.addToTemplate(nodeX);
  7. template.addToTemplate(nodeY);
  8. template.addToTemplate(nodeSpeed);
  9. createSchemaTemplate(flatTemplate);
  • 对于已经创建的元数据模板,还可以通过以下接口查询模板信息:
  1. // 查询返回目前模板中所有物理量的数量
  2. public int countMeasurementsInTemplate(String templateName);
  3. // 检查模板内指定路径是否为物理量
  4. public boolean isMeasurementInTemplate(String templateName, String path);
  5. // 检查在指定模板内是否存在某路径
  6. public boolean isPathExistInTemplate(String templateName, String path);
  7. // 返回指定模板内所有物理量的路径
  8. public List<String> showMeasurementsInTemplate(String templateName);
  9. // 返回指定模板内某前缀路径下的所有物理量的路径
  10. public List<String> showMeasurementsInTemplate(String templateName, String pattern);
  • 将名为’templateName’的元数据模板挂载到’prefixPath’路径下,在执行这一步之前,你需要创建名为’templateName’的元数据模板
  • 请注意,我们强烈建议您将模板设置在 database 或 database 下层的节点中,以更好地适配未来版本更新及各模块的协作
  1. void setSchemaTemplate(String templateName, String prefixPath)
  • 将模板挂载到 MTree 上之后,你可以随时查询所有模板的名称、某模板被设置到 MTree 的所有路径、所有正在使用某模板的所有路径,即如下接口:
  1. /** @return All template names. */
  2. public List<String> showAllTemplates();
  3. /** @return All paths have been set to designated template. */
  4. public List<String> showPathsTemplateSetOn(String templateName);
  5. /** @return All paths are using designated template. */
  6. public List<String> showPathsTemplateUsingOn(String templateName)
  • 如果你需要删除某一个模板,请确保在进行删除之前,MTree 上已经没有节点被挂载了模板,对于已经被挂载模板的节点,可以用如下接口卸载模板;
  1. void unsetSchemaTemplate(String prefixPath, String templateName);
  2. public void dropSchemaTemplate(String templateName);
  • 请注意,如果一个子树中有多个孩子节点需要使用模板,可以在其共同父母节点上使用 setSchemaTemplate 。而只有在已有数据点插入模板对应的物理量时,模板才会被设置为激活状态,进而被 show timeseries 等查询检测到。
  • 卸载’prefixPath’路径下的名为’templateName’的元数据模板。你需要保证给定的路径’prefixPath’下需要有名为’templateName’的元数据模板。

注意:目前不支持从曾经在’prefixPath’路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。

数据操作接口 DML

数据写入

推荐使用 insertTablet 帮助提高写入效率

  • 插入一个 Tablet,Tablet 是一个设备若干行数据块,每一行的列都相同
    • 写入效率高
    • 支持写入空值:空值处可以填入任意值,然后通过 BitMap 标记空值
  1. void insertTablet(Tablet tablet)
  2. public class Tablet {
  3. /** deviceId of this tablet */
  4. public String prefixPath;
  5. /** the list of measurement schemas for creating the tablet */
  6. private List<MeasurementSchema> schemas;
  7. /** timestamps in this tablet */
  8. public long[] timestamps;
  9. /** each object is a primitive type array, which represents values of one measurement */
  10. public Object[] values;
  11. /** each bitmap represents the existence of each value in the current column. */
  12. public BitMap[] bitMaps;
  13. /** the number of rows to include in this tablet */
  14. public int rowSize;
  15. /** the maximum number of rows for this tablet */
  16. private int maxRowNumber;
  17. /** whether this tablet store data of aligned timeseries or not */
  18. private boolean isAligned;
  19. }
  • 插入多个 Tablet
  1. void insertTablets(Map<String, Tablet> tablets)
  • 插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据。这里的 value 是 Object 类型,相当于提供了一个公用接口,后面可以通过 TSDataType 将 value 强转为原类型

    其中,Object 类型与 TSDataType 类型的对应关系如下表所示:

    TSDataTypeObject
    BOOLEANBoolean
    INT32Integer
    INT64Long
    FLOATFloat
    DOUBLEDouble
    TEXTString, Binary
  1. void insertRecord(String prefixPath, long time, List<String> measurements,
  2. List<TSDataType> types, List<Object> values)
  • 插入多个 Record
  1. void insertRecords(List<String> deviceIds,
  2. List<Long> times,
  3. List<List<String>> measurementsList,
  4. List<List<TSDataType>> typesList,
  5. List<List<Object>> valuesList)
  • 插入同属于一个 device 的多个 Record
  1. void insertRecordsOfOneDevice(String deviceId, List<Long> times,
  2. List<List<String>> measurementsList, List<List<TSDataType>> typesList,
  3. List<List<Object>> valuesList)
带有类型推断的写入

当数据均是 String 类型时,我们可以使用如下接口,根据 value 的值进行类型推断。例如:value 为 “true” ,就可以自动推断为布尔类型。value 为 “3.2” ,就可以自动推断为数值类型。服务器需要做类型推断,可能会有额外耗时,速度较无需类型推断的写入慢

  • 插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据
  1. void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values)
  • 插入多个 Record
  1. void insertRecords(List<String> deviceIds, List<Long> times,
  2. List<List<String>> measurementsList, List<List<String>> valuesList)
  • 插入同属于一个 device 的多个 Record
  1. void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,
  2. List<List<String>> measurementsList, List<List<String>> valuesList)
对齐时间序列的写入

对齐时间序列的写入使用 insertAlignedXXX 接口,其余与上述接口类似:

  • insertAlignedRecord
  • insertAlignedRecords
  • insertAlignedRecordsOfOneDevice
  • insertAlignedStringRecordsOfOneDevice
  • insertAlignedTablet
  • insertAlignedTablets
数据删除
  • 删除一个或多个时间序列在某个时间点前或这个时间点的数据
  1. void deleteData(String path, long endTime)
  2. void deleteData(List<String> paths, long endTime)
数据查询
  • 时间序列原始数据范围查询:
    • 指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。
  1. SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime);
  • 最新点查询:
    • 查询最后一条时间戳大于等于某个时间点的数据。
  1. SessionDataSet executeLastDataQuery(List<String> paths, long lastTime);
  • 聚合查询:
    • 支持指定查询时间范围。指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。
    • 支持按照时间区间分段查询。
  1. SessionDataSet executeAggregationQuery(List<String> paths, List<Aggregation> aggregations);
  2. SessionDataSet executeAggregationQuery(
  3. List<String> paths, List<Aggregation> aggregations, long startTime, long endTime);
  4. SessionDataSet executeAggregationQuery(
  5. List<String> paths,
  6. List<Aggregation> aggregations,
  7. long startTime,
  8. long endTime,
  9. long interval);
  10. SessionDataSet executeAggregationQuery(
  11. List<String> paths,
  12. List<Aggregation> aggregations,
  13. long startTime,
  14. long endTime,
  15. long interval,
  16. long slidingStep);

IoTDB-SQL 接口

  • 执行查询语句
  1. SessionDataSet executeQueryStatement(String sql)
  • 执行非查询语句
  1. void executeNonQueryStatement(String sql)

写入测试接口 (用于分析网络带宽)

不实际写入数据,只将数据传输到 server 即返回

  • 测试 insertRecord
  1. void testInsertRecord(String deviceId, long time, List<String> measurements, List<String> values)
  2. void testInsertRecord(String deviceId, long time, List<String> measurements,
  3. List<TSDataType> types, List<Object> values)
  • 测试 testInsertRecords
  1. void testInsertRecords(List<String> deviceIds, List<Long> times,
  2. List<List<String>> measurementsList, List<List<String>> valuesList)
  3. void testInsertRecords(List<String> deviceIds, List<Long> times,
  4. List<List<String>> measurementsList, List<List<TSDataType>> typesList,
  5. List<List<Object>> valuesList)
  • 测试 insertTablet
  1. void testInsertTablet(Tablet tablet)
  • 测试 insertTablets
  1. void testInsertTablets(Map<String, Tablet> tablets)

示例代码

浏览上述接口的详细信息,请参阅代码 session/src/main/java/org/apache/iotdb/session/Session.java

使用上述接口的示例代码在 example/session/src/main/java/org/apache/iotdb/SessionExample.java

使用对齐时间序列和元数据模板的示例可以参见 example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java

针对原生接口的连接池

我们提供了一个针对原生接口的连接池 (SessionPool),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。

当一个连接被用完后,他会自动返回池中等待下次被使用; 当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; 你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。

对于查询操作:

  1. 使用 SessionPool 进行查询时,得到的结果集是SessionDataSet的封装类SessionDataSetWrapper;
  2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作closeResultSet;
  3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作closeResultSet.
  4. 可以调用 SessionDataSetWrappergetColumnNames() 方法得到结果集列名

使用示例可以参见 session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java

example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java

集群信息相关的接口 (仅在集群模式下可用)

集群信息相关的接口允许用户获取如数据分区情况、节点是否当机等信息。 要使用该 API,需要增加依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.iotdb</groupId>
  4. <artifactId>iotdb-thrift-cluster</artifactId>
  5. <version>1.0.0</version>
  6. </dependency>
  7. </dependencies>

建立连接与关闭连接的示例:

  1. import org.apache.thrift.protocol.TBinaryProtocol;
  2. import org.apache.thrift.transport.TSocket;
  3. import org.apache.thrift.transport.TTransport;
  4. import org.apache.thrift.transport.TTransportException;
  5. import org.apache.iotdb.rpc.RpcTransportFactory;
  6. public class CluserInfoClient {
  7. TTransport transport;
  8. ClusterInfoService.Client client;
  9. public void connect() {
  10. transport =
  11. RpcTransportFactory.INSTANCE.getTransport(
  12. new TSocket(
  13. // the RPC address
  14. IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
  15. // the RPC port
  16. ClusterDescriptor.getInstance().getConfig().getClusterRpcPort()));
  17. try {
  18. transport.open();
  19. } catch (TTransportException e) {
  20. Assert.fail(e.getMessage());
  21. }
  22. //get the client
  23. client = new ClusterInfoService.Client(new TBinaryProtocol(transport));
  24. }
  25. public void close() {
  26. transport.close();
  27. }
  28. }

API 列表:

  • 获取集群中的各个节点的信息(构成哈希环)
  1. list<Node> getRing();
  • 给定一个路径(应包括一个 SG 作为前缀)和起止时间,获取其覆盖的数据分区情况:
  1. /**
  2. * @param path input path (should contains a database name as its prefix)
  3. * @return the data partition info. If the time range only covers one data partition, the the size
  4. * of the list is one.
  5. */
  6. list<DataPartitionEntry> getDataPartition(1:string path, 2:long startTime, 3:long endTime);
  • 给定一个路径(应包括一个 SG 作为前缀),获取其被分到了哪个节点上:
  1. /**
  2. * @param path input path (should contains a database name as its prefix)
  3. * @return metadata partition information
  4. */
  5. list<Node> getMetaPartition(1:string path);
  • 获取所有节点的死活状态:
  1. /**
  2. * @return key: node, value: live or not
  3. */
  4. map<Node, bool> getAllNodeStatus();
  • 获取当前连接节点的 Raft 组信息(投票编号等)(一般用户无需使用该接口):
  1. /**
  2. * @return A multi-line string with each line representing the total time consumption, invocation
  3. * number, and average time consumption.
  4. */
  5. string getInstrumentingInfo();