SDK 方式

Linkis 提供了方便的JAVA和SCALA调用的接口,只需要引入linkis-computation-client的模块就可以进行使用,1.0后新增支持带Label提交的方式,下面将对 SDK 使用方式进行介绍。

Linkis 支持的引擎版本及脚本类型

引擎插件默认支持的版本脚本类型类型说明
Spark2.4.3pypython脚本
scalascala脚本
sqlsql脚本
Hive2.3.3hqlhql脚本
Pythonpython2pythonpython脚本
Shell1shellshell脚本
JDBC4jdbcsql脚本名
Flink1.12.2sqlsql脚本
openLooKeng1.5.0sqlsql脚本
Pipeline1pipeline文件导入导出
Presto0.234psqlsql脚本
Sqoop1.4.6appconn文件导入导出
Elasticsearch7.6.2esjsonjson脚本
essqlsql脚本
trino371tsqlsql脚本

Linkis 常用标签

标签键标签值说明
engineTypespark-2.4.3指定引擎类型和版本
userCreatoruser + “-AppName”指定运行的用户和您的APPName
codeTypesql指定运行的脚本类型
jobRunningTimeout10job运行10s没完成自动发起Kill,单位为s
jobQueuingTimeout10job排队超过10s没完成自动发起Kill,单位为s
jobRetryTimeout10000job因为资源等原因失败重试的等待时间,单位为ms,如果因为队列资源不足的失败,会默认按间隔发起10次重试
tenanthduser02租户标签,设置前需要和BDP沟通需要单独机器进行隔离,则任务会被路由到单独的机器
  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. 如:
  7. <dependency>
  8. <groupId>org.apache.linkis</groupId>
  9. <artifactId>linkis-computation-client</artifactId>
  10. <version>1.0.3</version>
  11. </dependency>

建立Java的测试类LinkisClientTest,具体接口含义可以见注释:

  1. package org.apache.linkis.client.test;
  2. import org.apache.linkis.common.utils.Utils;
  3. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
  5. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import org.apache.linkis.manager.label.constant.LabelKeyConstant;
  7. import org.apache.linkis.protocol.constants.TaskConstant;
  8. import org.apache.linkis.ujes.client.UJESClient;
  9. import org.apache.linkis.ujes.client.UJESClientImpl;
  10. import org.apache.linkis.ujes.client.request.JobSubmitAction;
  11. import org.apache.linkis.ujes.client.request.JobExecuteAction;
  12. import org.apache.linkis.ujes.client.request.ResultSetAction;
  13. import org.apache.linkis.ujes.client.response.*;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeUnit;
  18. public class LinkisClientTest {
  19. // 1. build config: linkis gateway url
  20. private static DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  21. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  22. .connectionTimeout(30000) //connectionTimeOut
  23. .discoveryEnabled(false) //disable discovery
  24. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  25. .loadbalancerEnabled(true) // enable loadbalance
  26. .maxConnectionSize(5) // set max Connection
  27. .retryEnabled(false) // set retry
  28. .readTimeout(30000) //set read timeout
  29. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  30. .setAuthTokenKey("hadoop") // set submit user
  31. .setAuthTokenValue("123456"))) // set passwd or token (setAuthTokenValue("test"))
  32. .setDWSVersion("v1") //linkis rest version v1
  33. .build();
  34. // 2. new Client(Linkis Client) by clientConfig
  35. private static UJESClient client = new UJESClientImpl(clientConfig);
  36. public static void main(String[] args) {
  37. String user = "hadoop"; // 用户需要和AuthTokenKey的值保持一致
  38. String executeCode = "df=spark.sql(\"show tables\")\n" +
  39. "show(df)"; // code support:sql/hql/py/scala
  40. try {
  41. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  42. // 3. build job and execute
  43. JobExecuteResult jobExecuteResult = toSubmit(user, executeCode);
  44. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  45. // 4. get job jonfo
  46. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  47. int sleepTimeMills = 1000;
  48. int logFromLen = 0;
  49. int logSize = 100;
  50. while (!jobInfoResult.isCompleted()) {
  51. // 5. get progress and log
  52. JobProgressResult progress = client.progress(jobExecuteResult);
  53. System.out.println("progress: " + progress.getProgress());
  54. JobLogResult logRes = client.log(jobExecuteResult, logFromLen, logSize);
  55. logFromLen = logRes.fromLine();
  56. // 0: info 1: warn 2: error 3: all
  57. System.out.println(logRes.log().get(3));
  58. Utils.sleepQuietly(sleepTimeMills);
  59. jobInfoResult = client.getJobInfo(jobExecuteResult);
  60. }
  61. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  62. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  63. // multiple result sets will be generated)
  64. String resultSet = jobInfo.getResultSetList(client)[0];
  65. // 7. get resultContent
  66. ResultSetResult resultSetResult = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build());
  67. System.out.println("metadata: " + resultSetResult.getMetadata()); // column name type
  68. System.out.println("res: " + resultSetResult.getFileContent()); //row data
  69. } catch (Exception e) {
  70. e.printStackTrace();// please use log
  71. IOUtils.closeQuietly(client);
  72. }
  73. IOUtils.closeQuietly(client);
  74. }
  75. private static JobExecuteResult toSubmit(String user, String code) {
  76. // 1. build params
  77. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  78. Map<String, Object> labels = new HashMap<String, Object>();
  79. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  80. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName");// required execute user and creator eg:hadoop-IDE
  81. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  82. // set start up map :engineConn start params
  83. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  84. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  85. startupMap.put("spark.executor.instances", 2);
  86. // setting linkis params
  87. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  88. // 2. build jobSubmitAction
  89. JobSubmitAction jobSubmitAction = JobSubmitAction.builder()
  90. .addExecuteCode(code)
  91. .setStartupParams(startupMap)
  92. .setUser(user) //submit user
  93. .addExecuteUser(user) // execute user
  94. .setLabels(labels)
  95. .build();
  96. // 3. to execute
  97. return client.submit(jobSubmitAction);
  98. }
  99. }

运行上述的代码即可以完成任务提交/执行/日志/结果集获取等

  1. package org.apache.linkis.client.test
  2. import org.apache.commons.io.IOUtils
  3. import org.apache.commons.lang3.StringUtils
  4. import org.apache.linkis.common.utils.Utils
  5. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import org.apache.linkis.manager.label.constant.LabelKeyConstant
  8. import org.apache.linkis.ujes.client.request._
  9. import org.apache.linkis.ujes.client.response._
  10. import java.util
  11. import java.util.concurrent.TimeUnit
  12. import org.apache.linkis.ujes.client.UJESClient
  13. object LinkisClientTest {
  14. // 1. build config: linkis gateway url
  15. val clientConfig = DWSClientConfigBuilder.newBuilder()
  16. .addServerUrl("http://127.0.0.1:8088/") //set linkis-mg-gateway url: http://{ip}:{port}
  17. .connectionTimeout(30000) //connectionTimeOut
  18. .discoveryEnabled(false) //disable discovery
  19. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  20. .loadbalancerEnabled(true) // enable loadbalance
  21. .maxConnectionSize(5) // set max Connection
  22. .retryEnabled(false) // set retry
  23. .readTimeout(30000) //set read timeout
  24. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  25. .setAuthTokenKey("hadoop") // set submit user
  26. .setAuthTokenValue("hadoop") // set passwd or token (setAuthTokenValue("BML-AUTH"))
  27. .setDWSVersion("v1") //linkis rest version v1
  28. .build();
  29. // 2. new Client(Linkis Client) by clientConfig
  30. val client = UJESClient(clientConfig)
  31. def main(args: Array[String]): Unit = {
  32. val user = "hadoop" // execute user 用户需要和AuthTokenKey的值保持一致
  33. val executeCode = "df=spark.sql(\"show tables\")\n" +
  34. "show(df)"; // code support:sql/hql/py/scala
  35. try {
  36. // 3. build job and execute
  37. println("user : " + user + ", code : [" + executeCode + "]")
  38. // 推荐使用submit,支持传递任务label
  39. val jobExecuteResult = toSubmit(user, executeCode)
  40. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  41. // 4. get job jonfo
  42. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  43. var logFromLen = 0
  44. val logSize = 100
  45. val sleepTimeMills: Int = 1000
  46. while (!jobInfoResult.isCompleted) {
  47. // 5. get progress and log
  48. val progress = client.progress(jobExecuteResult)
  49. println("progress: " + progress.getProgress)
  50. val logObj = client.log(jobExecuteResult, logFromLen, logSize)
  51. logFromLen = logObj.fromLine
  52. val logArray = logObj.getLog
  53. // 0: info 1: warn 2: error 3: all
  54. if (logArray != null && logArray.size >= 4 && StringUtils.isNotEmpty(logArray.get(3))) {
  55. println(s"log: ${logArray.get(3)}")
  56. }
  57. Utils.sleepQuietly(sleepTimeMills)
  58. jobInfoResult = client.getJobInfo(jobExecuteResult)
  59. }
  60. if (!jobInfoResult.isSucceed) {
  61. println("Failed to execute job: " + jobInfoResult.getMessage)
  62. throw new Exception(jobInfoResult.getMessage)
  63. }
  64. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  65. // multiple result sets will be generated)
  66. val jobInfo = client.getJobInfo(jobExecuteResult)
  67. val resultSetList = jobInfoResult.getResultSetList(client)
  68. println("All result set list:")
  69. resultSetList.foreach(println)
  70. val oneResultSet = jobInfo.getResultSetList(client).head
  71. // 7. get resultContent
  72. val resultSetResult: ResultSetResult = client.resultSet(ResultSetAction.builder.setPath(oneResultSet).setUser(jobExecuteResult.getUser).build)
  73. println("metadata: " + resultSetResult.getMetadata) // column name type
  74. println("res: " + resultSetResult.getFileContent) //row data
  75. } catch {
  76. case e: Exception => {
  77. e.printStackTrace() //please use log
  78. }
  79. }
  80. IOUtils.closeQuietly(client)
  81. }
  82. def toSubmit(user: String, code: String): JobExecuteResult = {
  83. // 1. build params
  84. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  85. val labels: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]
  86. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  87. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName"); // 请求的用户和应用名,两个参数都不能少,其中APPName不能带有"-"建议替换为"_"
  88. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // 指定脚本类型
  89. val startupMap = new java.util.HashMap[String, AnyRef]()
  90. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  91. val instances: Integer = 2
  92. startupMap.put("spark.executor.instances", instances)
  93. // setting linkis params
  94. startupMap.put("wds.linkis.rm.yarnqueue", "default")
  95. // 2. build jobSubmitAction
  96. val jobSubmitAction = JobSubmitAction.builder
  97. .addExecuteCode(code)
  98. .setStartupParams(startupMap)
  99. .setUser(user) //submit user
  100. .addExecuteUser(user) //execute user
  101. .setLabels(labels)
  102. .build
  103. // 3. to execute
  104. client.submit(jobSubmitAction)
  105. }
  106. }

Linkis-cli客户端支持提交Once类型的任务,引擎进程启动后只运行一次任务,任务结束后自动销毁

OnceEngineConn 通过 LinkisManagerClient 调用 LinkisManager 的 createEngineConn 接口,并将代码发送到用户创建的引擎,然后引擎开始执行

1.首先创建一个新的 maven 项目或者在项目中引入以下依赖项

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

2.编写一个测试类 使用clien条件

  1. 1.配置正确可用的gatew地址:
  2. LinkisJobClient.config().setDefaultServerUrl("http://ip:9001");
  3. 2.将引擎参数,配置项,执行code写在code里面:
  4. String code = "env {
  5. + " spark.app.name = \"SeaTunnel\"\n"
  6. + " spark.executor.instances = 2\n"
  7. + " spark.executor.cores = 1\n"
  8. + " spark.executor.memory = \"1g\"\n"
  9. + "}\n"
  10. + "\n"
  11. + "source {\n"
  12. + " Fake {\n"
  13. + " result_table_name = \"my_dataset\"\n"
  14. + " }\n"
  15. + "\n"
  16. + "}\n"
  17. + "\n"
  18. + "transform {\n"
  19. + "}\n"
  20. + "\n"
  21. + "sink {\n"
  22. + " Console {}\n"
  23. + "}";
  24. 3.创建Once模式对象:SubmittableSimpleOnceJob :
  25. SubmittableSimpleOnceJob = LinkisJobClient.once()
  26. .simple()
  27. .builder()
  28. .setCreateService("seatunnel-Test")
  29. .setMaxSubmitTime(300000) 超时时间
  30. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "seatunnel-2.1.2") 引擎标签
  31. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-seatunnel") 用户标签
  32. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once") 引擎模式标签
  33. .addStartupParam(Configuration.IS_TEST_MODE().key(), true) 是否开启测试模式
  34. .addExecuteUser("hadoop") 执行用户
  35. .addJobContent("runType", "spark") 执行引擎
  36. .addJobContent("code", code) 执行代码
  37. .addJobContent("master", "local[4]")
  38. .addJobContent("deploy-mode", "client")
  39. .addSource("jobName", "OnceJobTest") 名称
  40. .build();
  1. package org.apache.linkis.ujes.client
  2. import org.apache.linkis.common.utils.Utils
  3. import java.util.concurrent.TimeUnit
  4. import java.util
  5. import org.apache.linkis.computation.client.LinkisJobBuilder
  6. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
  7. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
  8. import org.apache.linkis.computation.client.utils.LabelKeyUtils
  9. import scala.collection.JavaConverters._
  10. @Deprecated
  11. object SqoopOnceJobTest extends App {
  12. LinkisJobBuilder.setDefaultServerUrl("http://gateway地址:9001")
  13. val logPath = "C:\\Users\\resources\\log4j.properties"
  14. System.setProperty("log4j.configurationFile", logPath)
  15. val startUpMap = new util.HashMap[String, AnyRef]
  16. startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
  17. val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")
  18. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
  19. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "hadoop-Client")
  20. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  21. .setStartupParams(startUpMap)
  22. .setMaxSubmitTime(30000)
  23. .addExecuteUser("hadoop")
  24. val onceJob = importJob(builder)
  25. val time = System.currentTimeMillis()
  26. onceJob.submit()
  27. println(onceJob.getId)
  28. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
  29. println(onceJob.getECMServiceInstance)
  30. logOperator.setFromLine(0)
  31. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
  32. logOperator.setEngineConnType("sqoop")
  33. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
  34. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
  35. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
  36. var end = false
  37. var rowBefore = 1
  38. while (!end || rowBefore > 0) {
  39. if (onceJob.isCompleted) {
  40. end = true
  41. metricOperator = null
  42. }
  43. logOperator.setPageSize(100)
  44. Utils.tryQuietly {
  45. val logs = logOperator.apply()
  46. logs.logs.asScala.foreach(log => {
  47. println(log)
  48. })
  49. rowBefore = logs.logs.size
  50. }
  51. Thread.sleep(3000)
  52. Option(metricOperator).foreach(operator => {
  53. if (!onceJob.isCompleted) {
  54. println(s"Metric监控: ${operator.apply()}")
  55. println(s"进度: ${progressOperator.apply()}")
  56. }
  57. })
  58. }
  59. onceJob.isCompleted
  60. onceJob.waitForCompleted()
  61. println(onceJob.getStatus)
  62. println(TimeUnit.SECONDS.convert(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS) + "s")
  63. System.exit(0)
  64. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  65. jobBuilder
  66. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_1003_01")
  67. .addJobContent("sqoop.mode", "import")
  68. .addJobContent("sqoop.args.connect", "jdbc:mysql://数据库地址/库名")
  69. .addJobContent("sqoop.args.username", "数据库账户")
  70. .addJobContent("sqoop.args.password", "数据库密码")
  71. .addJobContent("sqoop.args.query", "select * from linkis_ps_udf_manager where 1=1 and $CONDITIONS")
  72. #表一定要存在 $CONDITIONS不可缺少
  73. .addJobContent("sqoop.args.hcatalog.database", "janicegong_ind")
  74. .addJobContent("sqoop.args.hcatalog.table", "linkis_ps_udf_manager_sync2")
  75. .addJobContent("sqoop.args.hcatalog.partition.keys", "ds")
  76. .addJobContent("sqoop.args.hcatalog.partition.values", "20220708")
  77. .addJobContent("sqoop.args.num.mappers", "1")
  78. .build()
  79. }
  80. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
  81. jobBuilder
  82. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_1003_01")
  83. .addJobContent("sqoop.mode", "import")
  84. .addJobContent("sqoop.args.connect", "jdbc:mysql://数据库地址/库名")
  85. .addJobContent("sqoop.args.username", "数据库账户")
  86. .addJobContent("sqoop.args.password", "数据库密码")
  87. .addJobContent("sqoop.args.query", "select * from linkis_ps_udf_manager where 1=1 and $CONDITIONS")
  88. #表一定要存在 $CONDITIONS不可缺少
  89. .addJobContent("sqoop.args.hcatalog.database", "janicegong_ind")
  90. .addJobContent("sqoop.args.hcatalog.table", "linkis_ps_udf_manager_sync2")
  91. .addJobContent("sqoop.args.hcatalog.partition.keys", "ds")
  92. .addJobContent("sqoop.args.hcatalog.partition.values", "20220708")
  93. .addJobContent("sqoop.args.num.mappers", "1")
  94. .build
  95. }
  96. }

3.测试程序完成,引擎会自动销毁,不用手动清除