Spark Writer

概述

Spark Writer 是 Nebula Graph 基于 Spark 的分布式数据导入工具,能够将多种数据仓库中的数据转化为图的点和边,并批量导入到图数据库中。目前支持的数据仓库有:

  • HDFS,包括 Parquet、JSON、ORC 和 CSV 格式的文件
  • HIVE

Spark Writer 支持并发导入多个 tag、edge,支持不同 tag/edge 配置不同的数据仓库。

软件要求

注意: 为确保 Nebula Graph Spark Writer 正常使用,请确保你的机器已安装:

  • Spark 2.0 及以上版本
  • Hive 2.3 及以上版本
  • Hadoop 2.0 及以上版本

获取 Spark Writer

编译源码

  1. git clone https://github.com/vesoft-inc/nebula.git
  2. cd nebula/src/tools/spark-sstfile-generator
  3. mvn compile package

或者直接下载

从云存储 OSS 下载

  1. wget https://oss-cdn.nebula-graph.com.cn/jar-packages/sst.generator-1.2.1.jar

使用流程

基本流程分为以下几步:

  1. 在 Nebula Graph 中创建图模型,构图
  2. 编写数据文件
  3. 编写输入源映射文件
  4. 导入数据

构图

构图请参考快速试用中的示例构图。

注意:请先在 Nebula Graph 中完成构图(创建图空间和定义图数据 Schema),再通过本工具向 Nebula Graph 中写入数据。

数据示例

顶点数据文件由一行一行的数据组成,文件中每一行表示一个点和它的属性。一般来说,第一列为点的 ID ——此列的名称将在后文的映射文件中指定,其他列为点的属性。

  • player 顶点数据
  1. {"id":100,"name":"Tim Duncan","age":42}
  2. {"id":101,"name":"Tony Parker","age":36}
  3. {"id":102,"name":"LaMarcus Aldridge","age":33}

边数据文件由一行一行的数据组成,文件中每一行表示一条边和它的属性。一般来说,第一列为起点 ID,第二列为终点 ID,起点 ID 列及终点 ID 列会在映射文件中指定。其他列为边属性。下面以 JSON 格式为例进行说明。

以边 边 follow 的数据为例:

  • 无 rank 的边
  1. {"source":100,"target":101,"likeness":95}
  2. {"source":101,"target":100,"likeness":95}
  3. {"source":101,"target":102,"likeness":90}
  • 有 rank 的边
  1. {"source":100,"target":101,"likeness":95,"ranking":2}
  2. {"source":101,"target":100,"likeness":95,"ranking":1}
  3. {"source":101,"target":102,"likeness":90,"ranking":3}

含有地理位置 Geo 的数据

Spark Writer 支持 Geo 数据导入,Geo 数据用 latitudelongitude 字段描述经纬度,数据类型为 double。

  1. {"latitude":30.2822095,"longitude":120.0298785,"target":0,"dp_poi_name":"0"}
  2. {"latitude":30.2813834,"longitude":120.0208692,"target":1,"dp_poi_name":"1"}
  3. {"latitude":30.2807347,"longitude":120.0181162,"target":2,"dp_poi_name":"2"}
  4. {"latitude":30.2812694,"longitude":120.0164896,"target":3,"dp_poi_name":"3"}

数据源文件

目前 Spark Writer 支持的数据源有:

  • HDFS
  • HIVE
HDFS 文件

支持的文件格式包括:

  • Parquet
  • JSON
  • CSV
  • ORC

Player 的 Parquet 示例如下:

  1. +---+---+------------+
  2. |age| id| name|
  3. +---+---+------------+
  4. | 42|100| Tim Duncan |
  5. | 36|101| Tony Parker|
  6. +---+---+------------+

JSON 示例如下:

  1. {"id":100,"name":"Tim Duncan","age":42}
  2. {"id":101,"name":"Tony Parker","age":36}

CSV 示例如下:

  1. age,id,name
  2. 42,100,Tim Duncan
  3. 36,101,Tony Parker
数据库

Spark Writer 支持以数据库作为数据源,目前支持 HIVE。

Player 表结构如下:

col_namedata_typecomment
idint
namestring
ageint

编写配置文件

配置文件由 Spark 相关信息,Nebula 相关信息,以及 tags 映射 和 edges 映射块组成。Spark 信息配置了 Spark 运行的相关参数,Nebula 相关信息配置了连接 Nebula Graph 的用户名和密码等信息。 tags 映射和 edges 映射分别对应多个 tag/edge 的输入源映射,描述每个 tag/edge 的数据源等基本信息,不同 tag/edge 可以来自不同数据源。

输入源的映射文件示例:

  1. {
  2. # Spark 相关信息配置
  3. # 参见: http://spark.apache.org/docs/latest/configuration.html
  4. spark: {
  5. app: {
  6. name: Spark Writer
  7. }
  8. driver: {
  9. cores: 1
  10. maxResultSize: 1G
  11. }
  12. cores {
  13. max: 16
  14. }
  15. }
  16. # Nebula Graph 相关信息配置
  17. nebula: {
  18. # 查询引擎 IP 列表
  19. addresses: ["127.0.0.1:3699"]
  20. # 连接 Nebula Graph 服务的用户名和密码
  21. user: user
  22. pswd: password
  23. # Nebula Graph 图空间名称
  24. space: test
  25. # thrift 超时时长及重试次数
  26. # 如未设置,则默认值分别为 3000 和 3
  27. connection {
  28. timeout: 3000
  29. retry: 3
  30. }
  31. # nGQL 查询重试次数
  32. # 如未设置,则默认值为 3
  33. execution {
  34. retry: 3
  35. }
  36. }
  37. # 处理标签
  38. tags: [
  39. # 从 HDFS 文件加载数据, 此处数据类型为 Parquet
  40. # tag 名称为 tag name 0
  41. # HDFS Parquet 文件的中的 field_0、field_1、field_2 将写入 tag_name_0
  42. # 节点列为 vertex_key_field
  43. {
  44. name: tag_name_0
  45. type: parquet
  46. path: hdfs path
  47. fields: {
  48. field_0: nebula_field_0,
  49. field_1: nebula_field_1,
  50. field_2: nebula_field_2
  51. }
  52. vertex: vertex_key_field
  53. batch : 16
  54. }
  55. # 与上述类似
  56. # 从 Hive 加载将执行命令 $ {exec} 作为数据集
  57. {
  58. name: tag_name_1
  59. type: hive
  60. exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
  61. fields: {
  62. hive_field_0: nebula_field_0,
  63. hive_field_1: nebula_field_1,
  64. hive_field_2: nebula_field_2
  65. }
  66. vertex: vertex_id_field
  67. }
  68. ]
  69. # 处理边
  70. edges: [
  71. # 从 HDFS 加载数据,数据类型为 JSON
  72. # 边名称为 edge_name_0
  73. # HDFS JSON 文件中的 field_0、field_1、field 2 将被写入 edge_name_0
  74. # 起始字段为 source_field,终止字段为 target_field ,边权重字段为 ranking_field。
  75. {
  76. name: edge_name_0
  77. type: json
  78. path: hdfs_path
  79. fields: {
  80. field_0: nebula_field_0,
  81. field_1: nebula_field_1,
  82. field_2: nebula_field_2
  83. }
  84. source: source_field
  85. target: target_field
  86. ranking: ranking_field
  87. }
  88. # 从 Hive 加载将执行命令 $ {exec} 作为数据集
  89. # 边权重为可选
  90. {
  91. name: edge_name_1
  92. type: hive
  93. exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
  94. fields: {
  95. hive_field_0: nebula_field_0,
  96. hive_field_1: nebula_field_1,
  97. hive_field_2: nebula_field_2
  98. }
  99. source: source_id_field
  100. target: target_id_field
  101. }
  102. ]
  103. }

Spark 配置信息

下表给出了一些示例,所有可配置项请见 Spark Available Properties

字段默认值是否必须说明
spark.app.nameSpark Writerapp 名称
spark.driver.cores1驱动程序进程的核数,仅适用于群集模式
spark.driver.maxResultSize1G每个 Spark 操作(例如收集)中所有分区的序列化结果的上限(以字节为单位)。至少应为 1M,否则应为 0(无限制)
spark.cores.max(not set)当以“粗粒度”共享模式在独立部署群集或 Mesos 群集上运行时,跨群集(而非从每台计算机)请求应用程序的最大 CPU 核数。如果未设置,则默认值为 Spark 的独立集群管理器上的 spark.deploy.defaultCores 或 Mesos 上的 infinite(所有可用的内核)

Nebula Graph 配置信息

字段默认值是否必须说明
nebula.addresses查询引擎的地址列表,逗号分隔
nebula.user数据库用户名,默认为 user
nebula.pswd数据库用户名对应密码,默认 user 密码为 password
nebula.space导入数据对应的 space,本例中为 test
nebula.connection.timeout3000Thrift 连接超时时间
nebula.connection.retry3Thrift 连接重试次数
nebula.execution.retry3nGQL 语句执行重试次数

tags 和 edges 映射信息

tag 和 edge 映射的选项比较类似。下面先介绍相同的选项,再分别介绍 tag 映射edge 映射的特有选项。

  • 相同的选项

    • type 指定上文中提到的数据类型,目前支持 “Parquet”、”JSON”、”ORC” 和 “CSV”,大小写不敏感,必填
    • path 适用于 HDFS 数据源,指定HDFS 文件或目录的绝对路径,type 为 HDFS 时,必填
    • exec 适用于 Hive 数据源, 当执行查询语句 type 为 HIVE 时,必填
    • fields 将输入源列的列名映射为 tag / edge 的属性名,必填
  • tag 映射的特有选项

    • vertex 指定某一列作为点的 ID 列,必填
  • edge 映射的特有选项

    • source 指定输入源某一列作为源点的 ID 列,必填
    • target 指定某一列作为目标点的 ID 列,必填
    • 当插入边有 ranking 值, ranking 指定某一列作为边 ranking 列,选填

数据源映射

  • HDFS Parquet 文件
    • type 指定输入源类型,当为 parquet 时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HDFS JSON 文件
    • type 指定输入源类型,当为 JSON 时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HIVE ORC 文件
    • type 指定输入源类型,当为 ORC时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HIVE CSV 文件
    • type 指定输入源类型,当为 CSV 时大小写不敏感,必填
    • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
  • HIVE
    • type 指定输入源类型,当为 HIVE 时大小写不敏感,必填
    • exec 指定 HIVE 执行查询的语句,必填

执行命令导入数据

导入数据命令:

  1. bin/spark-submit \
  2. --class com.vesoft.nebula.tools.generator.v2.SparkClientGenerator \
  3. --master ${MASTER-URL} \
  4. ${SPARK_WRITER_JAR_PACKAGE} -c conf/test.conf -h -d

参数说明:

AbbreviationRequiredDefaultDescription示例
—classyes指定程序主类
—masteryes指定spark cluster master url,请参见 master-urlse.g. spark://23.195.26.187:7077
-c / —config yes上文所编写的配置文件路径
-h / —hivenofalse用于指定是否支持 Hive
-d / —directlynofalsetrue 为客户端方式插入;
false 为 sst 方式导入 (TODO)
-D / —drynofalse检查配置文件是否正确

性能测试结果

三台物理机 (56 核,250G 内存,万兆网,SSD),写 1 亿条数据(每条数据三个字段,每个 batch 64 条记录),用时 4 分钟(40万条/秒)。