DataStream API 简介

该练习的重点是充分全面地了解 DataStream API,以便于编写流式应用入门。

什么能被转化成流?

Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。

Java tuples 和 POJOs

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs

Tuples

对于 Java,Flink 自带有 Tuple0Tuple25 类型。

  1. Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
  2. // zero based index!
  3. String name = person.f0;
  4. Integer age = person.f1;

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

示例:

  1. public class Person {
  2. public String name;
  3. public Integer age;
  4. public Person() {};
  5. public Person(String name, Integer age) {
  6. . . .
  7. };
  8. }
  9. Person person = new Person("Fred Flintstone", 35);

Flink 的序列化器支持的 POJO 类型数据结构升级

Scala tuples 和 case classes

如果你了解 Scala,那一定知道 tuple 和 case class。

一个完整的示例

该示例将关于人的记录流作为输入,并且过滤后只包含成年人。

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. public class Example {
  5. public static void main(String[] args) throws Exception {
  6. final StreamExecutionEnvironment env =
  7. StreamExecutionEnvironment.getExecutionEnvironment();
  8. DataStream<Person> flintstones = env.fromElements(
  9. new Person("Fred", 35),
  10. new Person("Wilma", 35),
  11. new Person("Pebbles", 2));
  12. DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
  13. @Override
  14. public boolean filter(Person person) throws Exception {
  15. return person.age >= 18;
  16. }
  17. });
  18. adults.print();
  19. env.execute();
  20. }
  21. public static class Person {
  22. public String name;
  23. public Integer age;
  24. public Person() {};
  25. public Person(String name, Integer age) {
  26. this.name = name;
  27. this.age = age;
  28. };
  29. public String toString() {
  30. return this.name.toString() + ": age " + this.age.toString();
  31. };
  32. }
  33. }

Stream 执行环境

每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers

此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

基本的 stream source

上述示例用 env.fromElements(...) 方法构造 DataStream<Person> 。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。因此,你可以这样做:

  1. List<Person> people = new ArrayList<Person>();
  2. people.add(new Person("Fred", 35));
  3. people.add(new Person("Wilma", 35));
  4. people.add(new Person("Pebbles", 2));
  5. DataStream<Person> flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷方法是用 socket

  1. DataStream<String> lines = env.socketTextStream("localhost", 9999)

或读取文件

  1. DataStream<String> lines = env.readTextFile("file:///path");

在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。

基本的 stream sink

上述示例用 adults.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

输出看起来类似于

  1. 1> Fred: age 35
  2. 2> Wilma: age 35

1> 和 2> 指出输出来自哪个 sub-task(即 thread)

In production, commonly used sinks include the StreamingFileSink, various databases, and several pub-sub systems.

调试

在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。

动手实践

至此,你已经可以开始编写并运行一个简单的 DataStream 应用了。 克隆 flink-training repo 并在阅读完 README 中的指示后,开始尝试第一个练习吧: Filtering a Stream (Ride Cleansing)

更多阅读