传递事件流
在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。流处理领域中的等价物看上去是什么样子的?
当输入是一个文件(一个字节序列),第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被叫做 事件(event) ,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的某件事情的细节。一个事件通常包含一个来自时钟的时间戳,以指明事件发生的时间(参见“单调钟与时钟”)。
例如,发生的事件可能是用户采取的行动,例如查看页面或进行购买。它也可能来源于机器,例如对温度传感器或CPU利用率的周期性测量。在“使用Unix工具进行批处理”的示例中,Web服务器日志的每一行都是一个事件。
事件可能被编码为文本字符串或JSON,或者某种二进制编码,如第4章所述。这种编码允许你存储一个事件,例如将其附加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许你通过网络将事件发送到另一个节点以进行处理。
在批处理中,文件被写入一次,然后可能被多个作业读取。类似地,在流处理术语中,一个事件由 生产者(producer) (也称为 发布者(publisher) 或 发送者(sender) )生成一次,然后可能由多个 消费者(consumer) ( 订阅者(subscribers) 或 接收者(recipients) )进行处理【3】。在文件系统中,文件名标识一组相关记录;在流式系统中,相关的事件通常被聚合为一个 主题(topic) 或 流(stream) 。
原则上讲,文件或数据库就足以连接生产者和消费者:生产者将其生成的每个事件写入数据存储,且每个消费者定期轮询数据存储,检查自上次运行以来新出现的事件。这实际上正是批处理在每天结束时处理当天数据时所做的事情。
但当我们想要进行低延迟的连续处理时,如果数据存储不是为这种用途专门设计的,那么轮询开销就会很大。轮询的越频繁,能返回新事件的请求比例就越低,而额外开销也就越高。相比之下,最好能在新事件出现时直接通知消费者。
数据库在传统上对这种通知机制支持的并不好,关系型数据库通常有 触发器(trigger) ,它们可以对变化作出反应(如,插入表中的一行),但是它们的功能非常有限,并且在数据库设计中有些后顾之忧【4,5】。相应的是,已经开发了专门的工具来提供事件通知。
消息系统
向消费者通知新事件的常用方式是使用消息传递系统(messaging system):生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在“消息传递中的数据流”中介绍了这些系统,但现在我们将详细介绍这些系统。
像生产者和消费者之间的Unix管道或TCP连接这样的直接信道,是实现消息传递系统的简单方法。但是,大多数消息传递系统都在这一基本模型上进行扩展。特别的是,Unix管道和TCP将恰好一个发送者与恰好一个接收者连接,而一个消息传递系统允许多个生产者节点将消息发送到同一个主题,并允许多个消费者节点接收主题中的消息。
在这个发布/订阅模式中,不同的系统采取各种各样的方法,并没有针对所有目的的通用答案。为了区分这些系统,问一下这两个问题会特别有帮助:
如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?一般来说,有三种选择:系统可以丢掉消息,将消息放入缓冲队列,或使用背压(backpressure)(也称为流量控制(flow control);即阻塞生产者,以免其发送更多的消息)。例如Unix管道和TCP使用背压:它们有一个固定大小的小缓冲区,如果填满,发送者会被阻塞,直到接收者从缓冲区中取出数据(参见“网络拥塞和排队”)。
如果消息被缓存在队列中,那么理解队列增长会发生什么是很重要的。当队列装不进内存时系统会崩溃吗?还是将消息写入磁盘?如果是这样,磁盘访问又会如何影响消息传递系统的性能【6】?
如果节点崩溃或暂时脱机,会发生什么情况? —— 是否会有消息丢失?与数据库一样,持久性可能需要写入磁盘和/或复制的某种组合(参阅“复制和持久性”),这是有代价的。如果你能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延迟。
是否可以接受消息丢失取决于应用。例如,对于周期传输的传感器读数和指标,偶尔丢失的数据点可能并不重要,因为更新的值会在短时间内发出。但要注意,如果大量的消息被丢弃,可能无法立刻意识到指标已经不正确了【7】。如果你正在对事件计数,那么更重要的是它们能够可靠送达,因为每个丢失的消息都意味着使计数器的错误扩大。
我们在第10章中探讨的批处理系统的一个很好的特性是,它们提供了强大的可靠性保证:失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障一样,这有助于简化编程模型。在本章的后面,我们将研究如何在流处理的上下文中提供类似的保证。
直接从生产者传递给消费者
许多消息传递系统使用生产者和消费者之间的直接网络通信,而不通过中间节点:
- UDP组播广泛应用于金融行业,例如股票市场,其中低时延非常重要【8】。虽然UDP本身是不可靠的,但应用层的协议可以恢复丢失的数据包(生产者必须记住它发送的数据包,以便能按需重新发送数据包)。
- 无代理的消息库,如ZeroMQ 【9】和nanomsg采取类似的方法,通过TCP或IP多播实现发布/订阅消息传递。
- StatsD 【10】和Brubeck 【7】使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。 (在StatsD协议中,只有接收到所有消息,才认为计数器指标是正确的;使用UDP将使得指标处在一种最佳近似状态【11】。另请参阅“TCP与UDP”
- 如果消费者在网络上公开了服务,生产者可以直接发送HTTP或RPC请求(参阅“通过服务进行数据流:REST和RPC”)将消息推送给使用者。这就是webhooks背后的想法【12】,一种服务的回调URL被注册到另一个服务中,并且每当事件发生时都会向该URL发出请求。
尽管这些直接消息传递系统在设计它们的环境中运行良好,但是它们通常要求应用代码意识到消息丢失的可能性。它们的容错程度极为有限:即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。
如果消费者处于脱机状态,则可能会丢失其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但当生产者崩溃时,它可能会丢失消息缓冲区及其本应发送的消息,这种方法可能就没用了。
消息代理
一种广泛使用的替代方法是通过消息代理(message broker)(也称为消息队列(message queue))发送消息,消息代理实质上是一种针对处理消息流而优化的数据库。它作为服务器运行,生产者和消费者作为客户端连接到服务器。生产者将消息写入代理,消费者通过从代理那里读取来接收消息。
通过将数据集中在代理上,这些系统可以更容易地容忍来来去去的客户端(连接,断开连接和崩溃),而持久性问题则转移到代理的身上。一些消息代理只将消息保存在内存中,而另一些消息代理(取决于配置)将其写入磁盘,以便在代理崩溃的情况下不会丢失。针对缓慢的消费者,它们通常会允许无上限的排队(而不是丢弃消息或背压),尽管这种选择也可能取决于配置。
排队的结果是,消费者通常是异步(asynchronous)的:当生产者发送消息时,通常只会等待代理确认消息已经被缓存,而不等待消息被消费者处理。向消费者递送消息将发生在未来某个未定的时间点 —— 通常在几分之一秒之内,但有时当消息堆积时会显著延迟。
消息代理与数据库对比
有些消息代理甚至可以使用XA或JTA参与两阶段提交协议(参阅“实践中的分布式事务”)。这个功能与数据库在本质上非常相似,尽管消息代理和数据库之间仍存在实践上很重要的差异:
- 数据库通常保留数据直至显式删除,而大多数消息代理在消息成功递送给消费者时会自动删除消息。这样的消息代理不适合长期的数据存储。
- 由于它们很快就能删除消息,大多数消息代理都认为它们的工作集相当小—— 即队列很短。如果代理需要缓冲很多消息,比如因为消费者速度较慢(如果内存装不下消息,可能会溢出到磁盘),每个消息需要更长的处理时间,整体吞吐量可能会恶化【6】。
- 数据库通常支持二级索引和各种搜索数据的方式,而消息代理通常支持按照某种模式匹配主题,订阅其子集。机制并不一样,对于客户端选择想要了解的数据的一部分,这是两种基本的方式。
- 查询数据库时,结果通常基于某个时间点的数据快照;如果另一个客户端随后向数据库写入一些改变了查询结果的内容,则第一个客户端不会发现其先前结果现已过期(除非它重复查询或轮询变更)。相比之下,消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端。
这是关于消息代理的传统观点,它被封装在诸如JMS 【14】和AMQP 【15】的标准中,并且被诸如RabbitMQ,ActiveMQ,HornetQ,Qpid,TIBCO企业消息服务,IBM MQ,Azure Service Bus和Google Cloud Pub/Sub实现 【16】。
多个消费者
当多个消费者从同一主题中读取消息时,有使用两种主要的消息传递模式,如图11-1所示:
负载均衡(load balance)
每条消息都被传递给消费者之一,所以处理该主题下消息的工作能被多个消费者共享。代理可以为消费者任意分配消息。当处理消息的代价高昂,希望能并行处理消息时,此模式非常有用(在AMQP中,可以通过让多个客户端从同一个队列中消费来实现负载均衡,而在JMS中则称之为共享订阅(shared subscription))。
扇出(fan-out)
每条消息都被传递给所有消费者。扇出允许几个独立的消费者各自“收听”相同的消息广播,而不会相互影响 —— 这个流处理中的概念对应批处理中多个不同批处理作业读取同一份输入文件 (JMS中的主题订阅与AMQP中的交叉绑定提供了这一功能)。
图11-1 (a)负载平衡:在消费者间共享消费主题;(b)扇出:将每条消息传递给多个消费者。
两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅一个主题,每一组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。
确认与重新交付
消费随时可能会崩溃,所以有一种可能的情况是:代理向消费者递送消息,但消费者没有处理,或者在消费者崩溃之前只进行了部分处理。为了确保消息不会丢失,消息代理使用确认(acknowledgments):客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除。
如果与客户端的连接关闭,或者代理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者。 (请注意可能发生这样的情况,消息实际上是处理完毕的,但确认在网络中丢失了。需要一种原子提交协议才能处理这种情况,正如在“实践中的分布式事务”中所讨论的那样)
当与负载均衡相结合时,这种重传行为对消息的顺序有种有趣的影响。在图11-2中,消费者通常按照生产者发送的顺序处理消息。然而消费者2在处理消息m3时崩溃,与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1,结果消费者1按照m4,m3,m5的顺序处理消息。因此m3和m4的交付顺序与以生产者1的发送顺序不同。
图11-2 在处理m3时消费者2崩溃,因此稍后重传至消费者1
即使消息代理试图保留消息的顺序(如JMS和AMQP标准所要求的),负载均衡与重传的组合也不可避免地导致消息被重新排序。为避免此问题,你可以让每个消费者使用单独的队列(即不使用负载均衡功能)。如果消息是完全独立的,则消息顺序重排并不是一个问题。但正如我们将在本章后续部分所述,如果消息之间存在因果依赖关系,这就是一个很重要的问题。
分区日志
通过网络发送数据包或向网络服务发送请求通常是短暂的操作,不会留下永久的痕迹。尽管可以永久记录(通过抓包与日志),但我们通常不这么做。即使是将消息持久地写入磁盘的消息代理,在送达给消费者之后也会很快删除消息,因为它们建立在短暂消息传递的思维方式上。
数据库和文件系统采用截然相反的方法论:至少在某人显式删除前,通常写入数据库或文件的所有内容都要被永久记录下来。
这种思维方式上的差异对创建衍生数据的方式有巨大影响。如第10章所述,批处理过程的一个关键特性是,你可以反复运行它们,试验处理步骤,不用担心损坏输入(因为输入是只读的)。而 AMQP/JMS风格的消息传递并非如此:收到消息是具有破坏性的,因为确认可能导致消息从代理中被删除,因此你不能期望再次运行同一个消费者能得到相同的结果。
如果你将新的消费者添加到消息系统,通常只能接收到消费者注册之后开始发送的消息。先前的任何消息都随风而逝,一去不复返。作为对比,你可以随时为文件和数据库添加新的客户端,且能读取任意久远的数据(只要应用没有显式覆盖或删除这些数据)。
为什么我们不能把它俩杂交一下,既有数据库的持久存储方式,又有消息传递的低延迟通知?这就是基于日志的消息代理(log-based message brokers) 背后的想法。
使用日志进行消息存储
日志只是磁盘上简单的仅追加记录序列。我们先前在第3章中日志结构存储引擎和预写式日志的上下文中讨论了日志,在第5章复制的上下文里也讨论了它。
同样的结构可以用于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。 Unix工具tail -f
能监视文件被追加写入的数据,基本上就是这样工作的。
为了扩展到比单个磁盘所能提供的更高吞吐量,可以对日志进行分区(在第6章的意义上)。不同的分区可以托管在不同的机器上,且每个分区都拆分出一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。这种方法如图11-3所示。
在每个分区内,代理为每个消息分配一个单调递增的序列号或偏移量(offset)(在图11-3中,框中的数字是消息偏移量)。这种序列号是有意义的,因为分区是仅追加写入的,所以分区内的消息是完全有序的。没有跨不同分区的顺序保证。
图11-3 生产者通过将消息追加写入主题分区文件来发送消息,消费者依次读取这些文件
Apache Kafka 【17,18】,Amazon Kinesis Streams 【19】和Twitter的DistributedLog 【20,21】都是基于日志的消息代理。 Google Cloud Pub/Sub在架构上类似,但对外暴露的是JMS风格的API,而不是日志抽象【16】。尽管这些消息代理将所有消息写入磁盘,但通过跨多台机器分区,每秒能够实现数百万条消息的吞吐量,并通过复制消息来实现容错性【22,23】。
日志与传统消息相比
基于日志的方法天然支持扇出式消息传递,因为多个消费者可以独立读取日志,而不会相互影响 —— 读取消息不会将其从日志中删除。为了在一组消费者之间实现负载平衡,代理可以将整个分区分配给消费者组中的节点,而不是将单条消息分配给消费者客户端。
每个客户端消费指派分区中的所有消息。然后使用分配的分区中的所有消息。通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点:
- 共享消费主题工作的节点数,最多为该主题中的日志分区数,因为同一个分区内的所有消息被递送到同一个节点^i。
- 如果某条消息处理缓慢,则它会阻塞该分区中后续消息的处理(一种行首阻塞的形式;请参阅“描述性能”)。
因此在消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况下,JMS/AMQP风格的消息代理是可取的。另一方面,在消息吞吐量很高,处理迅速,顺序很重要的情况下,基于日志的方法表现得非常好。
消费者偏移量
顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。在这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。
实际上,这种偏移量与单领导者数据库复制中常见的日志序列号非常相似,我们在“设置新从库”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导者,并在不跳过任何写入的情况下恢复复制。这里原理完全相同:消息代理的表现得像一个主库,而消费者就像一个从库。
如果消费者节点失效,则失效消费者的分区将指派给其他节点,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续的消息,但还没有记录它们的偏移量,那么重启后这些消息将被处理两次。我们将在本章后面讨论这个问题的处理方法。
磁盘空间使用
如果只追加写入日志,则磁盘空间终究会耗尽。为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储。 (我们将在后面讨论一种更为复杂的磁盘空间释放方式)
这就意味着如果一个慢消费者跟不上消息产生的速率而落后的太多,它的消费偏移量指向了删除的段,那么它就会错过一些消息。实际上,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃旧消息,它也被称为循环缓冲区(circular buffer)或环形缓冲区(ring buffer)。不过由于缓冲区在磁盘上,因此可能相当的大。
让我们做个简单计算。在撰写本文时,典型的大型硬盘容量为6TB,顺序写入吞吐量为150MB/s。如果以最快的速度写消息,则需要大约11个小时才能填满磁盘。因而磁盘可以缓冲11个小时的消息,之后它将开始覆盖旧的消息。即使使用多个磁盘和机器,这个比率也是一样的。实践中的部署很少能用满磁盘的写入带宽,所以通常可以保存一个几天甚至几周的日志缓冲区。
不管保留多长时间的消息,日志的吞吐量或多或少保持不变,因为无论如何,每个消息都会被写入磁盘【18】。这种行为与默认将消息保存在内存中,仅当队列太长时才写入磁盘的消息传递系统形成鲜明对比。当队列很短时,这些系统非常快;而当这些系统开始写入磁盘时,就要慢的多,所以吞吐量取决于保留的历史数量。
当消费者跟不上生产者时
在“消息传递系统”中,如果消费者无法跟上生产者发送信息的速度时,我们讨论了三种选择:丢弃信息,进行缓冲或施加背压。在这种分类法里,基于日志的方法是缓冲的一种形式,具有很大,但大小固定的缓冲区(受可用磁盘空间的限制)。
如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让人类运维来修复慢消费者,并在消息开始丢失之前让其赶上。
即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运维优势:你可以实验性地消费生产日志,以进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的只有消费者偏移量。
这种行为也与传统的信息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列—— 否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。
重播旧信息
我们之前提到,使用AMQP和JMS风格的消息代理,处理和确认消息是一个破坏性的操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,使用消息更像是从文件中读取数据:这是只读操作,不会更改日志。
除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量的前进。但偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地操纵:例如你可以用昨天的偏移量跑一个消费者副本,并将输出写到不同的位置,以便重新处理最近一天的消息。你可以使用各种不同的处理代码重复任意次。
这一方面使得基于日志的消息传递更像上一章的批处理,其中衍生数据通过可重复的转换过程与输入数据显式分离。它允许进行更多的实验,更容易从错误和漏洞中恢复,使其成为在组织内集成数据流的良好工具【24】。