Linkis SDK Manual

Linkis provides a convenient interface for JAVA and SCALA calls. You only need to import the linkis-computation-client module to use it. After 1.0, it supports the method of submitting with Label. The following will introduce the way to use the SDK.

Engine version and script type supported by Linkis

Engine pluginDefault supported versionsScript typeType Description
Spark2.4.3pypython script
scalascala script
sqlsql script
Hive2.3.3hqlhql script
Pythonpython2pythonpython script
Shell1shellshell script
JDBC4jdbcsql script name
Flink1.12.2sqlsql script
openLooKeng1.5.0sqlsql script
Pipeline1pipelineFile import and export
Presto0.234psqlsql script
Sqoop1.4.6appconnFile import and export
Elasticsearch7.6.2esjsonjson script
essqlsql script
trino371tsqlsql script

Linkis common label

label keylabel valuedescription
engineTypespark-2.4.3the engine type and version
userCreatoruser + “-AppName”the running user and your AppName
codeTypesqlscript type
jobRunningTimeout10If the job does not finish for 10s, it will automatically initiate Kill. The unit is s
jobQueuingTimeout10If the job queue exceeds 10s and fails to complete, Kill will be automatically initiated. The unit is s
jobRetryTimeout10000The waiting time for a job to fail due to resources or other reasons is ms. If a job fails due to insufficient queue resources, the retry is initiated 10 times by default
tenanthduser02tenant label
  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

Create a Java test class LinkisClientTest, the specific interface meaning can be found in the notes:

  1. package org.apache.linkis.client.test;
  2. import org.apache.linkis.common.utils.Utils;
  3. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
  5. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import org.apache.linkis.manager.label.constant.LabelKeyConstant;
  7. import org.apache.linkis.protocol.constants.TaskConstant;
  8. import org.apache.linkis.ujes.client.UJESClient;
  9. import org.apache.linkis.ujes.client.UJESClientImpl;
  10. import org.apache.linkis.ujes.client.request.JobSubmitAction;
  11. import org.apache.linkis.ujes.client.request.JobExecuteAction;
  12. import org.apache.linkis.ujes.client.request.ResultSetAction;
  13. import org.apache.linkis.ujes.client.response.*;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeUnit;
  18. public class LinkisClientTest {
  19. // 1. build config: linkis gateway url
  20. private static DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  21. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  22. .connectionTimeout(30000) //connectionTimeOut
  23. .discoveryEnabled(false) //disable discovery
  24. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  25. .loadbalancerEnabled(true) // enable loadbalance
  26. .maxConnectionSize(5) // set max Connection
  27. .retryEnabled(false) // set retry
  28. .readTimeout(30000) //set read timeout
  29. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  30. .setAuthTokenKey("hadoop") // set submit user
  31. .setAuthTokenValue("123456"))) // set passwd or token (setAuthTokenValue("test"))
  32. .setDWSVersion("v1") //linkis rest version v1
  33. .build();
  34. // 2. new Client(Linkis Client) by clientConfig
  35. private static UJESClient client = new UJESClientImpl(clientConfig);
  36. public static void main(String[] args) {
  37. // The user needs to be consistent with the value of AuthTokenKey
  38. String user = "hadoop";
  39. String executeCode = "df=spark.sql(\"show tables\")\n" +
  40. "show(df)"; // code support:sql/hql/py/scala
  41. try {
  42. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  43. // 3. build job and execute
  44. JobExecuteResult jobExecuteResult = toSubmit(user, executeCode);
  45. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  46. // 4. get job info
  47. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  48. int sleepTimeMills = 1000;
  49. int logFromLen = 0;
  50. int logSize = 100;
  51. while (!jobInfoResult.isCompleted()) {
  52. // 5. get progress and log
  53. JobProgressResult progress = client.progress(jobExecuteResult);
  54. System.out.println("progress: " + progress.getProgress());
  55. JobLogResult logRes = client.log(jobExecuteResult, logFromLen, logSize);
  56. logFromLen = logRes.fromLine();
  57. // 0: info 1: warn 2: error 3: all
  58. System.out.println(logRes.log().get(3));
  59. Utils.sleepQuietly(sleepTimeMills);
  60. jobInfoResult = client.getJobInfo(jobExecuteResult);
  61. }
  62. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  63. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  64. // multiple result sets will be generated)
  65. String resultSet = jobInfo.getResultSetList(client)[0];
  66. // 7. get resultContent
  67. ResultSetResult resultSetResult = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build());
  68. System.out.println("metadata: " + resultSetResult.getMetadata()); // column name type
  69. System.out.println("res: " + resultSetResult.getFileContent()); //row data
  70. } catch (Exception e) {
  71. e.printStackTrace();// please use log
  72. IOUtils.closeQuietly(client);
  73. }
  74. IOUtils.closeQuietly(client);
  75. }
  76. private static JobExecuteResult toSubmit(String user, String code) {
  77. // 1. build params
  78. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  79. Map<String, Object> labels = new HashMap<String, Object>();
  80. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  81. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName");// required execute user and creator eg:hadoop-IDE
  82. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  83. // set start up map :engineConn start params
  84. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  85. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  86. startupMap.put("spark.executor.instances", 2);
  87. // setting linkis params
  88. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  89. // 2. build jobSubmitAction
  90. JobSubmitAction jobSubmitAction = JobSubmitAction.builder()
  91. .addExecuteCode(code)
  92. .setStartupParams(startupMap)
  93. .setUser(user) //submit user
  94. .addExecuteUser(user) // execute user
  95. .setLabels(labels) .
  96. .build();
  97. // 3. to execute
  98. return client.submit(jobSubmitAction);
  99. }
  100. }

Run the above code to complete task submission/execution/log/result set acquisition, etc.

  1. package org.apache.linkis.client.test
  2. import org.apache.commons.io.IOUtils
  3. import org.apache.commons.lang3.StringUtils
  4. import org.apache.linkis.common.utils.Utils
  5. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import org.apache.linkis.manager.label.constant.LabelKeyConstant
  8. import org.apache.linkis.ujes.client.request._
  9. import org.apache.linkis.ujes.client.response._
  10. import java.util
  11. import java.util.concurrent.TimeUnit
  12. import org.apache.linkis.ujes.client.UJESClient
  13. object LinkisClientTest {
  14. // 1. build config: linkis gateway url
  15. val clientConfig = DWSClientConfigBuilder.newBuilder()
  16. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  17. .connectionTimeout(30000) //connectionTimeOut
  18. .discoveryEnabled(false) //disable discovery
  19. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  20. .loadbalancerEnabled(true) // enable loadbalance
  21. .maxConnectionSize(5) // set max Connection
  22. .retryEnabled(false) // set retry
  23. .readTimeout(30000) //set read timeout
  24. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  25. .setAuthTokenKey("hadoop") // set submit user
  26. .setAuthTokenValue("hadoop") // set passwd or token (setAuthTokenValue("BML-AUTH"))
  27. .setDWSVersion("v1") //link rest version v1
  28. .build();
  29. // 2. new Client(Linkis Client) by clientConfig
  30. val client = UJESClient(clientConfig)
  31. def main(args: Array[String]): Unit = {
  32. val user = "hadoop" // execute user user needs to be consistent with the value of AuthTokenKey
  33. val executeCode = "df=spark.sql(\"show tables\")\n" +
  34. "show(df)"; // code support:sql/hql/py/scala
  35. try {
  36. // 3. build job and execute
  37. println("user : " + user + ", code : [" + executeCode + "]")
  38. // It is recommended to use submit, which supports the transfer of task labels
  39. val jobExecuteResult = toSubmit(user, executeCode)
  40. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  41. // 4. get job info
  42. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  43. var logFromLen = 0
  44. val logSize = 100
  45. val sleepTimeMills: Int = 1000
  46. while (!jobInfoResult.isCompleted) {
  47. // 5. get progress and log
  48. val progress = client.progress(jobExecuteResult)
  49. println("progress: " + progress.getProgress)
  50. val logObj = client.log(jobExecuteResult, logFromLen, logSize)
  51. logFromLen = logObj.fromLine
  52. val logArray = logObj.getLog
  53. // 0: info 1: warn 2: error 3: all
  54. if (logArray != null && logArray.size >= 4 && StringUtils.isNotEmpty(logArray.get(3))) {
  55. println(s"log: ${logArray.get(3)}")
  56. }
  57. Utils.sleepQuietly(sleepTimeMills)
  58. jobInfoResult = client.getJobInfo(jobExecuteResult)
  59. }
  60. if (!jobInfoResult.isSucceed) {
  61. println("Failed to execute job: " + jobInfoResult.getMessage)
  62. throw new Exception(jobInfoResult.getMessage)
  63. }
  64. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  65. // multiple result sets will be generated)
  66. val jobInfo = client.getJobInfo(jobExecuteResult)
  67. val resultSetList = jobInfoResult.getResultSetList(client)
  68. println("All result set list:")
  69. resultSetList.foreach(println)
  70. val oneResultSet = jobInfo.getResultSetList(client).head
  71. // 7. get resultContent
  72. val resultSetResult: ResultSetResult = client.resultSet(ResultSetAction.builder.setPath(oneResultSet).setUser(jobExecuteResult.getUser).build)
  73. println("metadata: " + resultSetResult.getMetadata) // column name type
  74. println("res: " + resultSetResult.getFileContent) //row data
  75. } catch {
  76. case e: Exception => {
  77. e.printStackTrace() //please use log
  78. }
  79. }
  80. IOUtils.closeQuietly(client)
  81. }
  82. def toSubmit(user: String, code: String): JobExecuteResult = {
  83. // 1. build params
  84. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  85. val labels: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]
  86. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  87. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName"); // The requested user and application name, both parameters must be missing, where APPName cannot contain "-", it is recommended to replace it with "_"
  88. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // specify the script type
  89. val startupMap = new java.util.HashMap[String, AnyRef]()
  90. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  91. val instances: Integer = 2
  92. startupMap.put("spark.executor.instances", instances)
  93. // setting linkis params
  94. startupMap.put("wds.linkis.rm.yarnqueue", "default");
  95. // 2. build jobSubmitAction
  96. val jobSubmitAction = JobSubmitAction.builder
  97. .addExecuteCode(code)
  98. .setStartupParams(startupMap)
  99. .setUser(user) //submit user
  100. .addExecuteUser(user) //execute user
  101. .setLabels(labels)
  102. .build
  103. // 3. to execute
  104. client.submit(jobSubmitAction)
  105. }
  106. }

The Linkis-cli client supports submitting tasks of the Once type. After the engine process is started, the task will only be run once, and it will be automatically destroyed after the task ends.

OnceEngineConn calls LinkisManager’s createEngineConn interface through LinkisManagerClient, and sends the code to the engine created by the user, and then the engine starts to execute

write a test class Use clien conditions

  1. 1. Configure the correct and available gateway address:
  2. LinkisJobClient.config().setDefaultServerUrl("http://ip:9001");
  3. 2. Write the engine parameters, configuration items, and execution code in the code:
  4. String code = "env {
  5. + " spark.app.name = \"SeaTunnel\"\n"
  6. + "spark.executor.instances = 2\n"
  7. + "spark.executor.cores = 1\n"
  8. + " spark.executor.memory = \"1g\"\n"
  9. + "}\n"
  10. + "\n"
  11. + "source {\n"
  12. + "Fake {\n"
  13. + " result_table_name = \"my_dataset\"\n"
  14. + " }\n"
  15. + "\n"
  16. + "}\n"
  17. + "\n"
  18. + "transform {\n"
  19. + "}\n"
  20. + "\n"
  21. + "sink {\n"
  22. + " Console {}\n"
  23. + "}";
  24. 3. Create an Once mode object: SubmittableSimpleOnceJob:
  25. SubmittableSimpleOnceJob = LinkisJobClient.once()
  26. .simple()
  27. .builder()
  28. .setCreateService("seatunnel-Test")
  29. .setMaxSubmitTime(300000) timeout
  30. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "seatunnel-2.1.2") engine label
  31. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-seatunnel") user label
  32. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once") engine mode label
  33. .addStartupParam(Configuration.IS_TEST_MODE().key(), true) Whether to enable the test mode
  34. .addExecuteUser("hadoop") execute user
  35. .addJobContent("runType", "spark") execution engine
  36. .addJobContent("code", code) execute code
  37. .addJobContent("master", "local[4]")
  38. .addJobContent("deploy-mode", "client")
  39. .addSource("jobName", "OnceJobTest") name
  40. .build();

Test class sample code:

  1. package org.apache.linkis.ujes.client
  2. import org.apache.linkis.common.utils.Utils
  3. import java.util.concurrent.TimeUnit
  4. import java.util
  5. import org.apache.linkis.computation.client.LinkisJobBuilder
  6. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
  7. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
  8. import org.apache.linkis.computation.client.utils.LabelKeyUtils
  9. import scala.collection.JavaConverters._
  10. object SqoopOnceJobTest extends App {
  11. LinkisJobBuilder.setDefaultServerUrl("http://gateway address:9001")
  12. val logPath = "C:\\Users\\resources\\log4j.properties"
  13. System.setProperty("log4j.configurationFile", logPath)
  14. val startUpMap = new util. HashMap[String, AnyRef]
  15. startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
  16. val builder = SimpleOnceJob. builder(). setCreateService("Linkis-Client")
  17. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
  18. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "hadoop-Client")
  19. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  20. .setStartupParams(startUpMap)
  21. .setMaxSubmitTime(30000)
  22. .addExecuteUser("hadoop")
  23. val onceJob = importJob(builder)
  24. val time = System. currentTimeMillis()
  25. onceJob. submit()
  26. println(onceJob. getId)
  27. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
  28. println(onceJob. getECMServiceInstance)
  29. logOperator. setFromLine(0)
  30. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
  31. logOperator.setEngineConnType("sqoop")
  32. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
  33. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
  34. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
  35. var end = false
  36. var rowBefore = 1
  37. while (!end || rowBefore > 0) {
  38. if (onceJob. isCompleted) {
  39. end = true
  40. metricOperator = null
  41. }
  42. logOperator. setPageSize(100)
  43. Utils. tryQuietly {
  44. val logs = logOperator.apply()
  45. logs. logs. asScala. foreach(log => {
  46. println(log)
  47. })
  48. rowBefore = logs. logs. size
  49. }
  50. Thread. sleep(3000)
  51. Option(metricOperator).foreach(operator => {
  52. if (!onceJob.isCompleted) {
  53. println(s"Metric monitoring: ${operator.apply()}")
  54. println(s"Progress: ${progressOperator.apply()}")
  55. }
  56. })
  57. }
  58. onceJob. isCompleted
  59. onceJob. waitForCompleted()
  60. println(onceJob. getStatus)
  61. println(TimeUnit. SECONDS. convert(System. currentTimeMillis() - time, TimeUnit. MILLISECONDS) + "s")
  62. System. exit(0)
  63. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  64. jobBuilder
  65. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_1003_01")
  66. .addJobContent("sqoop. mode", "import")
  67. .addJobContent("sqoop.args.connect", "jdbc:mysql://database address/library name")
  68. .addJobContent("sqoop.args.username", "database account")
  69. .addJobContent("sqoop.args.password", "database password")
  70. .addJobContent("sqoop.args.query", "select * from linkis_ps_udf_manager where 1=1 and $CONDITIONS")
  71. #The table must exist $CONDITIONS is indispensable
  72. .addJobContent("sqoop.args.hcatalog.database", "janicegong_ind")
  73. .addJobContent("sqoop.args.hcatalog.table", "linkis_ps_udf_manager_sync2")
  74. .addJobContent("sqoop.args.hcatalog.partition.keys", "ds")
  75. .addJobContent("sqoop.args.hcatalog.partition.values", "20220708")
  76. .addJobContent("sqoop.args.num.mappers", "1")
  77. .build()
  78. }
  79. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  80. jobBuilder
  81. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_1003_01")
  82. .addJobContent("sqoop. mode", "import")
  83. .addJobContent("sqoop.args.connect", "jdbc:mysql://database address/library name")
  84. .addJobContent("sqoop.args.username", "database account")
  85. .addJobContent("sqoop.args.password", "database password")
  86. .addJobContent("sqoop.args.query", "select * from linkis_ps_udf_manager where 1=1 and $CONDITIONS")
  87. #The table must exist $CONDITIONS is indispensable
  88. .addJobContent("sqoop.args.hcatalog.database", "janicegong_ind")
  89. .addJobContent("sqoop.args.hcatalog.table", "linkis_ps_udf_manager_sync2")
  90. .addJobContent("sqoop.args.hcatalog.partition.keys", "ds")
  91. .addJobContent("sqoop.args.hcatalog.partition.values", "20220708")
  92. .addJobContent("sqoop.args.num.mappers", "1")
  93. .build
  94. }
  95. }