Flink 引擎使用文档

本文主要介绍在 Linkis 中,flink 引擎插件的安装、使用和配置。

如果您希望在您的服务器上使用 Flink 引擎,您需要保证以下的环境变量已经设置正确并且引擎的启动用户是有这些环境变量的。

强烈建议您在执行 Flink 任务之前,检查下执行用户的这些环境变量。具体方式是

  1. sudo su - ${username}
  2. echo ${JAVA_HOME}
  3. echo ${FLINK_HOME}
环境变量名环境变量内容备注
JAVA_HOMEJDK安装路径必须
HADOOP_HOMEHadoop安装路径必须
HADOOP_CONF_DIRHadoop配置路径Linkis启动Flink引擎采用的Flink on yarn的模式,所以需要yarn的支持。
FLINK_HOMEFlink安装路径必须
FLINK_CONF_DIRFlink配置路径必须,如 ${FLINK_HOME}/conf
FLINK_LIB_DIRFlink包路径必须,${FLINK_HOME}/lib

方式一:直接下载引擎插件包

Linkis 引擎插件下载

方式二:单独编译引擎插件(需要有 maven 环境)

  1. # 编译
  2. cd ${linkis_code_dir}/linkis-engineconn-plugins/flink/
  3. mvn clean install
  4. # 编译出来的引擎插件包,位于如下目录中
  5. ${linkis_code_dir}/linkis-engineconn-plugins/flink/target/out/

EngineConnPlugin 引擎插件安装

将 2.1 中的引擎插件包上传到服务器的引擎目录下

  1. ${LINKIS_HOME}/lib/linkis-engineconn-plugins

上传后目录结构如下所示

  1. linkis-engineconn-plugins/
  2. ├── flink
  3. ├── dist
  4. └── 1.12.2
  5. ├── conf
  6. └── lib
  7. └── plugin
  8. └── 1.12.2

通过重启 linkis-cg-linkismanager 服务刷新引擎

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-linkismanager

可以查看数据库中的 linkis_engine_conn_plugin_bml_resources 这张表的 last_update_time 是否为触发刷新的时间。

  1. #登陆到linkis的数据库
  2. select * from linkis_cg_engine_conn_plugin_bml_resources;

LinkisFlink 引擎是通过 flink on yarn 的方式进行启动的,所以需要指定用户使用的队列,如下图所示。

yarn

  1. sh ./bin/linkis-cli -engineType flink-1.12.2 \
  2. -codeType sql -code "show tables" \
  3. -submitUser hadoop -proxyUser hadoop

更多 Linkis-Cli 命令参数参考: Linkis-Cli 使用

FlinkSQL 可以支持多种数据源,例如 binlog , kafka , hive 等,如果您想要在 Flink 代码中使用这些数据源,您需要将这些 connector 的插件 jar 包放置到 Flink 引擎的 lib 中,并重启下 LinkisEnginePlugin 服务。如你想要在您的 FlinkSQL 中使用 binlog 作为数据源,那么您需要将 flink-connector-mysql-cdc-1.1.1.jar 放置到 Flink 引擎的 lib 中。

为了方便大家进行采样调试,我们在 Scriptis 新增了 fql 的脚本类型,专门用于执行 FlinkSQL 。但是需要保证您的 DSS 已经升级到 DSS1.0.0 。升级到 DSS1.0.0 之后,您可以直接进入 Scriptis ,新建 fql 脚本进行编辑和执行。

FlinkSQL 的编写示例,以 binlog 为例

  1. CREATE TABLE mysql_binlog (
  2. id INT NOT NULL,
  3. name STRING,
  4. age INT
  5. ) WITH (
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = 'ip',
  8. 'port' = 'port',
  9. 'username' = 'username',
  10. 'password' = 'password',
  11. 'database-name' = 'dbname',
  12. 'table-name' = 'tablename',
  13. 'debezium.snapshot.locking.mode' = 'none' --建议添加,不然会要求锁表
  14. );
  15. select * from mysql_binlog where id > 10;

Scriptis 中使用 select 语法进行调试的时候,Flink 引擎会有一个自动 cancel 的机制,即到了指定的时间或者采样的行数到了指定的数量,Flink 引擎将会主动将任务 cancel ,并且将已经获取到的结果集持久化,然后前端会调用打开结果集的接口将结果集在前端进行展示。

OnceEngineConn 的使用方式是用于正式启动 Flink 的流式应用,具体的是通过 LinkisManagerClient 调用 LinkisManager 的createEngineConn 的接口,并将代码发给创建的 Flink 引擎,然后 Flink 引擎就开始执行,此方式可以被其他系统进行调用,比如 StreamisClient 的使用方式也很简单,首先新建一个 maven 项目,或者在您的项目中引入以下的依赖。

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

然后新建 scala 测试文件,点击执行,就完成了从一个 binlog 数据进行解析并插入到另一个 mysql 数据库的表中。但是需要注意的是,您必须要在 maven 项目中新建一个 resources 目录,放置一个 linkis.properties 文件,并指定 linkisgateway 地址和 api 版本,如

  1. wds.linkis.server.version=v1
  2. wds.linkis.gateway.url=http://ip:9001/
  1. object OnceJobTest {
  2. def main(args: Array[String]): Unit = {
  3. val sql = """CREATE TABLE mysql_binlog (
  4. | id INT NOT NULL,
  5. | name STRING,
  6. | age INT
  7. |) WITH (
  8. | 'connector' = 'mysql-cdc',
  9. | 'hostname' = 'ip',
  10. | 'port' = 'port',
  11. | 'username' = '${username}',
  12. | 'password' = '${password}',
  13. | 'database-name' = '${database}',
  14. | 'table-name' = '${tablename}',
  15. | 'debezium.snapshot.locking.mode' = 'none'
  16. |);
  17. |CREATE TABLE sink_table (
  18. | id INT NOT NULL,
  19. | name STRING,
  20. | age INT,
  21. | primary key(id) not enforced
  22. |) WITH (
  23. | 'connector' = 'jdbc',
  24. | 'url' = 'jdbc:mysql://${ip}:port/${database}',
  25. | 'table-name' = '${tablename}',
  26. | 'driver' = 'com.mysql.jdbc.Driver',
  27. | 'username' = '${username}',
  28. | 'password' = '${password}'
  29. |);
  30. |INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog;
  31. |""".stripMargin
  32. val onceJob = SimpleOnceJob.builder().setCreateService("Flink-Test").addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "flink-1.12.2")
  33. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "hadoop-Streamis").addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  34. .addStartupParam(Configuration.IS_TEST_MODE.key, true)
  35. // .addStartupParam("label." + LabelKeyConstant.CODE_TYPE_KEY, "sql")
  36. .setMaxSubmitTime(300000)
  37. .addExecuteUser("hadoop").addJobContent("runType", "sql").addJobContent("code", sql).addSource("jobName", "OnceJobTest")
  38. .build()
  39. onceJob.submit()
  40. println(onceJob.getId)
  41. onceJob.waitForCompleted()
  42. System.exit(0)
  43. }
  44. }