翻译-In-Stream Big Data Processing 流式大数据处理


原文:http://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/

作者:Ilya Katsov

相当长一段时间以来,大数据社区已经普遍认识到了批量数据处理的不足。很多应用都对实时查询和流式处理产生了迫切需求。最近几年,在这个理念的推动下,催生出了一系列解决方案,Twitter Storm,Yahoo S4,Cloudera Impala,Apache Spark和Apache Tez纷纷加入大数据和NoSQL阵营。本文尝试探讨流式处理系统用到的技术,分析它们与大规模批量处理和OLTP/OLAP数据库的关系,并探索一个统一的查询引擎如何才能同时支持流式、批量和OLAP处理。

在Grid Dynamics,我们面临的需求是构建一个流式数据处理系统,每天需要处理80亿事件,并提供容错能力和严格事务性,即不能丢失或重复处理事件。新系统是现有系统的补充和继任者,现有系统基于Hadoop,数据处理延迟高,而且维护成本太高。此类需求和系统相当通用和典型,所以我们在下文将其描述为规范模型,作为一个抽象问题陈述。

下图从高层次展示了我们的生产环境概况:


这是一套典型的大数据基础设施:多个数据中心的各个应用程序都在生产数据,数据通过数据收集子系统输送到位于中心设施的HDFS上,然后原始数据通过标准的Hadoop工具栈(MapReduce,Pig,Hive)进行汇总和分析,汇总结果存储在HDFS和NoSQL上,再导出到OLAP数据库上被定制的用户应用访问。我们的目标是给所有的设施配备上新的流式处理引擎(见图底部),来处理大部分密集数据流,输送预汇总的数据到HDFS,减少Hadoop中原始数据量和批量job的负载。

流式处理引擎的设计由以下需求驱动:

  • SQL-like功能:引擎能执行SQL-like查询,包括在时间窗口上的join和各种聚合函数,用来实现复杂的业务逻辑。引擎还能处理从汇总数据中加载的相对静态数据(admixtures)。更复杂的多道次数据挖掘算法不在短期目标范围内。
  • 模块化和灵活性:引擎绝不仅是简单执行SQL-like查询,然后相应的管道就被自动创建和部署,它应该能通过连接各模块,很方便地组合出更复杂的数据处理链。
  • 容错:严格容错是引擎的基本需求。如草图中,一种可能的设计是使用分布式数据处理管道来实现join、汇总或者这些操作组成的链,再通过容错的持久化buffer来连接这些管道。利用这些buffer,可以实现发布/订阅的通信方式,可以非常方便地增加或者移除管道,这样最大程度提升了系统的模块化。管道也可以是有状态的,引擎的中间件提供持久化存储来启用状态检查点机制。所有这些主题将在本文后续章节进行讨论。
  • 和Hadoop交互:引擎应该能接入流式数据和来自Hadoop的数据,作为HDFS之上的定制化查询引擎提供服务。
  • 高性能和便携性:即使在最小规模的集群上,系统也能每秒传递成千上万次消息。引擎理应是紧凑高效的,能够被部署在多个数据中心的小规模集群上。

为了弄明白如何实现这样一个系统,我们讨论以下主题:

  • 首先,我们讨论流式数据处理系统,批量处理系统和关系型查询引擎之间的关系,流式处理系统能够大量用到其他类型系统中已经应用的技术。
  • 其次,我们介绍一些在构建流式处理框架和系统中用到的模式和技术。此外,我们调研了当下新兴技术,提供一些实现上的小贴士。

The article isbased on a research project developed at Grid Dynamics Labs. Much of the creditgoes to Alexey Kharlamov and Rafael Bagmanov who led the project and othercontributors: Dmitry Suslov, Konstantine Golikov, Evelina Stepanova, AnatolyVinogradov, Roman Belous, and Varvara Strizhkova.

分布式查询处理基础

分布式流式数据处理显然和分布式关系型数据库有联系。许多标准查询处理技术都能应用到流式处理引擎上来,所以理解分布式查询处理的经典算法,理解它们和流式处理以及其他流行框架比如MapReduce的关系,是非常有用的。

分布式查询处理已经发展了数十年,是一个很大的知识领域。我们从一些主要技术的简明概述入手,为下文的讨论提供基础。

分区和Shuffling

分布式和并行查询处理重度依赖数据分区,将大数据打散分成多片让各个独立进程分别进行处理。查询处理可能包括多步,每一步都有自己的分区策略,所以数据shuffling操作广泛应用于分布式数据库中。

尽管用于选择和投射操作的最优分区需要一定技巧(比如在范围查询中),但我们可以假设在流式数据过滤中,使用哈希分区在各处理器中分发数据足以行得通。

分布式join不是那么简单,需要深入研究。在分布式环境中,并行join通过数据分区实现,也就是说,数据分布在各个处理器中,每个处理器运行串行join算法(比如嵌套循环join或者排序-合并join或者哈希join)处理部分数据。最终结果从不同处理器获取合并得来。

分布式join主要采用两种数据分区技术:

  • 不相交数据分区
  •  划分-广播join

不相交数据分区技术使用join key将数据shuffle到不同分区,分区数据互不重叠。每个处理器在自己分区数据上执行join操作,不同处理器的结果简单拼接产生最终结果。考虑R和S数据集join的例子,它们以数值键k进行join,以简单的取模函数进行分区。(假设数据基于某种策略,已经分布在各个处理器上):


下图演示了划分-广播join算法。数据集R被划分成多个不相交的分区(图中的R1,R2和R3),数据集S被复制到所有的处理器上。在分布式数据库中,划分操作本身通常不包含在查询过程中,因为数据初始化已经分布在不同节点中。

这种策略适用于大数据集join小数据集,或者两个小数据集之间join。流式数据处理系统能应用这种技术,比如将静态数据(admixture)和数据流进行join

Group By处理过程也依赖shuffling,本质上和MapReduce是类似的。考虑以下场景,数据集根据字符字段分组,再对每组的数值字段求和:


在这个例子中,计算过程包含两步:本地汇总和全局汇总,这基本和Map/Reduce操作对应。本地汇总是可选的,原始数据可以在全局汇总阶段传输、shuffle、汇总。

本节的整体观点是,上面的算法都天然能通过消息传递架构模式实现,也就是说,查询执行引擎可以看做是由消息队列连接起来的分布式网络节点组成,概念上和流式处理管道是类似的。

管道

前一节中,我们注意到很多分布式查询处理算法都类似消息传递网络。但是,这不足以说明流式处理的高效性:查询中的所有操作应该形成链路,数据流平滑通过整个管道,也就是说,任何操作都不能阻塞处理过程,不能等待一大块输入而不产生任何输出,也不用将中间结果写入硬盘。有一些操作比如排序,天然不兼容这种理念(显而易见,排序在处理完输入之前都不能产生任何输出),但管道算法适用于很多场景。一个典型的管道如下图所示:

 

 

在这个例子中,使用三个处理器哈希join四个数据集:R1,S1,S2和S3。首先并行给S1,S2和S3建立哈希表,然后R1元组逐个流过管道,从S1,S2和S3哈希表中查找相匹配记录。流式处理天然能使用该技术实现数据流和静态数据的join。

在关系型数据库中,join操作还会使用对称哈希join算法或其他高级变种。对称哈希join是哈希join算法的泛化形式。正常的哈希join至少需要一个输入完全可用才能输出结果(其中一个输入用来建立哈希表),而对称哈希join为两个输入都维护哈希表,当数据元组抵达后,分别填充:


元组抵达时,先从另外一个数据流对应的哈希表查找,如果找到匹配记录,则输出结果,然后元组被插入到自身数据流对应的哈希表。

当然,这种算法对无限流进行完全join不是太有意义。很多场景下,join都作用于有限的时间窗口或者其他类型的缓冲区上,比如用LFU缓存数据流中最常用的元组。对称哈希join适用于缓冲区大过流的速率,或者缓冲区被应用逻辑频繁清除,或者缓存回收策略不可预见的场景。在其他情况下,使用简单哈希join已足够,因为缓冲区始终是满的,也不会阻塞处理流程:


值得注意的是,流式处理往往需要采用复杂的流关联算法,记录匹配不再是基于字段相等条件,而是基于评分度量,在这种场景下,需要为两个流维护更为复杂的缓存体系。

流式处理模式

前一节中,我们讨论了一些在大规模并行流处理中用到的标准查询技术。从概念层级上来看,似乎一个高效分布式数据库查询引擎能胜任流式处理,反之亦然,一个流式处理系统也应该能充当分布式数据库查询引擎的角色。Shuffling和管道是分布式查询处理的关键技术,而且通过消息传递网络能够自然而然地实现它们。然而真实情况没那么简单。在数据库查询引擎中,可靠性不是那么关键,因为一个只读查询总是能够被重新运行,而流式系统则必须重点关注消息的可靠处理。在本节中,我们讨论流式系统保证消息传递的技术,和其他一些在标准查询处理中不那么典型的模式。

流回放

在流式处理系统中,时光倒流和回放数据流的能力至关重要,因为以下原因:

  • 这是确保数据正确处理的唯一方式。即使数据处理管道是容错的,也难以确保数据处理逻辑是无缺陷的。人们总是面临修复和重新部署系统的需求,需要在新版本的管道上回放数据。
  • 问题调查需要即席查询。如果发生了问题,人们可能需要加入日志或者修改代码,在产生问题的数据上重跑系统。
  • 即使错误不常发生,即使系统总体上是容错的,流式处理系统也必须设计成在发生错误时能够从数据源中重新读取特定消息。

因此,输入数据通常通过缓冲区从数据源流入流式管道,允许客户端在缓冲区中前后移动读取指针。


Kafka消息队列系统就实现了这样一个缓冲区,支持可扩展分布式部署和容错,同时提供高性能。

流回放要求系统设计至少考虑以下需求:

  • 系统能够存储预定义周期内的原始数据。
  • 系统能够撤销一部分处理结果,回放对应的输入数据,产出新版本结果。
  • 系统能够快速倒回,回放数据,然后追上源源不断的数据流进度。

血缘跟踪

在流式系统中,事件流过一连串的处理器直到终点(比如外部数据库)。每个输入事件产生一个由子事件节点(血缘)构成的有向图,有向图以最终结果为终点。为了保障数据处理可靠性,整个图都必须被成功处理,而且在失败的情况下能重启处理过程。

实现高效血缘跟踪是一个难题。我们先介绍Twitter Storm是如何跟踪消息,保障“至少一次”消息处理语义:

  • 数据源(数据处理图的首个节点)发射的所有事件都标记上一个随机的eventID。框架为每个数据源的初始事件都维护了一个[eventIDàsignature]的键值对集合,其中signature以eventID初始化。
  • 下游节点接收到初始事件后,能产生0个或者多个事件,每个事件携带自身的随机eventID和初始事件的eventID。
  • 如果事件被图中下一个节点成功接收和处理,这个节点会更新初始事件的signature,规则是将初始signature与a)输入事件的ID和b)产生的所有事件的ID做异或。如图中第二部分,事件01111产生事件01100,10010和00010,所以事件01111的signature变成11100(=01111(初始值)xor 01111 xor 01100 xor 10010 xor 00010)。
  • 一个事件也能基于多个输入事件生成。这种情况下,事件关联到多个初始事件,携带多个初始ID(图中第三部分黄色背景事件)。
  • 事件的signature变为0代表事件被成功处理。最后一个节点确认图中的最后一个事件成功处理,而且不再往下游发送新的事件。框架给事件源节点发送提交消息(如图第三部分)。
  • 框架定期遍历初始事件表,查找尚未完全处理的事件(即signature不为0的事件)。这些事件被标记为失败,框架请求源节点回放事件。(译者注:Storm消息回放不是自动的,可以在消息发送时加上消息ID参数,然后根据失败的消息ID自行处理回放逻辑,一般Spout对接消息队列系统,利用消息队列系统的回放功能。)
  • 值得注意的是,由于异或操作的可交换特性,signature的更新顺序无关紧要。在下图中,第二部分中的确认操作可以发生在第三部分之后,这让完全的异步处理成为可能。
  • 还需要注意的是该算法不是严格可靠的——一些ID的组合可能偶然导致signature变为0。但是64位长的ID足以保证极低的错误概率,出错概率大概是2^(-64),在大多数应用中这都是可以接受的。算法的主要优点是只需要少量内存就可以保存下signature表。

以上实现非常优雅,具有去中心化特性:每个节点独立发送确认消息,不需要一个中心节点来显式跟踪血缘。然而,对于维护了滑动窗口或其他类型缓冲区的数据流,实现事务处理变得比较困难。比如,滑动窗口内可能包含成千上万个事件,很多事件处于未提交或计算中状态,需要频繁持久化,管理事件确认过程难度很大。

Apache Spark[3]使用的是另外一种实现方法,其想法是把最终结果看作是输入数据的处理函数。为了简化血缘跟踪,框架分批处理事件,结果也是分批的,每一批都是输入批次的处理函数。结果可以分批并行计算,如果某个计算失败,框架只要重跑它就行。考虑以下例子:

在这个例子中,框架在滑动窗口上join两个流,然后结果再经过一个处理阶段。框架把流拆分成批次,每个批次指定ID,框架随时都能根据ID获取相应批次。流式处理被拆分为一系列事务,每个事务处理一组输入批次,使用处理函数转换数据,并保存结果。在上图中,红色加亮部分代表了一次事务。如果事务失败,框架重跑它,最重要的是,事务是可以并行的。

这种方式简洁而强大,实现了集中式事务管理,并天然提供“只执行一次”消息处理语义。这种技术还同时适用于批量处理和流式处理,因为不管输入数据是否是流式的,都把它们拆分成一系列批次。

状态检查点

前一节,我们在血缘跟踪算法中使用签名(校验和)提供了“至少一次”消息传递语义。该技术改善了系统的可靠性,但留下了至少两个开放式问题:

  • 很多场景都要求“只执行一次”处理语义。比如,如果某些消息被发送两次,计数器管道会统计出错误的结果。
  • 处理消息时,管道中的节点计算状态被更新。计算状态需要持久化或者复制,以免节点失败时发生状态丢失。

Twitter Storm使用以下协议解决这些问题:

  • 事件被分组成批次,每个批次关联一个事务ID。事务ID单调递增(比如,第一批事件ID为1,第二批ID为2,诸如此类)。如果管道处理某个批次时失败,这批数据以同样的事务ID重新发送。 
  • 首先,框架通知管道中节点新事务启动。然后,框架发送这一批次数据通过管道。最后,框架通知事务已完成,所有的节点提交状态变更(比如更新到外部数据库)。
  • 框架保证所有事务的提交是有序的,比如事务2不能先于事务1被提交。这保证了处理节点可以使用以下逻辑持久化状态变更:
    •  最新的事务ID和状态被持久化
    • 如果框架请求提交的当前事务ID和数据库中ID不同,状态被更新,比如数据库中的计数器被增加。因为事务的强有序性,所以每个批次数据只会更新一次。
    • 如果当前事务ID和数据库中ID相同,那么这是这个批次数据的回放,节点会忽略这次提交。节点应该已经处理过这个批次,并更新过状态,而事务可能是因为管道中其他部分出错而失败。
    •  有序提交对实现“只执行一次”处理语义至关重要。然而,事务严格顺序处理不太可取,因为下游所有节点处理完成之前,管道中的首个节点都处于空闲状态。通过并行处理事务过程,串行化提交能缓解这个问题,如下图所示:

如果数据源是容错的,能够被回放,那么事务能保障“只执行一次”处理语义。但是,即使使用大容量分批处理,持久化状态更新也会导致严重的性能退化。所以,应该尽可能减少或者避免中间计算结果状态。补充说明的是,状态写入也能通过不同方式实现。最直接的方式是在事务提交过程中,把内存中状态复制到持久化存储。这种方式不适用于大规模状态(比如滑动窗口等)。另一种可选方式是存储某种事务日志,比如将原始状态转化为新状态的一系列操作日志(对滑动窗口来说,可以是一组添加和清理事件)。虽然这种方式需要从日志中重建状态,灾难恢复变得更麻烦,但在很多场景下,它都能提供更好的性能。

可加性和草图

中间和最终计算结果的可加性很重要,能极大地简化流式数据处理系统的设计,实现,维护和恢复。可加性意味着大范围时间或者大容量数据分区的计算结果能够由更小的时间范围或者更小的分区结果组合而来。比如,每日PV量等于每小时PV量之和。状态可加性允许将数据流切分处理,如我们在前一节讨论,每个批次都可以被独立计算/重算,这有助于简化血缘跟踪和减少状态维护的复杂性。

实现可加性往往不轻松:

  • 有一些场景,可加性确实很简单。比如简单计数是可加的。
  • 在一些场景下,需要存储一些附加信息来实现可加性。比如,系统统计网店每小时平均购买价格,尽管每日平均购买价格不等于对24个小时平均购买价格再求平均,但是,如果系统还存储了每个小时的交易量,就能轻易算出每日平均购买价格。
  • 在其他场景下,实现可加性非常困难甚至于不可能。比如,系统统计某个网站的独立访客数据。假设昨天和今天都有100个独立用户访问了网站,但这两天的独立访问用户之和可能是100到200之间的任何值。我们不得不维护用户ID列表,通过ID列表的交集和并集操作来实现可加性。用户ID列表的大小和处理复杂性和原始数据相当。

草图(Sketches)是将不可加值转换为可加值的有效方法。在上面的例子中,ID列表可以被紧凑的可加性统计计数器代替。计数器提供近似值而不是精确值,但在很多应用中都是可以接受的。草图在互联网广告等特定领域非常流行,可以被看做一种独立的流式处理模式。草图技术的深入综述请见[5]。

逻辑时间跟踪

流式计算中通常会依赖时间:汇总和Join一般作用在滑动时间窗口上;处理逻辑往往依赖事件的时间间隔等。显然,流式处理系统应该有自己的时间视图,而不应该使用CPU挂钟时间。因为发生故障时,数据流和特定事件会被回放,所以实现正确的时间跟踪并不简单。通常,全局的逻辑时间概念可以通过以下方式实现:

  • 原始系统产生的所有事件都应该标记上时间戳。
  • 管道中的处理器处理流时,跟踪最大时间戳,如果持久化的全局时钟落后了,就把它更新为最大时间戳。其他处理器与全局时钟进行时间同步
  • 在数据回放时,重置全局时钟。

持久化存储汇总

我们已经讨论了持久化存储可以用于状态检查点,但这不是流式系统引入外部存储的唯一作用。考虑使用Cassandra在时间窗口上join多个数据流的场景。不用再维护内存中的事件缓冲区,我们可以把所有数据流的传入事件保存到Casandra中,使用join key作为row key,如图所示:


在另一边,第二个处理器定期遍历数据记录,组装和发送join后的记录,清理超出时间窗口的事件。Cassandra还可以根据时间戳排序事件来加速处理过程。

不正确的实现会让整个流式数据处理过程功亏一篑——即使使用Cassandra或者Redis等快速存储系统,单独写入每条数据也会引入严重的性能瓶颈。另一方面,使用存储系统提供了更完善的状态持久化功能,如果应用批量写入等优化手段,在很多场景下,也能达成可接受的性能目标。

滑动窗口聚合

流式数据处理经常处理 “过去10分钟流的某个数据值求和是多少” 等类似查询,即时间窗口上的连续查询。针对这类查询,最直接的解决方案是分别计算各个时间窗口的sum等聚合函数。很显然,这种方案不是最优的,因为两个连续的时间窗口实例具有高度相似性。如果时刻T的窗口包含样本{s(0),s(1),s(2),...,s(T-1),s(T)},那么时刻T+1的窗口就包含{s(1),s(2),s(3)...,s(T),s(T+1)}。观察可知可以使用增量处理。

时间窗口之上的增量计算也被广泛应用在数字信号处理中,包括软件和硬件。典型例子是计算sum值。如果当前时间窗口的sum值已知,那么下次时间窗口的sum值就能通过加上新的样本和减去窗口中最老的样本得出。


类似技术不仅能用于求和和乘积等简单聚合函数,也能用于更复杂的转换过程。比如,SDFT(滑动离散傅里叶变换)算法[4]就比对每个窗口使用FFT(快速傅里叶变换)算法要高效得多。

查询处理管道:Storm, Cassandra, Kafka

现在回到文章一开始提出的实际问题上来。我们基于Storm,Kafka和Cassandra(这些组件应用了前文介绍的技术)设计和实现了自己的流式处理系统。在此,我们仅提供解决方案的简明概述——详细描述所有实现上的坑和技巧的篇幅太长,可能需要单独一篇文章。

      

系统理所当然使用Kafka0.8。Kafka作为分区、容错的事件缓冲区,能够实现流回放,可以轻松添加新的事件生产者和消费者,增强了系统的扩展性。Kafka读指针回溯的能力也使随机访问传入的数据批次成为可能,相应地,可以实现Spark风格的血缘跟踪,也可以将系统输入指向HDFS处理历史数据。

如之前描述,Cassandra用于实现状态检查点和持久化存储聚合。在很多使用场景中,Cassandra也用于存储最终结果。

TwitterStorm是系统的基石。所有的活动查询处理都运行于Storm的topologies中,topologies和Kafka、Cassandra进行交互。一些数据流是简单的:数据抵达Kafka;Storm读取并处理,然后把结果存储在Cassandra或者其他地方。其他数据流更为复杂:一个Storm topology通过Kafka或Cassandra将数据传递给另一个topology。上图展示了两个此类型数据流(红色和蓝色曲线箭头)。

迈向统一大数据处理平台

现有的Hive,Storm和Impala等技术让我们处理大数据时游刃有余,复杂分析和机器学习时使用批量处理,在线分析时使用实时查询处理,连续查询时使用流式处理。更进一步,Lambda架构还能有效整合这些解决方案。这给我们带来的问题是:将来这些技术和方法怎样才能聚集成一个统一解决方案。本节我们讨论分布式关系查询处理,批量处理和流式处理最突出的共同点,合计出能够覆盖所有用户场景,从而在这个领域最具发展潜力的解决方案。

关键之处在于,关系型查询处理,MapReduce和流式处理都能通过shuffling和管道等相同的概念和技术来实现。同时:

  • 流式处理必须保障严格的数据传递和中间状态持久化。这些特性对于计算过程很方便重启的批量处理不太关键。
  • 流式处理离不开管道。对于批量处理,管道不那么关键甚至在有些场景下不适用。Apache Hive就是基于分阶段的MapReduce过程,中间结果被物化,没有完全利用上管道的优点。

以上两点暗示,可调的持久化策略(基于内存的消息传递或者存储在硬盘上)和可靠性是我们想象中统一查询引擎的显著特性。统一查询引擎为高层的框架提供一组处理原语和接口。


在新兴技术中,以下两者值得重点关注:

  • Apache Tez[8],Stinger Initiative[9]的一部分。Apache Tez通过引入一组细粒度的查询处理原语来替代MapReduce框架。它的目标是让Apache Pig和Apache Hive等框架把查询语句和脚本分解成高效的查询处理管道,而不是一系列的MapReduce Job,后者通常很慢,因为需要存储中间结果。
  • Apache Spark[10].Spark项目可能是最先进和最有前途的统一大数据处理平台,它已经包含了批量处理框架,SQL查询引擎和流式处理框架。

References


 

                   

相关内容