项目配置
本节将向你展示如何通过流行的构建工具 (Maven、Gradle) 配置你的项目,必要的依赖项(比如连接器和格式),以及覆盖一些高级配置主题。
每个 Flink 应用程序都依赖于一组 Flink 库。应用程序至少依赖于 Flink API,此外还依赖于某些连接器库(比如 Kafka、Cassandra),以及用户开发的自定义的数据处理逻辑所需要的第三方依赖项。
开始
要开始使用 Flink 应用程序,请使用以下命令、脚本和模板来创建 Flink 项目。
Maven
你可以使用如下的 Maven 命令或快速启动脚本,基于原型创建一个项目。
All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
Maven 命令
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.20.0
这允许你命名新建的项目,而且会交互式地询问 groupId、artifactId、package 的名字。
快速启动脚本
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.20.0
Gradle
你可以创建一个空项目,你需要在其中手动创建 src/main/java
和 src/main/resources
目录并开始在其中编写一些类,使用如下 Gradle 构建脚本或下面提供的快速启动脚本以获得功能齐全的启动项目。
Gradle 构建脚本
请在脚本的所在目录执行 gradle
命令来执行这些构建配置脚本。
build.gradle
plugins {
id 'java'
id 'application'
// shadow plugin to produce fat JARs
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
// artifact properties
group = 'org.quickstart'
version = '0.1-SNAPSHOT'
mainClassName = 'org.quickstart.DataStreamJob'
description = """Flink Quickstart Job"""
ext {
javaVersion = '1.8'
flinkVersion = '1.20.0'
scalaBinaryVersion = '_2.12'
slf4jVersion = '1.7.36'
log4jVersion = '2.17.1'
}
sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}
applicationDefaultJvmArgs = ["-Dlog4j.configurationFile=log4j2.properties"]
// declare where to find the dependencies of your project
repositories {
mavenCentral()
maven {
url "https://repository.apache.org/content/repositories/snapshots"
mavenContent {
snapshotsOnly()
}
}
}
// NOTE: We cannot use "compileOnly" or "shadow" configurations since then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the "flinkShadowJar" configuration!
configurations {
flinkShadowJar // dependencies which go into the shadowJar
// always exclude these (also from transitive dependencies) since they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'org.apache.logging.log4j'
}
// declare the dependencies for your production and test code
dependencies {
// --------------------------------------------------------------
// Compile-time dependencies that should NOT be part of the
// shadow (uber) jar and are provided in the lib folder of Flink
// --------------------------------------------------------------
implementation "org.apache.flink:flink-streaming-java:${flinkVersion}"
implementation "org.apache.flink:flink-clients:${flinkVersion}"
// --------------------------------------------------------------
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --------------------------------------------------------------
//flinkShadowJar "org.apache.flink:flink-connector-kafka:${flinkVersion}"
runtimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
runtimeOnly "org.apache.logging.log4j:log4j-api:${log4jVersion}"
runtimeOnly "org.apache.logging.log4j:log4j-core:${log4jVersion}"
// Add test dependencies here.
// testCompile "junit:junit:4.12"
}
// make compileOnly dependencies available for tests:
sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar
test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar
javadoc.classpath += configurations.flinkShadowJar
}
run.classpath = sourceSets.main.runtimeClasspath
jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}
shadowJar {
configurations = [project.configurations.flinkShadowJar]
}
settings.gradle
rootProject.name = 'quickstart'
快速启动脚本
bash -c "$(curl https://flink.apache.org/q/gradle-quickstart.sh)" -- 1.20.0 _2.12
需要哪些依赖项?
要开始一个 Flink 作业,你通常需要如下依赖项:
除此之外,若要开发自定义功能,你还要添加必要的第三方依赖项。
Flink API
Flink提供了两大 API:Datastream API 和 Table API & SQL,它们可以单独使用,也可以混合使用,具体取决于你的使用场景:
你要使用的 API | 你需要添加的依赖项 |
---|---|
DataStream | flink-streaming-java |
DataStream Scala 版 | flink-streaming-scala_2.12 |
Table API | flink-table-api-java |
Table API Scala 版 | flink-table-api-scala_2.12 |
Table API + DataStream | flink-table-api-java-bridge |
Table API + DataStream Scala 版 | flink-table-api-scala-bridge_2.12 |
你只需将它们包含在你的构建工具脚本/描述符中,就可以开发你的作业了!
运行和打包
如果你想通过简单地执行主类来运行你的作业,你需要 classpath 里包含 flink-clients
。对于 Table API 程序,你还需要在 classpath 中包含 flink-table-runtime
和 flink-table-planner-loader
。
根据经验,我们建议将应用程序代码及其所有必需的依赖项打包进一个 fat/uber JAR 中。这包括打包你作业用到的连接器、格式和第三方依赖项。此规则不适用于 Java API、DataStream Scala API 以及前面提到的运行时模块,它们已经由 Flink 本身提供,不应包含在作业的 uber JAR 中。你可以把该作业 JAR 提交到已经运行的 Flink 集群,也可以轻松将其添加到 Flink 应用程序容器镜像中,而无需修改发行版。
下一步是什么?
- 要开发你的作业,请查阅 DataStream API 和 Table API & SQL;
- 关于如何使用特定的构建工具打包你的作业的更多细节,请查阅如下指南:
- 关于项目配置的高级内容,请查阅高级主题部分。