JAVA SDK Manual

Linkis provides a convenient interface for calling JAVA and SCALA. It can be used only by introducing the linkis-computation-client module. After 1.0, the method of submitting with Label is added. The following will introduce both ways that compatible with 0.X and newly added in 1.0.

1. Introduce dependent modules

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>
  6. Such as:
  7. <dependency>
  8. <groupId>org.apache.linkis</groupId>
  9. <artifactId>linkis-computation-client</artifactId>
  10. <version>1.0.3</version>
  11. </dependency>

2. Java test code

Create the Java test class LinkisClientTest. Refer to the comments to understand the purposes of those interfaces:

  1. package org.apache.linkis.client.test;
  2. import org.apache.linkis.common.utils.Utils;
  3. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
  5. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import org.apache.linkis.manager.label.constant.LabelKeyConstant;
  7. import org.apache.linkis.protocol.constants.TaskConstant;
  8. import org.apache.linkis.ujes.client.UJESClient;
  9. import org.apache.linkis.ujes.client.UJESClientImpl;
  10. import org.apache.linkis.ujes.client.request.JobSubmitAction;
  11. import org.apache.linkis.ujes.client.request.JobExecuteAction;
  12. import org.apache.linkis.ujes.client.request.ResultSetAction;
  13. import org.apache.linkis.ujes.client.response.*;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.concurrent.TimeUnit;
  18. public class LinkisClientTest {
  19. // 1. build config: linkis gateway url
  20. private static DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  21. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  22. .connectionTimeout(30000) //connectionTimeOut
  23. .discoveryEnabled(false) //disable discovery
  24. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  25. .loadbalancerEnabled(true) // enable loadbalance
  26. .maxConnectionSize(5) // set max Connection
  27. .retryEnabled(true) // set retry
  28. .readTimeout(30000) //set read timeout
  29. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  30. .setAuthTokenKey("hadoop") // set submit user
  31. .setAuthTokenValue("hadoop"))) // set passwd or token (setAuthTokenValue("test"))
  32. .setDWSVersion("v1") //linkis rest version v1
  33. .build();
  34. // 2. new Client(Linkis Client) by clientConfig
  35. private static UJESClient client = new UJESClientImpl(clientConfig);
  36. public static void main(String[] args) {
  37. String user = "hadoop"; // execute user
  38. String executeCode = "df=spark.sql(\"show tables\")\n" +
  39. "show(df)"; // code support:sql/hql/py/scala
  40. try {
  41. System.out.println("user : " + user + ", code : [" + executeCode + "]");
  42. // 3. build job and execute
  43. JobExecuteResult jobExecuteResult = toSubmit(user, executeCode);
  44. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  45. // 4. get job jonfo
  46. JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
  47. int sleepTimeMills = 1000;
  48. int logFromLen = 0;
  49. int logSize = 100;
  50. while (!jobInfoResult.isCompleted()) {
  51. // 5. get progress and log
  52. JobProgressResult progress = client.progress(jobExecuteResult);
  53. System.out.println("progress: " + progress.getProgress());
  54. JobLogResult logRes = client.log(jobExecuteResult, logFromLen, logSize);
  55. logFromLen = logRes.fromLine();
  56. // 0: info 1: warn 2: error 3: all
  57. System.out.println(logRes.log().get(3));
  58. Utils.sleepQuietly(sleepTimeMills);
  59. jobInfoResult = client.getJobInfo(jobExecuteResult);
  60. }
  61. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  62. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  63. // multiple result sets will be generated)
  64. String resultSet = jobInfo.getResultSetList(client)[0];
  65. // 7. get resultContent
  66. ResultSetResult resultSetResult = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build());
  67. System.out.println("metadata: " + resultSetResult.getMetadata()); // column name type
  68. System.out.println("res: " + resultSetResult.getFileContent()); //row data
  69. } catch (Exception e) {
  70. e.printStackTrace();// please use log
  71. IOUtils.closeQuietly(client);
  72. }
  73. IOUtils.closeQuietly(client);
  74. }
  75. private static JobExecuteResult toSubmit(String user, String code) {
  76. // 1. build params
  77. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  78. Map<String, Object> labels = new HashMap<String, Object>();
  79. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  80. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName");// required execute user and creator eg:hadoop-IDE
  81. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
  82. // set start up map :engineConn start params
  83. Map<String, Object> startupMap = new HashMap<String, Object>(16);
  84. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  85. startupMap.put("spark.executor.instances", 2);
  86. // setting linkis params
  87. startupMap.put("wds.linkis.rm.yarnqueue", "dws");
  88. // 2. build jobSubmitAction
  89. JobSubmitAction jobSubmitAction = JobSubmitAction.builder()
  90. .addExecuteCode(code)
  91. .setStartupParams(startupMap)
  92. .setUser(user) //submit user
  93. .addExecuteUser(user) // execute user
  94. .setLabels(labels)
  95. .build();
  96. // 3. to execute
  97. return client.submit(jobSubmitAction);
  98. }
  99. }

Run the above code to interact with Linkis

3. Scala test code

Create the Scala test class LinkisClientTest. Refer to the comments to understand the purposes of those interfaces:

  1. package org.apache.linkis.client.test
  2. import org.apache.commons.io.IOUtils
  3. import org.apache.commons.lang3.StringUtils
  4. import org.apache.linkis.common.utils.Utils
  5. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  6. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  7. import org.apache.linkis.manager.label.constant.LabelKeyConstant
  8. import org.apache.linkis.ujes.client.request._
  9. import org.apache.linkis.ujes.client.response._
  10. import java.util
  11. import java.util.concurrent.TimeUnit
  12. object LinkisClientTest {
  13. // 1. build config: linkis gateway url
  14. val clientConfig = DWSClientConfigBuilder.newBuilder()
  15. .addServerUrl("http://127.0.0.1:9001/") //set linkis-mg-gateway url: http://{ip}:{port}
  16. .connectionTimeout(30000) //connectionTimeOut
  17. .discoveryEnabled(false) //disable discovery
  18. .discoveryFrequency(1, TimeUnit.MINUTES) // discovery frequency
  19. .loadbalancerEnabled(true) // enable loadbalance
  20. .maxConnectionSize(5) // set max Connection
  21. .retryEnabled(false) // set retry
  22. .readTimeout(30000) //set read timeout
  23. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authen suppory static and Token
  24. .setAuthTokenKey("hadoop") // set submit user
  25. .setAuthTokenValue("hadoop") // set passwd or token (setAuthTokenValue("BML-AUTH"))
  26. .setDWSVersion("v1") //linkis rest version v1
  27. .build();
  28. // 2. new Client(Linkis Client) by clientConfig
  29. val client = UJESClient(clientConfig)
  30. def main(args: Array[String]): Unit = {
  31. val user = "hadoop" // execute user 用户需要和AuthTokenKey的值保持一致
  32. val executeCode = "df=spark.sql(\"show tables\")\n" +
  33. "show(df)"; // code support:sql/hql/py/scala
  34. try {
  35. // 3. build job and execute
  36. println("user : " + user + ", code : [" + executeCode + "]")
  37. // 推荐使用submit,支持传递任务label
  38. val jobExecuteResult = toSubmit(user, executeCode)
  39. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  40. // 4. get job jonfo
  41. var jobInfoResult = client.getJobInfo(jobExecuteResult)
  42. var logFromLen = 0
  43. val logSize = 100
  44. val sleepTimeMills: Int = 1000
  45. while (!jobInfoResult.isCompleted) {
  46. // 5. get progress and log
  47. val progress = client.progress(jobExecuteResult)
  48. println("progress: " + progress.getProgress)
  49. val logObj = client.log(jobExecuteResult, logFromLen, logSize)
  50. logFromLen = logObj.fromLine
  51. val logArray = logObj.getLog
  52. // 0: info 1: warn 2: error 3: all
  53. if (logArray != null && logArray.size >= 4 && StringUtils.isNotEmpty(logArray.get(3))) {
  54. println(s"log: ${logArray.get(3)}")
  55. }
  56. Utils.sleepQuietly(sleepTimeMills)
  57. jobInfoResult = client.getJobInfo(jobExecuteResult)
  58. }
  59. if (!jobInfoResult.isSucceed) {
  60. println("Failed to execute job: " + jobInfoResult.getMessage)
  61. throw new Exception(jobInfoResult.getMessage)
  62. }
  63. // 6. Get the result set list (if the user submits multiple SQLs at a time,
  64. // multiple result sets will be generated)
  65. val jobInfo = client.getJobInfo(jobExecuteResult)
  66. val resultSetList = jobInfoResult.getResultSetList(client)
  67. println("All result set list:")
  68. resultSetList.foreach(println)
  69. val oneResultSet = jobInfo.getResultSetList(client).head
  70. // 7. get resultContent
  71. val resultSetResult: ResultSetResult = client.resultSet(ResultSetAction.builder.setPath(oneResultSet).setUser(jobExecuteResult.getUser).build)
  72. println("metadata: " + resultSetResult.getMetadata) // column name type
  73. println("res: " + resultSetResult.getFileContent) //row data
  74. } catch {
  75. case e: Exception => {
  76. e.printStackTrace() //please use log
  77. }
  78. }
  79. IOUtils.closeQuietly(client)
  80. }
  81. def toSubmit(user: String, code: String): JobExecuteResult = {
  82. // 1. build params
  83. // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
  84. val labels: util.Map[String, Any] = new util.HashMap[String, Any]
  85. labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
  86. labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName"); // 请求的用户和应用名,两个参数都不能少,其中APPName不能带有"-"建议替换为"_"
  87. labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // 指定脚本类型
  88. val startupMap = new java.util.HashMap[String, Any]()
  89. // Support setting engine native parameters,For example: parameters of engines such as spark/hive
  90. startupMap.put("spark.executor.instances", 2);
  91. // setting linkis params
  92. startupMap.put("wds.linkis.rm.yarnqueue", "default");
  93. // 2. build jobSubmitAction
  94. val jobSubmitAction = JobSubmitAction.builder
  95. .addExecuteCode(code)
  96. .setStartupParams(startupMap)
  97. .setUser(user) //submit user
  98. .addExecuteUser(user) //execute user
  99. .setLabels(labels)
  100. .build
  101. // 3. to execute
  102. client.submit(jobSubmitAction)
  103. }
  104. }