大约六年前,我加入了 LinkedIn,那是一个特别有趣的时期。我们刚刚开始遇到单片集中式数据库的极限,需要开始向专门的分布式系统组合过渡。这是一次有趣的经历:我们构建、部署并运行了一个分布式图形数据库、一个分布式搜索后端、一个 Hadoop 安装以及第一代和第二代键值存储。
我从中学到的最有用的知识之一是,我们构建的许多东西的核心概念都非常简单:日志。日志有时被称为预写日志、提交日志或事务日志,它几乎和计算机一样古老,是许多分布式数据系统和实时应用程序架构的核心。
如果不了解日志,您就无法完全理解数据库、NoSQL 存储、键值存储、复制、paxos、hadoop、版本控制或几乎任何软件系统;然而,大多数软件工程师并不熟悉它们。我想改变这种现状。在这篇文章中,我将带您了解关于日志的所有知识,包括什么是日志以及如何使用日志进行数据集成、实时处理和系统构建。
日志可能是最简单的存储抽象。它是按时间排序的仅可追加、完全有序的记录序列。它看起来像这样:
记录附加到日志末尾,读取从左到右进行。每个条目都分配有一个唯一的连续日志条目编号。
记录的排序定义了“时间”的概念,因为左侧的条目被定义为比右侧的条目更旧。日志条目号可以被认为是条目的“时间戳”。将这种排序描述为时间概念乍一看似乎有点奇怪,但它有一个方便的特性,即它与任何特定的物理时钟脱钩。当我们进入分布式系统时,这个属性将变得至关重要。
记录的内容和格式对于本讨论的目的来说并不重要。另外,我们不能一直向日志中添加记录,因为最终空间会用完。我稍后会再讨论这个问题。
所以,日志与文件或表并没有什么不同。文件是字节数组,表是记录数组,而日志实际上只是一种按时间排序的记录表或文件。
此时,您可能会想知道为什么值得谈论如此简单的事情?仅可追加的记录序列与数据系统有什么关系?答案是日志具有特定用途:它们记录发生了什么以及何时发生。对于分布式数据系统来说,这在许多方面都是问题的核心。
但在深入讨论之前,让我先澄清一些有点令人困惑的事情。每个程序员都熟悉日志的另一种定义——应用程序可能使用 syslog 或 log4j 写入本地文件的非结构化错误消息或跟踪信息。为了清楚起见,我将其称为“应用程序日志”。应用程序日志是我所描述的日志概念的退化形式。最大的区别在于,文本日志主要供人类阅读,而我所描述的“日记”或“数据日志”是为编程访问而构建的。
(实际上,如果你仔细想想,人类阅读单个机器上的日志的想法有点不合时宜。当涉及许多服务和服务器时,这种方法很快就会成为一种难以管理的策略,日志的目的很快就会变成查询和图表的输入,以了解多台机器上的行为——对于这种情况,文件中的英文文本远不如这里描述的结构化日志那么合适。)
我不知道日志概念从何而来——可能它是类似于二分查找的东西,过于简单以至于发明者没有意识到它是一项发明。它早在 IBM 的System R中就已出现。它在数据库中的使用与在发生崩溃时保持各种数据结构和索引同步有关。为了使其具有原子性和持久性,数据库使用日志来写出将要修改的记录的相关信息,然后再将更改应用于它维护的各种数据结构。日志是发生的事情的记录,每个表或索引都是此历史记录在某个有用的数据结构或索引中的投影。由于日志会立即持久保存,因此在发生崩溃时,它会被用作恢复所有其他持久结构的权威来源。
随着时间的推移,日志的使用从 ACID 的实现细节发展成为在数据库之间复制数据的方法。事实证明,数据库上发生的更改顺序正是保持远程副本数据库同步所需要的。Oracle、MySQL 和 PostgreSQL 包括日志传送协议,用于将日志的各个部分传输到充当从属的副本数据库。Oracle 已将日志产品化为非 Oracle 数据订阅者的通用数据订阅机制,其XStreams和GoldenGate以及 MySQL 和 PostgreSQL 中的类似设施是许多数据架构的关键组件。
由于这种起源,机器可读日志的概念在很大程度上局限于数据库内部。使用日志作为数据订阅机制似乎几乎是偶然出现的。但这种抽象非常适合支持各种消息传递、数据流和实时数据处理。
分布式系统中的日志 日志解决的两个问题——对更改进行排序和分发数据——在分布式数据系统中更为重要。就更新的顺序达成一致(或同意不同意见并应对副作用)是这些系统的核心设计问题之一。
以日志为中心的分布式系统方法源于一个简单的观察,我称之为状态机复制原理:
如果两个相同的确定性过程从相同的状态开始并以相同的顺序获得相同的输入,则它们将产生相同的输出并以相同的状态结束。 这看起来可能有点难懂,所以让我们深入了解一下它的含义。
确定性意味着处理过程不依赖于时间,并且不会让任何其他“带外”输入影响其结果。例如,如果某个程序的输出受线程的特定执行顺序或调用gettimeofday或其他不可重复的事物的影响,则通常最好将其视为非确定性的。
进程的状态是处理结束时机器上剩余的数据(无论是在内存中还是在磁盘上)。
以相同顺序获取相同输入这一点应该引起人们的注意 — — 这就是日志的用武之地。这是一个非常直观的概念:如果你向两个确定性的代码片段提供相同的输入日志,它们将产生相同的输出。
分布式计算的应用非常明显。你可以将让多台机器都做同样的事情的问题简化为实现分布式一致性日志以向这些进程提供输入的问题。此处日志的目的是从输入流中挤出所有不确定性,以确保处理此输入的每个副本保持同步。
当你理解了它,你会发现这个原则并没有什么复杂或深奥之处:它或多或少相当于说“确定性处理是确定性的”。尽管如此,我认为它是分布式系统设计更通用的工具之一。
这种方法的一个优点是,索引日志的时间戳现在充当副本状态的时钟 - 您可以用一个数字描述每个副本,即它处理的最大日志条目的时间戳。此时间戳与日志相结合唯一地捕获了副本的整个状态。
根据日志中的内容,在系统中应用此原则的方法有很多种。例如,我们可以记录对服务的传入请求,或服务响应请求而发生的状态变化,或它执行的转换命令。理论上,我们甚至可以记录每个副本要执行的一系列机器指令或每个副本上要调用的方法名称和参数。只要两个进程以相同的方式处理这些输入,进程就会在副本之间保持一致。
不同的人似乎对日志的用途有不同的描述。数据库人员通常区分物理日志和逻辑日志。物理日志意味着记录更改的每一行的内容。逻辑日志意味着记录的不是更改的行,而是导致行更改的 SQL 命令(插入、更新和删除语句)。
分布式系统文献通常将处理和复制分为两种主要方法。“状态机模型”通常是指主动-主动模型,在该模型中,我们记录传入的请求,每个副本处理每个请求。对此模型的轻微修改称为“主-备份模型”,即选出一个副本作为领导者,并允许该领导者按请求到达的顺序处理请求,并记录处理请求时对其状态的更改。其他副本按领导者所做的状态更改的顺序应用,以便它们保持同步,并在领导者发生故障时随时接替领导者的位置。
为了理解这两种方法之间的区别,让我们看一个简单问题。考虑一个复制的“算术服务”,它维护一个数字作为其状态(初始化为零),并对该值应用加法和乘法。主动-主动方法可能会记录要应用的转换,例如“+1”、“*2”等。每个副本都会应用这些转换,因此会经历同一组值。“主动-被动”方法将让单个主服务器执行转换并记录结果,例如“1”、“3”、“6”等。这个例子还清楚地说明了为什么排序是确保副本之间一致性的关键:重新排序加法和乘法将产生不同的结果。
分布式日志可以看作是模拟共识问题的数据结构。毕竟,日志代表了一系列关于要附加的“下一个”值的决策。您必须眯着眼睛才能看到 Paxos 系列算法中的日志,尽管日志构建是它们最常见的实际应用。对于 Paxos,这通常使用称为“multi-paxos”的协议扩展来完成,该协议将日志建模为一系列共识问题,日志中的每个槽都有一个共识问题。日志在其他协议(如ZAB 、RAFT和Viewstamped Replication )中更为突出,这些协议直接模拟了维护分布式、一致日志的问题。
我怀疑我们对此的看法有点受历史轨迹的影响,也许是因为分布式计算理论在过去几十年的发展速度超过了实际应用。实际上,共识问题有点太简单了。计算机系统很少需要决定一个值,它们几乎总是处理一系列请求。因此,日志,而不是简单的单值寄存器,是更自然的抽象。
此外,对算法的关注掩盖了系统所需的底层日志抽象。我怀疑我们最终会更多地关注日志,将其作为商品化的构建块,而不管其实现方式如何,就像我们经常谈论哈希表一样,而不必费心去了解我们指的是线性探测的杂音哈希还是其他变体。日志将成为某种商品化的接口,许多算法和实现都在竞相提供最佳保证和最佳性能。
变更日志 101:表格和事件是双重的 让我们回到数据库。更改日志和表之间存在着一种令人着迷的二元性。日志类似于所有贷方和借方以及银行流程的列表;表是所有当前账户余额。如果您有更改日志,则可以应用这些更改以创建捕获当前状态的表。此表将记录每个键的最新状态(截至特定日志时间)。从某种意义上说,日志是更基本的数据结构:除了创建原始表之外,您还可以对其进行转换以创建各种派生表。(是的,对于非关系型用户来说,表可以表示键控数据存储。)
这个过程反过来也行得通:如果你有一个正在更新的表,你可以记录这些更改并发布一个“更改日志”,记录表状态的所有更新。这个更改日志正是你支持近实时副本所需要的。所以从这个意义上讲,你可以把表和事件看作是双重的:表支持静态数据,日志捕获更改。日志的神奇之处在于,如果它是一个完整的更改日志,它不仅保存了表的最终版本的内容,还允许重新创建可能存在的所有其他版本。实际上,它是表的每个先前状态的一种备份。
这可能会让您想起源代码版本控制。源代码控制和数据库之间有着密切的关系。版本控制解决的问题与分布式数据系统必须解决的问题非常相似——管理分布式、并发的状态变化。版本控制系统通常模拟补丁序列,这实际上是一个日志。您可以直接与当前代码的签出“快照”进行交互,这类似于表。您会注意到,在版本控制系统中,与其他分布式状态系统一样,复制是通过日志进行的:当您更新时,您只需下拉补丁并将其应用于当前快照。
一些人最近从Datomic(一家销售以日志为中心的数据库的公司)那里看到了其中的一些想法。此演示文稿很好地概述了他们如何在他们的系统中应用这个想法。当然,这些想法并不是这个系统独有的,因为它们已经成为分布式系统和数据库文献的一部分十多年了。
这一切似乎都有些理论化。不要绝望!我们很快就会进入实际阶段。
下一步是什么 在本文的其余部分,我将尝试介绍日志的用途,超越分布式计算或抽象分布式计算模型的内部原理。其中包括:
数据集成——使组织的所有数据在其所有存储和处理系统中轻松获取。 实时数据处理——计算派生的数据流。 分布式系统设计——如何通过以日志为中心的设计简化实用系统。 这些用途都围绕着日志作为独立服务的想法而解决。
在每种情况下,日志的用处都来自于日志提供的简单功能:生成持久、可重放的历史记录。令人惊讶的是,这些问题的核心是能够让多台机器以确定的方式以自己的速率回放历史记录。
第二部分:数据集成 让我首先说一下“数据集成”的含义以及我为什么认为它很重要,然后我们再看看它与日志的关系。
数据集成使组织的所有数据在其所有服务和系统中可用。 “数据集成”这个短语并不常见,但我不知道还有更好的词。更常见的术语ETL通常只涵盖数据集成的有限部分——填充关系数据仓库。但我所描述的大部分内容都可以被认为是 ETL 的泛化,涵盖实时系统和处理流程。
在人们对大数据概念的极大兴趣和热议中,你很少听到有关数据集成的讨论,但尽管如此,我相信“让数据可用”这个平凡的问题是组织可以关注的更有价值的事情之一。
有效使用数据遵循马斯洛需求层次理论。金字塔的底层涉及捕获所有相关数据,能够将其整合到适用的处理环境中(无论是花哨的实时查询系统还是文本文件和 Python 脚本)。需要以统一的方式对这些数据进行建模,以使其易于读取和处理。一旦满足了以统一方式捕获数据的这些基本需求,就可以合理地开发基础设施以各种方式处理这些数据 — MapReduce、实时查询系统等。
值得注意的是:如果没有可靠且完整的数据流,Hadoop 集群只不过是一个非常昂贵且难以组装的空间加热器。一旦数据和处理可用,人们就可以将注意力转移到更精细的问题上,即良好的数据模型和一致且易于理解的语义。最后,注意力可以转移到更复杂的处理上——更好的可视化、报告和算法处理和预测。
根据我的经验,大多数组织在金字塔的底部都存在巨大漏洞——他们缺乏可靠、完整的数据流——但却想直接跳转到高级数据建模技术。这完全是本末倒置。
所以问题是,我们如何在组织的所有数据系统中建立可靠的数据流?
数据集成:两个复杂因素 两种趋势使得数据集成变得更加困难。
事件数据流
第一个趋势是事件数据的兴起。事件数据记录发生的事情,而不是已经发生的事情。在网络系统中,这意味着用户活动日志,也意味着可靠地操作和监控数据中心的机器所需的机器级事件和统计数据。人们倾向于将其称为“日志数据”,因为它通常被写入应用程序日志,但这混淆了形式和功能。这些数据是现代网络的核心:毕竟,谷歌的财富是由基于点击和展示(即事件)的相关性管道产生的。
而且这些并不局限于网络公司,只是网络公司已经完全数字化了,所以更容易被追踪。财务数据长期以来都是以事件为中心的。RFID为物理对象添加了这种跟踪。我认为这种趋势将随着传统业务和活动的数字化而继续下去。
此类事件数据记录了发生的事情,其规模往往比传统数据库大几个数量级。这给处理带来了巨大挑战。
专业数据系统的爆炸式增长
第二个趋势来自过去五年内日益流行且通常免费提供的专用数据系统的激增。存在用于OLAP 、搜索、简单在线存储 、 批处理、图形分析等的专用 系统。
更多种类的更多数据的组合以及将这些数据放入更多系统的愿望导致了巨大的数据集成问题。
日志结构化数据流 日志是处理系统间数据流的自然数据结构。其原理很简单: 将所有组织的数据放入中央日志中,以供实时订阅。 每个逻辑数据源都可以建模为自己的日志。数据源可以是记录事件(例如点击次数或页面浏览次数)的应用程序,也可以是接受修改的数据库表。每个订阅系统都会尽快从此日志中读取数据,将每条新记录应用到自己的存储中,并提升其在日志中的位置。订阅者可以是任何类型的数据系统 - 缓存、Hadoop、另一个站点中的另一个数据库、搜索系统等。
例如,日志概念为每个更改提供了一个逻辑时钟,所有订阅者都可以根据该时钟进行衡量。这使得推理不同订阅者系统相对于彼此的状态变得更加简单,因为每个订阅者系统都有一个它们已读取的“时间点”。
为了更具体一点,考虑一个简单的情况,其中有一个数据库和一组缓存服务器。日志提供了一种同步所有这些系统的更新并推断每个系统的时间点的方法。假设我们用日志条目 X 写入一条记录,然后需要从缓存中读取。如果我们想保证不会看到过时的数据,我们只需确保我们不从任何尚未复制到 X 的缓存中读取数据。
日志还充当缓冲区,使数据生产与数据消费异步。这很重要,原因有很多,但特别是当有多个订阅者可能以不同的速率消费时。这意味着订阅系统可能会崩溃或停机进行维护,并在恢复时赶上进度:订阅者以自己控制的速度消费。批处理系统(如 Hadoop 或数据仓库)可能仅按小时或按天消费,而实时查询系统可能需要按秒计算。原始数据源和日志都不知道各种数据目标系统,因此可以在不改变管道的情况下添加和删除消费者系统。
“每条正常工作的数据管道都像一根原木一样设计;每条损坏的数据管道都有自己的损坏方式。”——列夫·托尔斯泰伯爵(作者翻译) 特别重要的是:目标系统只知道日志,而不了解源系统的任何细节。消费者系统无需关心数据是来自 RDBMS、新奇的键值存储,还是在没有任何实时查询系统的情况下生成的。这似乎是一个次要的问题,但实际上至关重要。
我在这里使用术语“日志”而不是“消息系统”或“发布订阅”,因为它在语义上更加具体,并且更贴近实际实现中支持数据复制所需的描述。我发现“发布订阅”的含义并不比间接寻址消息多——如果比较任何两个承诺发布订阅的消息系统,您会发现它们保证的东西非常不同,并且大多数模型在这个领域都没有用。您可以将日志视为一种具有持久性保证和强排序语义的消息系统。在分布式系统中,这种通信模型有时被称为(有点糟糕的)原子广播。
值得强调的是,日志仍然只是基础设施。这并不是掌握数据流的故事的结束:故事的其余部分围绕元数据、模式、兼容性以及处理数据结构和演化的所有细节。但是,除非有一种可靠的、通用的方法来处理数据流的机制,否则语义细节是次要的。
在 LinkedIn 随着 LinkedIn 从集中式关系数据库转变为分布式系统集合,我亲眼目睹了这个数据集成问题迅速出现。 目前我们的主要数据系统包括:
搜索 社交图谱 伏地魔(键值存储) Espresso(文档存储) 推荐引擎 OLAP 查询引擎 Hadoop 泰拉达特 Ingraphs(监控图表和指标服务) 这些都是专门的分布式系统,在其专业领域提供高级功能。
在我加入 LinkedIn 之前,使用日志进行数据流的想法就已经在 LinkedIn 上流传了。我们开发的最早的基础设施之一是一项名为databus的服务,它在我们早期的 Oracle 表之上提供了日志缓存抽象,以扩展对数据库更改的订阅,这样我们就可以提供给我们的社交图谱和搜索索引。
我先简单介绍一下历史,以提供背景信息。我自己参与这项工作的时间大约是 2008 年,当时我们刚刚推出键值存储。我的下一个项目是尝试让 Hadoop 运行起来,并将我们的一些推荐流程迁移到 Hadoop 上。由于我们在这方面经验不足,我们自然会预留几周时间用于数据输入和输出,其余时间用于实现花哨的预测算法。于是,漫长的跋涉开始了。
我们最初计划只是从现有的 Oracle 数据仓库中抓取数据。第一个发现是,快速从 Oracle 中获取数据是一种黑暗艺术。更糟糕的是,数据仓库处理不适合我们为 Hadoop 计划的生产批处理 - 大部分处理都是不可逆的,并且特定于正在完成的报告。我们最终避开了数据仓库,直接转到源数据库和日志文件。最后,我们实施了另一个管道,将数据加载到我们的键值存储中以提供结果。
这种单调乏味的数据复制最终成为最初开发中的主要项目之一。更糟糕的是,只要任何管道出现问题,Hadoop 系统基本上就无用了——在坏数据上运行花哨的算法只会产生更多坏数据。
尽管我们以相当通用的方式构建了系统,但每个新数据源都需要自定义配置才能设置。事实证明,这也是大量错误和故障的根源。我们在 Hadoop 上实现的网站功能变得很受欢迎,我们发现有很多工程师对此感兴趣。每个用户都有一份他们想要集成的系统列表和一份他们想要的新数据源列表。
古希腊的 ETL。至今没有太大变化。 我慢慢明白了一些事情。
首先,我们建立的管道虽然有点混乱,但实际上非常有价值。仅仅是将数据放入新的处理系统 (Hadoop) 的过程就开启了许多可能性。以前很难对数据进行新的计算,现在就可以实现。许多新产品和分析都来自于将以前被锁定在专门系统中的多条数据组合在一起。
其次,可靠的数据加载显然需要数据管道的深度支持。如果我们捕获了所需的所有结构,我们就可以让 Hadoop 数据加载完全自动化,这样就无需手动添加新数据源或处理架构更改 — 数据会神奇地出现在 HDFS 中,并且会自动为新数据源生成具有相应列的 Hive 表。
第三,我们的数据覆盖率仍然很低。也就是说,如果你看一下 LinkedIn 拥有的 Hadoop 中可用的数据的整体百分比,你会发现它仍然非常不完整。考虑到操作每个新数据源所需的工作量,完成这项工作并不容易。
我们之前的做法是,为每个数据源和目标构建自定义数据加载,这显然是不可行的。我们有几十个数据系统和数据存储库。连接所有这些系统和数据存储库将导致在每对系统之间构建自定义管道,如下所示:
请注意,数据通常是双向流动的,因为许多系统(数据库、Hadoop)既是数据传输的源,也是数据传输的目的地。这意味着我们最终会为每个系统构建两个管道:一个用于输入数据,一个用于输出数据。
这显然需要一大批人来构建,而且永远无法运行。当我们接近完全连接时,我们最终会得到类似 O(N 2 ) 的管道。
相反,我们需要一些像这样的通用的东西:
我们需要尽可能地将每个消费者与数据源隔离开来。理想情况下,他们应该只与一个可以访问所有内容的数据存储库集成。
这个想法是,添加一个新的数据系统(无论是数据源还是数据目的地)应该创建集成工作,只将其连接到单个管道而不是每个数据消费者。
这段经历让我专注于构建Kafka,将我们在消息系统中看到的内容与数据库和分布式系统内部流行的日志概念结合起来。我们希望它首先充当所有活动数据的中央管道,最终用于许多其他用途,包括从 Hadoop 部署数据、监控数据等。
长期以来,作为基础设施产品,Kafka 有点独特(有些人会说很奇怪)——既不是数据库,也不是日志文件收集系统,更不是传统的消息系统。但最近,亚马逊推出了一项与 Kafka 非常相似的服务,名为Kinesis。相似之处包括分区处理方式、数据保留方式,以及 Kafka API 在高级和低级消费者之间相当奇怪的划分。我对此感到非常高兴。AWS 将其作为一项服务提供,这表明您已经创建了一个良好的基础设施抽象!他们对此的愿景似乎与我所描述的完全相似:它是连接所有分布式系统(DynamoDB、RedShift、S3 等)的管道,也是使用 EC2 进行分布式流处理的基础。
与 ETL 和数据仓库的关系 让我们来谈谈数据仓库。数据仓库旨在成为干净、集成的数据存储库,这些数据结构化以支持分析。这是一个好主意。对于那些不了解的人来说,数据仓库方法涉及定期从源数据库中提取数据,将其混合成某种可理解的形式,然后将其加载到中央数据仓库中。拥有这个包含所有数据干净副本的中央位置对于数据密集型分析和处理来说是一笔非常宝贵的资产。从高层次上讲,无论您使用的是 Oracle、Teradata 还是 Hadoop 等传统数据仓库,这种方法都不会发生太大变化,尽管您可能会改变加载和混合的顺序。
包含干净、集成数据的数据仓库是一项巨大的资产,但获取这些数据的机制却有点过时了。
对于以数据为中心的组织来说,关键问题是将干净的集成数据与数据仓库相结合。数据仓库是一种批处理查询基础架构,非常适合多种报告和临时分析,特别是当查询涉及简单的计数、聚合和过滤时。但是,让批处理系统成为干净完整数据的唯一存储库意味着数据无法用于需要实时馈送的系统(实时处理、搜索索引、监控系统等)。
在我看来,ETL 实际上是两件事。首先,它是一个提取和数据清理过程——本质上是释放锁定在组织中各种系统中的数据并删除特定于系统的无意义数据。其次,这些数据被重组以用于数据仓库查询(即使其适合关系数据库的类型系统,强制采用星型或雪花型模式,或许分解为高性能列 格式等)。将这两件事混为一谈是个问题。干净、集成的数据存储库也应该实时可用,以便进行低延迟处理以及在其他实时存储系统中建立索引。
我认为这还具有额外的好处,即使数据仓库 ETL 在组织上更具可扩展性。数据仓库团队的典型问题是,他们负责收集和清理组织中其他每个团队生成的所有数据。激励机制不一致:数据生产者通常不太了解数据仓库中数据的用途,最终创建的数据难以提取或需要大量、难以扩展的转换才能转换为可用形式。当然,中央团队从未设法扩展以跟上组织其他部门的步伐,因此数据覆盖范围总是参差不齐,数据流脆弱,变化缓慢。
更好的方法是建立一个中央管道,即日志,并配备一个定义良好的 API 来添加数据。与此管道集成并提供干净、结构良好的数据馈送的责任在于此数据馈送的生产者。这意味着,作为系统设计和实施的一部分,他们必须考虑将数据导出并转换为结构良好的形式以交付给中央管道的问题。由于数据仓库团队有一个中央集成点,因此添加新的存储系统对他们来说并不重要。数据仓库团队只处理从中央日志加载结构化数据馈送并执行特定于其系统的转换的简单问题。
当考虑采用传统数据仓库以外的其他数据系统时,关于组织可扩展性的这一点就变得尤为重要。例如,假设有人希望提供对组织的完整数据集的搜索功能。或者,假设有人希望提供亚秒级的数据流监控,并提供实时趋势图和警报。在这两种情况下,传统数据仓库或 Hadoop 集群的基础设施都将不合适。更糟糕的是,为支持数据库负载而构建的 ETL 处理管道可能对为这些其他系统提供数据毫无用处,这使得引导这些基础设施成为与采用数据仓库一样艰巨的任务。这可能不可行,这可能有助于解释为什么大多数组织无法轻松获得这些功能来处理所有数据。相比之下,如果组织已经构建了统一、结构良好的数据馈送,那么让任何新系统完全访问所有数据只需要将一个集成管道连接到管道即可。
这种架构还针对特定清理或转换的位置提出了一系列不同的选项:
数据生产者可以在将数据添加到公司范围日志之前完成此操作。 它可以作为日志的实时转换来完成(进而生成新的、转换后的日志) 它可以作为加载过程的一部分,加载到某些目标数据系统中 最好的模式是在数据发布者将数据发布到日志之前进行清理。这意味着确保数据是规范格式,并且不会保留生成数据的特定代码或维护数据的存储系统的任何遗留内容。这些细节最好由创建数据的团队来处理,因为他们最了解自己的数据。在此阶段应用的任何逻辑都应该是无损且可逆的。
任何可以实时完成的增值转换都应作为对生成的原始日志源的后处理。这包括事件数据的会话化,或添加其他普遍感兴趣的派生字段。原始日志仍然可用,但这种实时处理会生成包含增强数据的派生日志。
最后,在加载过程中,只应执行特定于目标系统的聚合。这可能包括将数据转换为特定的星型或雪花型模式,以便在数据仓库中进行分析和报告。由于这个阶段最自然地映射到传统的 ETL 流程,现在在一组更干净、更统一的流上完成,因此应该会大大简化。
日志文件和事件 让我们稍微谈谈这种架构的一个附带好处:它支持解耦、事件驱动的系统。
网络行业处理活动数据的典型方法是将其记录到文本文件中,然后将其存入数据仓库或 Hadoop 进行聚合和查询。这种方法的问题与所有批量 ETL 的问题相同:它将数据流与数据仓库的功能和处理计划结合在一起。
在 LinkedIn,我们以日志为中心的方式构建了事件数据处理。我们使用 Kafka 作为中央多订阅者事件日志。我们定义了数百种事件类型,每种类型都捕获特定类型操作的独特属性。这涵盖了从页面浏览量、广告展示和搜索到服务调用和应用程序异常的所有内容。
要了解此方法的优势,请想象一个简单的事件 — 在职位页面上显示招聘信息。职位页面应仅包含显示职位所需的逻辑。然而,在一个相当动态的网站中,这很容易被与显示职位无关的其他逻辑所淹没。例如,假设我们需要集成以下系统:
我们需要将这些数据发送到 Hadoop 和数据仓库进行离线处理 我们需要计算浏览量,以确保浏览者没有试图抓取某种内容 我们需要汇总此视图以显示在职位发布者的分析页面中 我们需要记录浏览量,以确保我们正确地为该用户提供任何工作推荐(我们不想一遍又一遍地展示同样的东西) 我们的推荐系统可能需要记录浏览量,以正确跟踪该职位的受欢迎程度 ETC 很快,显示职位这一简单行为就变得相当复杂。随着我们添加显示职位的其他位置(移动应用程序等),这种逻辑必须延续,复杂性也随之增加。更糟糕的是,我们需要与之交互的系统现在有些交织在一起——负责显示职位的人员需要了解许多其他系统和功能,并确保它们正确集成。这只是问题的一个玩具版本,任何实际应用程序都会更复杂,而不是更简单。
“事件驱动”风格提供了一种简化此过程的方法。现在,职位显示页面仅显示职位,并记录显示职位的事实以及职位的相关属性、查看者以及有关职位显示的任何其他有用事实。其他每个相关系统(推荐系统、安全系统、职位发布者分析系统和数据仓库)都只需订阅提要并进行处理。显示代码不需要了解这些其他系统,并且如果添加了新的数据消费者,也不需要更改。
构建可扩展的日志 当然,将发布者与订阅者分开并不是什么新鲜事。但是,如果你想要保留一个提交日志,作为消费者规模网站上发生的所有事情的多订阅者实时日志,那么可扩展性将是一个主要挑战。如果我们不能构建一个快速、廉价且可扩展性足以实现大规模实用的日志,那么使用日志作为通用集成机制永远只是一个优雅的幻想。
系统人员通常认为分布式日志是一种缓慢、重量级的抽象(并且通常只将其与 Zookeeper 可能适用的“元数据”用途联系起来)。但是,通过专注于记录大数据流的周到实现,情况就不必如此了。在 LinkedIn,我们目前每天通过 Kafka 运行超过 600 亿条唯一消息写入(如果算上数据中心之间镜像的写入,则有数千亿条)。
我们在 Kafka 中使用了一些技巧来支持这种规模:
对日志进行分区 通过批量读写优化吞吐量 避免不必要的数据复制 为了实现水平扩展,我们将日志分成几个部分:
每个分区都是一个完全有序的日志,但分区之间没有全局排序(除了您可能在消息中包含的一些挂钟时间)。消息到特定分区的分配由写入器控制,大多数用户选择按某种键(例如用户 ID)进行分区。分区允许在分片之间不协调的情况下进行日志追加,并允许系统的吞吐量随 Kafka 集群大小线性扩展。
每个分区都复制到可配置数量的副本中,每个副本都具有分区日志的相同副本。在任何时候,其中一个副本将充当领导者;如果领导者发生故障,其中一个副本将接替领导者的位置。
缺乏跨分区的全局顺序是一个限制,但我们认为这不是一个主要问题。事实上,与日志的交互通常来自数百或数千个不同的进程,因此谈论它们行为的总体顺序是没有意义的。相反,我们提供的保证是每个分区都保持顺序,并且 Kafka 保证来自单个发送者的特定分区的附加内容将按照发送顺序交付。
日志与文件系统一样,易于针对线性读写模式进行优化。日志可以将小的读写操作组合成更大的高吞吐量操作。Kafka 积极追求这种优化。批处理在从客户端到服务器发送数据、写入磁盘、在服务器之间复制、将数据传输给消费者以及确认已提交的数据时发生。
最后,Kafka 使用简单的二进制格式,在内存日志、磁盘日志和网络数据传输之间保持这种格式。这使我们能够利用包括零拷贝数据传输在内的多种优化。
这些优化的累积效应是,您通常可以按照磁盘或网络支持的速率写入和读取数据,即使在维护大大超出内存的数据集时也是如此。
本文主要不是关于 Kafka 的,所以我不会深入讨论。你可以在这里阅读 LinkedIn 方法的更详细概述,并在这里阅读Kafka 设计的全面概述。
第三部分:日志和实时流处理 到目前为止,我只描述了一种将数据从一个地方复制到另一个地方的奇特方法。但在存储系统之间传输字节并不是故事的结束。事实证明,“日志”是“流”的另一种说法,日志是流处理的核心。
但是,等等,流处理到底是什么?
如果您是 90 年代末和 21 世纪初数据库 文献或半成功的数据 基础设施 产品的粉丝,您可能会将流处理与构建 SQL 引擎或用于事件驱动处理的“框和箭头”界面的努力联系起来。
如果您关注开源数据系统的爆炸式增长,您可能会将流处理与该领域的某些系统联系起来,例如Storm、Akka、S4和 Samza。但大多数人认为这些是一种异步消息处理系统,与集群感知的 RPC 层没有太大区别(事实上,该领域的一些东西就是这样的)。
这两种观点都有些局限。流处理与 SQL 无关。它也不限于实时处理。没有内在原因导致您不能使用各种不同的语言来表达计算,从而处理昨天或一个月前的数据流。 我认为流处理的含义要广泛得多:用于持续数据处理的基础设施。我认为计算模型可以像 MapReduce 或其他分布式处理框架一样通用,但能够产生低延迟结果。
处理模型的真正驱动因素是数据收集方法。批量收集的数据自然会批量处理。连续收集数据时,自然会连续处理数据。
美国人口普查是批量数据收集的一个很好的例子。人口普查定期启动,通过让人们挨家挨户走访的方式对美国公民进行强力发现和统计。这在 1790 年人口普查刚开始时非常有意义。当时的数据收集本质上是批量导向的,它涉及骑马四处走动并将记录写在纸上,然后将这批记录运送到一个中心位置,在那里人类将所有计数加起来。如今,当你描述人口普查过程时,人们会立即想知道为什么我们不记录出生和死亡,并连续或以所需的任何粒度进行人口计数。
这是一个极端的例子,但许多数据传输过程仍然依赖于定期转储和批量传输与集成。处理批量转储的唯一自然方法是使用批处理。但随着这些过程被连续馈送所取代,人们自然开始转向连续处理,以平衡所需的处理资源并减少延迟。
例如,LinkedIn 几乎没有批量数据收集。我们的数据大部分是活动数据或数据库更改,两者都是连续发生的。事实上,当你考虑任何业务时,底层机制几乎总是一个连续的过程——事件是实时发生的,正如 Jack Bauer 告诉我们的那样。当数据以批量方式收集时,它几乎总是由于某些手动步骤或缺乏数字化,或者是某些非数字过程自动化遗留下来的历史遗留。当机制是邮件并且由人工进行处理时,传输和响应数据的速度非常慢。自动化的第一次传递总是保留原始过程的形式,因此这通常会持续很长时间。
每天运行的生产“批处理”作业通常有效地模拟了一种窗口大小为一天的连续计算。当然,底层数据总是在变化。这些在 LinkedIn 上非常常见(并且让它们在 Hadoop 中工作的机制非常棘手),因此我们实施了一整套框架来管理增量 Hadoop 工作流。
从这个角度来看,很容易对流处理有不同的看法:它只是在正在处理的底层数据中包含时间概念的处理,并且不需要数据的静态快照,因此它可以以用户控制的频率产生输出,而不是等待数据集的“结尾”。从这个意义上讲,流处理是批处理的泛化,并且鉴于实时数据的普遍性,这是一个非常重要的泛化。
那么,为什么传统观点认为流处理是一种小众应用呢?我认为最大的原因是缺乏实时数据收集,使得持续处理成为学术界关注的问题。
我认为缺乏实时数据收集可能是导致商业流处理系统失败的原因。他们的客户仍在进行面向文件的日常批处理,以进行 ETL 和数据集成。构建流处理系统的公司专注于提供处理引擎来附加到实时数据流,但事实证明,当时很少有人真正拥有实时数据流。实际上,在我职业生涯的早期,在 LinkedIn,一家公司试图向我们出售一款非常酷的流处理系统,但由于当时我们所有的数据都是以小时文件的形式收集的,我们能想到的最佳应用就是在小时结束时将每小时的文件传输到流系统中!他们指出,这是一个相当普遍的问题。这个例外实际上证明了这里的规则:金融是流处理取得一定成功的一个领域,而金融正是实时数据流已经成为常态、处理成为瓶颈的领域。
即使存在健康的批处理生态系统,我认为流处理作为一种基础设施的实际适用性也相当广泛。我认为它弥补了实时请求/响应服务和离线批处理之间的基础设施差距。对于现代互联网公司来说,我认为大约 25% 的代码属于这一类。
事实证明,日志解决了流处理中一些最关键的技术问题,我将对此进行描述,但它解决的最大问题只是使数据在实时多订阅者数据馈送中可用。对于那些对更多细节感兴趣的人,我们已经开源了Samza ,这是一个明确基于这些想法构建的流处理系统。我们在此处的文档中更详细地描述了许多此类应用程序。
数据流图
流处理最有趣的方面与流处理系统的内部结构无关,而是与它如何扩展我们之前数据集成讨论中关于数据馈送的概念有关。我们主要讨论了原始数据的馈送或日志——在执行各种应用程序时产生的事件和数据行。但流处理还允许我们包括根据其他馈送计算的馈送。这些派生的馈送对于消费者来说看起来与计算它们所依据的原始数据的馈送没有什么不同。这些派生的馈送可以封装任意复杂性。
让我们深入探讨一下。就我们的目的而言,流处理作业是指从日志中读取数据并将输出写入日志或其他系统的任何内容。它们用于输入和输出的日志将这些过程连接到处理阶段图中。事实上,以这种方式使用集中式日志,您可以将组织的所有数据捕获、转换和流动视为一系列日志和写入日志的过程。
流处理器根本不需要花哨的框架:它可以是从日志中读取和写入的任何进程或进程集,但可以提供额外的基础设施和支持来帮助管理处理代码。
集成中日志的目的有两个方面。
首先,它使每个数据集具有多订阅者和有序性。回想一下我们的“状态复制”原则,记住顺序的重要性。为了更具体一点,考虑来自数据库的更新流——如果我们在处理过程中对同一记录的两个更新重新排序,我们可能会产生错误的最终输出。这种顺序比 TCP 之类的协议提供的顺序更持久,因为它不限于单个点对点链接,并且在进程故障和重新连接后仍然存在。
其次,日志为进程提供缓冲。这是非常基本的。如果处理以不同步的方式进行,则上游数据生成作业生成数据的速度可能比另一个下游作业处理数据的速度更快。发生这种情况时,处理必须阻止、缓冲或丢弃数据。丢弃数据可能不是一种选择;阻止可能会导致整个处理图陷入停顿。日志充当非常大的缓冲区,允许进程重新启动或失败,而不会减慢处理图的其他部分。当将此数据流扩展到更大的组织时,这种隔离尤其重要,因为处理是由许多不同团队的作业进行的。我们不能让一个错误的作业导致背压,从而停止整个处理流程。
Storm和Samza都是以这种方式构建的,并且可以使用 Kafka 或其他类似系统作为其日志。
状态实时处理 一些实时流处理只是无状态的一次记录转换,但许多用途是更复杂的计数、聚合或流中窗口的连接。例如,人们可能希望用有关执行点击的用户的信息来丰富事件流(例如点击流)——实际上是将点击流连接到用户帐户数据库。这种处理最终总是需要处理器维护某种状态:例如,在计算计数时,您需要维护迄今为止的计数。如果处理器本身可能会失败,如何正确维护这种状态?
最简单的替代方案是将状态保存在内存中。但是,如果进程崩溃,它将丢失中间状态。如果状态仅在窗口内保持,则进程可以回退到日志中窗口开始的位置。但是,如果要进行一小时的计数,这可能不可行。
另一种方法是将所有状态简单地存储在远程存储系统中,然后通过网络加入该存储。这种方法的问题是数据没有本地性,而且需要大量的网络往返。
我们如何支持像“表”这样根据我们的处理进行分区的东西?
我们回想一下关于表和日志的二元性的讨论。这为我们提供了将流转换为与我们的处理位于同一位置的表的工具,以及处理这些表的容错机制。
流处理器可以将其状态保存在本地“表”或“索引”中 — bdb、leveldb,甚至一些更不常见的索引,例如Lucene或fastbit索引。此存储的内容来自其输入流(首先可能应用任意转换)。它可以为其保存的本地索引记录更改日志,以便在发生崩溃和重新启动时恢复其状态。此机制允许一种通用机制,用于将共分区状态保存在与传入流数据本地的任意索引类型中。
当进程失败时,它会从变更日志中恢复其索引。日志是将本地状态转换为一种每次备份的增量记录。
这种状态管理方法具有一个优雅的特性,即处理器的状态也以日志的形式维护。我们可以将此日志视为对数据库表的更改日志。事实上,处理器拥有与它们一起维护的非常类似于共分区表的东西。由于此状态本身就是一个日志,因此其他处理器可以订阅它。当处理的目标是更新最终状态并且此状态是处理的自然输出时,这实际上非常有用。
当与出于数据集成目的而从数据库输出的日志相结合时,日志/表二元性的强大功能就变得显而易见。可以从数据库中提取更改日志,并由各种流处理器以不同的形式进行索引,以与事件流连接。
我们在这里更详细地介绍了 Samza 中这种管理状态处理的风格,并提供了更多实际示例。
日志压缩 当然,我们不可能希望保留所有时间的所有状态变化的完整日志。除非有人想使用无限空间,否则必须以某种方式清理日志。我将稍微谈谈 Kafka 中实现这一点的方法,以使其更加具体。在 Kafka 中,清理有两个选项,具体取决于数据是包含键控更新还是事件数据。对于事件数据,Kafka 支持仅保留一个数据窗口。通常,这配置为几天,但窗口可以根据时间或空间来定义。但是,对于键控数据,完整日志的一个很好的特性是您可以重播它以重新创建源系统的状态(可能在另一个系统中重新创建它)。
但是,随着时间的推移,保留完整日志将占用越来越多的空间,并且重放将花费越来越长的时间。因此,在 Kafka 中,我们支持不同类型的保留。我们不会简单地丢弃旧日志,而是删除过时的记录(即主键具有较新更新的记录)。通过这样做,我们仍然可以保证日志包含源系统的完整备份,但现在我们无法再重新创建源系统的所有先前状态,而只能重新创建较新的状态。我们将此功能称为日志压缩。
第四部分:体系建设 我想要讨论的最后一个主题是日志在在线数据系统的数据系统设计中的作用。
日志在分布式数据库内部的数据流中所起的作用与它在大型组织中数据集成中所起的作用类似。在这两种情况下,它都负责数据流、一致性和恢复。毕竟,如果不是非常复杂的分布式数据系统,组织又是什么呢?
拆分? 因此,如果您仔细观察,也许可以将整个组织的系统和数据流视为一个分布式数据库。您可以将所有单独的查询导向系统(Redis、SOLR、Hive 表等)视为数据的特定索引。您可以将 Storm 或 Samza 等流处理系统视为非常完善的触发器和视图实现机制。我注意到,传统数据库人员非常喜欢这种观点,因为它最终向他们解释了人们究竟在用所有这些不同的数据系统做什么——它们只是不同的索引类型!
不可否认的是,现在数据系统类型呈爆炸式增长,但实际上,这种复杂性一直存在。即使在关系数据库的鼎盛时期,组织也拥有大量的关系数据库!因此,自从大型机时代以来,当所有数据都集中在一个地方时,真正的集成可能就不存在了。将数据隔离到多个系统中有很多动机:规模、地理位置、安全性和性能隔离是最常见的。但这些问题可以通过一个好的系统来解决:例如,一个组织可以拥有一个包含所有数据并为大量多样化选民提供服务的 Hadoop 集群。
因此,在转向分布式系统的过程中,数据处理已经有可能得到简化:将每个系统的大量小实例合并到几个大集群中。许多系统还不足以实现这一点:它们没有安全性,或者无法保证性能隔离,或者只是扩展性不够好。但这些问题都是可以解决的。
我认为不同系统的激增是由于构建分布式数据系统的难度造成的。通过缩减到单一查询类型或用例,每个系统都能够将其范围缩小到可构建的事物集合中。但运行所有这些系统会产生过多的复杂性。
我认为未来可能会有三个发展方向。
第一种可能性是现状的延续:系统分离或多或少会维持相当长一段时间。这种情况可能是因为分布难度太大难以克服,也可能是因为这种专业化为每个系统带来了新的便利性和功能水平。只要这种情况持续下去,数据集成问题就仍然是成功使用数据最重要的问题之一。在这种情况下,集成数据的外部日志将非常重要。
第二种可能性是,可能会出现重新整合,即具有足够通用性的单个系统开始将所有不同功能合并为单个超级系统。这个超级系统表面上可能类似于关系数据库,但它在组织中的使用将大不相同,因为你只需要一个大系统,而不是无数个小系统。在这个世界上,除了这个系统内部解决的问题之外,没有真正的数据集成问题。我认为构建这样一个系统的实际困难使这种情况不太可能发生。
不过,还有另一种可能的结果,作为一名工程师,我发现这确实很有吸引力。新一代数据系统的一个有趣方面是它们几乎都是开源的。开源提供了另一种可能性:数据基础设施可以拆分成一组服务和面向应用程序的系统 API。您已经在一定程度上看到了这种情况在 Java 堆栈中发生:
Zookeeper处理大部分系统协调工作(也许需要Helix或Curator等更高级别抽象的帮助)。 Mesos和YARN实现进程虚拟化和资源管理 Lucene和LevelDB等嵌入式库可以进行索引 Netty、Jetty和Finagle和rest.li等高级包装器处理远程通信 Avro、Protocol Buffers、Thrift和无数其他库处理序列化 Kafka和Bookeeper提供支持日志。 如果你把这些东西堆在一起,然后眯着眼睛看,它看起来有点像乐高版的分布式数据系统工程。你可以把这些成分拼凑在一起,创造出大量可能的系统。这显然与最终用户无关,因为他们可能更关心 API,而不是 API 的实现方式,但它可能是在不断发展的更加多样化和模块化的世界中实现单一系统的简单性的一条途径。如果由于可靠、灵活的构建块的出现,分布式系统的实施时间从数年缩短到数周,那么合并为单个单片系统的压力就会消失。
系统架构中登录的位置 假设存在外部日志的系统允许各个系统放弃许多自身的复杂性并依赖共享日志。以下是我认为日志可以做的事情:
通过对节点进行并发更新排序来处理数据一致性(无论是最终的还是即时的) 提供节点间数据复制 为写入者提供“提交”语义(即,仅当您的写入保证不会丢失时才确认) 提供来自系统的外部数据订阅源 提供恢复丢失数据的故障副本或引导新副本的功能 处理节点之间的数据重新平衡。 这实际上是分布式数据系统所做工作的很大一部分。事实上,剩下的大部分内容与最终面向客户端的查询 API 和索引策略有关。这正是应该因系统而异的部分:例如,全文搜索查询可能需要查询所有分区,而按主键查询可能只需要查询负责该键数据的单个节点。
它的工作原理如下。系统分为两个逻辑部分:日志和服务层。日志按顺序捕获状态变化。服务节点存储服务查询所需的任何索引(例如,键值存储可能有 btree 或 sstable 之类的东西,搜索系统会有倒排索引)。写入可以直接进入日志,尽管它们可能由服务层代理。写入日志会产生一个逻辑时间戳(即日志中的索引)。如果系统是分区的(我假设是这样),那么日志和服务节点将具有相同数量的分区,尽管它们的机器数量可能大不相同。 服务节点订阅日志并按照日志存储的顺序尽快将写入内容应用到其本地索引。
客户端可以通过提供写入的时间戳作为其查询的一部分,从任何节点获取读写语义 - 接收此类查询的服务节点会将所需的时间戳与其自己的索引点进行比较,并在必要时延迟请求,直到它至少在该时间建立索引,以避免提供陈旧的数据。
服务节点可能需要或不需要任何“主控”或“领导者选举”的概念。对于许多简单的用例,服务节点可以完全没有领导者,因为日志是事实的来源。
分布式系统必须完成的比较棘手的事情之一是处理故障节点的恢复或将分区从一个节点移动到另一个节点。一种典型的方法是让日志仅保留一个固定的数据窗口,并将其与存储在分区中的数据快照相结合。日志同样可以保留完整的数据副本并对日志本身进行垃圾收集。这将大量复杂性从系统特定的服务层转移到通用日志中。
通过拥有此日志系统,您可以获得一个针对数据存储内容的完整订阅 API,该 API 可将 ETL 馈送到其他系统。事实上,许多系统可以共享相同的日志,同时提供不同的索引,如下所示:
请注意,这种以日志为中心的系统本身如何立即成为数据流的提供者,供其他系统处理和加载。同样,流处理器可以使用多个输入流,然后通过索引该输出的另一个系统为它们提供服务。
我发现将系统纳入日志和查询 API 的这种观点非常有启发性,因为它可以让您将查询特征与系统的可用性和一致性方面分开。实际上,我认为这甚至是一种有用的方法,可以从心理上考虑一个不是以这种方式构建的系统,以便更好地理解它。
值得注意的是,尽管 Kafka 和 Bookeeper 是一致的日志,但这并不是必需的。您可以轻松地将类似Dynamo的数据库分解为最终一致的AP日志和键值服务层。这样的日志使用起来有点棘手,因为它会重新传递旧消息,并且依赖于订阅者来处理这个问题(就像 Dynamo 本身一样)。
在日志中保留单独的数据副本(尤其是完整副本)的想法让许多人觉得很浪费。但实际上,有几个因素可以减少这个问题。首先,日志是一种特别高效的存储机制。我们在生产 Kafka 服务器上为每个数据中心存储超过 75 TB 的数据。同时,许多服务系统需要更多的内存才能有效地提供数据(例如,文本搜索通常全部在内存中)。服务系统也可能使用优化的硬件。例如,我们的大多数实时数据系统要么使用内存,要么使用 SSD。相比之下,日志系统只进行线性读写,因此它很乐意使用大型多 TB 硬盘。最后,如上图所示,在数据由多个系统提供的情况下,日志的成本分摊到多个索引上。这种组合使得外部日志的费用非常小。
这正是 LinkedIn 用来构建其许多实时查询系统的模式。这些系统以数据库为数据源(使用 Databus 作为日志抽象或使用 Kafka 的专用日志),并在该数据流之上提供特定的分区、索引和查询功能。这就是我们实现搜索、社交图谱和 OLAP 查询系统的方式。事实上,将单个数据馈送(无论是实时馈送还是来自 Hadoop 的派生馈送)复制到多个服务系统中以进行实时服务是很常见的。事实证明,这是一个非常简化的假设。这些系统都不需要具有外部可访问的写入 API,Kafka 和数据库用作记录系统,更改通过该日志流向适当的查询系统。写入由托管特定分区的节点在本地处理。这些节点盲目地将日志提供的馈送转录到自己的存储中。可以通过重放上游日志来恢复故障节点。
这些系统对日志的依赖程度各不相同。完全依赖的系统可以利用日志进行数据分区、节点恢复、重新平衡以及一致性和数据传播的所有方面。在这种设置中,实际的服务层实际上只不过是一种“缓存”,其结构化是为了支持特定类型的处理,写入直接进入日志。