Flink 引擎使用文档

本文主要介绍在Linkis1.X中,flink引擎的配置、部署和使用。

如果您希望在您的服务器上使用Flink引擎,您需要保证以下的环境变量已经设置正确并且引擎的启动用户是有这些环境变量的。

强烈建议您在执行flink任务之前,检查下执行用户的这些环境变量。具体方式是

  1. sudo su - ${username}
  2. echo ${JAVA_HOME}
  3. echo ${FLINK_HOME}
环境变量名环境变量内容备注
JAVA_HOMEJDK安装路径必须
HADOOP_HOMEHadoop安装路径必须
HADOOP_CONF_DIRHadoop配置路径Linkis启动Flink引擎采用的Flink on yarn的模式,所以需要yarn的支持。
FLINK_HOMEFlink安装路径必须
FLINK_CONF_DIRFlink配置路径必须,如 ${FLINK_HOME}/conf
FLINK_LIB_DIRFlink包路径必须,${FLINK_HOME}/lib

表1-1 环境配置清单

Linkis 1.0.2及以上支持的Flink版本是Flink1.12.2,理论上Linkis 1.0.2+可以支持各个版本的Flink,但是Flink各个版本之前的API变化太大,可能需要您按照API的变化修改Linkis中flink引擎的代码并重新编译。

注意: 编译flink引擎之前需要进行linkis项目全量编译
Linkis Flink引擎默认在Linkis1.0.2+不会安装,需要您手动进行编译并进行安装。

  1. 单独编译flink的方式
  2. ${linkis_code_dir}/linkis-enginepconn-lugins/engineconn-plugins/flink/
  3. mvn clean install

安装方式是将编译出来的引擎包,位置在

  1. ${linkis_code_dir}/linkis-enginepconn-lugins/engineconn-plugins/flink/target/flink-engineconn.zip

然后部署到

  1. ${LINKIS_HOME}/lib/linkis-engineplugins

并重启linkis-engineplugin

  1. cd ${LINKIS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-engineplugin

engineplugin更详细的介绍可以参看下面的文章。
https://linkis.apache.org/zh-CN/docs/1.1.1/deployment/engine-conn-plugin-installation

Linkis1.X是通过标签来进行的,所以需要在我们数据库中插入数据,插入的方式如下文所示。

EngineConnPlugin引擎插件安装

准备操作,队列设置

Linkis1.X的Flink引擎是通过flink on yarn的方式进行启动的,所以需要指定用户使用的队列。指定队列的方式如图3-1所示。

yarn

图3-1 队列设置

Linkis的Flink引擎有两种执行方式,一种是ComputationEngineConn方式,该方式主要是在DSS-Scriptis或者Streamis-Datasource进行使用,用于调试采样,验证flink代码的正确性;另一种方式是OnceEngineConn方式,该方式主要是用于在Streamis生产中心用于启动一个流式应用。

准备知识,FlinkSQL的Connector插件

FlinkSQL可以支持多种数据源,例如binlog,kafka,hive等,如果您想要在Flink代码中使用这些数据源,您需要将这些connector的插件jar包放置到flink引擎的lib中,并重启下Linkis的EnginePlugin服务。如你想要在您的FlinkSQL中使用binlog作为数据源,那么您需要将flink-connector-mysql-cdc-1.1.1.jar放置到flink引擎的lib中。

  1. cd ${LINKS_HOME}/sbin
  2. sh linkis-daemon.sh restart cg-engineplugin

3.1 ComputationEngineConn方式

为了方便大家进行采样调试,我们在Scriptis新增了fql的脚本类型,专门用于执行FlinkSQL。但是需要保证您的DSS已经升级到DSS1.0.0。升级到DSS1.0.0之后,您可以直接进入Scriptis,新建fql脚本进行编辑和执行。

FlinkSQL的编写示例,以binlog为例

  1. CREATE TABLE mysql_binlog (
  2. id INT NOT NULL,
  3. name STRING,
  4. age INT
  5. ) WITH (
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = 'ip',
  8. 'port' = 'port',
  9. 'username' = 'username',
  10. 'password' = 'password',
  11. 'database-name' = 'dbname',
  12. 'table-name' = 'tablename',
  13. 'debezium.snapshot.locking.mode' = 'none' --建议添加,不然会要求锁表
  14. );
  15. select * from mysql_binlog where id > 10;

在Scriptis中使用select语法进行调试的时候,Flink引擎会有一个自动cancel的机制,即到了指定的时间或者采样的行数到了指定的数量,Flink引擎将会主动将任务cancel,并且将已经获取到的结果集持久化,然后前端会调用打开结果集的接口将结果集在前端进行展示。

3.2 通过Linkis-cli进行任务提交

Linkis 1.0后提供了cli的方式提交任务,我们只需要指定对应的EngineConn和CodeType标签类型即可,Hive的使用如下:

  1. sh ./bin/linkis-cli -engineType flink-1.12.2 -codeType sql -code "show tables" -submitUser hadoop -proxyUser hadoop

具体使用可以参考: Linkis CLI Manual.

3.3 OnceEngineConn方式

OnceEngineConn的使用方式是用于正式启动Flink的流式应用,具体的是通过LinkisManagerClient调用LinkisManager的createEngineConn的接口,并将代码发给创建的Flink引擎,然后Flink引擎就开始执行,此方式可以被其他系统进行调用,比如Streamis。Client的使用方式也很简单,首先新建一个maven项目,或者在您的项目中引入以下的依赖

  1. <dependency>
  2. <groupId>org.apache.linkis</groupId>
  3. <artifactId>linkis-computation-client</artifactId>
  4. <version>${linkis.version}</version>
  5. </dependency>

然后新建scala测试文件,点击执行,就完成了从一个binlog数据进行解析并插入到另一个mysql数据库的表中。但是需要注意的是,您必须要在maven项目中新建一个resources目录,放置一个linkis.properties文件,并指定linkis的gateway地址和api版本,如

  1. wds.linkis.server.version=v1
  2. wds.linkis.gateway.url=http://ip:9001/
  1. object OnceJobTest {
  2. def main(args: Array[String]): Unit = {
  3. val sql = """CREATE TABLE mysql_binlog (
  4. | id INT NOT NULL,
  5. | name STRING,
  6. | age INT
  7. |) WITH (
  8. | 'connector' = 'mysql-cdc',
  9. | 'hostname' = 'ip',
  10. | 'port' = 'port',
  11. | 'username' = '${username}',
  12. | 'password' = '${password}',
  13. | 'database-name' = '${database}',
  14. | 'table-name' = '${tablename}',
  15. | 'debezium.snapshot.locking.mode' = 'none'
  16. |);
  17. |CREATE TABLE sink_table (
  18. | id INT NOT NULL,
  19. | name STRING,
  20. | age INT,
  21. | primary key(id) not enforced
  22. |) WITH (
  23. | 'connector' = 'jdbc',
  24. | 'url' = 'jdbc:mysql://${ip}:port/${database}',
  25. | 'table-name' = '${tablename}',
  26. | 'driver' = 'com.mysql.jdbc.Driver',
  27. | 'username' = '${username}',
  28. | 'password' = '${password}'
  29. |);
  30. |INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog;
  31. |""".stripMargin
  32. val onceJob = SimpleOnceJob.builder().setCreateService("Flink-Test").addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "flink-1.12.2")
  33. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "hadoop-Streamis").addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
  34. .addStartupParam(Configuration.IS_TEST_MODE.key, true)
  35. // .addStartupParam("label." + LabelKeyConstant.CODE_TYPE_KEY, "sql")
  36. .setMaxSubmitTime(300000)
  37. .addExecuteUser("hadoop").addJobContent("runType", "sql").addJobContent("code", sql).addSource("jobName", "OnceJobTest")
  38. .build()
  39. onceJob.submit()
  40. println(onceJob.getId)
  41. onceJob.waitForCompleted()
  42. System.exit(0)
  43. }
  44. }