MySQL to Iceberg Example

Here we use an example to introduce how to use Apache InLong creating MySQL -> Iceberg full database migration.

Deployment

Install InLong

Before we begin, we need to install InLong. Here we provide two ways:

Add Connectors

Download the connectors corresponding to Flink 1.13, and after decompression, place sort-connector-iceberg-[version]-SNAPSHOT.jar in /inlong-sort/connectors/ directory.

Install Iceberg

Please refer to the Installation Tutorial on the Apache Iceberg official website.

Cluster Initialize

When all containers are successfully started, you can access the InLong dashboard address http://localhost, and use the following default account to log in.

  1. User: admin
  2. Password: inlong

Create Cluster Tag

Click [Clusters] -> [ClusterTags] -> [Create] on the page to specify the cluster label name and responsible person. Create Cluster Tag

MySQL to Iceberg Example - 图2caution

default_cluster is the default ClusterTags reported by each component. If you decide to use a different name, make sure to update the corresponding tag configuration accordingly.

Register Pulsar Cluster

Click [Clusters] -> [Cluster] -> [Create] on the page to register Pulsar Cluster. Create Pulsar Cluster

MySQL to Iceberg Example - 图4note

The ClusterTags selects the newly created default_cluster, the Pulsar cluster deployed by docker:

Service URL is pulsar://pulsar:6650, Admin URL is http://pulsar:8080.

Register Iceberg DataNodes

Click [DataNodes] -> [Create] on the page to register Iceberg DataNodes. Create Iceberg DataNode

Create Task

Create Data Streams Group

Click [Synchronization] → [Create] on the page and input the Group ID, Stream ID and Full database migration: Create Group Stream

Create Data Source

In the data source, click [New] → [MySQL] to configure the source name, address, databases and tables information. Create Stream_Source

MySQL to Iceberg Example - 图8note

  • If the read mode is Full amount + Incremental, the existing data in the source table will also be collected, but the Incremental mode will not.
  • The table white list format is <dbName>.<tableName> and supports regular expressions.

Create Data Sink

In the data sink, click [New] → [Iceberg] to configure the sink name and created Iceberg data node. We can choose the data sink to have the same database table name as the data source, or customize it. Create data object

MySQL to Iceberg Example - 图10note

When customizing the names of database tables, you can use built-in parameters and string combinations to generate the target table names.

Built-in parameters include:

  • Source database name: ${database}
  • Source table name: ${table}

For example, if the source table name is table1 and the mapping rule is ${table}_inlong, the data from table1 will be ultimately mapped and written into table1_inlong.

Approve Data Stream

Click [Approval] -> [MyApproval] -> [Approval] -> [Ok]. Approve

Back to [Synchronization] page, wait for [success]. Success

Test Data

Send Data

  1. #!/bin/bash
  2. # MySQL info
  3. DB_HOST="mysql"
  4. DB_USER="root"
  5. DB_PASS="inlong"
  6. DB_NAME="test"
  7. DB_TABLE1="source_table"
  8. DB_TABLE2="source_table2"
  9. # Insert data in a loop
  10. for ((i=1; i<=500; i++))
  11. do
  12. # Generate data
  13. id=$i
  14. name="name_$i"
  15. # Build an insert SQL
  16. query1="INSERT INTO $DB_TABLE1 (id, name) VALUES ($id, '$name');"
  17. query2="INSERT INTO $DB_TABLE2 (id, name) VALUES ($id, '$name');"
  18. # Execute insert SQL
  19. mysql -h $DB_HOST -u $DB_USER -p$DB_PASS $DB_NAME -e "$query1"
  20. mysql -h $DB_HOST -u $DB_USER -p$DB_PASS $DB_NAME -e "$query2"
  21. done

Modify the variables in the script according to the actual environment, and insert 500 pieces of data into each table(There is a piece of existing data in source_table): Result Source

Verify Data

Enter Iceberg, check data in table.

Result Sink

You can also view audit data on the page:

Result Sink