Skip to content

Chapter 2. The What, Where, When, and How of Data Processing

kimi edited this page Mar 20, 2022 · 9 revisions

是时候开始具体行动了!

第 1 章主要集中在三个方面:术语,准确定义我在使用 "流" 这样的重载术语时的意思;批处理与流,比较两类系统的理论能力,并假设只有两样东西是使流系统超越批处理系统所必需的:正确性和推理时间的工具;以及数据处理模式,研究批处理和流式系统在处理有界和无界数据时采取的概念性方法。

在本章中,我们将进一步关注第 1 章中的数据处理模式,但要更加详细,并以具体的例子为背景。当我们结束时,我们将涵盖我认为是稳健的无序数据处理所需的核心原则和概念;这些是推理时间的工具,真正让你超越传统的批处理。

为了让你感觉到事情的实际情况,我使用了Apache Beam的代码片段,再加上延时图来提供概念的视觉表现。Apache Beam是一个统一的编程模型和可移植层,用于批处理和流处理,有一套不同语言(如Java和Python)的具体SDKs。用Apache Beam编写的管道可以在任何支持的执行引擎(Apache Apex、Apache Flink、Apache Spark、Cloud Dataflow等)上可移植地运行。

我在这里用Apache Beam做例子,并不是因为这是一本Beam的书(不是),而是因为它最完整地体现了本书所描述的概念。在最初写 Streaming 102 的时候(那时它还是Google Cloud Dataflow的Dataflow模型,而不是Apache Beam的Beam模型),它是现存的唯一一个能够为我们在这里要讲的所有例子提供必要的表达能力的系统。一年半之后,我很高兴地说,情况发生了很大变化,大多数主要的系统已经或正在朝着支持与本书描述的模型类似的方向发展。所以,请放心,我们在这里所涉及的概念,虽然是通过Beam的镜头来了解的,但也同样适用于你将遇到的大多数其他系统。

路线图

为了帮助本章奠定基础,我想列出五个主要概念,这些概念将成为本章所有讨论的基础,实际上也是第一部分其余大部分内容的基础。

在第一章中,我首先建立了事件时间(事件发生的时间)和处理时间(处理过程中观察到的时间)之间的关键区别。这为本书提出的一个主要论点奠定了基础:如果你关心事件的正确性和实际发生的背景,你必须根据其固有的事件时间来分析数据,而不是分析过程中遇到的处理时间。

然后我介绍了窗口化的概念(即沿着时间边界对数据集进行分割),这是一种常用的方法,用来应对无界数据源在技术上可能永远不会结束的事实。窗口化策略的一些较简单的例子是固定窗口和滑动窗口,但更复杂的窗口化类型,如会话(其中窗口由数据本身的特征定义;例如,捕捉每个用户活动的会话,然后是不活动的间隙)也得到了广泛使用。

除了这两个概念之外,我们现在还要仔细研究另外三个概念。

触发器 触发器是一种机制,用于声明一个窗口的输出何时应该相对于某些外部信号而被具体化。触发器在选择输出的时间方面提供了灵活性。在某种意义上,你可以把它们看成是一种流控机制,用于规定何时应该实现结果。另一种看法是,触发器就像相机上的快门释放器,允许你声明何时对正在计算的结果进行快照。
触发器也使你有可能在一个窗口的发展过程中多次观察其输出。这反过来又为随着时间的推移完善结果打开了大门,从而可以在数据到达时提供推测性的结果,以及处理上游数据(修订)随时间的变化或晚到的数据(例如,在移动场景中,某人的手机在其离线时记录了各种动作及其事件时间,然后在重新连接后继续上传这些事件进行处理)。

水位线 水位线是一个与事件时间有关的输入完整性的概念。一个时间值为X的水位线发出这样的声明。"所有事件时间小于X的输入数据都被观察到了"。因此,当观察一个没有已知终点的无界数据源时,水位线充当了进度的衡量标准。我们在这一章中谈到了水位线的基础知识,然后Slava在第三章中对这个问题进行了超级深入的研究。

累积 累积模式规定了在同一窗口中观察到的多个结果之间的关系。这些结果可能是完全不相干的;也就是说,代表了随时间变化的独立的三角洲,或者它们之间可能有重叠。不同的积累模式有不同的语义和相关的成本,因此可以在各种用例中找到适用性。

另外,因为我认为这样更容易理解所有这些概念之间的关系,我们在回答四个问题的结构中重温旧的,探索新的,我提出的所有这些问题对每个无界数据处理问题都是至关重要的。

  • 计算的结果是什么?这个问题由管道内的转换类型来回答。这包括诸如计算总和、建立直方图、训练机器学习模型等内容。这基本上也是经典批处理所回答的问题
  • 在事件时间的什么地方计算结果?这个问题是通过在流水线中使用事件时间窗口来回答的。这包括第一章中常见的窗口化例子(固定、滑动和会话);似乎没有窗口化概念的用例(例如,时间无关的处理;经典的批处理一般也属于这一类);以及其他更复杂的窗口化类型,例如有时间限制的拍卖。还要注意的是,如果你把记录到达系统时的进入时间指定为事件时间,它也可以包括处理时间窗口化。
  • 在处理时间中,什么时候结果会被具体化?这个问题可以通过使用触发器和(可选择的)水位线来回答。这个主题有无限的变化,但最常见的模式是那些涉及重复更新的模式(即物化视图语义),那些利用水位线来提供每个窗口的单一输出,只有在相应的输入被认为是完整的(即在每个窗口基础上应用的经典批处理语义),或两者的一些组合。
  • 结果的细化是如何关联的?这个问题由所使用的积累类型来回答:抛弃(在这种情况下,结果都是独立的和不同的),积累(在这种情况下,后来的结果建立在先前的结果之上),或积累和收回(在这种情况下,积累的值加上先前触发的值的收回都被排放出来)。

在本书的其余部分,我们将更详细地研究这些问题的每一个。而且,是的,我将会把这个颜色计划的事情搞得一塌糊涂,试图让人们非常清楚哪些概念与 "什么/哪里/何时/如何 "这个成语中的哪些问题有关。

批量的基础。什么和哪里

好吧,让我们开始这个聚会。第一站:批处理。

什么:转换

在经典的批处理中应用的变换回答了这个问题。"计算的结果是什么?" 尽管你可能已经熟悉了经典批处理,但我们还是要从这里开始,因为它是我们添加所有其他概念的基础。

在本章的其余部分(事实上,在本书的大部分内容中),我们看一个例子:在一个由九个值组成的简单数据集上计算键入的整数和。让我们想象一下,我们写了一个基于团队的手机游戏,我们想建立一个管道,通过将用户手机报告的个人分数相加来计算团队分数。如果我们在一个名为 "UserScores "的SQL表中捕获我们的九个示例分数,它可能看起来像这样。

Name Team Score EventTime ProcTime
Julie TeamX 5 12:00:26 12:05:19
Frank TeamX 9 12:01:26 12:08:19
Ed TeamX 7 12:02:26 12:05:39
Julie TeamX 8 12:03:06 12:07:06
Amy TeamX 3 12:03:39 12:06:13
Fred TeamX 4 12:04:19 12:06:39
Naomi TeamX 3 12:06:39 12:07:19
Becky TeamX 8 12:07:26 12:08:39
Naomi TeamX 1 12:07:46 12:09:00

请注意,这个例子中的所有分数都来自同一个团队的用户;这是为了保持例子的简单性,因为我们在接下来的图表中的维度数量有限。因为我们是按团队分组的,所以我们实际上只关心最后三栏。

Score - 与此事件相关的个人用户得分.
EventTime - 分数的事件时间,即分数发生的时间。
ProcTime - 分数的处理时间;也就是管道观察到该分数的时间

对于每个管道的例子,我们将看一个延时图,突出数据是如何随时间演变的。这些图在我们关心的两个时间维度上绘制了我们的九个分数:X轴的事件时间和Y轴的处理时间。图2-1说明了输入数据的静态图是什么样子的。

fig-2-1 图 2-1. 九条输入记录,以事件时间和处理时间绘制而成

随后的延时图要么是动画(Safari),要么是一连串的帧(打印和所有其他数字格式),让你看到数据是如何随着时间的推移而被处理的(在我们进入第一个延时图之后,会有更多的内容)。

在每个例子前面都有一个Apache Beam Java SDK伪代码的简短片段,以使管道的定义更加具体。在这个意义上,它是伪代码,我有时会弯曲规则以使例子更清晰,忽略细节(如使用具体的I/O源),或简化名称(Beam Java 2.x和更早的触发器名称是痛苦的冗长;我使用更简单的名称以使其清晰)。除了像这样的小事之外,它还是真实世界的Beam代码(本章中的所有例子的真实代码都可以在GitHub上找到)。

如果你已经熟悉了像Spark或Flink这样的东西,你应该会比较容易理解Beam代码在做什么。但为了给你上一堂速成课,Beam中有两个基本原语。

P 集合 (PCollections) 这些代表数据集(可能是大规模的数据集),在这些数据集上可以进行并行转换(因此名称开头有 "P")。 P 转换(PTransforms) 这些被应用于 PCollections 来创建新的 PCollections。PTransforms 可以进行元素间的转换,它们可以将多个元素分组/聚集在一起,或者它们可以是其他 PTransforms 的复合组合,如图2-2所描述。

fig-2-2 图2-2. 转换的类型

在我们的例子中,我们通常假设我们从一个预先加载的名为 “输入" 的PCollection<KV<Team, Integer>>开始(也就是说,一个由Teams和Integer的键/值对组成的PCollection,其中Teams只是代表团队名称的字符串,而Integer是来自相应团队中任何个人的得分)。在现实世界的管道中,我们会通过从I/O源读取原始数据(例如,日志记录)的PCollection来获得输入,然后通过将日志记录解析为适当的键/值对,将其转换为PCollection<KV<Team, Integer>>。在这个第一个例子中,为了清晰起见,我包括了所有这些步骤的伪代码,但在随后的例子中,我省略了I/O和解析。

因此,对于一个简单地从I/O源读取数据,解析球队/分数对,并计算每个球队的分数总和的管道,我们会有类似例2-1所示的东西。

例2-1。求和管道 PCollection raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input.apply(Sum.integersPerKey());

键/值数据从一个I/O源读取,以团队(如团队名称的字符串)为键,以整数(如单个团队成员的分数)作为值。然后将每个键的值相加,在输出集合中产生每个键的总和(例如,团队总分)。

在接下来的所有例子中,在看到描述我们正在分析的管道的代码片断后,我们会看一个延时图,显示该管道在我们的具体数据集上对单个键的执行。在一个真正的管道中,你可以想象类似的操作会在多台机器上并行进行,但为了我们的例子,保持简单的东西会更清楚。

如前所述,Safari浏览器版本以动画电影的形式呈现完整的执行过程,而印刷品和所有其他数字格式则使用关键帧的静态序列,提供管道如何随时间进展的感觉。在这两种情况下,我们还提供了一个URL,可以在 www.streamingbook.net 上看到完整的动画版本。

每张图描绘了两个维度的输入和输出:事件时间(在X轴上)和处理时间(在Y轴上)。因此,管道观察到的真实时间是从下往上的,正如随着时间的推移在处理时间轴上上升的粗黑线所表示的。输入是圆圈,圆圈内的数字代表该特定记录的值。它们开始时是浅灰色的,随着管道的观察而变深。

当流水线观察到数值时,它在中间状态中积累这些数值,并最终将汇总的结果具体化为输出。状态和输出用矩形表示(灰色表示状态,蓝色表示输出),集合值靠近顶部,矩形覆盖的区域代表事件时间和处理时间累积到结果的部分。对于例2-1 中的流水线,在一个经典的批处理引擎上执行时,它看起来就像图2-3所示。

fig-2-3 图2-3. 经典的批处理

因为这是一个批处理管道,它积累状态,直到它看到所有的输入(由顶部的绿色虚线表示),在这一点上,它产生48的单一输出。在这个例子中,我们计算的是所有事件时间的总和,因为我们没有应用任何特定的窗口转换;因此状态和输出的矩形覆盖了X轴的全部内容。然而,如果我们想处理一个无边界的数据源,经典的批处理是不够的;我们不能等待输入结束,因为它实际上永远不会结束。我们想要的概念之一是窗口化,我们在第1章中介绍了这一概念。因此,在我们的第二个问题--"在事件时间的什么地方计算结果?"的背景下,我们现在将简要地重新审视窗口化。

哪里:窗口 (Where: Windowing)

正如第1章所讨论的,窗口化是将数据源按照时间界线切分的过程。常见的窗口化策略包括固定窗口、滑动窗口和会话窗口,如图2-4所示。

fig-2-4 图2-4. 窗口化策略实例。每个例子都显示了三个不同的键,强调了对齐窗口(适用于所有数据)和非对齐窗口(适用于数据的一个子集)之间的区别。

为了更好地了解窗口化在实践中的情况,让我们把我们的整数求和管道变成固定的、两分钟的窗口。在Beam中,这种变化只是简单地增加了一个Window.into的转换,你可以在例2-2中看到高亮的部分。

例2-2. 窗口化求和代码 PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES))) .apply(Sum.integersPerKey());

回顾一下,Beam提供了一个统一的模型,在批处理和流媒体中都能工作,因为从语义上讲,批实际上只是流的一个子集。因此,让我们先在批处理引擎上执行这个管道;其机制更直接,而且当我们切换到流式引擎时,它可以给我们一些直接的比较。图2-5显示了结果。

fig-2-5 图2-5. 批量引擎上的窗口化求和

和以前一样,输入在状态中被积累,直到它们被完全消耗,之后就会产生输出。然而,在这种情况下,我们得到的不是一个输出,而是四个:一个输出,用于四个相关的两分钟事件时间窗口。

在这一点上,我们已经重温了我在第一章中介绍的两个主要概念:事件时间域和处理时间域之间的关系,以及窗口化。如果我们想更进一步,我们就需要开始加入本节开头提到的新概念:触发器、水印和累积。

流式处理: 时间点和怎么处理 (Going Streaming: When and How)

我们刚刚观察了批处理引擎上的窗口化流水线的执行情况。但是,在理想情况下,我们希望我们的结果有更低的延迟,而且我们也希望能原生地处理无界数据源。切换到流式引擎是朝着正确的方向迈出的一步,但是我们之前的策略,即等到我们的输入被完全消耗后再生成输出,已经不可行了。进入触发器和水位线。

时间点: 触发器的奇妙之处在于触发器是奇妙! (When: The Wonderful Thing About Triggers Is Triggers Are Wonderful Things!)

触发器提供了问题的答案。"在处理时间内,什么时候计算结果?" 触发器声明一个窗口的输出应该在处理时间内发生(尽管触发器本身可能会根据在其他时间域发生的事情做出这些决定,比如水印在事件时间域的进展,我们一会儿就会看到)。一个窗口的每个具体输出被称为窗口的一个窗格。

尽管我们可以想象出相当广泛的可能的触发语义,但从概念上讲,只有两种普遍有用的触发器类型,而实际应用几乎总是归结为使用其中之一或两者的组合。

重复更新的触发器
当一个窗口的内容发生变化时,它们会定期为其生成更新的窗格。这些更新可以在每条新记录中被具物化,也可以在一些处理时间延迟后发生,比如每分钟一次。对重复更新触发器的周期的选择主要是为了平衡延迟和成本。
完备性触发器
这些触发器只有在某个窗口的输入被认为完成到某个阈值后才会为该窗口实现一个窗格。这种类型的触发器最类似于我们在批处理中所熟悉的:只有在输入完成后,我们才提供一个结果。基于触发器的方法的不同之处在于,完整性的概念被限定在单个窗口的范围内,而不是总是与整个输入的完整性相联系。

重复更新触发器是流式系统中最常见的触发器类型。它们实现起来很简单,也很容易理解,而且它们为特定类型的用例提供了有用的语义:对物化数据集的重复(最终一致)更新,类似于你在数据库世界中通过物化视图得到的语义。

完整性触发器不常出现,但它提供的流式语义与经典批处理世界的语义更为接近。它们还提供了用于推理缺失数据和延迟数据的工具,这两点我们很快就会讨论(以及在下一章),因为我们要探索驱动完整性触发器的底层原子:水印。

但首先,让我们从简单的开始,看看一些基本的重复更新触发器的作用。为了使触发器的概念更加具体,让我们继续往前走,给我们的示例管道添加最直接的触发器类型:一个每条新记录都会触发的触发器,如例2-3所示。

例2-3. 每条记录重复触发 PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AfterCount(1)))); .apply(Sum.integersPerKey());

如果我们在一个流媒体引擎上运行这个新的管道,结果会像图2-6所示那样。

fig-2-6 图2-6. 流媒体引擎上的每条记录触发

你可以看到我们现在如何为每个窗口获得多个输出(窗格):每个相应的输入一次。当输出流被写入某种你可以简单地轮询结果的表格时,这种触发模式效果很好。任何时候,你都可以在表格中看到某个窗口的最新值,而且这些值会随着时间的推移向正确性靠拢。

每条记录触发的一个缺点是,它是相当健谈的。在处理大规模数据时,像求和这样的聚合提供了一个很好的机会,可以在不丢失信息的情况下减少数据流的cardinality。这在你有高容量键的情况下特别明显;对于我们的例子,有大量活跃玩家的大规模团队。想象一下,在一个大规模的多人游戏中,玩家被分成两个派别中的一个,而你想在每个派别的基础上统计数字。也许没有必要用一个给定派别中每个玩家的每条新输入记录来更新你的统计。相反,你可能会很高兴在一些处理时间延迟后更新它们,比如每秒钟,或者每分钟。使用处理时间延迟的一个很好的副作用是,它对高容量的键或窗口有一个均衡的效果:所产生的数据流最终在卡片数量上更加统一。

触发器中的处理时间延迟有两种不同的方法:对齐的延迟(延迟将处理时间切成固定的区域,在不同的键和窗口中对齐)和非对齐的延迟(延迟是相对于在特定窗口中观察到的数据而言)。一个没有对齐延迟的流水线可能看起来像例2-4,其结果如图2-7所示。

例2-4. 在对齐的两分钟处理时间边界上触发 PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AlignedDelay(TWO_MINUTES))) .apply(Sum.integersPerKey());

fig-2-7 图2-7. 两分钟对齐的延迟触发器(即微分批)。