DataStream API

Apache Flink 提供了 DataStream API 来实现稳定可靠的、有状态的流处理应用程序。Flink 支持对状态和时间的细粒度控制,以此来实现复杂的事件驱动数据处理系统。这个入门指导手册讲述了如何通过 Flink DataStream API 来实现一个有状态流处理程序。

你要搭建一个什么系统

在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物。

在这个教程中,你将会建立一个针对可疑信用卡交易行为的反欺诈检测系统。通过使用一组简单的规则,你将了解到 Flink 如何为我们实现复杂业务逻辑并实时执行。

准备条件

这个代码练习假定你对 Java 或 Scala 有一定的了解,当然,如果你之前使用的是其他开发语言,你也应该能够跟随本教程进行学习。

困难求助

如果遇到困难,可以参考 社区支持资源。当然也可以在邮件列表提问,Flink 的 用户邮件列表 一直被评为所有Apache项目中最活跃的一个,这也是快速获得帮助的好方法。

怎样跟着教程练习

首先,你需要在你的电脑上准备以下环境:

  • Java 8 or 11
  • Maven

一个准备好的 Flink Maven Archetype 能够快速创建一个包含了必要依赖的 Flink 程序骨架,基于此,你可以把精力集中在编写业务逻辑上即可。这些已包含的依赖包括 flink-streaming-javaflink-walkthrough-common 等,他们分别是 Flink 应用程序的核心依赖项和这个代码练习需要的数据生成器,当然还包括其他本代码练习所依赖的类。

说明: 为简洁起见,本练习中的代码块中可能不包含完整的类路径。完整的类路径可以在文档底部 链接 中找到。

  1. $ mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-walkthrough-datastream-java \
  4. -DarchetypeVersion=1.10.0 \
  5. -DgroupId=frauddetection \
  6. -DartifactId=frauddetection \
  7. -Dversion=0.1 \
  8. -Dpackage=spendreport \
  9. -DinteractiveMode=false
  1. $ mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-walkthrough-datastream-scala \
  4. -DarchetypeVersion=1.10.0 \
  5. -DgroupId=frauddetection \
  6. -DartifactId=frauddetection \
  7. -Dversion=0.1 \
  8. -Dpackage=spendreport \
  9. -DinteractiveMode=false

你可以根据自己的情况修改 groupIdartifactIdpackage。通过这三个参数,Maven 将会创建一个名为 frauddetection 的文件夹,包含了所有依赖的整个工程项目将会位于该文件夹下。将工程目录导入到你的开发环境之后,你可以找到 FraudDetectionJob.java (或 FraudDetectionJob.scala) 代码文件,文件中的代码如下所示。你可以在 IDE 中直接运行这个文件。同时,你可以试着在数据流中设置一些断点或者以 DEBUG 模式来运行程序,体验 Flink 是如何运行的。

FraudDetectionJob.java

  1. package spendreport;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.walkthrough.common.sink.AlertSink;
  5. import org.apache.flink.walkthrough.common.entity.Alert;
  6. import org.apache.flink.walkthrough.common.entity.Transaction;
  7. import org.apache.flink.walkthrough.common.source.TransactionSource;
  8. public class FraudDetectionJob {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. DataStream<Transaction> transactions = env
  12. .addSource(new TransactionSource())
  13. .name("transactions");
  14. DataStream<Alert> alerts = transactions
  15. .keyBy(Transaction::getAccountId)
  16. .process(new FraudDetector())
  17. .name("fraud-detector");
  18. alerts
  19. .addSink(new AlertSink())
  20. .name("send-alerts");
  21. env.execute("Fraud Detection");
  22. }
  23. }

FraudDetector.java

  1. package spendreport;
  2. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  3. import org.apache.flink.util.Collector;
  4. import org.apache.flink.walkthrough.common.entity.Alert;
  5. import org.apache.flink.walkthrough.common.entity.Transaction;
  6. public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
  7. private static final long serialVersionUID = 1L;
  8. private static final double SMALL_AMOUNT = 1.00;
  9. private static final double LARGE_AMOUNT = 500.00;
  10. private static final long ONE_MINUTE = 60 * 1000;
  11. @Override
  12. public void processElement(
  13. Transaction transaction,
  14. Context context,
  15. Collector<Alert> collector) throws Exception {
  16. Alert alert = new Alert();
  17. alert.setId(transaction.getAccountId());
  18. collector.collect(alert);
  19. }
  20. }

FraudDetectionJob.scala

  1. package spendreport
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.walkthrough.common.sink.AlertSink
  4. import org.apache.flink.walkthrough.common.entity.Alert
  5. import org.apache.flink.walkthrough.common.entity.Transaction
  6. import org.apache.flink.walkthrough.common.source.TransactionSource
  7. object FraudDetectionJob {
  8. @throws[Exception]
  9. def main(args: Array[String]): Unit = {
  10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  11. val transactions: DataStream[Transaction] = env
  12. .addSource(new TransactionSource)
  13. .name("transactions")
  14. val alerts: DataStream[Alert] = transactions
  15. .keyBy(transaction => transaction.getAccountId)
  16. .process(new FraudDetector)
  17. .name("fraud-detector")
  18. alerts
  19. .addSink(new AlertSink)
  20. .name("send-alerts")
  21. env.execute("Fraud Detection")
  22. }
  23. }

FraudDetector.scala

  1. package spendreport
  2. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  3. import org.apache.flink.util.Collector
  4. import org.apache.flink.walkthrough.common.entity.Alert
  5. import org.apache.flink.walkthrough.common.entity.Transaction
  6. object FraudDetector {
  7. val SMALL_AMOUNT: Double = 1.00
  8. val LARGE_AMOUNT: Double = 500.00
  9. val ONE_MINUTE: Long = 60 * 1000L
  10. }
  11. @SerialVersionUID(1L)
  12. class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
  13. @throws[Exception]
  14. def processElement(
  15. transaction: Transaction,
  16. context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
  17. collector: Collector[Alert]): Unit = {
  18. val alert = new Alert
  19. alert.setId(transaction.getAccountId)
  20. collector.collect(alert)
  21. }
  22. }

代码分析

让我们一步步地来分析一下这两个代码文件。FraudDetectionJob 类定义了程序的数据流,而 FraudDetector 类定义了欺诈交易检测的业务逻辑。

下面我们开始讲解整个 Job 是如何组装到 FraudDetectionJob 类的 main 函数中的。

执行环境

第一行的 StreamExecutionEnvironment 用于设置你的执行环境。任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

创建数据源

数据源从外部系统例如 Apache Kafka、Rabbit MQ 或者 Apache Pulsar 接收数据,然后将数据送到 Flink 程序中。这个代码练习使用的是一个能够无限循环生成信用卡模拟交易数据的数据源。每条交易数据包括了信用卡 ID (accountId),交易发生的时间 (timestamp) 以及交易的金额(amount)。绑定到数据源上的 name 属性是为了调试方便,如果发生一些异常,我们能够通过它快速定位问题发生在哪里。

  1. DataStream<Transaction> transactions = env
  2. .addSource(new TransactionSource())
  3. .name("transactions");
  1. val transactions: DataStream[Transaction] = env
  2. .addSource(new TransactionSource)
  3. .name("transactions")

对事件分区 & 欺诈检测

transactions 这个数据流包含了大量的用户交易数据,需要被划分到多个并发上进行欺诈检测处理。由于欺诈行为的发生是基于某一个账户的,所以,必须要要保证同一个账户的所有交易行为数据要被同一个并发的 task 进行处理。

为了保证同一个 task 处理同一个 key 的所有数据,你可以使用 DataStream#keyBy 对流进行分区。process() 函数对流绑定了一个操作,这个操作将会对流上的每一个消息调用所定义好的函数。通常,一个操作会紧跟着 keyBy 被调用,在这个例子中,这个操作是FraudDetector,该操作是在一个 keyed context 上执行的。

  1. DataStream<Alert> alerts = transactions
  2. .keyBy(Transaction::getAccountId)
  3. .process(new FraudDetector())
  4. .name("fraud-detector");
  1. val alerts: DataStream[Alert] = transactions
  2. .keyBy(transaction => transaction.getAccountId)
  3. .process(new FraudDetector)
  4. .name("fraud-detector")

输出结果

sink 会将 DataStream 写出到外部系统,例如 Apache Kafka、Cassandra 或者 AWS Kinesis 等。AlertSink 使用 INFO 的日志级别打印每一个 Alert 的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。

  1. alerts.addSink(new AlertSink());
  1. alerts.addSink(new AlertSink)

运行作业

Flink 程序是懒加载的,并且只有在完全搭建好之后,才能够发布到集群上执行。调用 StreamExecutionEnvironment#execute 时给任务传递一个任务名参数,就可以开始运行任务。

  1. env.execute("Fraud Detection");
  1. env.execute("Fraud Detection")

欺诈检测器

欺诈检查类 FraudDetectorKeyedProcessFunction 接口的一个实现。他的方法 KeyedProcessFunction#processElement 将会在每个交易事件上被调用。这个程序里边会对每笔交易发出警报,有人可能会说这做报过于保守了。

本教程的后续步骤将指导你对这个欺诈检测器进行更有意义的业务逻辑扩展。

  1. public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
  2. private static final double SMALL_AMOUNT = 1.00;
  3. private static final double LARGE_AMOUNT = 500.00;
  4. private static final long ONE_MINUTE = 60 * 1000;
  5. @Override
  6. public void processElement(
  7. Transaction transaction,
  8. Context context,
  9. Collector<Alert> collector) throws Exception {
  10. Alert alert = new Alert();
  11. alert.setId(transaction.getAccountId());
  12. collector.collect(alert);
  13. }
  14. }
  1. object FraudDetector {
  2. val SMALL_AMOUNT: Double = 1.00
  3. val LARGE_AMOUNT: Double = 500.00
  4. val ONE_MINUTE: Long = 60 * 1000L
  5. }
  6. @SerialVersionUID(1L)
  7. class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
  8. @throws[Exception]
  9. def processElement(
  10. transaction: Transaction,
  11. context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
  12. collector: Collector[Alert]): Unit = {
  13. val alert = new Alert
  14. alert.setId(transaction.getAccountId)
  15. collector.collect(alert)
  16. }
  17. }

实现一个真正的应用程序

我们先实现第一版报警程序,对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。

假设你的欺诈检测器所处理的交易数据如下:

Transactions

交易 3 和交易 4 应该被标记为欺诈行为,因为交易 3 是一个 $0.09 的小额交易,而紧随着的交易 4 是一个 $510 的大额交易。另外,交易 7、8 和 交易 9 就不属于欺诈交易了,因为在交易 7 这个 $0.02 的小额交易之后,并没有跟随一个大额交易,而是一个金额适中的交易,这使得交易 7 到 交易 9 不属于欺诈行为。

欺诈检测器需要在多个交易事件之间记住一些信息。仅当一个大额的交易紧随一个小额交易的情况发生时,这个大额交易才被认为是欺诈交易。在多个事件之间存储信息就需要使用到 状态,这也是我们选择使用 KeyedProcessFunction 的原因。它能够同时提供对状态和时间的细粒度操作,这使得我们能够在接下来的代码练习中实现更复杂的算法。

最直接的实现方式是使用一个 boolean 型的标记状态来表示是否刚处理过一个小额交易。当处理到该账户的一个大额交易时,你只需要检查这个标记状态来确认上一个交易是是否小额交易即可。

然而,仅使用一个标记作为 FraudDetector 的类成员来记录账户的上一个交易状态是不准确的。Flink 会在同一个 FraudDetector 的并发实例中处理多个账户的交易数据,假设,当账户 A 和账户 B 的数据被分发的同一个并发实例上处理时,账户 A 的小额交易行为可能会将标记状态设置为真,随后账户 B 的大额交易可能会被误判为欺诈交易。当然,我们可以使用如 Map 这样的数据结构来保存每一个账户的状态,但是常规的类成员变量是无法做到容错处理的,当任务失败重启后,之前的状态信息将会丢失。这样的话,如果程序曾出现过失败重启的情况,将会漏掉一些欺诈报警。

为了应对这个问题,Flink 提供了一套支持容错状态的原语,这些原语几乎与常规成员变量一样易于使用。

Flink 中最基础的状态类型是 ValueState,这是一种能够为被其封装的变量添加容错能力的类型。ValueState 是一种 keyed state,也就是说它只能被用于 keyed context 提供的 operator 中,即所有能够紧随 DataStream#keyBy 之后被调用的operator。一个 operator 中的 keyed state 的作用域默认是属于它所属的 key 的。这个例子中,key 就是当前正在处理的交易行为所属的信用卡账户(key 传入 keyBy() 函数调用),而 FraudDetector 维护了每个帐户的标记状态。ValueState 需要使用 ValueStateDescriptor 来创建,ValueStateDescriptor 包含了 Flink 如何管理变量的一些元数据信息。状态在使用之前需要先被注册。状态需要使用 open() 函数来注册状态。

  1. public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
  2. private static final long serialVersionUID = 1L;
  3. private transient ValueState<Boolean> flagState;
  4. @Override
  5. public void open(Configuration parameters) {
  6. ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
  7. "flag",
  8. Types.BOOLEAN);
  9. flagState = getRuntimeContext().getState(flagDescriptor);
  10. }
  1. @SerialVersionUID(1L)
  2. class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
  3. @transient private var flagState: ValueState[java.lang.Boolean] = _
  4. @throws[Exception]
  5. override def open(parameters: Configuration): Unit = {
  6. val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
  7. flagState = getRuntimeContext.getState(flagDescriptor)
  8. }

ValueState 是一个包装类,类似于 Java 标准库里边的 AtomicReferenceAtomicLong。它提供了三个用于交互的方法。update 用于更新状态,value 用于获取状态值,还有 clear 用于清空状态。如果一个 key 还没有状态,例如当程序刚启动或者调用过 ValueState#clear 方法时,ValueState#value 将会返回 null。如果需要更新状态,需要调用 ValueState#update 方法,直接更改 ValueState#value 的返回值可能不会被系统识别。容错处理将在 Flink 后台自动管理,你可以像与常规变量那样与状态变量进行交互。

下边的示例,说明了如何使用标记状态来追踪可能的欺诈交易行为。

  1. @Override
  2. public void processElement(
  3. Transaction transaction,
  4. Context context,
  5. Collector<Alert> collector) throws Exception {
  6. // Get the current state for the current key
  7. Boolean lastTransactionWasSmall = flagState.value();
  8. // Check if the flag is set
  9. if (lastTransactionWasSmall != null) {
  10. if (transaction.getAmount() > LARGE_AMOUNT) {
  11. // Output an alert downstream
  12. Alert alert = new Alert();
  13. alert.setId(transaction.getAccountId());
  14. collector.collect(alert);
  15. }
  16. // Clean up our state
  17. flagState.clear();
  18. }
  19. if (transaction.getAmount() < SMALL_AMOUNT) {
  20. // Set the flag to true
  21. flagState.update(true);
  22. }
  23. }
  1. override def processElement(
  2. transaction: Transaction,
  3. context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
  4. collector: Collector[Alert]): Unit = {
  5. // Get the current state for the current key
  6. val lastTransactionWasSmall = flagState.value
  7. // Check if the flag is set
  8. if (lastTransactionWasSmall != null) {
  9. if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
  10. // Output an alert downstream
  11. val alert = new Alert
  12. alert.setId(transaction.getAccountId)
  13. collector.collect(alert)
  14. }
  15. // Clean up our state
  16. flagState.clear()
  17. }
  18. if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
  19. // set the flag to true
  20. flagState.update(true)
  21. }
  22. }

对于每笔交易,欺诈检测器都会检查该帐户的标记状态。请记住,ValueState 的作用域始终限于当前的 key,即信用卡帐户。如果标记状态不为空,则该帐户的上一笔交易是小额的,因此,如果当前这笔交易的金额很大,那么检测程序将输出报警信息。

在检查之后,不论是什么状态,都需要被清空。不管是当前交易触发了欺诈报警而造成模式的结束,还是当前交易没有触发报警而造成模式的中断,都需要重新开始新的模式检测。

最后,检查当前交易的金额是否属于小额交易。如果是,那么需要设置标记状态,以便可以在下一个事件中对其进行检查。注意,ValueState<Boolean> 实际上有 3 种状态:unset (null),true,和 falseValueState 是允许空值的。我们的程序只使用了 unset (null) 和 true 两种来判断标记状态被设置了与否。

欺诈检测器 v2:状态 + 时间 = ❤️

骗子们在小额交易后不会等很久就进行大额消费,这样可以降低小额测试交易被发现的几率。比如,假设你为欺诈检测器设置了一分钟的超时,对于上边的例子,交易 3 和 交易 4 只有间隔在一分钟之内才被认为是欺诈交易。Flink 中的 KeyedProcessFunction 允许您设置计时器,该计时器在将来的某个时间点执行回调函数。

让我们看看如何修改程序以符合我们的新要求:

  • 当标记状态被设置为 true 时,设置一个在当前时间一分钟后触发的定时器。
  • 当定时器被触发时,重置标记状态。
  • 当标记状态被重置时,删除定时器。

要删除一个定时器,你需要记录这个定时器的触发时间,这同样需要状态来实现,所以你需要在标记状态后也创建一个记录定时器时间的状态。

  1. private transient ValueState<Boolean> flagState;
  2. private transient ValueState<Long> timerState;
  3. @Override
  4. public void open(Configuration parameters) {
  5. ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
  6. "flag",
  7. Types.BOOLEAN);
  8. flagState = getRuntimeContext().getState(flagDescriptor);
  9. ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
  10. "timer-state",
  11. Types.LONG);
  12. timerState = getRuntimeContext().getState(timerDescriptor);
  13. }
  1. @SerialVersionUID(1L)
  2. class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
  3. @transient private var flagState: ValueState[java.lang.Boolean] = _
  4. @transient private var timerState: ValueState[java.lang.Long] = _
  5. @throws[Exception]
  6. override def open(parameters: Configuration): Unit = {
  7. val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
  8. flagState = getRuntimeContext.getState(flagDescriptor)
  9. val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
  10. timerState = getRuntimeContext.getState(timerDescriptor)
  11. }

KeyedProcessFunction#processElement 需要使用提供了定时器服务的 Context 来调用。定时器服务可以用于查询当前时间、注册定时器和删除定时器。使用它,你可以在标记状态被设置时,也设置一个当前时间一分钟后触发的定时器,同时,将触发时间保存到 timerState 状态中。

  1. if (transaction.getAmount() < SMALL_AMOUNT) {
  2. // set the flag to true
  3. flagState.update(true);
  4. // set the timer and timer state
  5. long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
  6. context.timerService().registerProcessingTimeTimer(timer);
  7. timerState.update(timer);
  8. }
  1. if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
  2. // set the flag to true
  3. flagState.update(true)
  4. // set the timer and timer state
  5. val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
  6. context.timerService.registerProcessingTimeTimer(timer)
  7. timerState.update(timer)
  8. }

处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。

当定时器触发时,将会调用 KeyedProcessFunction#onTimer 方法。通过重写这个方法来实现一个你自己的重置状态的回调逻辑。

  1. @Override
  2. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
  3. // remove flag after 1 minute
  4. timerState.clear();
  5. flagState.clear();
  6. }
  1. override def onTimer(
  2. timestamp: Long,
  3. ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
  4. out: Collector[Alert]): Unit = {
  5. // remove flag after 1 minute
  6. timerState.clear()
  7. flagState.clear()
  8. }

最后,如果要取消定时器,你需要删除已经注册的定时器,并同时清空保存定时器的状态。你可以把这些逻辑封装到一个助手函数中,而不是直接调用 flagState.clear()

  1. private void cleanUp(Context ctx) throws Exception {
  2. // delete timer
  3. Long timer = timerState.value();
  4. ctx.timerService().deleteProcessingTimeTimer(timer);
  5. // clean up all state
  6. timerState.clear();
  7. flagState.clear();
  8. }
  1. @throws[Exception]
  2. private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
  3. // delete timer
  4. val timer = timerState.value
  5. ctx.timerService.deleteProcessingTimeTimer(timer)
  6. // clean up all states
  7. timerState.clear()
  8. flagState.clear()
  9. }

这就是一个功能完备的,有状态的分布式流处理程序了。

完整的程序

  1. package spendreport;
  2. import org.apache.flink.api.common.state.ValueState;
  3. import org.apache.flink.api.common.state.ValueStateDescriptor;
  4. import org.apache.flink.api.common.typeinfo.Types;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.flink.walkthrough.common.entity.Alert;
  9. import org.apache.flink.walkthrough.common.entity.Transaction;
  10. public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
  11. private static final long serialVersionUID = 1L;
  12. private static final double SMALL_AMOUNT = 1.00;
  13. private static final double LARGE_AMOUNT = 500.00;
  14. private static final long ONE_MINUTE = 60 * 1000;
  15. private transient ValueState<Boolean> flagState;
  16. private transient ValueState<Long> timerState;
  17. @Override
  18. public void open(Configuration parameters) {
  19. ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
  20. "flag",
  21. Types.BOOLEAN);
  22. flagState = getRuntimeContext().getState(flagDescriptor);
  23. ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
  24. "timer-state",
  25. Types.LONG);
  26. timerState = getRuntimeContext().getState(timerDescriptor);
  27. }
  28. @Override
  29. public void processElement(
  30. Transaction transaction,
  31. Context context,
  32. Collector<Alert> collector) throws Exception {
  33. // Get the current state for the current key
  34. Boolean lastTransactionWasSmall = flagState.value();
  35. // Check if the flag is set
  36. if (lastTransactionWasSmall != null) {
  37. if (transaction.getAmount() > LARGE_AMOUNT) {
  38. //Output an alert downstream
  39. Alert alert = new Alert();
  40. alert.setId(transaction.getAccountId());
  41. collector.collect(alert);
  42. }
  43. // Clean up our state
  44. cleanUp(context);
  45. }
  46. if (transaction.getAmount() < SMALL_AMOUNT) {
  47. // set the flag to true
  48. flagState.update(true);
  49. long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
  50. context.timerService().registerProcessingTimeTimer(timer);
  51. timerState.update(timer);
  52. }
  53. }
  54. @Override
  55. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
  56. // remove flag after 1 minute
  57. timerState.clear();
  58. flagState.clear();
  59. }
  60. private void cleanUp(Context ctx) throws Exception {
  61. // delete timer
  62. Long timer = timerState.value();
  63. ctx.timerService().deleteProcessingTimeTimer(timer);
  64. // clean up all state
  65. timerState.clear();
  66. flagState.clear();
  67. }
  68. }
  1. package spendreport
  2. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  3. import org.apache.flink.api.scala.typeutils.Types
  4. import org.apache.flink.configuration.Configuration
  5. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  6. import org.apache.flink.util.Collector
  7. import org.apache.flink.walkthrough.common.entity.Alert
  8. import org.apache.flink.walkthrough.common.entity.Transaction
  9. object FraudDetector {
  10. val SMALL_AMOUNT: Double = 1.00
  11. val LARGE_AMOUNT: Double = 500.00
  12. val ONE_MINUTE: Long = 60 * 1000L
  13. }
  14. @SerialVersionUID(1L)
  15. class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
  16. @transient private var flagState: ValueState[java.lang.Boolean] = _
  17. @transient private var timerState: ValueState[java.lang.Long] = _
  18. @throws[Exception]
  19. override def open(parameters: Configuration): Unit = {
  20. val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
  21. flagState = getRuntimeContext.getState(flagDescriptor)
  22. val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
  23. timerState = getRuntimeContext.getState(timerDescriptor)
  24. }
  25. override def processElement(
  26. transaction: Transaction,
  27. context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
  28. collector: Collector[Alert]): Unit = {
  29. // Get the current state for the current key
  30. val lastTransactionWasSmall = flagState.value
  31. // Check if the flag is set
  32. if (lastTransactionWasSmall != null) {
  33. if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
  34. // Output an alert downstream
  35. val alert = new Alert
  36. alert.setId(transaction.getAccountId)
  37. collector.collect(alert)
  38. }
  39. // Clean up our state
  40. cleanUp(context)
  41. }
  42. if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
  43. // set the flag to true
  44. flagState.update(true)
  45. val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
  46. context.timerService.registerProcessingTimeTimer(timer)
  47. timerState.update(timer)
  48. }
  49. }
  50. override def onTimer(
  51. timestamp: Long,
  52. ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
  53. out: Collector[Alert]): Unit = {
  54. // remove flag after 1 minute
  55. timerState.clear()
  56. flagState.clear()
  57. }
  58. @throws[Exception]
  59. private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
  60. // delete timer
  61. val timer = timerState.value
  62. ctx.timerService.deleteProcessingTimeTimer(timer)
  63. // clean up all states
  64. timerState.clear()
  65. flagState.clear()
  66. }
  67. }

期望的结果

使用已准备好的 TransactionSource 数据源运行这个代码,将会检测到账户 3 的欺诈行为,并输出报警信息。你将能够在你的 task manager 的日志中看到下边输出:

  1. 2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
  2. 2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
  3. 2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
  4. 2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
  5. 2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}