从 InfluxDB 迁移

本文档将帮助你了解 GreptimeDB 和 InfluxDB 的数据模型之间的区别,并指导你完成迁移过程。

数据模型的区别

你可能已经熟悉了 InfluxDB 的关键概念, GreptimeDB 的 数据模型 是值得了解的新事物。 让我们从相似和不同之处开始:

  • 两者都是schemaless 写入的解决方案,这意味着在写入数据之前无需定义表结构。
  • 在 InfluxDB 中,一个点代表一条数据记录,包含一个 measurement、tag 集、field 集和时间戳。 在 GreptimeDB 中,它被表示为时间序列表中的一行数据。 表名对应于 measurement,列由三种类型组成:Tag、Field 和 Timestamp。
  • GreptimeDB 使用 TimestampNanosecond 作为来自 InfluxDB 行协议 API 的时间戳数据类型。
  • GreptimeDB 使用 Float64 作为来自 InfluxDB 行协议 API 的数值数据类型。

让我们以 InfluxDB 文档中的示例数据为例:

_time_measurementlocationscientist_field_value
2019-08-18T00:00:00Zcensusklamathandersonbees23
2019-08-18T00:00:00Zcensusportlandmullenants30
2019-08-18T00:06:00Zcensusklamathandersonbees28
2019-08-18T00:06:00Zcensusportlandmullenants32

上述数据的 InfluxDB 行协议格式为:

  1. census,location=klamath,scientist=anderson bees=23 1566086400000000000
  2. census,location=portland,scientist=mullen ants=30 1566086400000000000
  3. census,location=klamath,scientist=anderson bees=28 1566086760000000000
  4. census,location=portland,scientist=mullen ants=32 1566086760000000000

在 GreptimeDB 数据模型中,上述数据将被表示为 census 表中的以下内容:

  1. +---------------------+----------+-----------+------+------+
  2. | ts | location | scientist | bees | ants |
  3. +---------------------+----------+-----------+------+------+
  4. | 2019-08-18 00:00:00 | klamath | anderson | 23 | NULL |
  5. | 2019-08-18 00:06:00 | klamath | anderson | 28 | NULL |
  6. | 2019-08-18 00:00:00 | portland | mullen | NULL | 30 |
  7. | 2019-08-18 00:06:00 | portland | mullen | NULL | 32 |
  8. +---------------------+----------+-----------+------+------+

census 表结构如下:

  1. +-----------+----------------------+------+------+---------+---------------+
  2. | Column | Type | Key | Null | Default | Semantic Type |
  3. +-----------+----------------------+------+------+---------+---------------+
  4. | location | String | PRI | YES | | TAG |
  5. | scientist | String | PRI | YES | | TAG |
  6. | bees | Float64 | | YES | | FIELD |
  7. | ts | TimestampNanosecond | PRI | NO | | TIMESTAMP |
  8. | ants | Float64 | | YES | | FIELD |
  9. +-----------+----------------------+------+------+---------+---------------+

数据库连接信息

在写入或查询数据之前,需要了解 InfluxDB 和 GreptimeDB 之间的数据库连接信息的差异。

  • Token:InfluxDB API 中的 token 用于身份验证,与 GreptimeDB 身份验证相同。 当使用 InfluxDB 的客户端库或 HTTP API 与 GreptimeDB 交互时,你可以使用 <greptimedb_user:greptimedb_password> 作为 token。
  • Organization:GreptimeDB 中没有组织。
  • Bucket:在 InfluxDB 中,bucket 是时间序列数据的容器,与 GreptimeDB 中的数据库名称相同。

写入数据

GreptimeDB 兼容 InfluxDB 的行协议格式,包括 v1 和 v2。 这意味着你可以轻松地从 InfluxDB 迁移到 GreptimeDB。

HTTP API

你可以使用以下 HTTP API 请求将 measurement 写入 GreptimeDB:

  • InfluxDB line protocol v2
  • InfluxDB line protocol v1
  1. curl -X POST 'http://<greptimedb-host>:4000/v1/influxdb/api/v2/write?db=<db-name>' \
  2. -H 'authorization: token <greptime_user:greptimedb_password>' \
  3. -d 'census,location=klamath,scientist=anderson bees=23 1566086400000000000'
  1. curl 'http://<greptimedb-host>:4000/v1/influxdb/write?db=<db-name>&u=<greptime_user>&p=<greptimedb_password>' \
  2. -d 'census,location=klamath,scientist=anderson bees=23 1566086400000000000'

Telegraf

GreptimeDB 支持 InfluxDB 行协议也意味着 GreptimeDB 与 Telegraf 兼容。 要配置 Telegraf,只需将 http://<greptimedb-host>:4000 URL 添加到 Telegraf 配置中:

  • InfluxDB line protocol v2
  • InfluxDB line protocol v1
  1. [[outputs.influxdb_v2]]
  2. urls = ["http://<greptimedb-host>:4000/v1/influxdb"]
  3. token = "<greptime_user>:<greptimedb_password>"
  4. bucket = "<db-name>"
  5. ## 留空即可
  6. organization = ""
  1. [[outputs.influxdb]]
  2. urls = ["http://<greptimedb-host>:4000/v1/influxdb"]
  3. database = "<db-name>"
  4. username = "<greptime_user>"
  5. password = "<greptimedb_password>"

客户端库

使用 InfluxDB 客户端库写入数据到 GreptimeDB 非常直接且简单。 你只需在客户端配置中包含 URL 和身份验证信息。

例如:

  • Node.js
  • Python
  • Go
  • Java
  • PHP
  1. 'use strict'
  2. /** @module write
  3. **/
  4. import { InfluxDB, Point } from '@influxdata/influxdb-client'
  5. /** 环境变量 **/
  6. const url = 'http://<greptimedb-host>:4000/v1/influxdb'
  7. const token = '<greptime_user>:<greptimedb_password>'
  8. const org = ''
  9. const bucket = '<db-name>'
  10. const influxDB = new InfluxDB({ url, token })
  11. const writeApi = influxDB.getWriteApi(org, bucket)
  12. writeApi.useDefaultTags({ region: 'west' })
  13. const point1 = new Point('temperature')
  14. .tag('sensor_id', 'TLM01')
  15. .floatField('value', 24.0)
  16. writeApi.writePoint(point1)
  1. import influxdb_client
  2. from influxdb_client.client.write_api import SYNCHRONOUS
  3. bucket = "<db-name>"
  4. org = ""
  5. token = "<greptime_user>:<greptimedb_password>"
  6. url="http://<greptimedb-host>:4000/v1/influxdb"
  7. client = influxdb_client.InfluxDBClient(
  8. url=url,
  9. token=token,
  10. org=org
  11. )
  12. write_api = client.write_api(write_options=SYNCHRONOUS)
  13. p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
  14. write_api.write(bucket=bucket, org=org, record=p)
  1. bucket := "<db-name>"
  2. org := ""
  3. token := "<greptime_user>:<greptimedb_password>"
  4. url := "http://<greptimedb-host>:4000/v1/influxdb"
  5. client := influxdb2.NewClient(url, token)
  6. writeAPI := client.WriteAPIBlocking(org, bucket)
  7. p := influxdb2.NewPoint("stat",
  8. map[string]string{"unit": "temperature"},
  9. map[string]interface{}{"avg": 24.5, "max": 45},
  10. time.Now())
  11. writeAPI.WritePoint(context.Background(), p)
  12. client.Close()
  1. private static String url = "http://<greptimedb-host>:4000/v1/influxdb";
  2. private static String org = "";
  3. private static String bucket = "<db-name>";
  4. private static char[] token = "<greptime_user>:<greptimedb_password>".toCharArray();
  5. public static void main(final String[] args) {
  6. InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
  7. WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
  8. Point point = Point.measurement("temperature")
  9. .addTag("location", "west")
  10. .addField("value", 55D)
  11. .time(Instant.now().toEpochMilli(), WritePrecision.MS);
  12. writeApi.writePoint(point);
  13. influxDBClient.close();
  14. }
  1. $client = new Client([
  2. "url" => "http://<greptimedb-host>:4000/v1/influxdb",
  3. "token" => "<greptime_user>:<greptimedb_password>",
  4. "bucket" => "<db-name>",
  5. "org" => "",
  6. "precision" => InfluxDB2\Model\WritePrecision::S
  7. ]);
  8. $writeApi = $client->createWriteApi();
  9. $dateTimeNow = new DateTime('NOW');
  10. $point = Point::measurement("weather")
  11. ->addTag("location", "Denver")
  12. ->addField("temperature", rand(0, 20))
  13. ->time($dateTimeNow->getTimestamp());
  14. $writeApi->write($point);

除了上述语言之外,GreptimeDB 还支持其他 InfluxDB 支持的客户端库。 你可以通过参考上面提供的连接信息代码片段,使用你喜欢的语言编写代码。

查询数据

GreptimeDB 不支持 Flux 和 InfluxQL,而是使用 SQL 和 PromQL。

SQL 是一种通用的用于管理和操作关系数据库的语言。 具有灵活的数据检索、操作和分析功能, 减少了已经熟悉 SQL 的用户的学习曲线。

PromQL(Prometheus 查询语言)允许用户实时选择和聚合时间序列数据, 表达式的结果可以显示为图形,也可以在 Prometheus 的表达式浏览器中以表格数据的形式查看, 或通过 HTTP API 传递给外部系统。

假设你要查询过去 24 小时内记录的 monitor 表中的最大 CPU。 在 InfluxQL 中,查询如下:

  1. SELECT
  2. MAX("cpu")
  3. FROM
  4. "monitor"
  5. WHERE
  6. time > now() - 24h
  7. GROUP BY
  8. time(1h)

此 InfluxQL 查询计算 monitor 表中 cpu字段的最大值, 其中时间大于当前时间减去 24 小时,结果以一小时为间隔进行分组。

该查询在 Flux 中的表达如下:

  1. from(bucket: "public")
  2. |> range(start: -24h)
  3. |> filter(fn: (r) => r._measurement == "monitor")
  4. |> aggregateWindow(every: 1h, fn: max)

在 GreptimeDB SQL 中,类似的查询为:

  1. SELECT
  2. ts,
  3. host,
  4. AVG(cpu) RANGE '1h' as mean_cpu
  5. FROM
  6. monitor
  7. WHERE
  8. ts > NOW() - INTERVAL '24 hours'
  9. ALIGN '1h' TO NOW
  10. ORDER BY ts DESC;

在该 SQL 查询中, RANGE 子句确定了 AVG(cpu) 聚合函数的时间窗口, 而 ALIGN 子句设置了时间序列数据的对齐时间。 有关按时间窗口分组的更多详细信息,请参考按时间窗口聚合数据文档。

在 PromQL 中,类似的查询为:

  1. avg_over_time(monitor[1h])

要查询最后 24 小时的时间序列数据, 你需要执行此 PromQL 并使用 HTTP API 的 startend 参数定义时间范围。 有关 PromQL 的更多信息,请参考 PromQL 文档。

可视化数据

推荐使用 Grafana 可视化 GreptimeDB 数据, 请参考 Grafana 文档了解如何配置 GreptimeDB。

迁移数据

你可以通过以下步骤实现从 InfluxDB 到 GreptimeDB 的数据无缝迁移:

Double write to GreptimeDB and InfluxDB

  1. 同时将数据写入 GreptimeDB 和 InfluxDB,以避免迁移过程中的数据丢失。
  2. 从 InfluxDB 导出所有历史数据,并将数据导入 GreptimeDB。
  3. 停止向 InfluxDB 写入数据,并移除 InfluxDB 服务器。

双写 GreptimeDB 和 InfluxDB

将数据双写 GreptimeDB 和 InfluxDB 是迁移过程中防止数据丢失的有效策略。 当使用 InfluxDB 的客户端库时,你可以建立两个客户端实例,一个用于 GreptimeDB,另一个用于 InfluxDB。 有关如何使用 InfluxDB 行协议将数据写入 GreptimeDB 的操作,请参考写入数据部分。

如果无需保留所有历史数据, 你可以双写一段时间以积累所需的最新数据, 然后停止向 InfluxDB 写入数据并仅使用 GreptimeDB。 如果需要完整迁移所有历史数据,请按照接下来的步骤操作。

从 InfluxDB v1 服务器导出数据

创建一个临时目录来存储 InfluxDB 的导出数据。

  1. mkdir -p /path/to/export

使用 InfluxDB 的 influx_inspect export 命令 导出数据。

  1. influx_inspect export \
  2. -database <db-name> \
  3. -end <end-time> \
  4. -lponly \
  5. -datadir /var/lib/influxdb/data \
  6. -waldir /var/lib/influxdb/wal \
  7. -out /path/to/export/data
  • -database 指定要导出的数据库。
  • -end 指定要导出的数据的结束时间。 必须是RFC3339 格式,例如 2024-01-01T00:00:00Z。 你可以使用同时写入 GreptimeDB 和 InfluxDB 时的时间戳作为结束时间。
  • -lponly 指定只导出行协议数据。
  • -datadir 指定数据目录的路径,请见InfluxDB 数据设置中的配置。
  • -waldir 指定 WAL 目录的路径,请见InfluxDB 数据设置中的配置。
  • -out 指定输出目录。

导出的 InfluxDB 行协议数据类似如下:

  1. disk,device=disk1s5s1,fstype=apfs,host=bogon,mode=ro,path=/ inodes_used=356810i 1714363350000000000
  2. diskio,host=bogon,name=disk0 iops_in_progress=0i 1714363350000000000
  3. disk,device=disk1s6,fstype=apfs,host=bogon,mode=rw,path=/System/Volumes/Update inodes_used_percent=0.0002391237988702021 1714363350000000000
  4. ...

从 InfluxDB v2 服务器导出数据

创建一个临时目录来存储 InfluxDB 的导出数据。

  1. mkdir -p /path/to/export

使用 InfluxDB 的 influx inspect export-lp 命令 导出数据。

  1. influxd inspect export-lp \
  2. --bucket-id <bucket-id> \
  3. --engine-path /var/lib/influxdb2/engine/ \
  4. --end <end-time> \
  5. --output-path /path/to/export/data
  • --bucket-id 指定要导出的 bucket ID。
  • --engine-path 指定引擎目录的路径,请见InfluxDB 数据设置中的配置。
  • --end 指定要导出的数据的结束时间。 必须是RFC3339 格式,例如 2024-01-01T00:00:00Z。 你可以使用同时写入 GreptimeDB 和 InfluxDB 时的时间戳作为结束时间。
  • --output-path 指定输出目录。

命令行的执行结果类似如下:

  1. {"level":"info","ts":1714377321.4795408,"caller":"export_lp/export_lp.go:219","msg":"exporting TSM files","tsm_dir":"/var/lib/influxdb2/engine/data/307013e61d514f3c","file_count":1}
  2. {"level":"info","ts":1714377321.4940555,"caller":"export_lp/export_lp.go:315","msg":"exporting WAL files","wal_dir":"/var/lib/influxdb2/engine/wal/307013e61d514f3c","file_count":1}
  3. {"level":"info","ts":1714377321.4941633,"caller":"export_lp/export_lp.go:204","msg":"export complete"}

导出的 InfluxDB 行协议数据类似如下:

  1. cpu,cpu=cpu-total,host=bogon usage_idle=80.4448912910468 1714376180000000000
  2. cpu,cpu=cpu-total,host=bogon usage_idle=78.50167052182304 1714376190000000000
  3. cpu,cpu=cpu-total,host=bogon usage_iowait=0 1714375700000000000
  4. cpu,cpu=cpu-total,host=bogon usage_iowait=0 1714375710000000000
  5. ...

导入数据到 GreptimeDB

在将数据导入 GreptimeDB 之前,如果数据文件过大,建议将数据文件拆分为多个片段:

  1. split -l 100000 -d -a 10 data data.
  2. # -l [line_count] 创建长度为 line_count 行的拆分文件。
  3. # -d 使用数字后缀而不是字母后缀。
  4. # -a [suffix_length] 使用 suffix_length 个字母来形成文件名的后缀。

你可以使用 HTTP API 导入数据,如写入数据部分所述。 下方提供的 Python 脚本将帮助你从文件中读取数据并将其导入 GreptimeDB。

创建一个名为 ingest.py 的 Python 文件,确保你使用的是 Python 3.9 或更高版本,然后将以下代码复制并粘贴到其中。

  1. import os
  2. import sys
  3. import subprocess
  4. def process_file(file_path, url, token):
  5. print("Ingesting file:", file_path)
  6. curl_command = ['curl', '-i',
  7. '-H', "authorization: token {}".format(token),
  8. '-X', "POST",
  9. '--data-binary', "@{}".format(file_path),
  10. url]
  11. print(" ".join(curl_command))
  12. attempts = 0
  13. while attempts < 3: # 最多重试三次
  14. result = subprocess.run(curl_command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  15. print(result)
  16. # 检查 curl 命令输出中是否有任何警告或错误
  17. output = result.stderr.lower()
  18. if "warning" in output or "error" in output:
  19. print("Warnings or errors detected. Retrying...")
  20. attempts += 1
  21. else:
  22. break
  23. if attempts == 3:
  24. print("Request failed after 3 attempts. Giving up.")
  25. sys.exit(1)
  26. def process_directory(directory, url, token):
  27. file_names = []
  28. # 遍历目录
  29. for root, dirs, files in os.walk(directory):
  30. for file in files:
  31. file_path = os.path.join(root, file)
  32. file_names.append(file_path)
  33. # 对文件名数组进行排序
  34. file_names.sort()
  35. # 处理每个文件
  36. for file_name in file_names:
  37. process_file(file_name, url, token)
  38. # 检查是否提供了参数
  39. if len(sys.argv) < 4:
  40. print("Please provide the directory path as the first argument, the url as the second argument and the token as the third argument.")
  41. sys.exit(1)
  42. directory_path = sys.argv[1]
  43. url = sys.argv[2]
  44. token = sys.argv[3]
  45. # 调用函数处理目录
  46. process_directory(directory_path, url, token)

假如你的工作目录树如下:

  1. .
  2. ├── ingest.py
  3. └── slices
  4. ├── data.0000000000
  5. ├── data.0000000001
  6. ├── data.0000000002

在当前目录执行 Python 脚本并等待数据导入完成。

  1. python3 ingest.py slices http://<greptimedb-host>:4000/v1/influxdb/write?db=<db-name> <token>