Writing Data to MatrixOne Using Spark

Overview

Apache Spark is a distributed computing engine designed to process large-scale data efficiently. It employs distributed parallel computing to distribute tasks like data splitting, computation, and merging across multiple machines, thereby achieving efficient data processing and analysis.

Scenarios

  • Large-Scale Data Processing and Analysis

    Spark can handle massive volumes of data, improving processing efficiency through parallel computing tasks. It is widely used in data processing and analysis in various sectors like finance, telecommunications, healthcare, and more.

  • Stream Data Processing

    Spark Streaming allows real-time data stream processing, transforming it into batch-processing data for analysis and storage. This is particularly valuable in real-time data analysis scenarios like online advertising and network security.

  • Machine Learning

    Spark provides a machine learning library (MLlib) supporting various machine learning algorithms and model training for applications such as recommendation systems and image recognition.

  • Graph Computing

    Spark’s graph computing library (GraphX) supports various graph computing algorithms, making it suitable for graph analysis scenarios like social network analysis and recommendation systems.

This document introduces two examples of using the Spark computing engine to write bulk data into MatrixOne. One example covers migrating data from MySQL to MatrixOne, and the other involves writing Hive data into MatrixOne.

Before you start

Hardware Environment

The hardware requirements for this practice are as follows:

Server NameServer IPInstalled SoftwareOperating System
node1192.168.146.10MatrixOneDebian11.1 x86
node3192.168.146.11IDEA, MYSQL, Hadoop, HiveWindows 10

Software Environment

This practice requires the installation and deployment of the following software environments:

Example 1: Migrating Data from MySQL to MatrixOne

Step 1: Initialize the Project

  1. Start IDEA, click File > New > Project, select Spring Initializer, and fill in the following configuration parameters:

    • Name:mo-spark-demo
    • Location:~\Desktop
    • Language:Java
    • Type:Maven
    • Group:com.example
    • Artiface:matrixone-spark-demo
    • Package name:com.matrixone.demo
    • JDK 1.8

    Writing Batch data to MatrixOne Using Spark - 图1

  2. Add project dependencies and edit the content of pom.xml in the project root directory as follows:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.example.mo</groupId>
  7. <artifactId>mo-spark-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <spark.version>3.2.1</spark.version>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.apache.spark</groupId>
  17. <artifactId>spark-sql_2.12</artifactId>
  18. <version>${spark.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.spark</groupId>
  22. <artifactId>spark-hive_2.12</artifactId>
  23. <version>${spark.version}</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.spark</groupId>
  27. <artifactId>spark-catalyst_2.12</artifactId>
  28. <version>${spark.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.spark</groupId>
  32. <artifactId>spark-core_2.12</artifactId>
  33. <version>${spark.version}</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.codehaus.jackson</groupId>
  37. <artifactId>jackson-core-asl</artifactId>
  38. <version>1.9.13</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.codehaus.jackson</groupId>
  42. <artifactId>jackson-mapper-asl</artifactId>
  43. <version>1.9.13</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>mysql</groupId>
  47. <artifactId>mysql-connector-java</artifactId>
  48. <version>8.0.16</version>
  49. </dependency>
  50. </dependencies>
  51. </project>

Step 2: Read MatrixOne Data

After connecting to MatrixOne using the MySQL client, create the necessary database and data tables for the demonstration.

  1. Create a database, tables and import data in MatrixOne:

    1. CREATE DATABASE test;
    2. USE test;
    3. CREATE TABLE `person` (`id` INT DEFAULT NULL, `name` VARCHAR(255) DEFAULT NULL, `birthday` DATE DEFAULT NULL);
    4. INSERT INTO test.person (id, name, birthday) VALUES(1, 'zhangsan', '2023-07-09'),(2, 'lisi', '2023-07-08'),(3, 'wangwu', '2023-07-12');
  2. In IDEA, create the MoRead.java class to read MatrixOne data using Spark:

    1. package com.matrixone.spark;
    2. import org.apache.spark.sql.Dataset;
    3. import org.apache.spark.sql.Row;
    4. import org.apache.spark.sql.SQLContext;
    5. import org.apache.spark.sql.SparkSession;
    6. import java.util.Properties;
    7. /**
    8. * @auther MatrixOne
    9. * @desc read the MatrixOne data
    10. */
    11. public class MoRead {
    12. // parameters
    13. private static String master = "local[2]";
    14. private static String appName = "mo_spark_demo";
    15. private static String srcHost = "192.168.146.10";
    16. private static Integer srcPort = 6001;
    17. private static String srcUserName = "root";
    18. private static String srcPassword = "111";
    19. private static String srcDataBase = "test";
    20. private static String srcTable = "person";
    21. public static void main(String[] args) {
    22. SparkSession sparkSession = SparkSession.builder().appName(appName).master(master).getOrCreate();
    23. SQLContext sqlContext = new SQLContext(sparkSession);
    24. Properties properties = new Properties();
    25. properties.put("user", srcUserName);
    26. properties.put("password", srcPassword);
    27. Dataset<Row> dataset = sqlContext.read()
    28. .jdbc("jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase,srcTable, properties);
    29. dataset.show();
    30. }
    31. }
  3. Run MoRead.Main() in IDEA, the result is as below:

    Writing Batch data to MatrixOne Using Spark - 图2

Step 3: Write MySQL Data to MatrixOne

Now, you can begin migrating MySQL data to MatrixOne using Spark.

  1. Prepare MySQL data: On node3, use the MySQL client to connect to the local MySQL instance. Create the necessary database, tables, and insert data:

    1. mysql -h127.0.0.1 -P3306 -uroot -proot
    2. mysql> CREATE DATABASE test;
    3. mysql> USE test;
    4. mysql> CREATE TABLE `person` (`id` int DEFAULT NULL, `name` varchar(255) DEFAULT NULL, `birthday` date DEFAULT NULL);
    5. mysql> INSERT INTO test.person (id, name, birthday) VALUES(2, 'lisi', '2023-07-09'),(3, 'wangwu', '2023-07-13'),(4, 'zhaoliu', '2023-08-08');
  2. Clear MatrixOne table data:

    On node3, use the MySQL client to connect to the local MatrixOne instance. Since this example continues to use the test database from the previous MatrixOne data reading example, you need to clear the data from the person table first.

    1. -- On node3, use the MySQL client to connect to the local MatrixOne
    2. mysql -h192.168.146.10 -P6001 -uroot -p111
    3. mysql> TRUNCATE TABLE test.person;
  3. Write code in IDEA:

    Create the Person.java and Mysql2Mo.java classes to use Spark to read MySQL data. Refer to the following example for the Mysql2Mo.java class code:

  1. // Import necessary libraries
  2. import org.apache.spark.api.java.function.MapFunction;
  3. import org.apache.spark.sql.*;
  4. public class Mysql2Mo {
  5. // Define parameters
  6. private static String master = "local[2]";
  7. private static String appName = "app_spark_demo";
  8. private static String srcHost = "127.0.0.1";
  9. private static Integer srcPort = 3306;
  10. private static String srcUserName = "root";
  11. private static String srcPassword = "root";
  12. private static String srcDataBase = "motest";
  13. private static String srcTable = "person";
  14. private static String destHost = "192.168.146.10";
  15. private static Integer destPort = 6001;
  16. private static String destUserName = "root";
  17. private static String destPassword = "111";
  18. private static String destDataBase = "test";
  19. private static String destTable = "person";
  20. public static void main(String[] args) throws SQLException {
  21. SparkSession sparkSession = SparkSession.builder().appName(appName).master(master).getOrCreate();
  22. SQLContext sqlContext = new SQLContext(sparkSession);
  23. Properties connectionProperties = new Properties();
  24. connectionProperties.put("user", srcUserName);
  25. connectionProperties.put("password", srcPassword);
  26. connectionProperties.put("driver", "com.mysql.cj.jdbc.Driver");
  27. // Define the JDBC URL
  28. String url = "jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase + "?characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai";
  29. // Read table contents using Spark JDBC
  30. System.out.println("Reading data from the 'person' table in the database");
  31. Dataset<Row> rowDataset = sqlContext.read().jdbc(url, srcTable, connectionProperties).select("*");
  32. // Apply transformations to the data (filter records where id > 2 and add 'spark_' prefix to 'name' field)
  33. Dataset<Row> dataset = rowDataset.filter("id > 2")
  34. .map((MapFunction<Row, Row>) row -> RowFactory.create(row.getInt(0), "spark_" + row.getString(1), row.getDate(2)), RowEncoder.apply(rowDataset.schema()));
  35. // Specify connection properties for writing the data
  36. Properties properties = new Properties();
  37. properties.put("user", destUserName);
  38. properties.put("password", destPassword);
  39. dataset.write()
  40. .mode(SaveMode.Append)
  41. .jdbc("jdbc:mysql://" + destHost + ":" + destPort + "/" + destDataBase, destTable, properties);
  42. }
  43. }

In the above code, a simple ETL operation is performed (filtering data where id > 2 and adding the prefix “spark_“ to the ‘name’ field) and the processed data is written to the MatrixOne database.

Step 4: View the Execution Results

Execute the following SQL in MatrixOne to view the execution results:

  1. select * from test.person;
  2. +------+---------------+------------+
  3. | id | name | birthday |
  4. +------+---------------+------------+
  5. | 3 | spark_wangwu | 2023-07-12 |
  6. | 4 | spark_zhaoliu | 2023-08-07 |
  7. +------+---------------+------------+
  8. 2 rows in set (0.01 sec)

Example 2: Importing Hive Data into MatrixOne

Step 1: Initialize the Project

  1. Launch IDEA and click File > New > Project. Select Spring Initializer and fill in the following configuration parameters:

    • Name: mo-spark-demo
    • Location: ~\Desktop
    • Language: Java
    • Type: Maven
    • Group: com.example
    • Artifact: matrixone-spark-demo
    • Package name: com.matrixone.demo
    • JDK: 1.8

    Project Initialization

  2. Add project dependencies. Edit the contents of the pom.xml file in the project’s root directory as follows:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.example.mo</groupId>
  7. <artifactId>mo-spark-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <spark.version>3.2.1</spark.version>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.apache.spark</groupId>
  17. <artifactId>spark-sql_2.12</artifactId>
  18. <version>${spark.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.spark</groupId>
  22. <artifactId>spark-hive_2.12</artifactId>
  23. <version>${spark.version}</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.spark</groupId>
  27. <artifactId>spark-catalyst_2.12</artifactId>
  28. <version>${spark.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.spark</groupId>
  32. <artifactId>spark-core_2.12</artifactId>
  33. <version>${spark.version}</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.codehaus.jackson</groupId>
  37. <artifactId>jackson-core-asl</artifactId>
  38. <version>1.9.13</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.codehaus.jackson</groupId>
  42. <artifactId>jackson-mapper-asl</artifactId>
  43. <version>1.9.13</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>mysql</groupId>
  47. <artifactId>mysql-connector-java</artifactId>
  48. <version>8.0.16</version>
  49. </dependency>
  50. </dependencies>
  51. </project>

Step 2: Prepare Hive Data

Execute the following commands in a terminal window to create a Hive database, data table, and insert data:

  1. hive
  2. hive> create database motest;
  3. hive> CREATE TABLE `users`(
  4. `id` int,
  5. `name` varchar(255),
  6. `age` int);
  7. hive> INSERT INTO motest.users (id, name, age) VALUES(1, 'zhangsan', 12),(2, 'lisi', 17),(3, 'wangwu', 19);

Step 3: Create MatrixOne Data Table

Connect to the local MatrixOne using the MySQL client on node3. Continue using the previously created “test” database and create a new data table called “users.”

  1. CREATE TABLE `users` (
  2. `id` INT DEFAULT NULL,
  3. `name` VARCHAR(255) DEFAULT NULL,
  4. `age` INT DEFAULT NULL
  5. )

Step 4: Copy Configuration Files

Copy the following three configuration files from the Hadoop root directory, “etc/hadoop/core-site.xml” and “hdfs-site.xml,” and from the Hive root directory, “conf/hive-site.xml,” to the “resource” directory of your project.

Configuration Files

Step 5: Write Code

In IntelliJ IDEA, create a class named “Hive2Mo.java” to read data from Hive using Spark and write it to MatrixOne.

  1. package com.matrixone.spark;
  2. import org.apache.spark.sql.*;
  3. import java.sql.SQLException;
  4. import java.util.Properties;
  5. public class Hive2Mo {
  6. // Parameters
  7. private static String master = "local[2]";
  8. private static String appName = "app_spark_demo";
  9. private static String destHost = "192.168.146.10";
  10. private static Integer destPort = 6001;
  11. private static String destUserName = "root";
  12. private static String destPassword = "111";
  13. private static String destDataBase = "test";
  14. private static String destTable = "users";
  15. public static void main(String[] args) throws SQLException {
  16. SparkSession sparkSession = SparkSession.builder()
  17. .appName(appName)
  18. .master(master)
  19. .enableHiveSupport()
  20. .getOrCreate();
  21. System.out.println("Reading data from the Hive table");
  22. Dataset<Row> rowDataset = sparkSession.sql("select * from motest.users");
  23. Properties properties = new Properties();
  24. properties.put("user", destUserName);
  25. properties.put("password", destPassword);
  26. rowDataset.write()
  27. .mode(SaveMode.Append)
  28. .jdbc("jdbc:mysql://" + destHost + ":" + destPort + "/" + destDataBase, destTable, properties);
  29. }
  30. }

Step 6: View the Execution Result

Execute the following SQL in MatrixOne to view the execution result.

  1. mysql> select * from test.users;
  2. +------+----------+------+
  3. | id | name | age |
  4. +------+----------+------+
  5. | 1 | zhangsan | 12 |
  6. | 2 | lisi | 17 |
  7. | 3 | wangwu | 19 |
  8. +------+----------+------+
  9. 3 rows in set (0.00 sec)