导入SST文件数据

本文以一个示例说明如何将数据源的数据生成SST(Sorted String Table)文件并保存在HDFS上,然后导入Nebula Graph,示例数据源是CSV文件。

注意事项

  • 仅Linux系统支持导入SST文件。

  • 不支持属性的Default值。

背景信息

Exchange支持两种数据导入模式:

  • 直接将数据源的数据通过nGQL语句的形式导入Nebula Graph。

  • 将数据源的数据生成SST文件,然后借助Console将SST文件导入Nebula Graph。

下文将介绍生成SST文件并用其导入数据的适用场景、实现方法、前提条件、操作步骤等内容。

适用场景

  • 适合在线业务,因为生成时几乎不会影响业务(只是读取Schema),导入速度快。

    Caution

    虽然导入速度快,但是导入期间(大约10秒)会阻塞对应空间的写操作,建议在业务低峰期进行导入。

  • 适合数据源数据量较大的场景,导入速度快。

实现方法

Nebula Graph底层使用RocksDB作为键值型存储引擎。RocksDB是基于硬盘的存储引擎,提供了一系列API用于创建及导入SST格式的文件,有助于快速导入海量数据。

SST文件是一个内部包含了任意长度的有序键值对集合的文件,用于高效地存储大量键值型数据。生成SST文件的整个过程主要由Exchange的Reader、sstProcessor和sstWriter完成。整个数据处理过程如下:

  1. Reader从数据源中读取数据。

  2. sstProcessor根据Nebula Graph的Schema信息生成SST文件,然后上传至HDFS。SST文件的格式请参见数据存储格式

  3. sstWriter打开一个文件并插入数据。生成SST文件时,Key必须按照顺序写入。

  4. 生成SST文件之后,RocksDB通过IngestExternalFile()方法将SST文件导入到Nebula Graph中。例如:

    1. IngestExternalFileOptions ifo;
    2. # 导入两个SST文件
    3. Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo);
    4. if (!s.ok()) {
    5. printf("Error while adding file %s and %s, Error %s\n",
    6. file_path1.c_str(), file_path2.c_str(), s.ToString().c_str());
    7. return 1;
    8. }

    调用IngestExternalFile()方法时,RocksDB默认会将文件拷贝到数据目录,并且阻塞RocksDB写入操作。如果SST文件中的键范围覆盖了Memtable键的范围,则将Memtable落盘(flush)到硬盘。将SST文件放置在LSM树最优位置后,为文件分配一个全局序列号,并打开写操作。

数据集

本文以basketballplayer数据集为例。

环境配置

本文示例在MacOS下完成,以下是相关的环境配置信息:

  • 硬件规格:

    • CPU:1.7 GHz Quad-Core Intel Core i7
    • 内存:16 GB
  • Spark:2.4.7 单机版

  • Hadoop:2.9.2 伪分布式部署

  • Nebula Graph:2.6.1。

前提条件

开始导入数据之前,用户需要确认以下信息:

  • 已经安装部署Nebula Graph 2.6.1并获取如下信息:

    • Graph服务和Meta服务的的IP地址和端口。

    • 拥有Nebula Graph写权限的用户名和密码。

    • Meta服务配置文件中的--ws_storage_http_port和Storage服务配置文件中的--ws_http_port一致。例如都为19779

    • Graph服务配置文件中的--ws_meta_http_port和Meta服务配置文件中的--ws_http_port一致。例如都为19559

    • Schema的信息,包括Tag和Edge type的名称、属性等。

  • 已经编译Exchange,或者直接下载编译完成的.jar文件。本示例中使用Exchange 2.6.0。

  • 已经安装Spark。

  • 已经安装JDK 1.8或以上版本,并配置环境变量JAVA_HOME。

  • 确认Hadoop服务在所有部署Storage服务的机器上运行正常。

    Note

    • 如果需要生成其他数据源的SST文件,请参见相应数据源的文档,查看前提条件部分。

    • 如果只需要生成SST文件,不需要在部署Storage服务的机器上安装Hadoop服务。

操作步骤

步骤 1:在Nebula Graph中创建Schema

分析CSV文件中的数据,按以下步骤在Nebula Graph中创建Schema:

  1. 确认Schema要素。Nebula Graph中的Schema要素如下表所示。

    要素名称属性
    Tagplayername string, age int
    Tagteamname string
    Edge Typefollowdegree int
    Edge Typeservestart_year int, end_year int
  2. 使用Nebula Console创建一个图空间basketballplayer,并创建一个Schema,如下所示。

    1. ## 创建图空间
    2. nebula> CREATE SPACE basketballplayer \
    3. (partition_num = 10, \
    4. replica_factor = 1, \
    5. vid_type = FIXED_STRING(30));
    6. ## 选择图空间basketballplayer
    7. nebula> USE basketballplayer;
    8. ## 创建Tag player
    9. nebula> CREATE TAG player(name string, age int);
    10. ## 创建Tag team
    11. nebula> CREATE TAG team(name string);
    12. ## 创建Edge type follow
    13. nebula> CREATE EDGE follow(degree int);
    14. ## 创建Edge type serve
    15. nebula> CREATE EDGE serve(start_year int, end_year int);

更多信息,请参见快速开始

步骤 2:处理CSV文件

确认以下信息:

  1. 处理CSV文件以满足Schema的要求。

    Note

    可以使用有表头或者无表头的CSV文件。

  2. 获取CSV文件存储路径。

步骤 3:修改配置文件

编译Exchange后,复制target/classes/application.conf文件设置相关配置。在本示例中,复制的文件名为sst_application.conf。各个配置项的详细说明请参见配置说明

  1. {
  2. # Spark相关配置
  3. spark: {
  4. app: {
  5. name: Nebula Exchange 2.0
  6. }
  7. master:local
  8. driver: {
  9. cores: 1
  10. maxResultSize: 1G
  11. }
  12. executor: {
  13. memory:1G
  14. }
  15. cores:{
  16. max: 16
  17. }
  18. }
  19. # Nebula Graph相关配置
  20. nebula: {
  21. address:{
  22. graph:["127.0.0.1:9669"]
  23. meta:["127.0.0.1:9559"]
  24. }
  25. user: root
  26. pswd: nebula
  27. space: basketballplayer
  28. # SST文件相关配置
  29. path:{
  30. # 本地临时存放生成的SST文件的目录
  31. local:"/tmp"
  32. # SST文件在HDFS的存储路径
  33. remote:"/sst"
  34. # HDFS的NameNode地址
  35. hdfs.namenode: "hdfs://*.*.*.*:9000"
  36. }
  37. # 客户端连接参数
  38. connection {
  39. # socket连接、执行的超时时间,单位:毫秒。
  40. timeout: 30000
  41. }
  42. error: {
  43. # 最大失败数,超过后会退出应用程序。
  44. max: 32
  45. # 失败的导入作业将记录在输出路径中。
  46. output: /tmp/errors
  47. }
  48. # 使用谷歌的RateLimiter来限制发送到NebulaGraph的请求。
  49. rate: {
  50. # RateLimiter的稳定吞吐量。
  51. limit: 1024
  52. # 从RateLimiter获取允许的超时时间,单位:毫秒
  53. timeout: 1000
  54. }
  55. }
  56. # 处理点
  57. tags: [
  58. # 设置Tag player相关信息。
  59. {
  60. # 指定Nebula Graph中定义的Tag名称。
  61. name: player
  62. type: {
  63. # 指定数据源,使用CSV。
  64. source: csv
  65. # 指定如何将点数据导入Nebula Graph:Client或SST。
  66. sink: sst
  67. }
  68. # 指定CSV文件的路径。
  69. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
  70. path: "hdfs://*.*.*.*:9000/dataset/vertex_player.csv"
  71. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
  72. # 如果CSV文件有表头,则使用实际的列名。
  73. fields: [_c1, _c2]
  74. # 指定Nebula Graph中定义的属性名称。
  75. # fields与nebula.fields的顺序必须一一对应。
  76. nebula.fields: [age, name]
  77. # 指定一个列作为VID的源。
  78. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
  79. # 目前,Nebula Graph 2.6.1仅支持字符串或整数类型的VID。
  80. vertex: {
  81. field:_c0
  82. }
  83. # 指定的分隔符。默认值为英文逗号(,)。
  84. separator: ","
  85. # 如果CSV文件有表头,请将header设置为true。
  86. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
  87. header: false
  88. # 指定单批次写入Nebula Graph的最大点数量。
  89. batch: 256
  90. # 指定Spark分片数量。
  91. partition: 32
  92. }
  93. # 设置Tag team相关信息。
  94. {
  95. # 指定Nebula Graph中定义的Tag名称。
  96. name: team
  97. type: {
  98. # 指定数据源,使用CSV。
  99. source: csv
  100. # 指定如何将点数据导入Nebula Graph:Client或SST。
  101. sink: sst
  102. }
  103. # 指定CSV文件的路径。
  104. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
  105. path: "hdfs://*.*.*.*:9000/dataset/vertex_team.csv"
  106. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
  107. # 如果CSV文件有表头,则使用实际的列名。
  108. fields: [_c1]
  109. # 指定Nebula Graph中定义的属性名称。
  110. # fields与nebula.fields的顺序必须一一对应。
  111. nebula.fields: [name]
  112. # 指定一个列作为VID的源。
  113. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
  114. # 目前,Nebula Graph 2.6.1仅支持字符串或整数类型的VID。
  115. vertex: {
  116. field:_c0
  117. }
  118. # 指定的分隔符。默认值为英文逗号(,)。
  119. separator: ","
  120. # 如果CSV文件有表头,请将header设置为true。
  121. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
  122. header: false
  123. # 指定单批次写入Nebula Graph的最大点数量。
  124. batch: 256
  125. # 指定Spark分片数量。
  126. partition: 32
  127. }
  128. # 如果需要添加更多点,请参考前面的配置进行添加。
  129. ]
  130. # 处理边
  131. edges: [
  132. # 设置Edge type follow相关信息。
  133. {
  134. # 指定Nebula Graph中定义的Edge type名称。
  135. name: follow
  136. type: {
  137. # 指定数据源,使用CSV。
  138. source: csv
  139. # 指定如何将点数据导入Nebula Graph:Client或SST。
  140. sink: sst
  141. }
  142. # 指定CSV文件的路径。
  143. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
  144. path: "hdfs://*.*.*.*:9000/dataset/edge_follow.csv"
  145. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
  146. # 如果CSV文件有表头,则使用实际的列名。
  147. fields: [_c2]
  148. # 指定Nebula Graph中定义的属性名称。
  149. # fields与nebula.fields的顺序必须一一对应。
  150. nebula.fields: [degree]
  151. # 指定一个列作为起始点和目的点的源。
  152. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
  153. # 目前,Nebula Graph 2.6.1仅支持字符串或整数类型的VID。
  154. source: {
  155. field: _c0
  156. }
  157. target: {
  158. field: _c1
  159. }
  160. # 指定的分隔符。默认值为英文逗号(,)。
  161. separator: ","
  162. # 指定一个列作为rank的源(可选)。
  163. #ranking: rank
  164. # 如果CSV文件有表头,请将header设置为true。
  165. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
  166. header: false
  167. # 指定单批次写入Nebula Graph的最大边数量。
  168. batch: 256
  169. # 指定Spark分片数量。
  170. partition: 32
  171. }
  172. # 设置Edge type serve相关信息。
  173. {
  174. # 指定Nebula Graph中定义的Edge type名称。
  175. name: serve
  176. type: {
  177. # 指定数据源,使用CSV。
  178. source: csv
  179. # 指定如何将点数据导入Nebula Graph:Client或SST。
  180. sink: sst
  181. }
  182. # 指定CSV文件的路径。
  183. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
  184. path: "hdfs://*.*.*.*:9000/dataset/edge_serve.csv"
  185. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
  186. # 如果CSV文件有表头,则使用实际的列名。
  187. fields: [_c2,_c3]
  188. # 指定Nebula Graph中定义的属性名称。
  189. # fields与nebula.fields的顺序必须一一对应。
  190. nebula.fields: [start_year, end_year]
  191. # 指定一个列作为起始点和目的点的源。
  192. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
  193. # 目前,Nebula Graph 2.6.1仅支持字符串或整数类型的VID。
  194. source: {
  195. field: _c0
  196. }
  197. target: {
  198. field: _c1
  199. }
  200. # 指定的分隔符。默认值为英文逗号(,)。
  201. separator: ","
  202. # 指定一个列作为rank的源(可选)。
  203. #ranking: _c5
  204. # 如果CSV文件有表头,请将header设置为true。
  205. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
  206. header: false
  207. # 指定单批次写入Nebula Graph的最大边数量。
  208. batch: 256
  209. # 指定Spark分片数量。
  210. partition: 32
  211. }
  212. ]
  213. # 如果需要添加更多边,请参考前面的配置进行添加。
  214. }

步骤 4:生成SST文件

运行如下命令将CSV源文件生成为SST文件。关于参数的说明,请参见命令参数

  1. ${SPARK_HOME}/bin/spark-submit --master "local" --conf spark.sql.shuffle.partition=<shuffle_concurrency> --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-2.6.0.jar_path> -c <sst_application.conf_path>

Note

生成SST文件时,会涉及到Spark的shuffle操作,请注意在提交命令中增加spark.sql.shuffle.partition的配置。

Note

JAR包有两种获取方式:自行编译或者从maven仓库下载。

示例:

  1. ${SPARK_HOME}/bin/spark-submit --master "local" --conf spark.sql.shuffle.partition=200 --class com.vesoft.nebula.exchange.Exchange /root/nebula-exchange/nebula-exchange/target/nebula-exchange-2.6.0.jar -c /root/nebula-exchange/nebula-exchange/target/classes/sst_application.conf

任务执行完成后,可以在HDFS上的/sst目录(nebula.path.remote参数指定)内查看到生成的SST文件。

Note

如果对Schema有修改操作,例如重建图空间、修改Tag、修改Edge type等,需要重新生成SST文件,因为SST文件会验证Space ID、Tag ID、Edge ID等信息。

步骤 5:导入SST文件

Note

导入前请确认以下信息:

  • 确认所有部署Storage服务的机器上都已部署Hadoop服务,并配置HADOOP_HOME和JAVA_HOME。

  • Meta服务配置文件中的--ws_storage_http_port(如果没有,请手动添加)和Storage服务配置文件中的--ws_http_port一致。例如都为19779

  • Graph服务配置文件中的--ws_meta_http_port(如果没有,请手动添加)和Meta服务配置文件中的--ws_http_port一致。例如都为19559

使用客户端工具连接Nebula Graph数据库,按如下操作导入SST文件:

  1. 执行命令选择之前创建的图空间。

    1. nebula> USE basketballplayer;
  2. 执行命令下载SST文件:

    1. nebula> DOWNLOAD HDFS "hdfs://<hadoop_address>:<hadoop_port>/<sst_file_path>";

    示例:

    1. nebula> DOWNLOAD HDFS "hdfs://*.*.*.*:9000/sst";
  3. 执行命令导入SST文件:

    1. nebula> INGEST;

Note

  • 如果需要重新下载,请在Nebula Graph安装路径内的data/storage/nebula目录内,将对应Space ID目录内的download文件夹删除,然后重新下载SST文件。如果图空间是多副本,保存副本的所有机器都需要删除download文件夹。

  • 如果导入时出现问题需要重新导入,重新执行INGEST;即可。

步骤 6:(可选)验证数据

用户可以在Nebula Graph客户端(例如Nebula Graph Studio)中执行查询语句,确认数据是否已导入。例如:

  1. GO FROM "player100" OVER follow;

用户也可以使用命令SHOW STATS查看统计数据。

步骤 7:(如有)在Nebula Graph中重建索引

导入数据后,用户可以在Nebula Graph中重新创建并重建索引。详情请参见索引介绍


最后更新: November 2, 2021