Sqoop Engine

This article mainly introduces the installation, usage and configuration of the Sqoop engine plugin in Linkis.

The Sqoop engine mainly depends on the Hadoop basic environment. If the node needs to deploy the Sqoop engine, you need to deploy the Hadoop client environment, and ![Download](https://archive.apache.org/dist/sqoop /) Install the Sqoop client.

Before executing the Sqoop task, use the native Sqoop to execute the test task on the node to check whether the node environment is normal.

  1. #Verify whether the sqoop environment is available Reference example: Import the /user/hive/warehouse/hadoop/test_linkis_sqoop file data of hdfs into the mysql table test_sqoop
  2. sqoop export \
  3. --connect jdbc:mysql://10.10.10.10/test \
  4. --username test \
  5. --password test123\
  6. --table test_sqoop \
  7. --columns user_id,user_code,user_name,email,status \
  8. --export-dir /user/hive/warehouse/hadoop/test_linkis_sqoop \
  9. --update-mode allowinsert \
  10. --verbose ;
Environment variable nameEnvironment variable contentRemarks
JAVA_HOMEJDK installation pathRequired
HADOOP_HOMEHadoop installation pathRequired
HADOOP_CONF_DIRHadoop configuration pathrequired
SQOOP_HOMESqoop installation pathRequired
SQOOP_CONF_DIRSqoop configuration pathnot required
HCAT_HOMEHCAT configuration pathnot required
HBASE_HOMEHBASE configuration pathnot required
Linkis System ParametersParametersRemarks
wds.linkis.hadoop.site.xmlSet sqoop to load hadoop parameter file locationGenerally, no separate configuration is required, the default value is “core-site.xml;hdfs-site.xml;yarn-site.xml;mapred-site. xml”
sqoop.fetch.status.intervalSet the interval for obtaining sqoop execution statusGenerally, no separate configuration is required, the default value is 5s

Method 1: Download the engine plug-in package directly

Linkis Engine Plugin Download

Method 2: Compile the engine plug-in separately (requires a maven environment)

  1. # compile
  2. cd ${linkis_code_dir}/linkis-engineconn-plugins/sqoop/
  3. mvn clean install
  4. # The compiled engine plug-in package is located in the following directory
  5. ${linkis_code_dir}/linkis-engineconn-plugins/sqoop/target/out/

EngineConnPlugin engine plugin installation

Upload the engine package in 2.1 to the engine directory of the server

  1. ${LINKIS_HOME}/lib/linkis-engineconn-plugins

The directory structure after uploading is as follows

  1. linkis-engineconn-plugins/
  2. ├── sqoop
  3. ├── dist
  4. └── 1.4.6
  5. ├── conf
  6. └── lib
  7. └── plugin
  8. └── 1.4.6

Refresh the engine by restarting the linkis-cg-linkismanager service

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-linkismanager

You can check whether the last_update_time of the linkis_engine_conn_plugin_bml_resources table in the database is the time to trigger the refresh.

  1. #Login to the `linkis` database
  2. select * from linkis_cg_engine_conn_plugin_bml_resources;
  1. sh linkis-cli-sqoop export \
  2. -D mapreduce.job.queuename=ide\
  3. --connect jdbc:mysql://10.10.10.10:9600/testdb\
  4. --username password@123 \
  5. --password password@123 \
  6. --table test_sqoop_01_copy \
  7. --columns user_id,user_code,user_name,email,status \
  8. --export-dir /user/hive/warehouse/hadoop/test_linkis_sqoop_2 \
  9. --update-mode allowinsert --verbose ;
  1. `mysql` is imported into `hive` library `linkis_test_ind.test_import_sqoop_1`, table `test_import_sqoop_1` does not exist, need to add parameter `--create-hive-table`
  2. sh linkis-cli-sqoop import -D mapreduce.job.queuename=dws\
  3. --connect jdbc:mysql://10.10.10.10:3306/casion_test\
  4. --username hadoop\
  5. --password password@123 \
  6. --table test_sqoop_01 \
  7. --columns user_id,user_code,user_name,email,status \
  8. --fields-terminated-by ',' \
  9. --hive-import --create-hive-table \
  10. --hive-database casionxia_ind\
  11. --hive-table test_import_sqoop_1 \
  12. --hive-drop-import-delims \
  13. --delete-target-dir \
  14. --input-null-non-string '\\N' \
  15. --input-null-string '\\N' \
  16. --verbose ;
  17. `mysql` is imported into the `hive` library `linkis_test_ind.test_import_sqoop_1`, the table `test_import_sqoop_1` exists to remove the parameter `--create-hive-table`
  18. sh linkis-cli-sqoop import -D mapreduce.job.queuename=dws\
  19. --connect jdbc:mysql://10.10.10.10:9600/testdb\
  20. --username testdb \
  21. --password password@123 \
  22. --table test_sqoop_01 \
  23. --columns user_id,user_code,user_name,email,status \
  24. --fields-terminated-by ',' \
  25. --hive-import \
  26. --hive-database linkis_test_ind \
  27. --hive-table test_import_sqoop_1 \
  28. --hive-overwrite \
  29. --hive-drop-import-delims \
  30. --delete-target-dir \
  31. --input-null-non-string '\\N' \
  32. --input-null-string '\\N' \
  33. --verbose ;

The usage of OnceEngineConn is to call the createEngineConn interface of LinkisManager through LinkisManagerClient, and send the code to the created Sqoop engine, and then the Sqoop engine starts to execute, which can be performed by other systems. Calls such as Exchangeis. The usage of Client is also very simple, first create a maven project, or introduce the following dependencies into your project

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

Test case:

  1. import java.util.concurrent.TimeUnit
  2. import java.util
  3. import org.apache.linkis.computation.client.LinkisJobBuilder
  4. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
  5. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
  6. import org.apache.linkis.computation.client.utils.LabelKeyUtils
  7. import scala.collection.JavaConverters._
  8. object SqoopOnceJobTest extends App {
  9. LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")
  10. val logPath = "C:\\Users\\resources\\log4j.properties"
  11. System.setProperty("log4j.configurationFile", logPath)
  12. val startUpMap = new util. HashMap[String, Any]
  13. startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
  14. val builder = SimpleOnceJob. builder(). setCreateService("Linkis-Client")
  15. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
  16. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")
  17. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  18. .setStartupParams(startUpMap)
  19. .setMaxSubmitTime(30000)
  20. .addExecuteUser("freeuser")
  21. val onceJob = importJob(builder)
  22. val time = System. currentTimeMillis()
  23. onceJob.submit()
  24. println(onceJob. getId)
  25. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
  26. println(onceJob.getECMServiceInstance)
  27. logOperator.setFromLine(0)
  28. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
  29. logOperator.setEngineConnType("sqoop")
  30. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
  31. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
  32. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
  33. var end = false
  34. var rowBefore = 1
  35. while (!end || rowBefore > 0){
  36. if(onceJob.isCompleted) {
  37. end = true
  38. metricOperator = null
  39. }
  40. logOperator.setPageSize(100)
  41. Utils. tryQuietly{
  42. val logs = logOperator.apply()
  43. logs.logs.asScala.foreach( log => {
  44. println(log)
  45. })
  46. rowBefore = logs. logs. size
  47. }
  48. Thread.sleep(3000)
  49. Option(metricOperator).foreach( operator => {
  50. if (!onceJob.isCompleted){
  51. println(s"Metric monitoring: ${operator.apply()}")
  52. println(s"Progress: ${progressOperator.apply()}")
  53. }
  54. })
  55. }
  56. onceJob. isCompleted
  57. onceJob.waitForCompleted()
  58. println(onceJob. getStatus)
  59. println(TimeUnit. SECONDS. convert(System. currentTimeMillis() - time, TimeUnit. MILLISECONDS) + "s")
  60. System. exit(0)
  61. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  62. jobBuilder
  63. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")
  64. .addJobContent("sqoop. mode", "import")
  65. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
  66. .addJobContent("sqoop.args.username", "free")
  67. .addJobContent("sqoop.args.password", "testpwd")
  68. .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +
  69. "exchangis where sno =1 and $CONDITIONS")
  70. .addJobContent("sqoop.args.hcatalog.database", "freedb")
  71. .addJobContent("sqoop.args.hcatalog.table", "zy_test")
  72. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
  73. .addJobContent("sqoop.args.hcatalog.partition.values", "3")
  74. .addJobContent("sqoop.args.num.mappers", "1")
  75. .build()
  76. }
  77. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  78. jobBuilder
  79. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")
  80. .addJobContent("sqoop.mode", "import")
  81. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
  82. .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +
  83. "exchangis_table where sno =1 and $CONDITIONS")
  84. .addJobContent("sqoop.args.hcatalog.database", "hadoop")
  85. .addJobContent("sqoop.args.hcatalog.table", "partition_33")
  86. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
  87. .addJobContent("sqoop.args.hcatalog.partition.values", "4")
  88. .addJobContent("sqoop.args.num.mappers", "1")
  89. .build()
  90. }
parameterkeydescription
sqoop.modeimport/export/…
-Dmapreduce.job.queuenamesqoop.env.mapreduce.job.queuename
—connect <jdbc-uri>sqoop.args.connectSpecify JDBC connect string
—connection-manager <class-name>sqoop.args.connection.managerSpecify connection manager class name
—connection-param-file <properties-file>sqoop.args.connection.param.fileSpecify connection parameters file
—driver <class-name>sqoop.args.driverManually specify JDBC driver class to use
—hadoop-home <hdir>sqoop.args.hadoop.homeOverride $HADOOP_MAPRED_HOME_ARG
—hadoop-mapred-home <dir>sqoop.args.hadoop.mapred.homeOverride $HADOOP_MAPRED_HOME_ARG
—helpsqoop.args.helpPrint usage instructions
-PRead password from console
—password <password>sqoop.args.passwordSet authentication password
—password-alias <password-alias>sqoop.args.password.aliasCredential provider password alias
—password-file <password-file>sqoop.args.password.fileSet authentication password file path
—relaxed-isolationsqoop.args.relaxed.isolationUse read-uncommitted isolation for imports
—skip-dist-cachesqoop.args.skip.dist.cacheSkip copying jars to distributed cache
—username <username>sqoop.args.usernameSet authentication username
—verbosesqoop.args.verbosePrint more information while working
parameterkeydescription
—batchsqoop.args.batchIndicates underlying statements to be executed in batch mode
—call <arg>sqoop.args.callPopulate the table using this stored procedure (one call per row)
—clear-staging-tablesqoop.args.clear.staging.tableIndicates that any data in staging table can be deleted
—columns <col,col,col…>sqoop.args.columnsColumns to export to table
—directsqoop.args.directUse direct export fast path
—export-dir <dir>sqoop.args.export.dirHDFS source path for the export
-m,—num-mappers <n>sqoop.args.num.mappersUse ‘n’ map tasks to export in parallel
—mapreduce-job-name <name>sqoop.args.mapreduce.job.nameSet name for generated mapreduce job
—staging-table <table-name>sqoop.args.staging.tableIntermediate staging table
—table <table-name>sqoop.args.tableTable to populate
—update-key <key>sqoop.args.update.keyUpdate records by specified key column
—update-mode <mode>sqoop.args.update.modeSpecifies how updates are performed when new rows are found with non-matching keys in database
—validatesqoop.args.validateValidate the copy using the configured validator
—validation-failurehandler <validation-failurehandler>sqoop.args.validation.failurehandlerValidate the copy using the configured validator
—validation-threshold <validation-threshold>sqoop.args.validation.thresholdFully qualified class name for ValidationThreshold
—validator <validator>sqoop.args.validatorFully qualified class name for the Validator
parameterkeydescription
—appendsqoop.args.appendImports data in append mode
—as-avrodatafilesqoop.args.as.avrodatafileImports data to Avro data files
—as-parquetfilesqoop.args.as.parquetfileImports data to Parquet files
—as-sequencefilesqoop.args.as.sequencefileImports data to SequenceFiles
—as-textfilesqoop.args.as.textfileImports data as plain text (default)
—autoreset-to-one-mappersqoop.args.autoreset.to.one.mapperReset the number of mappers to one mapper if no split key available
—boundary-query <statement>sqoop.args.boundary.querySet boundary query for retrieving max and min value of the primary key
—case-insensitivesqoop.args.case.insensitiveData Base is case insensitive, split where condition transfrom to lower case!
—columns <col,col,col…>sqoop.args.columnsColumns to import from table
—compression-codec <codec>sqoop.args.compression.codecCompression codec to use for import
—delete-target-dirsqoop.args.delete.target.dirImports data in delete mode
—directsqoop.args.directUse direct import fast path
—direct-split-size <n>sqoop.args.direct.split.sizeSplit the input stream every ‘n’ bytes when importing in direct mode
-e,—query <statement>sqoop.args.queryImport results of SQL ‘statement’
—fetch-size <n>sqoop.args.fetch.sizeSet number ‘n’ of rows to fetch from the database when more rows are needed
—inline-lob-limit <n>sqoop.args.inline.lob.limitSet the maximum size for an inline LOB
-m,—num-mappers <n>sqoop.args.num.mappersUse ‘n’ map tasks to import in parallel
—mapreduce-job-name <name>sqoop.args.mapreduce.job.nameSet name for generated mapreduce job
—merge-key <column>sqoop.args.merge.keyKey column to use to join results
—split-by <column-name>sqoop.args.split.byColumn of the table used to split work units
—table <table-name>sqoop.args.tableTable to read
—target-dir <dir>sqoop.args.target.dirHDFS plain table destination
—validatesqoop.args.validateValidate the copy using the configured validator
—validation-failurehandler <validation-failurehandler>sqoop.args.validation.failurehandlerFully qualified class name for ValidationFa ilureHandler
—validation-threshold <validation-threshold>sqoop.args.validation.thresholdFully qualified class name for ValidationThreshold
—validator <validator>sqoop.args.validatorFully qualified class name for the Validator
—warehouse-dir <dir>sqoop.args.warehouse.dirHDFS parent for table destination
—where <where clause>sqoop.args.whereWHERE clause to use during import
-z,—compresssqoop.args.compressEnable compression
parameterkeydescription
—check-column <column>sqoop.args.check.columnSource column to check for incremental change
—incremental <import-type>sqoop.args.incrementalDefine an incremental import of type ‘append’ or ‘lastmodified’
—last-value <value>sqoop.args.last.valueLast imported value in the incremental check column
parameterkeydescription
—enclosed-by <char>sqoop.args.enclosed.bySets a required field enclosing character
—escaped-by <char>sqoop.args.escaped.bySets the escape character
—fields-terminated-by <char>sqoop.args.fields.terminated.bySets the field separator character
—lines-terminated-by <char>sqoop.args.lines.terminated.bySets the end-of-line character
—mysql-delimiterssqoop.args.mysql.delimitersUses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: ‘
—optionally-enclosed-by <char>sqoop.args.optionally.enclosed.bySets a field enclosing character
parameterkeydescription
—input-enclosed-by <char>sqoop.args.input.enclosed.bySets a required field enclosure
—input-escaped-by <char>sqoop.args.input.escaped.bySets the input escape character
—input-fields-terminated-by <char>sqoop.args.input.fields.terminated.bySets the input field separator
—input-lines-terminated-by <char>sqoop.args.input.lines.terminated.bySets the input end-of-line char
—input-optionally-enclosed-by <char>sqoop.args.input.optionally.enclosed.bySets a field enclosing character
parameterkeydescription
—create-hive-tablesqoop.args.create.hive.tableFail if the target hive table exists
—hive-database <database-name>sqoop.args.hive.databaseSets the database name to use when importing to hive
—hive-delims-replacement <arg>sqoop.args.hive.delims.replacementReplace Hive record \0x01 and row delimiters (\n\r) from imported string fields with user-defined string
—hive-drop-import-delimssqoop.args.hive.drop.import.delimsDrop Hive record \0x01 and row delimiters (\n\r) from imported string fields
—hive-home <dir>sqoop.args.hive.homeOverride $HIVE_HOME
—hive-importsqoop.args.hive.importImport tables into Hive (Uses Hive’s default delimiters if none are set.)
—hive-overwritesqoop.args.hive.overwriteOverwrite existing data in the Hive table
—hive-partition-key <partition-key>sqoop.args.hive.partition.keySets the partition key to use when importing to hive
—hive-partition-value <partition-value>sqoop.args.hive.partition.valueSets the partition value to use when importing to hive
—hive-table <table-name>sqoop.args.hive.tableSets the table name to use when importing to hive
—map-column-hive <arg>sqoop.args.map.column.hiveOverride mapping for specific column to hive types.
parameterkeydescription
—column-family <family>sqoop.args.column.familySets the target column family for the import
—hbase-bulkloadsqoop.args.hbase.bulkloadEnables HBase bulk loading
—hbase-create-tablesqoop.args.hbase.create.tableIf specified, create missing HBase tables
—hbase-row-key <col>sqoop.args.hbase.row.keySpecifies which input column to use as the row key
—hbase-table <table>sqoop.args.hbase.tableImport to <table>in HBase
parameterkeydescription
—hcatalog-database <arg>sqoop.args.hcatalog.databaseHCatalog database name
—hcatalog-home <hdir>sqoop.args.hcatalog.homeOverride $HCAT_HOME
—hcatalog-partition-keys <partition-key>sqoop.args.hcatalog.partition.keysSets the partition keys to use when importing to hive
—hcatalog-partition-values ​​<partition-value>sqoop.args.hcatalog.partition.values ​​Sets the partition values ​​to use when importing to hive
—hcatalog-table <arg>sqoop.args.hcatalog.tableHCatalog table name
—hive-home <dir>sqoop.args.hive.homeOverride $HIVE_HOME
—hive-partition-key <partition-key>sqoop.args.hive.partition.keySets the partition key to use when importing to hive
—hive-partition-value <partition-value>sqoop.args.hive.partition.valueSets the partition value to use when importing to hive
—map-column-hive <arg>sqoop.args.map.column.hiveOverride mapping for specific column to hive types.
HCatalog import specific options:
—create-hcatalog-tablesqoop.args.create.hcatalog.tableCreate HCatalog before import
—hcatalog-storage-stanza <arg>sqoop.args.hcatalog.storage.stanzaHCatalog storage stanza for table creation
parameterkeydescription
—accumulo-batch-size <size>sqoop.args.accumulo.batch.sizeBatch size in bytes
—accumulo-column-family <family>sqoop.args.accumulo.column.familySets the target column family for the import
—accumulo-create-tablesqoop.args.accumulo.create.tableIf specified, create missing Accumulo tables
—accumulo-instance <instance>sqoop.args.accumulo.instanceAccumulo instance name.
—accumulo-max-latency <latency>sqoop.args.accumulo.max.latencyMax write latency in milliseconds
—accumulo-password <password>sqoop.args.accumulo.passwordAccumulo password.
—accumulo-row-key <col>sqoop.args.accumulo.row.keySpecifies which input column to use as the row key
—accumulo-table <table>sqoop.args.accumulo.tableImport to <table>in Accumulo
—accumulo-user <user>sqoop.args.accumulo.userAccumulo user name.
—accumulo-visibility <vis>sqoop.args.accumulo.visibilityVisibility token to be applied to all rows imported
—accumulo-zookeepers <zookeepers>sqoop.args.accumulo.zookeepersComma-separated list of zookeepers (host:port)
parameterkeydescription
—bindir <dir>sqoop.args.bindirOutput directory for compiled objects
—class-name <name>sqoop.args.class.nameSets the generated class name. This overrides —package-name. When combined with —jar-file, sets the input class.
—input-null-non-string <null-str>sqoop.args.input.null.non.stringInput null non-string representation
—input-null-string <null-str>sqoop.args.input.null.stringInput null string representation
—jar-file <file>sqoop.args.jar.fileDisable code generation; use specified jar
—map-column-java <arg>sqoop.args.map.column.javaOverride mapping for specific columns to java types
—null-non-string <null-str>sqoop.args.null.non.stringNull non-string representation
—null-string <null-str>sqoop.args.null.stringNull string representation
—outdir <dir>sqoop.args.outdirOutput directory for generated code
—package-name <name>sqoop.args.package.namePut auto-generated classes in this package

must preceed any tool-specific arguments,Generic options supported are

parameterkeydescription
-conf <configuration file>sqoop.args.confspecify an application configuration file
-D <property=value>sqoop.args.Duse value for given property
-fs <localnamenode:port>sqoop.args.fs
-jt <localresourcemanager:port>sqoop.args.jt
-files <comma separated list of files>sqoop.args.filesspecify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>sqoop.args.libjarsspecify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>sqoop.args.archivesspecify comma separated archives to be unarchived on the compute machines.