从 InfluxDB 迁移

本文档将帮助你了解 GreptimeDB 和 InfluxDB 的数据模型之间的区别,并指导你完成迁移过程。

数据模型的区别

要了解 InfluxDB 和 GreptimeDB 的数据模型之间的差异,请参考写入数据文档中的数据模型

数据库连接信息

在写入或查询数据之前,需要了解 InfluxDB 和 GreptimeDB 之间的数据库连接信息的差异。

  • Token:InfluxDB API 中的 token 用于身份验证,与 GreptimeDB 身份验证相同。 当使用 InfluxDB 的客户端库或 HTTP API 与 GreptimeDB 交互时,你可以使用 <greptimedb_user:greptimedb_password> 作为 token。
  • Organization:GreptimeDB 中没有组织。
  • Bucket:在 InfluxDB 中,bucket 是时间序列数据的容器,与 GreptimeDB 中的数据库名称相同。

打开 GreptimeCloud 控制台 点击 Manage Your Data 下的 Connection Information. 你可以找到 GreptimeDB URL,数据库名称,以及 token 所需的 username 和 password。

写入数据

GreptimeDB 兼容 InfluxDB 的行协议格式,包括 v1 和 v2。 这意味着你可以轻松地从 InfluxDB 迁移到 GreptimeDB。

HTTP API

你可以使用以下 HTTP API 请求将 measurement 写入 GreptimeDB:

  • InfluxDB line protocol v2
  • InfluxDB line protocol v1
  1. curl -X POST 'https://<host>/v1/influxdb/api/v2/write?bucket=<db-name>' \
  2. -H 'authorization: token <greptime_user:greptimedb_password>' \
  3. -d 'census,location=klamath,scientist=anderson bees=23 1566086400000000000'
  1. curl 'https://<host>/v1/influxdb/write?db=<db-name>&u=<greptime_user>&p=<greptimedb_password>' \
  2. -d 'census,location=klamath,scientist=anderson bees=23 1566086400000000000'

Telegraf

GreptimeDB 支持 InfluxDB 行协议也意味着 GreptimeDB 与 Telegraf 兼容。 要配置 Telegraf,只需将 GreptimeDB 的 URL 添加到 Telegraf 配置中:

  • InfluxDB line protocol v2
  • InfluxDB line protocol v1
  1. [[outputs.influxdb_v2]]
  2. urls = ["https://<host>/v1/influxdb"]
  3. token = "<greptime_user>:<greptimedb_password>"
  4. bucket = "<db-name>"
  5. ## 留空即可
  6. organization = ""
  1. [[outputs.influxdb]]
  2. urls = ["https://<host>/v1/influxdb"]
  3. database = "<db-name>"
  4. username = "<greptime_user>"
  5. password = "<greptimedb_password>"

客户端库

使用 InfluxDB 客户端库写入数据到 GreptimeDB 非常直接且简单。 你只需在客户端配置中包含 URL 和身份验证信息。

例如:

  • Node.js
  • Python
  • Go
  • Java
  • PHP
  1. 'use strict'
  2. /** @module write
  3. **/
  4. import { InfluxDB, Point } from '@influxdata/influxdb-client'
  5. /** 环境变量 **/
  6. const url = 'https://<host>/v1/influxdb'
  7. const token = '<greptime_user>:<greptimedb_password>'
  8. const org = ''
  9. const bucket = '<db-name>'
  10. const influxDB = new InfluxDB({ url, token })
  11. const writeApi = influxDB.getWriteApi(org, bucket)
  12. writeApi.useDefaultTags({ region: 'west' })
  13. const point1 = new Point('temperature')
  14. .tag('sensor_id', 'TLM01')
  15. .floatField('value', 24.0)
  16. writeApi.writePoint(point1)
  1. import influxdb_client
  2. from influxdb_client.client.write_api import SYNCHRONOUS
  3. bucket = "<db-name>"
  4. org = ""
  5. token = "<greptime_user>:<greptimedb_password>"
  6. url="https://<host>/v1/influxdb"
  7. client = influxdb_client.InfluxDBClient(
  8. url=url,
  9. token=token,
  10. org=org
  11. )
  12. write_api = client.write_api(write_options=SYNCHRONOUS)
  13. p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
  14. write_api.write(bucket=bucket, org=org, record=p)
  1. bucket := "<db-name>"
  2. org := ""
  3. token := "<greptime_user>:<greptimedb_password>"
  4. url := "https://<host>/v1/influxdb"
  5. client := influxdb2.NewClient(url, token)
  6. writeAPI := client.WriteAPIBlocking(org, bucket)
  7. p := influxdb2.NewPoint("stat",
  8. map[string]string{"unit": "temperature"},
  9. map[string]interface{}{"avg": 24.5, "max": 45},
  10. time.Now())
  11. writeAPI.WritePoint(context.Background(), p)
  12. client.Close()
  1. private static String url = "https://<host>/v1/influxdb";
  2. private static String org = "";
  3. private static String bucket = "<db-name>";
  4. private static char[] token = "<greptime_user>:<greptimedb_password>".toCharArray();
  5. public static void main(final String[] args) {
  6. InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
  7. WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
  8. Point point = Point.measurement("temperature")
  9. .addTag("location", "west")
  10. .addField("value", 55D)
  11. .time(Instant.now().toEpochMilli(), WritePrecision.MS);
  12. writeApi.writePoint(point);
  13. influxDBClient.close();
  14. }
  1. $client = new Client([
  2. "url" => "https://<host>/v1/influxdb",
  3. "token" => "<greptime_user>:<greptimedb_password>",
  4. "bucket" => "<db-name>",
  5. "org" => "",
  6. "precision" => InfluxDB2\Model\WritePrecision::S
  7. ]);
  8. $writeApi = $client->createWriteApi();
  9. $dateTimeNow = new DateTime('NOW');
  10. $point = Point::measurement("weather")
  11. ->addTag("location", "Denver")
  12. ->addField("temperature", rand(0, 20))
  13. ->time($dateTimeNow->getTimestamp());
  14. $writeApi->write($point);

除了上述语言之外,GreptimeDB 还支持其他 InfluxDB 支持的客户端库。 你可以通过参考上面提供的连接信息代码片段,使用你喜欢的语言编写代码。

查询数据

GreptimeDB 不支持 Flux 和 InfluxQL,而是使用 SQL 和 PromQL。

SQL 是一种通用的用于管理和操作关系数据库的语言。 具有灵活的数据检索、操作和分析功能, 减少了已经熟悉 SQL 的用户的学习曲线。

PromQL(Prometheus 查询语言)允许用户实时选择和聚合时间序列数据, 表达式的结果可以显示为图形,也可以在 Prometheus 的表达式浏览器中以表格数据的形式查看, 或通过 HTTP API 传递给外部系统。

假设你要查询过去 24 小时内记录的 monitor 表中的最大 CPU。 在 InfluxQL 中,查询如下:

  1. SELECT
  2. MAX("cpu")
  3. FROM
  4. "monitor"
  5. WHERE
  6. time > now() - 24h
  7. GROUP BY
  8. time(1h)

此 InfluxQL 查询计算 monitor 表中 cpu字段的最大值, 其中时间大于当前时间减去 24 小时,结果以一小时为间隔进行分组。

该查询在 Flux 中的表达如下:

  1. from(bucket: "public")
  2. |> range(start: -24h)
  3. |> filter(fn: (r) => r._measurement == "monitor")
  4. |> aggregateWindow(every: 1h, fn: max)

在 GreptimeDB SQL 中,类似的查询为:

  1. SELECT
  2. ts,
  3. host,
  4. AVG(cpu) RANGE '1h' as mean_cpu
  5. FROM
  6. monitor
  7. WHERE
  8. ts > NOW() - INTERVAL '24 hours'
  9. ALIGN '1h' TO NOW
  10. ORDER BY ts DESC;

在该 SQL 查询中, RANGE 子句确定了 AVG(cpu) 聚合函数的时间窗口, 而 ALIGN 子句设置了时间序列数据的对齐时间。 有关按时间窗口分组的更多详细信息,请参考按时间窗口聚合数据文档。

在 PromQL 中,类似的查询为:

  1. avg_over_time(monitor[1h])

要查询最后 24 小时的时间序列数据, 你需要执行此 PromQL 并使用 HTTP API 的 startend 参数定义时间范围。 有关 PromQL 的更多信息,请参考 PromQL 文档。

可视化数据

GreptimeCloud 控制台提供了名为 Workbench 的数据可视化工作台。 打开控制台, 在 Manage Your Data 下选择 Web Dashboard, 然后创建一个新的 Workbench 文件, 即可按需求创建图表。

迁移数据

你可以通过以下步骤实现从 InfluxDB 到 GreptimeDB 的数据无缝迁移:

Double write to GreptimeDB and InfluxDB

  1. 同时将数据写入 GreptimeDB 和 InfluxDB,以避免迁移过程中的数据丢失。
  2. 从 InfluxDB 导出所有历史数据,并将数据导入 GreptimeDB。
  3. 停止向 InfluxDB 写入数据,并移除 InfluxDB 服务器。

双写 GreptimeDB 和 InfluxDB

将数据双写 GreptimeDB 和 InfluxDB 是迁移过程中防止数据丢失的有效策略。 当使用 InfluxDB 的客户端库时,你可以建立两个客户端实例,一个用于 GreptimeDB,另一个用于 InfluxDB。 有关如何使用 InfluxDB 行协议将数据写入 GreptimeDB 的操作,请参考写入数据部分。

如果无需保留所有历史数据, 你可以双写一段时间以积累所需的最新数据, 然后停止向 InfluxDB 写入数据并仅使用 GreptimeDB。 如果需要完整迁移所有历史数据,请按照接下来的步骤操作。

从 InfluxDB v1 服务器导出数据

创建一个临时目录来存储 InfluxDB 的导出数据。

  1. mkdir -p /path/to/export

使用 InfluxDB 的 influx_inspect export 命令 导出数据。

  1. influx_inspect export \
  2. -database <db-name> \
  3. -end <end-time> \
  4. -lponly \
  5. -datadir /var/lib/influxdb/data \
  6. -waldir /var/lib/influxdb/wal \
  7. -out /path/to/export/data
  • -database 指定要导出的数据库。
  • -end 指定要导出的数据的结束时间。 必须是RFC3339 格式,例如 2024-01-01T00:00:00Z。 你可以使用同时写入 GreptimeDB 和 InfluxDB 时的时间戳作为结束时间。
  • -lponly 指定只导出行协议数据。
  • -datadir 指定数据目录的路径,请见InfluxDB 数据设置中的配置。
  • -waldir 指定 WAL 目录的路径,请见InfluxDB 数据设置中的配置。
  • -out 指定输出目录。

导出的 InfluxDB 行协议数据类似如下:

  1. disk,device=disk1s5s1,fstype=apfs,host=bogon,mode=ro,path=/ inodes_used=356810i 1714363350000000000
  2. diskio,host=bogon,name=disk0 iops_in_progress=0i 1714363350000000000
  3. disk,device=disk1s6,fstype=apfs,host=bogon,mode=rw,path=/System/Volumes/Update inodes_used_percent=0.0002391237988702021 1714363350000000000
  4. ...

从 InfluxDB v2 服务器导出数据

创建一个临时目录来存储 InfluxDB 的导出数据。

  1. mkdir -p /path/to/export

使用 InfluxDB 的 influx inspect export-lp 命令 导出数据。

  1. influxd inspect export-lp \
  2. --bucket-id <bucket-id> \
  3. --engine-path /var/lib/influxdb2/engine/ \
  4. --end <end-time> \
  5. --output-path /path/to/export/data
  • --bucket-id 指定要导出的 bucket ID。
  • --engine-path 指定引擎目录的路径,请见InfluxDB 数据设置中的配置。
  • --end 指定要导出的数据的结束时间。 必须是RFC3339 格式,例如 2024-01-01T00:00:00Z。 你可以使用同时写入 GreptimeDB 和 InfluxDB 时的时间戳作为结束时间。
  • --output-path 指定输出目录。

命令行的执行结果类似如下:

  1. {"level":"info","ts":1714377321.4795408,"caller":"export_lp/export_lp.go:219","msg":"exporting TSM files","tsm_dir":"/var/lib/influxdb2/engine/data/307013e61d514f3c","file_count":1}
  2. {"level":"info","ts":1714377321.4940555,"caller":"export_lp/export_lp.go:315","msg":"exporting WAL files","wal_dir":"/var/lib/influxdb2/engine/wal/307013e61d514f3c","file_count":1}
  3. {"level":"info","ts":1714377321.4941633,"caller":"export_lp/export_lp.go:204","msg":"export complete"}

导出的 InfluxDB 行协议数据类似如下:

  1. cpu,cpu=cpu-total,host=bogon usage_idle=80.4448912910468 1714376180000000000
  2. cpu,cpu=cpu-total,host=bogon usage_idle=78.50167052182304 1714376190000000000
  3. cpu,cpu=cpu-total,host=bogon usage_iowait=0 1714375700000000000
  4. cpu,cpu=cpu-total,host=bogon usage_iowait=0 1714375710000000000
  5. ...

导入数据到 GreptimeDB

在将数据导入 GreptimeDB 之前,如果数据文件过大,建议将数据文件拆分为多个片段:

  1. split -l 100000 -d -a 10 data data.
  2. # -l [line_count] 创建长度为 line_count 行的拆分文件。
  3. # -d 使用数字后缀而不是字母后缀。
  4. # -a [suffix_length] 使用 suffix_length 个字母来形成文件名的后缀。

你可以使用 HTTP API 导入数据,如写入数据部分所述。 下方提供的脚本将帮助你从文件中读取数据并将其导入 GreptimeDB。

假设你的当前位置是存储数据文件的目录:

  1. .
  2. ├── data.0000000000
  3. ├── data.0000000001
  4. ├── data.0000000002
  5. ...

将 GreptimeDB 的连接信息设置到环境变量中:

  1. export GREPTIME_USERNAME=<greptime_username>
  2. export GREPTIME_PASSWORD=<greptime_password>
  3. export GREPTIME_HOST=<host>
  4. export GREPTIME_DB=<db-name>

将数据导入到 GreptimeDB:

  1. for file in data.*; do
  2. curl -i --retry 3 \
  3. -X POST "https://${GREPTIME_HOST}/v1/influxdb/write?db=${GREPTIME_DB}&u=${GREPTIME_USERNAME}&p=${GREPTIME_PASSWORD}" \
  4. --data-binary @${file}
  5. sleep 1
  6. done