Data Source Manual

Introduce how to use the new feature function data source of version 1.1.0

  • Data source: We call database services that can provide data storage as databases, such as mysql/hive/kafka. The data source defines the configuration information for connecting to the actual database. The configuration information is mainly the address required for connection and user authentication information , connection parameters, etc. Stored in the linkisps_dm_datasource* table related to the linkis database
  • Metadata: simply refers to the metadata of the database, which refers to the data that defines the data structure and the data of various object structures of the database. For example, the database name, table name, column name, field length, type and other information data in the database.

linkis-datasource-client Client module, DataSourceRemoteClient for basic management of user data sources, and MetaDataRemoteClient for metadata query operations.

linkis-datasource-manager-server Data source management module, service name ps-data-source-manager. Perform basic management of data sources, and provide http interfaces such as adding, querying, modifying, and connection testing of external data sources. The rpc service is provided internally, which is convenient for the data element management module to call through rpc to query the necessary information needed to establish a connection to the database.

  • http interface documentation
  • http interface class org.apache.linkis.metadatamanager.server.restful
  • rpc interface class org.apache.linkis.metadatamanager.server.receiver

linkis-metedata-manager-server Data element management module, service name ps-metadatamanager. It provides the basic query function of the data metadata of the database, provides the http interface externally, and provides the rpc service internally, which is convenient for the data source management module to perform the connection test of the data source through the rpc call.

  • http interface documentation
  • http interface class org.apache.linkis.datasourcemanager.core.restful
  • rpc interface class org.apache.linkis.datasourcemanager.core.receivers

The functional structure diagram is as follows: datasource

  • The LinkisDataSourceRemoteClient client assembles the http request according to the request parameters,
  • HTTP request sent to linkis-ps-data-source-manager
  • linkis-ps-data-source-manager will perform basic parameter verification, some interfaces can only be operated by the administrator role
  • linkis-ps-data-source-manager performs basic data operations with the database
  • The data source test connection interface provided by linkis-ps-data-source-manager internally uses rpc to call the ps-metadatamanager method for connection test
  • The data result after the http request is processed will be mapped and converted from the result set to the entity class by annotating the DWSHttpMessageResult function

LinkisDataSourceRemoteClient interface

  • GetAllDataSourceTypesResult getAllDataSourceTypes(GetAllDataSourceTypesAction) Query all data source types
  • QueryDataSourceEnvResult queryDataSourceEnv(QueryDataSourceEnvAction) Query the cluster configuration information that can be used by the data source
  • GetInfoByDataSourceIdResult getInfoByDataSourceId(GetInfoByDataSourceIdAction): query data source information by data source id
  • QueryDataSourceResult queryDataSource(QueryDataSourceAction) query data source information
  • GetConnectParamsByDataSourceIdResult getConnectParams(GetConnectParamsByDataSourceIdAction) Get connection configuration parameters
  • CreateDataSourceResult createDataSource(CreateDataSourceAction) to create a data source
  • DataSourceTestConnectResult getDataSourceTestConnect(DataSourceTestConnectAction) to test whether the data source can be connected normally
  • DeleteDataSourceResult deleteDataSource(DeleteDataSourceAction) deletes the data source
  • ExpireDataSourceResult expireDataSource(ExpireDataSourceAction) sets the data source to expired state
  • GetDataSourceVersionsResult getDataSourceVersions(GetDataSourceVersionsAction) Query the version list of the data source configuration
  • PublishDataSourceVersionResult publishDataSourceVersion(PublishDataSourceVersionAction) publishes the data source configuration version
  • UpdateDataSourceResult updateDataSource(UpdateDataSourceAction) to update the data source
  • UpdateDataSourceParameterResult updateDataSourceParameter(UpdateDataSourceParameterAction) Update data source configuration parameters
  • GetKeyTypeDatasourceResult getKeyDefinitionsByType(GetKeyTypeDatasourceAction) Query the configuration properties required by a data source type

The functional structure diagram is as follows: metadata

  • LinkisMetaDataRemoteClient client, according to the request parameters, assemble the http request,
  • HTTP request sent to ps-metadatamanager
  • ps-metadatamanager will perform basic parameter verification,
  • The request will send an RPC request to linkis-ps-data-source-manager based on the parameter datasourceId to obtain the type of the data source, connection parameters such as username and password, etc.
  • After getting the information required for the connection, load the lib package in the corresponding directory according to the data source type, and call the corresponding function method through the reflection mechanism to query the metadata information
  • The data result after the http request is processed will be mapped and converted from the result set to the entity class by annotating the DWSHttpMessageResult function

LinkisMetaDataRemoteClient interface

  • MetadataGetDatabasesResult getDatabases(MetadataGetDatabasesAction) query database list
  • MetadataGetTablesResult getTables(MetadataGetTablesAction) query table data
  • MetadataGetTablePropsResult getTableProps(MetadataGetTablePropsAction)
  • MetadataGetPartitionsResult getPartitions(MetadataGetPartitionsAction) query partition table
  • MetadataGetColumnsResult getColumns(MetadataGetColumnsAction) query data table fields
  1. linkis-public-enhancements/linkis-datasource
  2. ├── linkis-datasource-client //client code
  3. ├── linkis-datasource-manager //Datasource management module
  4. ├── common //Data source management common module
  5. └── server //Data source management service module
  6. ├── linkis-metadata //Module existing in the old version, reserved
  7. ├── linkis-metadata-manager //Data Metadata Management Module
  8. ├── common //Data element management common module
  9. ├── server //Data element management service module
  10. └── service //Supported data sources
  11. ├── elasticsearch
  12. ├── hive
  13. ├── kafka
  14. └── mysql
  1. /lib/linkis-public-enhancements/
  2. ├── linkis-ps-data-source-manager
  3. ├── linkis-ps-metadatamanager
  4. └── service
  5. ├── elasticsearch
  6. ├── hive
  7. ├── kafka
  8. └── mysql

wds.linkis.server.mdm.service.lib.dir controls the classpath loaded during reflection calls. The default value of the parameter is /lib/linkis-public-enhancements/linkis-ps-metadatamanager/service

See Tuning and Troubleshooting>Parameter List#datasourceConfiguration Parameters

1.Some data drivers require to be installed by user’s self, because they are possibly not compatible with the Apache license

2.Extra data driver directory: ./lib/linkis-public-enhancements/linkis-ps-publicservice

3.Data drivers list

Driver NameDriver VersionDownload Link
db2db2jcc4https://www.ibm.com/support/pages/db2-jdbc-driver-versions-and-downloads
damengDmJdbcDriver18https://download.dameng.com/eco/docs/JAVA_Mybatis_lib.zip
mysql5.1.34https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.34/mysql-connector-java-5.1.34.jar
kingbasekingbase8http://maven.jeecg.org/nexus/content/repositories/jeecg/kingbase/kingbase8/8/kingbase8-8.jar
greenplum5.1.4https://network.pivotal.io/products/vmware-tanzu-greenplum#/releases/985537/file_groups/5749
postgresql42.3.1https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.1/postgresql-42.3.1.jar
sqlserversqlserver2000https://www.microsoft.com/en-us/download/details.aspx?id=11774
oracle11.2.0.3http://www.datanucleus.org/downloads/maven2/oracle/ojdbc6/11.2.0.3/ojdbc6-11.2.0.3.jar

In the startup script of linkis, the two services related to the data source (ps-data-source-manager, ps-metadatamanager) will not be started by default. If you want to use the data source service, you can enable it in the following ways: Modify export ENABLE_METADATA_MANAGER=true in $LINKIS_CONF_DIR/linkis-env.sh to true. When the service is started and stopped through linkis-start-all.sh/linkis-stop-all.sh, the data source service will be started and stopped.

Check whether the service starts normally through the eureka page

datasource eureka

Data Source Manual - 图4note

    1. Management of linkis The web version needs to be upgraded to version 1.1.0 to use the data source management page function on the linkis console.
    1. At present, there are jar packages of mysql/hive/kafak/elasticsearch in the data source, but the kafak/elasticsearch data source has not been strictly tested, and the complete availability of functions is not guaranteed.

The use of data sources is divided into three steps:

  • step 1. Create data source/configure connection parameters
  • step 2. Publish the data source and select the connection configuration version to use
  • step 3. Data source usage, query metadata information , hive/kafka/elasticsearch configuration is associated with the corresponding cluster environment configuration.

You can only create configuration data sources, and test whether the data sources can be connected normally, and cannot directly query metadata

Implement a JDBC generic module, and then choose any item mentioned below on the web UI.

Data SourceLink
mysqlhttps://www.mysql.com
oraclehttps://www.oracle.com/database/technologies
kingbasehttps://www.kingbase.com.cn
postgresqlhttps://www.postgresql.org
sqlserverhttps://www.microsoft.com/en-us/sql-server
db2https://www.ibm.com/products/db2/database
greenplumhttps://greenplum.org
dmhttps://dmdatabases.com
dorishttps://doris.apache.org
clickhousehttps://clickhouse.com

Take MySQL as an example:

Data Source Management > New Data Source > Select MySQL Type

Enter relevant configuration information

create mysql

After the entry is successful, you can pass the connection test to verify whether the connection can be made normally

Data Source Manual - 图6note

  • The system to which the data source created through the management console belongs is Linkis
  • After the creation is successful, it needs to be published (switching and selecting the configuration parameter version when publishing) before it can be used normally

Publishing of the configuration (using that configuration for the connection to the data source):

Click on the version and then pop up the page to select the appropriate configuration to publish

publish

scala code example:

  1. package org.apache.linkis.datasource.client
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import org.apache.linkis.common.utils.JsonUtils
  5. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
  6. import org.apache.linkis.datasource.client.request._
  7. import org.apache.linkis.datasource.client.response._
  8. import org.apache.linkis.datasourcemanager.common.domain.DataSource
  9. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  10. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  11. import org.junit.jupiter.api.{Disabled, Test}
  12. object TestMysqlClient {
  13. val gatewayUrl = "http://127.0.0.1:9001"
  14. val clientConfig = DWSClientConfigBuilder.newBuilder
  15. .addServerUrl(gatewayUrl)
  16. .connectionTimeout(30000)
  17. .discoveryEnabled(false)
  18. .discoveryFrequency(1, TimeUnit.MINUTES)
  19. .loadbalancerEnabled(true)
  20. .maxConnectionSize(1)
  21. .retryEnabled(false)
  22. .readTimeout(30000)
  23. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  24. .setAuthTokenKey("hadoop")
  25. .setAuthTokenValue("xxxxx")
  26. .setDWSVersion("v1")
  27. val dataSourceclient = new LinkisDataSourceRemoteClient(clientConfig.build())
  28. val clientConfig2 = DWSClientConfigBuilder.newBuilder
  29. .addServerUrl(gatewayUrl)
  30. .connectionTimeout(30000)
  31. .discoveryEnabled(false)
  32. .discoveryFrequency(1, TimeUnit.MINUTES)
  33. .loadbalancerEnabled(true)
  34. .maxConnectionSize(1)
  35. .retryEnabled(false)
  36. .readTimeout(30000)
  37. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  38. .setAuthTokenKey("hadoop")
  39. .setAuthTokenValue("xxxxx")
  40. .setDWSVersion("v1")
  41. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig2.build())
  42. @Test
  43. @Disabled
  44. def testCreateDataSourceMysql: Unit = {
  45. val user = "hadoop"
  46. val system = "Linkis"
  47. //create data source
  48. val dataSource = new DataSource();
  49. val dataSourceName = "for-mysql-test"
  50. dataSource.setDataSourceName(dataSourceName)
  51. dataSource.setDataSourceDesc("this is for mysql test")
  52. dataSource.setCreateSystem(system)
  53. dataSource.setDataSourceTypeId(1L)
  54. val map = JsonUtils.jackson.readValue(JsonUtils.jackson.writeValueAsString(dataSource), new util.HashMap[String, Any]().getClass)
  55. val createDataSourceAction: CreateDataSourceAction = CreateDataSourceAction.builder()
  56. .setUser(user)
  57. .addRequestPayloads(map)
  58. .build()
  59. val createDataSourceResult: CreateDataSourceResult = dataSourceclient.createDataSource(createDataSourceAction)
  60. val dataSourceId = createDataSourceResult.getInsertId
  61. // set connection parameters
  62. val params = new util.HashMap[String, Any]
  63. val connectParams = new util.HashMap[String, Any]
  64. connectParams.put("host", "127.0.0.1")
  65. connectParams.put("port", "36000")
  66. connectParams.put("username", "db username")
  67. connectParams.put("password", "db password")
  68. params.put("connectParams", connectParams)
  69. params.put("comment", "init")
  70. val updateParameterAction: UpdateDataSourceParameterAction = UpdateDataSourceParameterAction.builder()
  71. .setUser(user)
  72. .setDataSourceId(dataSourceId)
  73. .addRequestPayloads(params)
  74. .build()
  75. val updateParameterResult: UpdateDataSourceParameterResult = dataSourceclient.updateDataSourceParameter(updateParameterAction)
  76. val version: Long = updateParameterResult.getVersion
  77. //publish configuration version
  78. dataSourceclient.publishDataSourceVersion(
  79. PublishDataSourceVersionAction.builder()
  80. .setDataSourceId(dataSourceId)
  81. .setUser(user)
  82. .setVersion(version)
  83. .build())
  84. // use example
  85. val metadataGetDatabasesAction: MetadataGetDatabasesAction = MetadataGetDatabasesAction.builder()
  86. .setUser(user)
  87. .setDataSourceName(dataSourceName)
  88. .setSystem(system)
  89. .build()
  90. val metadataGetDatabasesResult: MetadataGetDatabasesResult = metaDataClient.getDatabases(metadataGetDatabasesAction)
  91. val metadataGetTablesAction: MetadataGetTablesAction = MetadataGetTablesAction.builder()
  92. .setUser(user)
  93. .setDataSourceName(dataSourceName)
  94. .setDatabase("linkis")
  95. .setSystem(system)
  96. .build()
  97. val metadataGetTablesResult: MetadataGetTablesResult = metaDataClient.getTables(metadataGetTablesAction)
  98. val metadataGetColumnsAction = MetadataGetColumnsAction.builder()
  99. .setUser(user)
  100. .setDataSourceName(dataSourceName)
  101. .setDatabase("linkis")
  102. .setSystem(system)
  103. .setTable("linkis_datasource")
  104. .build()
  105. val metadataGetColumnsResult: MetadataGetColumnsResult = metaDataClient.getColumns(metadataGetColumnsAction)
  106. }
  107. }
  • MySQL
  • Oracle
  • KingBase
  • PostgreSQL
  • SQLServer
  • DB2
  • Greenplum
  • DM
  • Doris
  • ClickHouse
  • TiDB
  • Starrocks
  • Gaussdb
  • OceanBase

You can only create configuration data sources, and test whether the data sources can be connected normally, and cannot directly query metadata

First need to configure the cluster environment information Table linkis_ps_dm_datasource_env

  1. INSERT INTO `linkis_ps_dm_datasource_env`
  2. (`env_name`, `env_desc`, `datasource_type_id`, `parameter`, `create_user`, `modify_user`)
  3. VALUES
  4. ('testEnv', 'Test Environment', 4,
  5. '{\r\n "uris": "thrift://clustername:9083",\r\n "keytab": "4dd408ad-a2f9-4501-83b3-139290977ca2",\r\n "principle": "hadoop @WEBANK.COM",\r\n "hadoopConf":{"hive.metastore.execute.setugi":"true"}\r\n}',
  6. 'user','user');

The primary key id is used as the envId. When establishing a connection, you need to use this envId parameter to obtain information about the cluster configuration. Explanation of configuration fields:

  1. {
  2. "uris": "thrift://clustername:9083", # Mandatory If kerberos authentication is not enabled, the following [keytab][principle] parameters can be empty
  3. "keytab": "bml resource id", //keytab stores the resourceId in the material library, and currently needs to be manually uploaded through the http interface.
  4. "principle": "hadoop@WEBANK.COM" //Authentication principle
  5. "hadoopConf":{} //Additional connection parameters are optional
  6. }

The resourceId acquisition method of keytab, the basic data management function is still under planning, and can be obtained through the http interface request reference example

  1. curl --form "file=@file path" \
  2. --form system=subsystem name \
  3. -H "Token-Code: authentication token" \
  4. -H "Token-User: authentication user name" \
  5. http://linkis-gatewayip:port/api/rest_j/v1/bml/upload
  6. Example:
  7. curl --form "file=@/appcom/keytab/hadoop.keytab" \
  8. --form system=ABCD \
  9. -H "Token-Code:QML-AUTH" \
  10. -H "Token-User:hadoop" \
  11. http://127.0.0.1:9001/api/rest_j/v1/bml/upload
  12. The resourceId in the request result is the corresponding `bml resource id` value
  13. {"method":"/bml/upload","status":0,"message":"The task of submitting and uploading resources was successful","data":{"resourceId": "6e4e54fc-cc97-4d0d-8d5e-a311129ec84e","version":"v000001","taskId":35}}

Create on the web:

create_hive

  1. package org.apache.linkis.datasource.client
  2. import java.util
  3. import java.util.concurrent.TimeUnit
  4. import org.apache.linkis.common.utils.JsonUtils
  5. import org.apache.linkis.datasource.client.impl.{LinkisDataSourceRemoteClient, LinkisMetaDataRemoteClient}
  6. import org.apache.linkis.datasource.client.request._
  7. import org.apache.linkis.datasource.client.response._
  8. import org.apache.linkis.datasourcemanager.common.domain.DataSource
  9. import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
  10. import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder
  11. import org.junit.jupiter.api.{Disabled, Test}
  12. object TestHiveClient {
  13. val gatewayUrl = "http://127.0.0.1:9001"
  14. val clientConfig = DWSClientConfigBuilder.newBuilder
  15. .addServerUrl(gatewayUrl)
  16. .connectionTimeout(30000)
  17. .discoveryEnabled(false)
  18. .discoveryFrequency(1, TimeUnit.MINUTES)
  19. .loadbalancerEnabled(true)
  20. .maxConnectionSize(1)
  21. .retryEnabled(false)
  22. .readTimeout(30000)
  23. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  24. .setAuthTokenKey("hadoop")
  25. .setAuthTokenValue("xxxxx")
  26. .setDWSVersion("v1")
  27. val dataSourceclient = new LinkisDataSourceRemoteClient(clientConfig.build())
  28. val clientConfig2 = DWSClientConfigBuilder.newBuilder
  29. .addServerUrl(gatewayUrl)
  30. .connectionTimeout(30000)
  31. .discoveryEnabled(false)
  32. .discoveryFrequency(1, TimeUnit.MINUTES)
  33. .loadbalancerEnabled(true)
  34. .maxConnectionSize(1)
  35. .retryEnabled(false)
  36. .readTimeout(30000)
  37. .setAuthenticationStrategy(new StaticAuthenticationStrategy)
  38. .setAuthTokenKey("hadoop")
  39. .setAuthTokenValue("xxxxx")
  40. .setDWSVersion("v1")
  41. val metaDataClient = new LinkisMetaDataRemoteClient(clientConfig2.build())
  42. @Test
  43. @Disabled
  44. def testCreateDataSourceMysql: Unit = {
  45. val user = "hadoop"
  46. val system = "Linkis"
  47. //create data source
  48. val dataSource = new DataSource();
  49. val dataSourceName = "for-hive-test"
  50. dataSource.setDataSourceName(dataSourceName)
  51. dataSource.setDataSourceDesc("this is for hive test")
  52. dataSource.setCreateSystem(system)
  53. dataSource.setDataSourceTypeId(4L)
  54. val map = JsonUtils.jackson.readValue(JsonUtils.jackson.writeValueAsString(dataSource), new util.HashMap[String, Any]().getClass)
  55. val createDataSourceAction: CreateDataSourceAction = CreateDataSourceAction.builder()
  56. .setUser(user)
  57. .addRequestPayloads(map)
  58. .build()
  59. val createDataSourceResult: CreateDataSourceResult = dataSourceclient.createDataSource(createDataSourceAction)
  60. val dataSourceId = createDataSourceResult.getInsertId
  61. // set connection parameters
  62. val params = new util.HashMap[String, Any]
  63. val connectParams = new util.HashMap[String, Any]
  64. connectParams.put("envId", "3")
  65. params.put("connectParams", connectParams)
  66. params.put("comment", "init")
  67. val updateParameterAction: UpdateDataSourceParameterAction = UpdateDataSourceParameterAction.builder()
  68. .setUser(user)
  69. .setDataSourceId(dataSourceId)
  70. .addRequestPayloads(params)
  71. .build()
  72. val updateParameterResult: UpdateDataSourceParameterResult = dataSourceclient.updateDataSourceParameter(updateParameterAction)
  73. val version: Long = updateParameterResult.getVersion
  74. //publish configuration version
  75. dataSourceclient.publishDataSourceVersion(
  76. PublishDataSourceVersionAction.builder()
  77. .setDataSourceId(dataSourceId)
  78. .setUser(user)
  79. .setVersion(version)
  80. .build())
  81. // use example
  82. val metadataGetDatabasesAction: MetadataGetDatabasesAction = MetadataGetDatabasesAction.builder()
  83. .setUser(user)
  84. .setDataSourceName(dataSourceName)
  85. .setSystem(system)
  86. .build()
  87. val metadataGetDatabasesResult: MetadataGetDatabasesResult = metaDataClient.getDatabases(metadataGetDatabasesAction)
  88. val metadataGetTablesAction: MetadataGetTablesAction = MetadataGetTablesAction.builder()
  89. .setUser(user)
  90. .setDataSourceName(dataSourceName)
  91. .setDatabase("linkis_test_ind")
  92. .setSystem(system)
  93. .build()
  94. val metadataGetTablesResult: MetadataGetTablesResult = metaDataClient.getTables(metadataGetTablesAction)
  95. val metadataGetColumnsAction = MetadataGetColumnsAction.builder()
  96. .setUser(user)
  97. .setDataSourceName(dataSourceName)
  98. .setDatabase("linkis_test_ind")
  99. .setSystem(system)
  100. .setTable("test")
  101. .build()
  102. val metadataGetColumnsResult: MetadataGetColumnsResult = metaDataClient.getColumns(metadataGetColumnsAction)
  103. }
  104. }