Flume-Core

本章讲述Flume项目的核心表示层: 逻辑执行计划. 逻辑执行计划的执行请参见 算子执行框架

核心概念

Flume将分布式计算过程抽象为数据流在算子上的变换, 这就引申出了两个Flume中的核心概念: 算子和算子应用的范围. 接下来我们分别讲解这些概念.

Processor

在Flume中, 最核心的算子是Processor, 它接受N路数据流, 产出1路数据流. 我们这里举两个例子.

../_images/filter_processor.png

上图描述了FilterProcessor的逻辑, 它过滤掉的输入流X中的奇数数据, 得到一个新的数据流O.

../_images/cartesian_processor.png

上图描述了CartesianProcessor的逻辑, 它有两路输入X Y, 输出是X和Y中数据的笛卡尔积.

Processor可以被串联/级联组成Dag, 如下图所示:

../_images/processor_dag.png

通过上图我们可以看到, Processor之间可以任意串联, 一个Processor后面也可以跟随任意个后继Processor, 然后后继的每个Processor在该路上得到的输入都是一样的, 比如B和C得到的输入就完全一样.

Group/Scope

Processor算子是Flume中的基本计算单元, 它体现了分布式计算中的’计算’, 我们需要另外的概念来描述’分布式’. 考虑下面的例子

  1. select week, weekday, sum(sales_value)
  2. from sales_log
  3. group by rollup(week, weekday)
  4. orderby week, weekday

rollup是Oracle SQL数据库的概念, 含义是层级聚集, 具体到上面的例子, 就是分别以week和(week,weekday)作为分组应用进行求和, 其结果形如:

销售额
114363.55
124794.76
134718.25
145387.45
155027.34
1 24291.35
215652.84
224583.02
235555.77
245936.67
254508.74
2 26237.04
A 50528.39

我们看到, 所有这些输出都是SumProcessor的计算结果, 区别在于该算子应用的范围. 因此在Flume的定义中, 每个算子以Group为基本处理单位, 每次处理一组数据. 又因为分组是嵌套的, 如在{第一周销售记录}这一组数据, 又可按照工作日继续细分, 我们用Scope来面表达相同地位分组的集合. 每一个算子, 都必须置于某一个Scope下, 处理该Scope下所有Group的数据. 按照这种思路, 我们用如下图所示的逻辑执行计划来描述上述SQL的计算逻辑:

../_images/sales_sum_plan.png

在Flume中, 我们用Shuffle这个名词来表达分组和排序. 一个Shuffle过程可以有N路输入源参与, 其结果表示为一个嵌套的Scope和其中的N个ShuffleNode, 每个ShuffleNode有一路输入和一路输出, 输入是参与Shuffle过程的输入源, 其输出可以被同Scope的后续算子操作.

逻辑执行计划

在上一节中, 我们讲述了Flume计算抽象的基本原则. 本节我们要讲述完整的核心表示层:逻辑执行计划. 首先我们介绍逻辑执行计划的各个组成部分.

Node

  1. 组成逻辑计划的基本单位
  2. Node上绑定着相应的自定义算子
  3. 每个Node上只有一路逻辑上的输出
  4. 逻辑执行计划上的所有Node组成一个DAG

Scope/Group

  1. Scope是地位相同的各个数据分组的集合
  2. Scope中的数据按key分组, 每组为一个Group
  3. 逻辑计划上的每个Node都属于某个Scope
  4. 逻辑计划中的所有Scope组成一棵树

算子/Entity

  1. 算子是可以被反射的C++类, 用以承载用户逻辑. 具体参见 flume/core/entity.h
  2. 在所有算子中, 数据流被抽象为无类型的, 每条数据的类型都是void*
  3. 每个带有输出的Node上必须指定Objector算子, 该算子负责完成数据的序列化和反序列化工作

逻辑执行计划在 flume/proto/logical_plan.proto和entity.proto中定义, 下面给出以上三个概念的proto定义:

  1. message PbLogicalPlan {
  2. repeated PbLogicalPlanNode node = 1;
  3. repeated PbScope scope = 2;
  4. }
  5. message PbLogicalPlanNode {
  6. enum Type {
  7. UNION_NODE = 0;
  8. LOAD_NODE = 1;
  9. SINK_NODE = 2;
  10. PROCESS_NODE = 3;
  11. SHUFFLE_NODE = 4;
  12. }
  13. required string id = 1; // uuid
  14. required Type type = 2;
  15. optional bytes debug_info = 3;
  16. optional PbEntity objector = 4;
  17. required string scope = 5; // scope uuid
  18. optional bool cache = 6 [default = false]; // indicate if the node need to be cached
  19. // according to type, only corresponding member makes sense
  20. optional PbUnionNode union_node = 100;
  21. optional PbLoadNode load_node = 101;
  22. optional PbSinkNode sink_node = 102;
  23. optional PbProcessNode process_node = 103;
  24. optional PbShuffleNode shuffle_node = 104;
  25. }
  26. message PbScope {
  27. enum Type {
  28. DEFAULT = 0;
  29. INPUT = 1; // only accept single LOAD_NODE under this scope
  30. GROUP = 2; // accept BROADCAST/KEY SHUFFLE_NODE under this scope
  31. BUCKET = 3; // accept BROADCAST/SEQUENCE SHUFFLE_NODE under this scope
  32. };
  33. required string id = 1; // uuid
  34. optional Type type = 2 [default = DEFAULT]; // declare optional for compatible reason
  35. optional string father = 3; // scope uuid
  36. optional bool is_sorted = 4 [default = false];
  37. optional uint32 concurrency = 5 [default = 1]; // a hint for execution
  38. optional PbInputScope input_scope = 101;
  39. optional PbGroupScope group_scope = 102;
  40. optional PbBucketScope bucket_scope = 103;
  41. }
  42. message PbEntity { // PbEntity 是可被反射的C++类,可通过类名和config参数来得到相应实例
  43. required string name = 1;
  44. required bytes config = 2;
  45. }

下面分别介绍各个逻辑计划节点.

PROCESS_NODE

PROCESS_NODE是基本的数据处理节点, N入单出, 下面是它的proto定义:

  1. message PbProcessNode {
  2. message Input {
  3. required string from = 1; // 上游来源节点
  4. optional bool is_partial = 101 [default = false]; // 表示是否需要拥有全量数据才能计算(可用于map阶段的预聚合)
  5. optional bool is_prepared = 102 [default = false]; // 表示输入是一个Stream还是一个Collection
  6. };
  7. repeated Input input = 1; // 输入的属性
  8. required PbEntity processor = 2; // 实际的处理逻辑,可由用户自定义
  9. optional int32 least_prepared_inputs = 101 [default = 0]; // 表示至少有几路输入是Collection才能开始计算
  10. }

Processor的接口定义如下:

  1. class Processor {
  2. public:
  3. virtual ~Processor() {}
  4. virtual void Setup(const std::string& config) = 0;
  5. // keys传入该record所在Group在Scop中的位置
  6. // inputs用来传入可迭代的输入.
  7. // emitter用来将结果传给下游节点.
  8. virtual void BeginGroup(const std::vector<toft::StringPiece>& keys,
  9. const std::vector<Iterator*>& inputs,
  10. Emitter* emitter) = 0;
  11. // index表示传入的记录属于哪路输入.
  12. // 对于第N路输入, 如果inputs[N] != NULL, 则index != N
  13. virtual void Process(uint32_t index, void* object) = 0;
  14. // 当前分组处理结束
  15. virtual void EndGroup() = 0;
  16. };
  17. // Emitter are used to pass result to subsequent execution node.
  18. class Emitter {
  19. public:
  20. virtual ~Emitter() {}
  21. // Emit result to subsequent execution node. When Emit returns, object is not needed any more.
  22. // If Emit returns false, means no more records is needed by subsequence nodes.
  23. virtual bool Emit(void *object) = 0;
  24. // No more outputs. User can explicit calling it to cancel execution.
  25. virtual void Done() = 0;
  26. };
  27. class Iterator {
  28. public:
  29. virtual ~Iterator() {}
  30. virtual bool HasNext() const = 0;
  31. virtual void* NextValue() = 0;
  32. virtual void Reset() = 0;
  33. virtual void Done() = 0;
  34. };

每个PROCESS_NODE都属于一个Scope, 该Scope存在多组数据. 从每个Processor对象的角度看, 在其生命期内, 其时序关系如下图所示:

../_images/process_cycle.png

每次Begin-Group周期该Processor处理其所在Scope的一个分组.

LOAD_NODE

LOAD_NODE代表了框架的输入, 每个Load操作会在Global Scope下创建一个新Scope和该Scope下的一个LOAD_NODE. 之所以Load操作会创建Scope, 是因为很多情况下数据是按组存储的, 比如UDW存储中的partition概念.

LOAD_NODE的proto定义如下:

  1. message PbLoadNode {
  2. repeated string uri = 1; // 确定数据源所在位置,可以有多个。
  3. required PbEntity loader = 2; // 读取数据采取的方式,类似Hadoop的InputFormat
  4. }

Loader算子的定义比较接近与Hadoop中InputFormat的设定, 分为且分和读取两个部分. 不同的是, Loader算子并不默认输入都是KeyValue形式的. 另外, 在执行的时候, 每个split都是其数据所在分组的key. Loader算子的接口定义如下:

  1. class Loader {
  2. public:
  3. virtual ~Loader() {}
  4. virtual void Setup(const std::string& config) = 0;
  5. // 参数uri指定了数据所在路径,splits是存放了数据切片后的结果,
  6. // 如URI、数据起始位置和偏移量. splits中的每个元素, 都会作为Load的参数
  7. virtual void Split(const std::string& uri, std::vector<std::string>* splits) = 0;
  8. // split是对一个数据块的描述,是 Split() 方法存入的.
  9. // 这个方法中是对这个数据块的具体处理逻辑,比如反序列化和简单过滤等。
  10. virtual void Load(const std::string& split, Emitter* emitter) = 0;
  11. };

SINK_NODE

SINK_NODE代表了框架的输出. 和PROCESS_NODE相同, SINK_NODE也属于某个Scope, 将该组Scope中的每组数据输出到外部系统. SINK_NODE的proto定义如下:

  1. message PbSinkNode {
  2. required string from = 1; // 指数据所在位置,可以实现每个scope有不同的输出
  3. required PbEntity sinker = 2; // 指输出数据是采取的方式,类似Hadoop的OutputFormat
  4. }

Sinker的接口定义如下:

  1. class Sinker {
  2. public:
  3. virtual ~Sinker() {}
  4. virtual void Setup(const std::string& config) = 0;
  5. // 打开要写入的文件, 相当于Processor中的BeginGroup.
  6. virtual void Open(const std::vector<toft::StringPiece>& keys) = 0;
  7. // 写入实际数据。每条记录都是void*类型,由用户自己转换.
  8. virtual void Sink(void* object) = 0;
  9. // 关闭写入. 具有Commit语义
  10. virtual void Close() = 0;
  11. };

SHUFFLE_NODE

SHUFFLE_NODE代表分组后的数据流, 由Shuffle操作产生. 其proto定义如下所述:

  1. message PbShuffleNode {
  2. // 数据源参与分组的三种方式.
  3. enum Type {
  4. BROADCAST = 0; // 不参与下面两种处理,所有的记录都会被分发到每一组中
  5. KEY = 1; // 表示按key分组,不同的key属于不同的组
  6. SEQUENCE = 2; // 表示将数据分桶,预先设定桶数,按照某种策略(如hash)将key分到这些桶中
  7. };
  8. required string from = 1; // 上游来源节点
  9. required Type type = 2;
  10. optional PbEntity key_reader = 3; // 表示用来提取key的方式,对应于KEY类型的shuffle
  11. optional PbEntity partitioner = 4; // 表示分桶方式,对应于SEQUENCE类型的shuffle
  12. }

Flume支持两种分组方式, 按Key聚集和分桶. 按Key聚集是为参与分组的每条记录附加一个key, 把所有key的记录汇聚到同一组中. 分桶是指事先决定好分组数量, 再把每条记录分配到某个桶中的分组方式.

这个过程中涉及到KeyReader和Partitioner两种算子, 其接口定义如下:

  1. class KeyReader {
  2. public:
  3. virtual ~KeyReader() {}
  4. virtual void Setup(const std::string& config) = 0;
  5. // 具体的提取key的逻辑实现,object是整条记录,由用户自己理解其类型.
  6. // buffer是最终存放key的变量,要求必须将key转换为char* 存放到buffer中。
  7. // wing/common 下的comparable.h中提供了专门方法,生成可用来排序的string类型的key。
  8. // 同时提供了升序和降序两种方法。
  9. // 返回值是key的实际长度。
  10. virtual uint32_t ReadKey(void* object, char* buffer, uint32_t buffer_size) = 0;
  11. };
  12. class Partitioner {
  13. public:
  14. virtual ~Partitioner() {}
  15. virtual void Setup(const std::string& config) = 0;
  16. // 返回该条记录应该属于的分桶.
  17. virtual uint32_t Partition(void* object, uint32_t partition_number) = 0;
  18. };

UNION_NODE

UNION_NODE用来将多个数据源和合并为一个数据源统一处理, 其proto定义如下:

  1. message PbUnionNode {
  2. repeated string from = 1; // 用于合并数据流,repeated字段中存放的是多个上游节点。
  3. }

编程实例 - WordCount

这里我们实现一个小程序,用来计算在两个文件中出现频率最高的前50个词.

  1. namespace baidu {
  2. namespace flume {
  3. namespace runtime {
  4. namespace dce {
  5. using core::Emitter;
  6. using core::Iterator;
  7. using core::KeyReader;
  8. using core::LogicalPlan;
  9. using core::Objector;
  10. using core::Processor;
  11. // 自定义Loader, 从源文件中读取数据传给下游节点
  12. class TextLoader : public core::Loader {
  13. public:
  14. virtual ~TextLoader() {}
  15. virtual void Setup(const std::string& config) {}
  16. virtual void Split(const std::string& uri, std::vector<std::string>* splits) {
  17. splits->push_back(uri);
  18. }
  19. virtual void Load(const std::string& split, Emitter* emitter) {
  20. std::ifstream fin(split.c_str());
  21. std::string line;
  22. Record record;
  23. while (getline(fin, line)) {
  24. record.key = "";
  25. record.value = line;
  26. emitter->Emit(&record);
  27. }
  28. }
  29. };
  30. // 用来切词,并对每个词的出现次数置为1
  31. class WordSplitter : public Processor {
  32. public:
  33. WordSplitter() : m_emitter(NULL) {}
  34. virtual ~WordSplitter() {}
  35. virtual void Setup(const std::string& config) {}
  36. virtual void BeginGroup(const std::vector<toft::StringPiece>& keys,
  37. const std::vector<Iterator*>& inputs,
  38. Emitter* emitter) {
  39. m_emitter = emitter;
  40. }
  41. virtual void Process(uint32_t index, void* object) {
  42. Record* record = static_cast<Record*>(object);
  43. std::vector<std::string> words;
  44. toft::SplitStringByAnyOf(record->value, " ,.?!:;~@#$%-&*+-()[]{}|<>/\\'\"\n\t\r", &words);
  45. for (size_t i = 0; i < words.size(); ++i) {
  46. WordSum::ValueType single;
  47. single.word = words[i];
  48. single.sum = 1;
  49. m_emitter->Emit(&single);
  50. }
  51. }
  52. virtual void EndGroup() {}
  53. private:
  54. Emitter* m_emitter;
  55. };
  56. // 用来序列化反序列化的方法
  57. class WordSum : public Objector {
  58. public:
  59. struct ValueType {
  60. ValueType() : sum(0) {}
  61. std::string word;
  62. int sum;
  63. };
  64. virtual ~WordSum() {}
  65. virtual void Setup(const std::string& config) {}
  66. virtual uint32_t Serialize(void* object, char* buffer, uint32_t buffer_size) {
  67. ValueType* value = static_cast<ValueType*>(object);
  68. Record record;
  69. record.key = value->word;
  70. std::string sum_text = boost::lexical_cast<std::string>(value->sum);
  71. record.value = sum_text;
  72. return m_objector.Serialize(&record, buffer, buffer_size);
  73. }
  74. virtual void* Deserialize(const char* buffer, uint32_t buffer_size) {
  75. Record* record = static_cast<Record*>(m_objector.Deserialize(buffer, buffer_size));
  76. ValueType* value = new ValueType();
  77. value->word = record->key.as_string();
  78. value->sum = boost::lexical_cast<int>(record->value.as_string());
  79. m_objector.Release(record);
  80. return value;
  81. }
  82. virtual void Release(void* object) {
  83. ValueType* value = static_cast<ValueType*>(object);
  84. delete value;
  85. }
  86. private:
  87. RecordObjector m_objector;
  88. };
  89. // shuffle按key分组,提取分组的key,为单词
  90. class WordIdentity : public KeyReader {
  91. public:
  92. virtual ~WordIdentity() {}
  93. virtual void Setup(const std::string& config) {}
  94. virtual uint32_t ReadKey(void* object, char* buffer, uint32_t buffer_size) {
  95. WordSum::ValueType* single = static_cast<WordSum::ValueType*>(object);
  96. if (single->word.size() <= buffer_size) {
  97. memcpy(buffer, single->word.data(), single->word.size());
  98. }
  99. return single->word.size();
  100. }
  101. };
  102. // 统计每个单词个数
  103. class WordCount : public Processor {
  104. public:
  105. WordCount() : m_emitter(NULL) {}
  106. virtual ~WordCount() {}
  107. virtual void Setup(const std::string& config) {}
  108. virtual void BeginGroup(const std::vector<toft::StringPiece>& keys,
  109. const std::vector<Iterator*>& inputs,
  110. Emitter* emitter) {
  111. m_adder.word.clear();
  112. m_adder.sum = 0;
  113. m_emitter = emitter;
  114. }
  115. virtual void Process(uint32_t index, void* object) {
  116. WordSum::ValueType* value = static_cast<WordSum::ValueType*>(object);
  117. CHECK_EQ(1, value->sum);
  118. if (m_adder.word.empty()) {
  119. m_adder.word = value->word;
  120. m_adder.sum = value->sum;
  121. } else {
  122. CHECK_EQ(m_adder.word, value->word);
  123. m_adder.sum += value->sum;
  124. }
  125. }
  126. virtual void EndGroup() {
  127. Record record;
  128. record.key = m_adder.word;
  129. std::string sum_text = boost::lexical_cast<std::string>(m_adder.sum);
  130. record.value = sum_text;
  131. m_emitter->Emit(&record);
  132. }
  133. private:
  134. WordSum::ValueType m_adder;
  135. Emitter* m_emitter;
  136. };
  137. // 按key分组,提取分组的key,为单词个数
  138. class WordNum : public KeyReader {
  139. public:
  140. virtual ~WordNum() {}
  141. virtual void Setup(const std::string& config) {
  142. }
  143. virtual uint32_t ReadKey(void* object, char* buffer, uint32_t buffer_size) {
  144. Record* record = static_cast<Record*>(object);
  145. std::string num;
  146. int value = boost::lexical_cast<int>(record->value.as_string());
  147. ::baidu::wing::AppendReverseOrdered(value, &num);
  148. if (num.size() <= buffer_size) {
  149. memcpy(buffer, num.data(), num.size());
  150. }
  151. return num.size();
  152. }
  153. };
  154. // CORE部分还没提供 limit 算子,需要自己实现
  155. class Limit : public Processor {
  156. public:
  157. Limit() : m_emitter(NULL) {}
  158. virtual ~Limit() {}
  159. virtual void Setup(const std::string& config) {
  160. m_limit = boost::lexical_cast<int>(config);
  161. }
  162. virtual void BeginGroup(const std::vector<toft::StringPiece>& keys,
  163. const std::vector<Iterator*>& inputs,
  164. Emitter* emitter) {
  165. m_emitter = emitter;
  166. }
  167. virtual void Process(uint32_t index, void* object) {
  168. if (results.size() < m_limit) {
  169. results.push_back(*(static_cast<Record*>(object)));
  170. }
  171. }
  172. virtual void EndGroup() {
  173. for (int i = 0; i < results.size(); i++) {
  174. m_emitter->Emit(&results[i]);
  175. }
  176. }
  177. private:
  178. std::vector<Record> results;
  179. Emitter* m_emitter;
  180. int m_limit;
  181. };
  182. // 写入文件使用的 Sinker
  183. class TextSinker : public core::Sinker {
  184. public:
  185. virtual ~TextSinker() {}
  186. virtual void Setup(const std::string& config) {
  187. m_path = config;
  188. }
  189. virtual void Open(const std::vector<toft::StringPiece>& keys) {
  190. m_fout.open(m_path.c_str());
  191. }
  192. virtual void Sink(void* object) {
  193. Record* record = static_cast<Record*>(object);
  194. m_fout << record->key << " " << record->value << endl;
  195. }
  196. virtual void Close() {
  197. m_fout.close();
  198. }
  199. private:
  200. std::ofstream m_fout;
  201. std::string m_path;
  202. };
  203. TEST(DceBackendTest, Test) {
  204. LOG(WARNING) << "test begin!";
  205. toft::scoped_ptr<LogicalPlan> plan(new LogicalPlan());
  206. // Map部分
  207. LogicalPlan::Node* single_word = plan
  208. // 读入两个文件
  209. ->Load("/home/work/xuekang/tmp/hamlet.txt", "/home/work/xuekang/tmp/hamlet-copy.txt")
  210. // 指定Loader和Objector
  211. ->By<TextLoader>()->As<RecordObjector>()
  212. // 指定Processor和Objector
  213. ->ProcessBy<WordSplitter>()->As<WordSum>();
  214. // Shuffle部分,指定分发使用的 KeyReader
  215. plan->Shuffle(plan->global_scope(), single_word)->node(0)->MatchBy<WordIdentity>()
  216. // 指定Processor和Objector
  217. ->ProcessBy<WordCount>()->As<RecordObjector>()
  218. // 回到上层scope
  219. ->LeaveScope()
  220. // 对子scope的全量数据做排序,指定 KeyReader
  221. ->SortBy<WordNum>()
  222. // 指定自定义的 Limit Processor和 Objector
  223. ->ProcessBy<Limit>("50")->As<RecordObjector>()
  224. // 指定Sinker和写入的文件
  225. ->SinkBy<TextSinker>("/home/work/xuekang/tmp/hamlet-output");
  226. plan->RunLocally();
  227. }
  228. } // namespace dce
  229. } // namespace runtime
  230. } // namespace flume
  231. } // namespace baidu
  232. int main(int argc, char* argv[]) {
  233. ::testing::InitGoogleTest(&argc, argv);
  234. ::google::ParseCommandLineFlags(&argc, &argv, true);
  235. ::baidu::flume::InitBaiduFlume();
  236. return RUN_ALL_TESTS();
  237. }

Reference

本节讲述 LogicalPlan 类提供的各个接口.

Load

用于读入数据源,需要指定数据所在的路径。

有两种调用方法:传入包含所有路径的 vector, 或直接列出每个路径,最多支持4个,以下大多API都支持这两种方式。

组成逻辑计划时会在 Global Scope 中创建一个 Scope 和一个 LoadNode ,同时使用 By<Loader>() 方法指定具体使用的 Loader (可自定义), As<Objector>() 方法指定 Objector (可自定义),表示本节点处理完成后传输给下游节点的数据格式。

Sink

用于将结果写入目标路径,调用时要指定写入时使用的Sinker(可自定义)和目的路径。

Process

具体的数据处理逻辑,用户可自己实现此方法。需要指定所属Scope,以及数据来源的节点。

Union

合并多条数据流, 用户只能控制使用的Objector, 合并操作由框架保证。

Shuffle

Shuffle节点, 新建一个scope, 存放为每一个来源节点创建的shuffle node。

shuffle node 可以通过 MatchAny() MatchBy() DistributeAll() 等方法设置不同的shuffle方式,需要指定KeyReader。

ToProtoMessage

将一个节点的相关信息写入pb的message,方便作为执行计划传递给runtime。

RunLocally

本地执行逻辑计划。

Run

远程模式执行逻辑计划,同时指定执行后端和运行资源。