Java

GreptimeDB 使用不同的客户端库来写入和查询数据。 你根据需求可以选择合适的客户端库。

写入数据

GreptimeDB 提供了一个 ingester 库来帮助你写入数据。 它使用 gRPC 协议,支持自动生成表结构,无需在写入数据前创建表。 更多信息请参考 自动生成表结构

GreptimeDB 提供的 Java ingester SDK 是一个轻量级库,具有以下特点:

  • 基于 SPI 的可扩展网络传输层,提供了使用 gRPC 框架的默认实现。
  • 非阻塞、纯异步的易于使用的 API。
  • 默认情况下自动收集各种性能指标,然后可以配置并将其写入本地文件。
  • 能够对关键对象进行内存快照,配置并将其写入本地文件。这对于解决复杂问题很有帮助。

安装

  1. 安装 Java 开发工具包(JDK)

确保你的系统已安装 JDK 8 或更高版本。有关如何检查 Java 版本并安装 JDK 的更多信息,请参见 Oracle JDK 安装概述文档

  1. 将 GreptimeDB Java SDK 添加为依赖项

如果你使用的是 Maven,请将以下内容添加到 pom.xml 的依赖项列表中:

  1. <dependency>
  2. <groupId>io.greptime</groupId>
  3. <artifactId>ingester-all</artifactId>
  4. <version>0.7.3</version>
  5. </dependency>

最新版本可以在 这里 查看。

配置依赖项后,请确保它们对项目可用,这可能需要在 IDE 中刷新项目或运行依赖项管理器。

连接数据库

连接 GreptimeDB 时,通常需要用户名和密码。 关于如何设置 GreptimeDB 的鉴权方式,请参考鉴权。 这里我们在使用 ingester 库连接 GreptimeDB 时设置用户名和密码。

下方的代码展示了以最简单的配置连接到 GreptimeDB 的方法。 如果想要自定义连接选项,请参考 API 文档。 请注意每个选项的注释,它们提供了对其各自角色的详细解释。

  1. // GreptimeDB 默认 database 为 "public",默认 catalog 为 "greptime",
  2. // 我们可以将其作为测试数据库使用
  3. String database = "public";
  4. // 默认情况下,GreptimeDB 使用 gRPC 协议在监听端口 4001。
  5. // 我们可以提供多个指向同一 GreptimeDB 集群的 endpoints,
  6. // 客户端将根据负载均衡策略调用这些 endpoints。
  7. String[] endpoints = {"127.0.0.1:4001"};
  8. // 设置鉴权信息
  9. AuthInfo authInfo = new AuthInfo("username", "password");
  10. GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database)
  11. // 如果数据库不需要鉴权,我们可以使用 AuthInfo.noAuthorization() 作为参数。
  12. .authInfo(authInfo)
  13. // 如果服务配置了 TLS ,设置 TLS 选项来启用安全连接
  14. //.tlsOptions(new TlsOptions())
  15. // 好的开始 ^_^
  16. .build();
  17. GreptimeDB client = GreptimeDB.create(opts);

数据模型

表中的每条行数据包含三种类型的列:TagTimestampField。更多信息请参考 数据模型。 列值的类型可以是 StringFloatIntTimestamp 等。更多信息请参考 数据类型

低层级 API

GreptimeDB 的低层级 API 通过向具有预定义模式的 table 对象添加 row 来写入数据。

创建行数据

以下代码片段首先构建了一个名为 cpu_metric 的表,其中包括 hostcpu_usercpu_systs 列。 随后,它向表中插入了一行数据。

该表包含三种类型的列:

  • Taghost 列,值类型为 String
  • Fieldcpu_usercpu_sys 列,值类型为 Float
  • Timestampts 列,值类型为 Timestamp
  1. // 为 CPU 指标构建表结构
  2. TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric")
  3. .addTag("host", DataType.String) // 主机的标识符
  4. .addTimestamp("ts", DataType.TimestampMillisecond) // 毫秒级的时间戳
  5. .addField("cpu_user", DataType.Float64) // 用户进程的 CPU 使用率
  6. .addField("cpu_sys", DataType.Float64) // 系统进程的 CPU 使用率
  7. .build();
  8. // 根据定义的模式创建表
  9. Table cpuMetric = Table.from(cpuMetricSchema);
  10. // 单行的示例数据
  11. String host = "127.0.0.1"; // 主机标识符
  12. long ts = System.currentTimeMillis(); // 当前时间戳
  13. double cpuUser = 0.1; // 用户进程的 CPU 使用率(百分比)
  14. double cpuSys = 0.12; // 系统进程的 CPU 使用率(百分比)
  15. // 将一行数据插入表中
  16. // 注意:参数必须按照定义的表结构的列顺序排列:host, ts, cpu_user, cpu_sys
  17. cpuMetric.addRow(host, ts, cpuUser, cpuSys);

为了提高写入数据的效率,你可以一次创建多行数据以便写入到 GreptimeDB。

  1. // 创建表结构
  2. TableSchema cpuMetricSchema = TableSchema.newBuilder("cpu_metric")
  3. .addTag("host", DataType.String)
  4. .addTimestamp("ts", DataType.TimestampMillisecond)
  5. .addField("cpu_user", DataType.Float64)
  6. .addField("cpu_sys", DataType.Float64)
  7. .build();
  8. TableSchema memMetricSchema = TableSchema.newBuilder("mem_metric")
  9. .addTag("host", DataType.String)
  10. .addTimestamp("ts", DataType.TimestampMillisecond)
  11. .addField("mem_usage", DataType.Float64)
  12. .build();
  13. Table cpuMetric = Table.from(cpuMetricSchema);
  14. Table memMetric = Table.from(memMetricSchema);
  15. // 添加行数据
  16. for (int i = 0; i < 10; i++) {
  17. String host = "127.0.0." + i;
  18. long ts = System.currentTimeMillis();
  19. double cpuUser = i + 0.1;
  20. double cpuSys = i + 0.12;
  21. cpuMetric.addRow(host, ts, cpuUser, cpuSys);
  22. }
  23. for (int i = 0; i < 10; i++) {
  24. String host = "127.0.0." + i;
  25. long ts = System.currentTimeMillis();
  26. double memUsage = i + 0.2;
  27. memMetric.addRow(host, ts, memUsage);
  28. }

插入数据

下方示例展示了如何向 GreptimeDB 的表中插入行数据。

  1. // 插入数据
  2. // 考虑到尽可能提升性能和降低资源占用,SDK 设计为纯异步的。
  3. // 返回值是一个 future 对象。如果你想立即获取结果,可以调用 `future.get()`。
  4. CompletableFuture<Result<WriteOk, Err>> future = greptimeDB.write(cpuMetric, memMetric);
  5. Result<WriteOk, Err> result = future.get();
  6. if (result.isOk()) {
  7. LOG.info("Write result: {}", result.getOk());
  8. } else {
  9. LOG.error("Failed to write: {}", result.getErr());
  10. }

流式插入

当你需要插入大量数据时,例如导入历史数据,流式插入是非常有用的。

  1. StreamWriter<Table, WriteOk> writer = greptimeDB.streamWriter();
  2. // 写入数据到流中
  3. writer.write(cpuMetric);
  4. writer.write(memMetric);
  5. // 你可以对流执行操作,例如删除前 5 行
  6. writer.write(cpuMetric.subRange(0, 5), WriteOp.Delete);

在所有数据写入完毕后关闭流式写入。 一般情况下,连续写入数据时不需要关闭流式写入。

  1. // 完成流式写入
  2. CompletableFuture<WriteOk> future = writer.completed();
  3. WriteOk result = future.get();
  4. LOG.info("Write result: {}", result);

更新数据

关于更新机制,请参考 更新数据。 下方代码首先保存了一行数据,然后使用相同的标签和时间索引来更新特定的行数据。

  1. Table cpuMetric = Table.from(myMetricCpuSchema);
  2. // 插入一行数据
  3. long ts = 1703832681000L;
  4. cpuMetric.addRow("host1", ts, 0.23, 0.12);
  5. Result<WriteOk, Err> putResult = greptimeDB.write(cpuMetric).get();
  6. // 更新行数据
  7. Table newCpuMetric = Table.from(myMetricCpuSchema);
  8. // 相同的标签 `host1`
  9. // 相同的时间索引 `1703832681000`
  10. // 新的值:cpu_user = `0.80`, cpu_sys = `0.11`
  11. long ts = 1703832681000L;
  12. myMetricCpuSchema.addRow("host1", ts, 0.80, 0.11);
  13. // 覆盖现有数据
  14. CompletableFuture<Result<WriteOk, Err>> future = greptimeDB.write(myMetricCpuSchema);
  15. Result<WriteOk, Err> result = future.get();

高层级 API

SDK 的高层级 API 使用 ORM 风格的对象写入数据, 它允许你以更面向对象的方式创建、插入和更新数据,为开发者提供了更友好的体验。 然而,高层级 API 不如低层级 API 高效。 这是因为 ORM 风格的对象在转换对象时可能会消耗更多的资源和时间。

创建行数据

GreptimeDB Java Ingester SDK 允许我们使用基本的 POJO 对象进行写入。虽然这种方法需要使用 Greptime 的注解,但它们很容易使用。

  1. @Metric(name = "cpu_metric")
  2. public class Cpu {
  3. @Column(name = "host", tag = true, dataType = DataType.String)
  4. private String host;
  5. @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
  6. private long ts;
  7. @Column(name = "cpu_user", dataType = DataType.Float64)
  8. private double cpuUser;
  9. @Column(name = "cpu_sys", dataType = DataType.Float64)
  10. private double cpuSys;
  11. // getters and setters
  12. // ...
  13. }
  14. @Metric(name = "mem_metric")
  15. public class Memory {
  16. @Column(name = "host", tag = true, dataType = DataType.String)
  17. private String host;
  18. @Column(name = "ts", timestamp = true, dataType = DataType.TimestampMillisecond)
  19. private long ts;
  20. @Column(name = "mem_usage", dataType = DataType.Float64)
  21. private double memUsage;
  22. // getters and setters
  23. // ...
  24. }
  25. // 添加行
  26. List<Cpu> cpus = new ArrayList<>();
  27. for (int i = 0; i < 10; i++) {
  28. Cpu c = new Cpu();
  29. c.setHost("127.0.0." + i);
  30. c.setTs(System.currentTimeMillis());
  31. c.setCpuUser(i + 0.1);
  32. c.setCpuSys(i + 0.12);
  33. cpus.add(c);
  34. }
  35. List<Memory> memories = new ArrayList<>();
  36. for (int i = 0; i < 10; i++) {
  37. Memory m = new Memory();
  38. m.setHost("127.0.0." + i);
  39. m.setTs(System.currentTimeMillis());
  40. m.setMemUsage(i + 0.2);
  41. memories.add(m);
  42. }

插入数据

写入 POJO 对象:

  1. // 插入数据
  2. CompletableFuture<Result<WriteOk, Err>> puts = greptimeDB.writePOJOs(cpus, memories);
  3. Result<WriteOk, Err> result = puts.get();
  4. if (result.isOk()) {
  5. LOG.info("Write result: {}", result.getOk());
  6. } else {
  7. LOG.error("Failed to write: {}", result.getErr());
  8. }

流式插入

当你需要插入大量数据时,例如导入历史数据,流式插入是非常有用的。

  1. StreamWriter<List<?>, WriteOk> writer = greptimeDB.streamWriterPOJOs();
  2. // 写入数据到流中
  3. writer.write(cpus);
  4. writer.write(memories);
  5. // 你可以对流执行操作,例如删除前 5 行
  6. writer.write(cpus.subList(0, 5), WriteOp.Delete);

在所有数据写入完毕后关闭流式写入。 一般情况下,连续写入数据时不需要关闭流式写入。

  1. // 完成流式写入
  2. CompletableFuture<WriteOk> future = writer.completed();
  3. WriteOk result = future.get();
  4. LOG.info("Write result: {}", result);

更新数据

关于更新机制,请参考 更新数据。 下方代码首先保存了一行数据,然后使用相同的标签和时间索引来更新特定的行数据。

  1. Cpu cpu = new Cpu();
  2. cpu.setHost("host1");
  3. cpu.setTs(1703832681000L);
  4. cpu.setCpuUser(0.23);
  5. cpu.setCpuSys(0.12);
  6. // 插入一行数据
  7. Result<WriteOk, Err> putResult = greptimeDB.writePOJOs(cpu).get();
  8. // 更新该行数据
  9. Cpu newCpu = new Cpu();
  10. // 相同的标签 `host1`
  11. newCpu.setHost("host1");
  12. // 相同的时间索引 `1703832681000`
  13. newCpu.setTs(1703832681000L);
  14. // 新的值: cpu_user = `0.80`, cpu_sys = `0.11`
  15. cpu.setCpuUser(0.80);
  16. cpu.setCpuSys(0.11);
  17. // 覆盖现有数据
  18. Result<WriteOk, Err> updateResult = greptimeDB.writePOJOs(newCpu).get();

请参考此处获取更多代码示例。

更多示例

请参考示例获取更多完全可运行的代码片段和常用方法的解释。

调试日志

ingester SDK 提供了用于调试的指标和日志。 请参考 Metrics & DisplayMagic Tools 了解如何启用或禁用日志。

Ingester 库参考

查询数据

GreptimeDB 使用 SQL 作为主要查询语言,兼容 MySQL 和 PostgreSQL。 因此,我们推荐使用成熟的 SQL 驱动来查询数据。

推荐的查询库

Java 数据库连接(JDBC)是 JavaSoft 规范的标准应用程序编程接口(API),它允许 Java 程序访问数据库管理系统。

许多数据库,如 MySQL 或 PostgreSQL,都已经基于 JDBC API 实现了自己的驱动程序。 由于 GreptimeDB 支持多种协议,这里我们使用 MySQL 协议作为示例来演示如何使用 JDBC。 如果你希望使用其他协议,只需要将 MySQL 驱动程序替换为相应的驱动程序。

安装

如果你使用的是 Maven,请将以下内容添加到 pom.xml 的依赖项列表中:

  1. <!-- MySQL 依赖 -->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>8.0.33</version>
  6. </dependency>

连接数据库

下方的例子展示了如何连接到 GreptimeDB:

这里我们使用 MySQL 作为示例来演示如何连接到 GreptimeDB。

  1. public static Connection getConnection() throws IOException, ClassNotFoundException, SQLException {
  2. Properties prop = new Properties();
  3. prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties"));
  4. String dbName = (String) prop.get("db.database-driver");
  5. String dbConnUrl = (String) prop.get("db.url");
  6. String dbUserName = (String) prop.get("db.username");
  7. String dbPassword = (String) prop.get("db.password");
  8. Class.forName(dbName);
  9. Connection dbConn = DriverManager.getConnection(dbConnUrl, dbUserName, dbPassword);
  10. return Objects.requireNonNull(dbConn, "Failed to make connection!");
  11. }

你需要一个 properties 文件来存储数据库连接信息,将其放在 Resources 目录中并命名为 db-connection.properties。文件内容如下:

  1. # DataSource
  2. db.database-driver=com.mysql.cj.jdbc.Driver
  3. db.url=jdbc:mysql://localhost:4002/public
  4. db.username=
  5. db.password=

或者你可以从这里获取文件。

Raw SQL

我们推荐使用 Raw SQL 来体验 GreptimeDB 的全部功能。 下面的例子展示了如何使用 Raw SQL 查询数据:

  1. try (Connection conn = getConnection()) {
  2. Statement statement = conn.createStatement();
  3. // DESC table;
  4. ResultSet rs = statement.executeQuery("DESC cpu_metric");
  5. LOG.info("Column | Type | Key | Null | Default | Semantic Type ");
  6. while (rs.next()) {
  7. LOG.info("{} | {} | {} | {} | {} | {}",
  8. rs.getString(1),
  9. rs.getString(2),
  10. rs.getString(3),
  11. rs.getString(4),
  12. rs.getString(5),
  13. rs.getString(6));
  14. }
  15. // SELECT COUNT(*) FROM cpu_metric;
  16. rs = statement.executeQuery("SELECT COUNT(*) FROM cpu_metric");
  17. while (rs.next()) {
  18. LOG.info("Count: {}", rs.getInt(1));
  19. }
  20. // SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5;
  21. rs = statement.executeQuery("SELECT * FROM cpu_metric ORDER BY ts DESC LIMIT 5");
  22. LOG.info("host | ts | cpu_user | cpu_sys");
  23. while (rs.next()) {
  24. LOG.info("{} | {} | {} | {}",
  25. rs.getString("host"),
  26. rs.getTimestamp("ts"),
  27. rs.getDouble("cpu_user"),
  28. rs.getDouble("cpu_sys"));
  29. }
  30. }

请参考此处获取更多可执行代码。

查询库参考

有关如何使用查询库的更多信息,请参考相应库的文档: