Linkis 提供了方便的JAVA和SCALA调用的接口,只需要引入ujes-client的模块就可以进行使用

1. 引入依赖模块

  1. <dependency>
  2. <groupId>com.webank.wedatasphere.Linkis</groupId>
  3. <artifactId>Linkis-ujes-client</artifactId>
  4. <version>0.6.0</version>
  5. </dependency>

2. 建立测试类

建立Java的测试类UJESClientImplTestJ,具体接口含义可以见注释:

  1. package com.webank.bdp.dataworkcloud.ujes.client;
  2. import com.webank.wedatasphere.Linkis.common.utils.Utils;
  3. import com.webank.wedatasphere.Linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
  4. import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfig;
  5. import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfigBuilder;
  6. import com.webank.wedatasphere.Linkis.ujes.client.UJESClient;
  7. import com.webank.wedatasphere.Linkis.ujes.client.UJESClientImpl;
  8. import com.webank.wedatasphere.Linkis.ujes.client.request.JobExecuteAction;
  9. import com.webank.wedatasphere.Linkis.ujes.client.request.ResultSetAction;
  10. import com.webank.wedatasphere.Linkis.ujes.client.response.JobExecuteResult;
  11. import com.webank.wedatasphere.Linkis.ujes.client.response.JobInfoResult;
  12. import com.webank.wedatasphere.Linkis.ujes.client.response.JobProgressResult;
  13. import com.webank.wedatasphere.Linkis.ujes.client.response.JobStatusResult;
  14. import org.apache.commons.io.IOUtils;
  15. import java.util.concurrent.TimeUnit;
  16. public class UJESClientImplTestJ{
  17. public static void main(String[] args){
  18. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
  19. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
  20. .addUJESServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
  21. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
  22. .discoveryEnabled(true).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
  23. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
  24. .maxConnectionSize(5) //指定最大连接数,即最大并发数
  25. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
  26. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
  27. .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //认证key,一般为用户名; 认证value,一般为用户名对应的密码
  28. .setDWSVersion("v1").build(); //Linkis后台协议的版本,当前版本为v1
  29. // 2. 通过DWSClientConfig获取一个UJESClient
  30. UJESClient client = new UJESClientImpl(clientConfig);
  31. // 3. 开始执行代码
  32. JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder()
  33. .setCreator("LinkisClient-Test") //creator,请求Linkis的客户端的系统名,用于做系统级隔离
  34. .addExecuteCode("show tables") //ExecutionCode 请求执行的代码
  35. .setEngineType(JobExecuteAction.EngineType$.MODULE$.HIVE()) // 希望请求的Linkis的执行引擎类型,如Spark hive等
  36. .setUser("johnnwang") //User,请求用户;用于做用户级多租户隔离
  37. .build());
  38. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
  39. // 4. 获取脚本的执行状态
  40. JobStatusResult status = client.status(jobExecuteResult);
  41. while(!status.isCompleted()) {
  42. // 5. 获取脚本的执行进度
  43. JobProgressResult progress = client.progress(jobExecuteResult);
  44. Utils.sleepQuietly(500);
  45. status = client.status(jobExecuteResult);
  46. }
  47. // 6. 获取脚本的Job信息
  48. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
  49. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
  50. String resultSet = jobInfo.getResultSetList(client)[0];
  51. // 8. 通过一个结果集信息,获取具体的结果集
  52. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
  53. System.out.println("fileContents: " + fileContents);
  54. IOUtils.closeQuietly(client);
  55. }
  56. }

运行上述的代码即可以和Linkis进行交互

3. Scala测试代码:

  1. import java.util.concurrent.TimeUnit
  2. import com.webank.wedatasphere.Linkis.common.utils.Utils
  3. import com.webank.wedatasphere.Linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  4. import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfigBuilder
  5. import com.webank.wedatasphere.Linkis.ujes.client.request.JobExecuteAction.EngineType
  6. import com.webank.wedatasphere.Linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
  7. import org.apache.commons.io.IOUtils
  8. object UJESClientImplTest extends App {
  9. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
  10. val clientConfig = DWSClientConfigBuilder.newBuilder()
  11. .addUJESServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
  12. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
  13. .discoveryEnabled(true).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
  14. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
  15. .maxConnectionSize(5) //指定最大连接数,即最大并发数
  16. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
  17. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
  18. .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //认证key,一般为用户名; 认证value,一般为用户名对应的密码
  19. .setDWSVersion("v1").build() //Linkis后台协议的版本,当前版本为v1
  20. // 2. 通过DWSClientConfig获取一个UJESClient
  21. val client = UJESClient(clientConfig)
  22. // 3. 开始执行代码
  23. val jobExecuteResult = client.execute(JobExecuteAction.builder()
  24. .setCreator("LinkisClient-Test") //creator,请求Linkis的客户端的系统名,用于做系统级隔离
  25. .addExecuteCode("show tables") //ExecutionCode 请求执行的代码
  26. .setEngineType(EngineType.SPARK) // 希望请求的Linkis的执行引擎类型,如Spark hive等
  27. .setUser("${username}").build()) //User,请求用户;用于做用户级多租户隔离
  28. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
  29. // 4. 获取脚本的执行状态
  30. var status = client.status(jobExecuteResult)
  31. while(!status.isCompleted) {
  32. // 5. 获取脚本的执行进度
  33. val progress = client.progress(jobExecuteResult)
  34. val progressInfo = if(progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
  35. println("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)
  36. Utils.sleepQuietly(500)
  37. status = client.status(jobExecuteResult)
  38. }
  39. // 6. 获取脚本的Job信息
  40. val jobInfo = client.getJobInfo(jobExecuteResult)
  41. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
  42. val resultSet = jobInfo.getResultSetList(client).head
  43. // 8. 通过一个结果集信息,获取具体的结果集
  44. val fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
  45. println("fileContents: " + fileContents)
  46. IOUtils.closeQuietly(client)
  47. }