写在前面的话
本章是ddia的第十一章,流处理。
流处理与之前批处理最大的区别在于流是无穷无尽的,以事件为最小原子,随着时间推移逐渐可用。
本章第一部分讲的消息队列,将消费者和生产者的稳定性问题转移到了自己身上,比如fabirc中的老熟人kafka
接下来提到了流与数据库的关联,流很像是数据库中的日志结构存储引擎,阐述了两者之间的关联
再后提到了状态与流处理,讨论了流处理中的不同窗口方式以及连接类型,以及应用
最后对流处理的容错进行了讨论,与批处理不同的需要更加细粒度的容错机制。
流处理
- 批处理:输入是有界的,批处理知道它合适完成输入的读取
- 流处理:抛开固定的时间切片,事件发生就立即处理
“流”是指随着时间的推移逐渐可用的数据
传递事件流
流处理的上下文中,记录通常被叫做事件
事件:一个小的,自包含的,不可变的对象,包含某个时间点发生的某件事情的细节
事件由生产者(producer
)生成一次,然后可能由多个消费者(consumer
)进行处理,在流媒体系统中,相关的事件通常被聚合为一个主题(topic
)或流(stream
)
消息系统
向消费者通知新事件的常用方式是使用消息传递系统(messaging system
)
通用问题:
- 如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?
- 系统丢掉消息
- 将消息放入缓冲队列
- 使用背压(流量控制,阻塞生产者,以避免发送更多的消息)
P.S. 如消息缓存在队列中,当队列装不进内存时系统会发生何种情况?
- 崩溃?
- 写入磁盘?
- 磁盘访问会如何影响消息传递系统的性能?
- 如果节点崩溃或暂时脱机,会发生什么情况?
- 持久性可能需要写入磁盘和/或复制的某种组合
- 如果你能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延 迟。
直接从生产者传递给消费者
- UDP组播广泛应用于金融行业 (应用层协议恢复丢失的数据包)
- 无代理的消息库,通过TCP或IP多播实现发布/订阅消息
- ZeroMQ
- nanomsg
- 使用不可靠的UDP消息传递来收集网络中所有机器的指标对其进行监控
- StasD
- Brubeck
- 如消费者在网络上公开服务,生产者可以直接发送HTTP或RPC请求将消息推送给使用者
- webhooks
这些系统:
- 通常要求应用代码意识到消息丢失的可能性
- 需要假设生产者和消费者始终在线
消息代理
消息代理(message broker
),也称为消息队列(message queue
),消息代理实质上是一种针对处理消息流而优化的数据库。作为服务器运行,生产者和消费者作为客户端连接到服务器,生产者将消息写入代理,消费者通过从代理那里读取来接受消息。
通过代理,可以将持久性问题转移到代理的身上。
消息代理与数据库对比
- 数据库通常保留数据直至显式删除,而大多数消息代理在消息传递成功后会自动删除数据
- 代理通常认为自己的队列很短,如果需要缓冲很多消息,就可能导致整体吞吐量恶化
- 数据库通常支持二级索引和各种搜索数据的方式,消息代理通常支持按照某种模式匹配主题,订阅子集,机制并不一样
- 查询数据库时,结果通常基于某个时间点的数据快照。消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端
多个消费者
多个消费者从同一主题中读取消息时,有两种主要的消息传递模式
- 负载均衡(load balance)
每条消息都被传递给消费者之一,处理该主题下消息的工作能被多个消费者共享。适用于处理消息的代价高昂,希望能并行处理消息
- 扇出(fan-out)
每条消息都传递给所有的消费者,允许几个独立的消费者收听相同的消息广播而不互相影响。
两种模式可以组合使用:两个独立的消费者组可以每组各订阅一个主题,每一组组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理
确认与重新交付
为了确保消息不会丢失,消息代理使用确认:客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除
如果与客户端的连接关闭,或者代理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者
负载均衡与重新交付结合时,可能会导致消息顺序变动,如下图所示
消费者2在消费m3时崩溃,此时消费者1正在处理m4,在处理完成后再处理重发的m3,因此导致顺序变动
为避免此问题:
- 可以让每个消费者使用单独的队列
- 让消息是完全独立的
分区日志
消息队列是建立在短暂消息传递的思维方式上。
如果你将新的消费者添加到消息系统,通常只能接收到消费者注册之后开始发送的消息,先前的任何消息都不能收到,而对于文件和数据库,可以随时添加新的客户端,并且能读取任意久远的数据。
这里就产生了一个想法:基于日志的消息代理(log-based message brokers
),既有数据库的持久存储方式,又有消息传递的低延迟通知
使用日志进行消息存储
生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。 Unix工具tail -f 能监视文件被追加写入的数据,基本上就是这样工作的。
为了扩展性别,可以分区,不同分区可以托管到不同的机器上。
每个分区内队列为每个消息非配一个单调递增的序列号或偏移量(offset)
这种消息存储的实例有:
- Apache Kafka
- Amazon Kinesis Streams
- DistributedLog
日志与传统消息相比
基于日志的方法天然支持扇出式消息传递,读取消息不会将其从日志中删除。
为了在一组消费者之间实现负载平衡,代理可以将整个分区分配给消费者组中的节点,而不是将单条消息分配给消费者客户端
这种负载均衡方式的缺点:
- 共享消费主题工作的节点数,最多为该主题中的日志分区数
- 如果某条消息处理缓慢,则它会阻塞该分区中后续消息的处理
优劣:
- JMS/AMQP风格的消息代理:消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况
- 基于日志的方法:在消息吞吐量很高,处理迅速,顺序很重要的情况下
消费者偏移量
所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到
P.S.
如果消费者节点失效,则失效消费者的分区将指派给其他节点,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续的消息,但还没有记录它们的偏移量,那么重启后这些消息将被处理两次
磁盘空间使用
为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储
在实际中,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃就消息,也被称为循环缓冲区(circular buffer
)或环形缓冲区(ring buffer
)。
当消费者跟不上生产者时
基于日志的方法是缓冲的一种形式,具有很大,但大小固定的缓冲区(受可用磁盘空间的限制)
如果消费者过慢,可以监控消费者落后日志头部的距离,如果落后太多就发出报警
即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务,可以实验性的消费生产日志,进行开发,测试或者调试,不担心会中断生产服务。
重播旧信息
基于日志的消息代理中,使用消息更像是从文件中读取数据:这是只读操作,不会更改日志。
这一方面允许基于日志的消息传递进行更多的实验,更容易从错误和漏洞中恢复,使其成为在组织内继承数据流的良好工具
流与数据库
数据库和数据流之间的联系不仅仅是磁盘日志的物理存储 —— 而是更深层的联系
保持系统同步
由于相同或相关的数据出现在了不同的地方,因此相互间需要保持同步
对于数据仓库:
批处理: 通常是先取得数据库的完整副本,然后执行转换,并批量加载到数据仓库中。
对于数据库和搜索索引,有时候会使用双写(dual write
),代码在数据变更时明确写入每个系统中,例如,首先写入数据库,再更新搜索索引,再然后使缓存项失效
双写的问题之竞争条件如下图所示
在数据库中被置为A,在索引中最终被置为B
双写的问题之容错问题:其中一个写入可能失败,而另外一个成功,这里需要原子操作
变更数据捕获
变更数据捕获(change data capture, CDC)
这是一种观察写入数据库的所有数据变更,并将其提取并转换为可以复制到其他系统中的形式的过程.
变更数据捕获的实现
从本质上说,变更数据捕获使得一个数据库成为领导者(被捕获变化的数据库),并将其他组件变为追随者。基于日志的消息代理非常适合从源数据库传输变更事件,因为它保留了消息的顺序。
方法:
- 数据库触发器可用来实现变更数据捕获,通过注册观察所有变更的触发器,并将相应的变更项写入变更日志表中,但是很脆弱,有显著的性能开销。
- 解析复制日志是一种更稳健的方法
实例:
- LinkedIn的Databus,Facebook的Wormhole,Yahoo!的Sherpa大规模应用数据库触发
- Bottled Water使用解码WAL的API实现了PostgreSQL的CDC
- Maxwell和Debezium通过解析binlog实现了MySQL的CDC
- Mongoriver读取MongoDB oplog实现了CDC
- GoldenGate为Oracle实现了CDC
变更数据捕获通常是异步的
- 优势:添加缓慢的消费者不会过度影响记录系统
- 劣势:所有复制延迟可能有的问题在这里都可能出现
初始快照
拥有所有对数据库进行变更的日志,则可以通过重放该日志,来重建数据库的完整状 态。
- 永远保留所有更改会耗费太多磁盘空间
- 重放过于费时
因此需要快照。
数据库的快照必须与变更日志中的已知位置或偏移量相对应,以便在处理完快照后知道从哪里开始应用变更。
日志压缩
在基于日志的消息代理与变更数据捕获与日志存储中的日志压缩一致:
- 存储引擎定期在日志中查找具有相同键的记录,丢掉所有重复的内容,并只保留每个键的最新更新。这个压缩与合并过程在后台运行
实践:
- Apache Kafka支持这种日志压缩功能
变更流的API支持
数据库开始将变更流作为接口,例如:
- RethinkDB允许查询订阅通知,当查询结果变更时获得通知
- Firebase和CouchDB基于变更流进行同步,这个变更流同样可以用于应用
- Meteor使用MongoDB oplog订阅数据变更,并改变了用户接口
- VoltDB允许事务以流的形式连续地从数据库中导出数据
- Kafka Connect致力于将广泛的数据库系统的变更数据捕获工具与Kafka集成
事件溯源
事件溯源涉及到将所有对应用状态的变更存储为变更事件日志,与变更数据捕获的区别在于:
- 在变更数据捕获中,应用以可变方式使用数据库,任意更新和删除记录
- 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的
从事件日志中派生出当前状态
事件日志本身并不是很有用,因为用户通常期望看到的是系统的当前状态,而不是变更历史。
重放事件日志允许让你重新构建系统的当前状态,但是日志压缩需要采用不同的方式
- 用于记录更新的CDC事件通常包含记录的完整新版本,因此主键的当前值完全由该主键的最近事件确定,而日志压缩可以丢弃相同主键的先前事件
- 事件溯源在更高层次进行建模:事件通常表示用户操作的意图,而不是因为操作而发生的状态更新机制,因此不能进行同样的日志压缩
命令和事件
- 命令
来自用户的请求刚到达时,它一开始是一个命令 - 事件
如果验证成功并且命令被接 受,则它变为一个持久化且不可变的事件
状态,流和不变性
状态:
- 通常将数据库视为应用程序当前状态的存储
- 本质:会变化
- 数据库支持数据的增删改
如果存储了变更日志,重现状态就非常简单
事务日志记录了数据库的所有变更。高速追加下入是更改日志的唯一方法。从这个角度来看,数据库的内容其实是日志中记录最新值的缓存。日志才是真相,数据库是日志子集的缓存,这一缓存子集恰好来自日志中每条记录与索引值的最新值
日志压缩是连接日志与数据库状态之间的桥梁:它只保留每条记录的最新版本,并丢弃被覆盖的版本
不可变事件的优点
数据库中的不变性,如果发生错误,不是单纯的删除或者更改分类账中的错误交易,而是添加另外一笔以补偿错误。
从同一事件日志中派生出多个视图
通过从不变的事件日志中分离出可变的状态,你可以针对不同的读取方式,从相同的事件日志中衍生出几种不同的表现形式
例如:
- 分析型数据库Druid使用这种方式直接从Kafka摄取数据
- Pistachio是一个分布式的键值存储,使用Kafka作为提交日志
- Kafka Connect能将来自Kafka的数据导出到各种不同的数据库与索引
通过将数据写入的形式与读取形式相分离,并允许几个不同的读取视图,能获得很大的灵活性。这个想法有时被称为命令查询责任分离
并发控制
事件溯源和变更数据捕获的缺点:用户会写入日志,然后从日志衍生视图中读取,结果发现他的写入还没有反映在读取视图中
- 解决方案一:将事件附加到日志时同步执行读取视图的更新,需要分布式事务
- 解决方案二:可以设计一个自包含的事件以表示一个用户操作。然后用户操作就只需要在一个地方进行单次写入操作 —— 即将事件附加到日志中
- 解决方案三:如果事件日志与应用状态以相同的方式分区,直接使用单线程日志消费者就不需要写入并发控制了
不变性的限制
永远保持所有变更的不变历史取决于数据集的流失率
流处理
如何处理流:
- 可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后能被其他客户端查询
- 能以某种方式将事件推送给用户,例如发送邮件或者推送通知
- 可以处理一个或多个输入流并产生一个或多个输出流
流处理的应用
主要用于监控目的,例如
- 欺诈检测系统需要确定信用卡的使用模式是否有意外地变化
- 交易系统需要检查金融市场的价格变化
- 制造系统需要监控工厂中机器的状态
- 军事和情报系统需要跟踪潜在侵略者的活动
复合事件处理
简称CEP(complex, event processing),与正则表达式允许你在字符串中搜索特定字符模式的方式类似,CEP允许你指定规则以在流中搜索某些事件模式
- 通常使用高层次的声明式查询语言,例如SQL
- 将查询提交引擎,引擎消费输入流,并在内部维护一个执行所需匹配的状态机
- 发现匹配时发出一个复合事件,并附有监测到的事件模式详情
CEP与普通数据库相比是颠倒的。
- 普通数据库持久存储数据,查询到来时匹配数据,查询完成后丢弃查询
- CEP中查询是长期存储,来自输入流的事件不断流过它们,搜索匹配事件模式的查询
实现:
- Esper
- IBM InfoSphere Streams
- Apama
- TIBCO StreamBase
- SQLstream
流分析
分析往往对找出特定事件序列并不关心,而更关注大量事件上的聚合与统计指标,如
- 测量某种类型事件的速率
- 滚动计算一段时间窗口内某个值的平均值
- 将当前的统计值与先前的时间区间的值对比
流分析系统有时会使用概率算法,例如
- Bloom filter,管理成员资格
- HyperLogLog,用于基数估计以及百分比估计算法
流处理没有任何内在近似性,概率算法只是一种优化
维护物化视图
在某个数据集上衍生出一个替代视图以便高效查询,并在底层数据变更时更新视图
原则上讲,任何流处理组件都可以用于维护物化视图
例如:
- Samza、Kafka Streams
在流上搜索
也允许基于复杂标准(例如全文搜索查询)来搜索单个事件的需求。
消息传递与RPC
消息传递系统可以作为RPC的替代方案,即作为一种服务间通信的机制
例如Actor框架与流处理的区别
- Actor框架主要是管理模块通信的并发和分布式执行的一种机制;而流处理主要是一种数据管理技术
- Actor之间的交流往往是短暂的,一对一的;而事件日志则是持久的,多订阅者的
- Actor可以以任意方式进行通信(允许包括循环的请求/响应);而流处理通常配置在无环流水线中,其中每个流都是一个特定作业的输出,由良好定义的输入流中派生而来
时间推理
流处理通常需要与时间打交道,尤其是用于分析目的时候,会频繁使用时间窗口,例如“过去五分钟的平均值”
- 按时间分析中,批处理器是检查事件中嵌入的时间戳,不需要运行系统的时钟
- 许多流处理框架使用处理机器上的本地系统时钟来确定窗口,然而存在延时时,就会失效。
事件时间与处理时间
事件时间与处理事件是两个概念,很多原因都会导致处理延迟。
消息延迟还可能导致无法预测消息顺序
知道什么时候准备好了
用事件时间来定义窗口的一个棘手的问题是,你永远也无法确定是不是已经收到了特定窗口的所有事件,还是说还有一些事件正在来的路上
在一段时间没有看到任何新事件后,可以宣布窗口就绪,但是仍然可能因为“延迟”导致窗口完成之后到达的滞留事件,大体有两种选择:
- 忽略这些滞留事件,并且将丢弃数量作为一个监控指标,并在大量丢消息时进行报警
- 发布一个更正,一个包括滞留事件的更新窗口值
你用的是谁的时钟
当事件可能在系统内多个地方进行缓冲时,为事件分配时间戳更加困难了
例如:
考虑一个移动应用向服务器上报关于用量的事件
- 极大的滞留事件:可能在设备脱机状态时被使用,事件会被本地缓冲,并在下一次联网时上报
- 时间戳不可信:事件时间戳实际是用户交互发生时间,即用户的时间
- 可信的时间戳意义不大:服务器收到事件的时间准确,但是在描述用户交互方面意义不大
解决方案:
方法一:记录三个时间戳
- 设备时钟:事件发生的时间
- 设备时钟:事件发送服务器的时间
- 服务器时钟:事件被服务器接收的时间
用时钟3-时钟2,可以得到服务器与设备时钟间的偏移,在用于时间1,就可以估计真实发生的时间。(假设设备时钟不发生变化)
窗口的类型
确定事件时间戳后,需要定义时间段的窗口,有几种窗口比较常用
- 滚动窗口:固定长度,每个事件只属于一个窗口
- 例如:一分钟的滚动窗口,10:03:00到10:03:59之间会被分组到一个窗口,10:04:00到10:04:59被分组到另一个窗口。
- 跳动窗口:跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑
- 例如:一个带有1分钟跳跃步长的5分钟窗口将包含10:03:00到10:07:59之间的事件,先计算1分钟的滚动窗口,然后再在几个相邻窗口上进行聚合。
- 滑动窗口:滑动窗口包含了此间距在特定时长内的所有事件
- 例如:一个5分钟的滑动窗口应当覆盖 10:03:39到10:08:12的事件,通过维护一个按时间排序的事件缓冲区,并不断移除过期事件,可以实现滑动窗口
- 会话窗口:会话窗口没有固定的持续时间,将同一用户出现时间相近的所有事件分组在一起,而当用户一段时间没有活动时,窗口结束。
流式连接
流流连接(窗口连接)
例如:
目的
- 某网站上有搜索功能,想要找出搜索URL的近期趋势
场景
- 键入搜索查询时,会记录下一个包含查询与返回结果的事件
- 点击搜索结果时,就会记录另一个记录点击事件
- 需要将搜索动作与点击动作的事件连接,用于计算结果中每个URL的点击率
搜索结果与点击事件的时间是高度可变的,需要选择合适的连接窗口,例如点击与搜索时间间隔在一小时以内
方法
- 按会话ID索引最近一小时内发生的所有事件。
- 发生搜索事件或点击事件,会被添加到合适的索引中,流处理器会去检查另一个索引是否有相同ID事件到达。
- 如有匹配事件,就会发出一个表示搜索结果被点击的事件
- 如果过期,就会发出一个表示搜索结果未被点击的事件
流表连接(流扩展)
目的
用户活动事件与用户档案数据库的连接
场景
- 输入:包含用户ID的活动事件流
- 输出:活动事件流,用户ID被扩展为用户档案信息
方法
- 一次处理一个活动事件,远程查询数据库并将用户档案信息添加到活动事件中
- 将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返
- 流处理器数据库的本地副本需要保持更新。可以通过变更数据捕获来解决
表表连接(维护物化视图)
目的
需要一个时间线缓存:发送推文的时候写入这些信息,读取时间线只需要简单查询即可
场景
- 当用户
u
发送新推文时,会被添加到每个关注u
的时间线上 - 用户删除推文时,推文将从所有用户的时间表中删除
- 当用户
u1
开始关注用户u2
时,将u2
最近的推文添加u1
的时间线上 - 当用户
u1
取消关注用户u2
时,u2
的推文将从u1
的时间线中移除
需求前提
- 推文事件流(发送与删除)
- 关注关系事件流(关注与取消关注)
- 维护数据库,包含每个用户的粉丝集合
方法
本质上是维护了一个连接两个表的物化视图
1 | SELECT follows.follower_id AS timeline_id, |
连接的时间依赖性
流流,流表,表表的共通之处:都需要流处理器维护连接一侧的一些状态,然后当连接另一侧的消息到达时查询该状态。
问题
如果不同流中的事件发生在近似的时间范围内,则应该按照什么样的顺序进行处理?
在数据仓库中,这个问题被称为缓慢变化的维度(slowly changing dimension, SCD)
方法
通常通过对特定版本的记录使用唯一的标识符来解决
例如:每当税率改变时都会获得一个新的标识符,而发票在销售时会带有税率的标识符
容错
等待某个任务完成之后再使其输出可见并不是一个可行选项,因为你永远无法处理完一个无限的流
微批量与存档点
微批次:将流分成小块,并像微型批处理一样处理每个块
- Spark Streaming:批次大小通常为1秒,提供了一个与批次大小相等的滚动窗口
- Apache Flink:定期生成状态的滚动存档点并将其写入持久存储中。
原子提交再现
需要确保事件处理的所有输出和副作用当且仅当处理成功时才会生效
- Google Cloud Dataflow和VoltDB使用了类似分布式事务中的XA方法
- Apache Kafka也计划加入类似的功能
幂等性
幂等操作是多次重复执行与单次执行效果相同的操作
即使一个操作不是天生幂等的,往往可以通过一些额外的元数据做成幂等的
例如:
在使用来自Kafka的消息时,每条消息都有一个持久的,单调递增的偏移量,写入外部数据库时可以带上这个偏移量,避免重复执行,实现幂等
失败后重建状态
任何需要状态的流处理,都必须确保失败之后能恢复其状态
方法
- 将状态保存在远程数据存储中,并进行复制,缺点是每个消息查询都需要远程数据库,可能会很慢
- 在流处理器本地保存状态,并定期复制
实践
- Flink定期捕获算子状态的快照,并将它们写入HDFS等持久存储中
- Samza和Kafka Streams通过将状态变更发送到具有日志压缩功能的专用Kafka主题来复制状态变更
- VoltDB通过在多个节点上对每个输入消息进行冗余处理来复制状态