DBSP:丰富查询语言的自动增量视图维护

Mihai Budiu VMware Research mbudiu@vmware.com , Frank McSherry Materialize Inc. mcsherry@materialize.com , Leonid Ryzhyk VMware Research lryzhyk@vmware.com and Val Tannen University of Pennsylvania val@seas.upenn.edu
摘要。

增量视图维护长期以来一直是数据库理论(gupta-idb93,)的中心问题。 针对有限类别的数据库语言(例如关系代数或 Datalog),提出了许多解决方案。 这些技术并不能自然地推广到更丰富的语言。 在本文中,我们分三步给出了这个问题的通用解决方案:(1)我们描述了一种简单但富有表现力的语言,称为 DBSP,用于描述数据流上的计算; (2)我们给出了解决任意 DBSP 程序的增量视图维护问题的通用算法,并且(3)我们展示了如何对许多丰富的数据库查询语言进行建模(包括完整的关系查询、分组和聚合、单调和非单调)递归和流式聚合)使用 DBSP。 因此,我们获得了针对所有这些丰富语言的高效增量视图维护技术。

1. 介绍

在本文中,我们提出了一种用于建模流和增量计算的简单数学理论。 该模型在流数据库和增量视图维护的设计和实现中具有直接的实际应用。 我们的模型基于离散数字信号处理 (DSP) (rabiner-book75, ) 中使用的数学形式,但我们将其应用于数据库计算。 因此,我们将其称为“DBSP”。 DBSP 受到差分数据流 (mcsherry-cidr13, ) (DD) 的启发,最初是为了提供比 Abadi 等人 (abadi-fossacs15, )(如第 §9 中所讨论),但已经在该目的的背后演变。

DBSP 的核心概念是:类型为𝒮A的流s将“时间”时刻t映射到值s[t] 类型为 A;将其视为“无限向量”。 流计算是一种消耗一个或多个流并生成另一个流的函数。 我们用典型的 DSP 框箭头图(也称为“电路”)来描述流计算,其中框是计算,流是箭头,如下图所示,它显示了一个流运算符 T 消耗了两个输入流 s0s1 并生成一个输出流 s

s0s1Ts

我们通常将流视为值的序列,并且我们将以这种方式使用它们。 然而,我们想象力的飞跃,也将整个数据库视为流值。 什么是数据库流? 它是数据库快照序列 我们将数据库 DB 的时间演化建模为流 DB𝒮SCH,其中 SCH 是数据库模式。 时间不是挂钟时间,而是本质上应用于数据库的事务序列的计数器。 由于事务是可线性化的,因此它们具有全序,全序定义了线性时间t维度:流DB[t]的值是t之后数据库内容的快照t2> 交易已被应用。 我们假设DB[0]=0,即数据库开始为空。

数据库事务还形成应用于数据库的流T更改流增量 t时刻的数据库快照是应用序列中直到t的所有事务的累积结果:DB[t]=itT[i]=def(T)[t](我们将“加法”的概念精确化)之后。)。 将所有变化相加的操作就是流集成 下图使用 运算符进行流集成来表达这种关系:

TDB

相反,我们可以说事务是数据库的更改,并写成T=𝒟(DB)T[t]=DB[t]DB[t1] 这就是流微分的定义,用𝒟表示;该操作计算流的变化,是流积分的逆操作。 §2精确定义了流、积分和微分,并分析了它们的性质。

让我们将这些概念应用到视图维护中。 考虑一个数据库 DB 和一个查询 Q,将视图 V 定义为数据库快照 V=Q(DB) 的函数。 对应于数据库快照流DB,我们有一个视图快照流:V[t]t之后的视图内容 -第 交易已被应用。 我们使用下图展示这种关系:

DBQV

符号 Q(Q 的“提升”)表明查询 Q 独立应用于数据库快照流 DB Q 是一个“流查询”,因为它对值流进行操作。 增量视图维护问题需要一种算法来计算视图V更改的流ΔV,即𝒟(V),如下所示流T的函数。通过将这些定义链接在一起,我们得到视图维护问题的以下基本方程:ΔV=𝒟(Q(DB))=𝒟(Q((T))),以图形方式显示为:

TQ𝒟ΔVDB

此定义可以推广到比提升的逐点查询Q更丰富的更通用的流式查询S:𝒮A𝒮B 流式查询S的增量版本用SΔ表示,根据上式定义,也可以写为:SΔ=𝒟S

通常假设数据集的变化远小于数据集本身;因此,对变化流进行计算可能会产生显着的性能优势。

应用查询增量化运算符SSΔ构造一个直接计算更改的查询;但是,生成的查询并不比在整个数据集上计算的查询更有效,因为它使用积分运算符来重建完整的数据集。 §3 显示了如何使用 Δ 运算符的代数属性来优化 SΔ 的实现:

  1. (1)

    第一个属性是许多类原始操作都有非常有效的增量版本。 特别是,线性查询具有属性Q=QΔ 几乎所有关系查询和数据记录查询都基于线性运算符。 因此,可以按照与变化的大小成比例的时间来计算此类查询的增量版本。 双线性运算符(例如连接)具有更复杂的实现,尽管如此,它仍然执行与更改大小成比例的工作,但需要存储与关系大小成比例的数据量。

  2. (2)

    第二个关键属性是链式法则:(S1S2)Δ=S1ΔS2Δ 该规则将复杂查询的增​​量版本作为其组件的增量版本的组合。 因此,我们可以将任何增量查询实现为原始增量查询的组合,所有这些查询都执行与更改大小成比例的工作

借助增量计算的一般理论,我们在第 §4 中展示了如何在 DBSP 中对关系查询进行建模。 这立即为我们提供了计算任何关系查询的增量版本的通用算法。 这些结果众所周知,但它们是由 DBSP 清晰建模的。

将 DBSP 应用于递归查询需要扩展此计算模型。 在§5中,我们引入了两个额外的运算符:δ0从标量值创建流,从流创建标量值。 这些运算符可用于通过 while 循环实现计算。 因此,除了对不断变化的输入和数据库进行建模之外,我们还使用流作为循环迭代变量的连续值序列的模型。 通过此添加,DBSP 变得足够丰富以实现递归查询。 §5.1 显示了如何在 DBSP 中实现带否定的分层递归 Datalog 程序。

在§6中,我们使用 DBSP 对嵌套流的计算进行建模,其中一个流的每个值都是另一个流。 这使我们能够定义递归程序的增量流计算 因此,我们得出了一种用于增量任意流数据记录程序的通用算法。

DBSP 是一种简单语言:基本的 DBSP 流模型本质上是由两个基本数学运算符构建的:提升 和延迟 z1 嵌套流模型添加了两个额外的运算符,δ0 用于流构造, 用于销毁。

DBSP 还具有表现力:例如,它比分层 Datalog 更强大。 DBSP还可以描述流式窗口查询,或者嵌套关系(例如分组)的查询,以及非单调递归查询。 我们在§7中简要讨论了 DBSP 在更丰富的语言中的应用。

本文省略了大部分证明;完整的证明可在一份内容广泛的配套技术报告(tr, )中找到。

本文做出以下贡献:

  1. (1)

    它定义了 DBSP,一种用于流计算的小型语言,但它仍然可以表达嵌套的非单调递归;

  2. (2)

    它提供了增量所有DBSP程序的算法;

  3. (3)

    对于对应于关系代数和分层单调数据记录的 DBSP 片段,自动增量算法提供了与最先进方法相匹配的结果。 此外,我们的方法还适用于更强大的语言,例如 while 关系型和非单调 Datalog (Abiteboul-book95, )

  4. (4)

    它为流式计算和增量计算的操作奠定了正式、健全的基础,使人们能够正式地推理程序转换并设计新的高效实现。

  5. (5)

    DBSP 可以在单个框架中表达流式计算模型和增量计算模型。 我们认为这种统一是一项重大贡献。

2. 流计算

在本节中,我们正式介绍流作为无限值序列的概念,并定义流上的计算。 流运算符 (§2.1) 是流计算的基本构建块。 我们使用(§2.2)受限类型的流运算符:因果运算符(无法“展望未来”)和严格运算符(甚至无法“展望现在”)。 此外,我们所有的运算符都是“同步的”:它们以相同的“速率”消费和生成数据。 因果运算符可以链接成复杂的非循环计算电路;循环电路仅限于在后沿使用严格的运算符。 最后,我们定义(§2.3)两个有用的流运算符:积分和微分。

本节中的所有结果几十年来都已为人所知,但我们重述它们以澄清我们模型的正式假设。

2.1. 流和流运算符

是自然数集合,𝔹 是布尔集合, 是整数集合。

Definition 2.1 (流):

给定一组 A、来自 A 的值的 stream A -stream,是一个函数A 我们用 𝒮A=def{s|s:A} 表示所有 A 流的集合。

s𝒮At 时,我们为流 s 的第 t 个元素编写 s[t],而不是常用s(t)来区别于其他功能应用。 我们将索引 t 视为(离散)时间,并将 s[t]A 视为流 s“在时间”t 的值>。例如,𝑖𝑑[t]=t给出的自然数流id𝒮是值序列\setsepchar\readlistarg01234[arg[1]arg[2]arg[3]arg[4]arg[5]]

Definition 2.2 (流运算符):

具有 n 输入的(类型化)流运算符是一个函数T:𝒮A0××𝒮An1𝒮B

一般来说,我们将使用“运算符”来表示流上的函数,使用“函数”来计算“标量”值。

我们正在使用简单类型 lambda 演算的扩展来编写 DBSP 程序;我们将逐步介绍其要素。 然而,我们发现使用类似信号处理的电路图来描述 DBSP 程序更具可读性。 在电路图中,矩形代表运算符应用程序(标有运算符名称,例如T),而箭头表示流。

流运算符组合(函数组合)显示为链式电路。 二元运算符 T:𝒮A×𝒮B𝒮A 与一元运算符 S:𝒮A𝒮B 组合成计算 λs.T(T(s,S(s)),S(s)):𝒮A𝒮A 为:

sSTTSo

(图表模糊了运算符输入的顺序;对于非交换运算符,我们必须提供更多信息。)

Definition 2.3:

(提升)给定一个(标量)函数f:AB,我们通过在时间上逐点提升函数f来定义流运算符f:𝒮A𝒮B(f)(s)=deffs 等效地,(f)(s)[t]=deff(s[t]) 这扩展到多个参数的函数。

例如, ((λx(2x)))(id)=\setsepchar\readlistarg02468[arg[1]arg[2]arg[3]arg[4]arg[5]].

Proposition 2.4 (分配性):

提升分布在函数组合上:(fg)=(f)(g)

如果两个 DBSP 程序在流上计算相同的输入输出函数,我们就说它们等价 我们使用符号来表示两个电路是等效的。 例如,命题 2.4 陈述了以下电路等价:

sgfo s(fg)o

2.2. 阿贝尔群上的流

对于技术开发的其余部分,我们需要流𝒮A的值集A来形成交换群(A,+,0,) 现在我们介绍DBSP使用的原始流运算符。

2.2.1. 延迟和时不变性

Definition 2.5 (延迟):

延迟算子111姓名z1 来自DSP文献,与z变换(rabiner-book75, )相关。 通过将输入延迟一步来生成输出流:zA1:𝒮A𝒮A

zA1(s)[t]=def{0whent=0As[t1]whent1 sz1o

我们经常省略类型参数A,只写z1 例如,z1(𝑖𝑑)=\setsepchar\readlistarg00123[arg[1]arg[2]arg[3]arg[4]arg[5]]

Definition 2.6 (时间不变性):

流运算符 S:𝒮A𝒮B时间不变 iff S(zA1(s))=zB1(S(s)) 对于所有 s𝒮A,或者换句话说,当且仅当以下两个电路是等价的:

sSz1o sz1So

这个定义自然地扩展到具有多个输入的运算符。

任意数量的输入的时不变运算符的组合是时不变的。 延迟运算符z1是时不变的。 DBSP 仅使用时不变运算符。

Definition 2.7:

如果f(0A)=0B,我们说组f:AB之间的函数具有零保留属性 我们写zpp(f)

提升运算符 f 是时不变的,当且仅当 zpp(f) 时。

2.2.2. 因果运算符和严格运算符

Definition 2.8 (因果关系):

当对于所有s,s𝒮A和所有时间t我们有:(its[i]=s[i])S(s)[t]=S(s)[t].时,流运算符S:𝒮A𝒮B因果

换句话说,时间t的输出值只能取决于时间tt的输入值。 提升产生的算子是因果的,z1是因果的。 所有 DBSP 运算符都是因果的。 任意数量的输入的因果运算符的组合都是因果的。

Definition 2.9 (严格性):

流运算符 F:𝒮A𝒮B严格因果关系(缩写为 strict),如果 s,s𝒮A,t 我们有: (i<t.s[i]=s[i])F(s)[t]=F(s)[t].

因此,F(s) 的第 t 输出只能依赖于输入 s 的“过去”值,介于 0t1 特别是,F(s)[0]=0B 对于所有 s𝒮A 都是相同的。 严格算子是因果关系。 一般来说,提升运算符并不严格。 z1 是严格的。

Proposition 2.10:

对于严格的F:𝒮A𝒮A,方程α=F(α)有唯一解α𝒮A,用fixα.F(α)表示。

因此,从集合到自身的每个严格运算符都有一个唯一的不动点。 这个简单的证明依赖于强归纳法,表明 α[t] 仅取决于 t 之前的 α 的值。

我们证明以下电路具有严格的“反馈”边缘F,是流上定义良好的函数:

sTαF
Lemma 2.11:

如果 F:𝒮B𝒮B 是严格的并且 T:𝒮A×𝒮B𝒮B 是因果的,那么对于固定的 s 运算符 λα.T(s,F(α)):𝒮A𝒮B 是严格的。

Corollary 2.12:

如果 F:𝒮B𝒮B 是严格的,而 T:𝒮A×𝒮B𝒮B 是因果的,则运算符 Q(s)=fixα.T(s,F(α)) 是明确定义的并且是因果的。 此外,如果FT是时间不变的,那么Q也是时间不变的。

DBSP 中的所有流计算都是根据我们描述的原始运算符构建的:提升运算符和延迟(我们在§6 中添加了两个运算符)。 由此类算子组成的电路可以使用数据流机(lee-ieee95,)有效地实现。

带反馈的电路有两个用途:定义积分运算符(在下一节中)和定义递归计算(§5)。 反过来,积分运算符将有助于定义增量计算 (§3)。

2.3. 整合与差异化

请记住,我们要求流的元素来自阿贝尔群A 流本身形成一个阿贝尔群:

Proposition 2.13:

通过将 + 和一元 运算从 A 提升到 𝒮A 获得的结构 (𝒮A,+,0,) 是一个阿贝尔群。

流加法和求反是因果的、时不变的运算符。

Definition 2.14:

给定阿贝尔群AB,如果它是群同态,我们称其为流运算符S:𝒮A𝒮B 线性,即S(a+b)=S(a)+S(b)(因此S(0)=0S(a)=S(a))。

提升线性函数 f:AB 会生成线性、时不变 (LTI) 的流运算符 f z1 是 LTI。

Definition 2.15:

(双线性)具有 A,B,C 组的两个参数 f:A×BC 的函数,如果它在每个参数中分别是线性的(即,它在加法上分布),则为 双线性a,b,c,d.f(a+b,c)=f(a,c)+f(b,c)f(a,c+d)=f(a,c)+f(c,d).

该定义扩展到流运算符。 提升双线性函数 f:A×BC 会生成双线性流运算符 f 𝒮 上的双线性运算符示例是提升乘法:f:𝒮×𝒮𝒮,f(a,b)[t]=a[t]b[t]

(双)线性算子与线性算子的组合是(双)线性的(因为同态组成)。

使用线性算子产生的反馈回路是线性的:

Proposition 2.16:

S 为一元因果 LTI 运算符。 运算符 Q(s)=fixα.S(s+z1(α)) 定义明确且 LTI:

s+Sαz1
Definition 2.17 (微分):

微分运算符𝒟𝒮A:𝒮A𝒮A定义为:𝒟(s)=defsz1(s)

我们通常省略类型,当可以从上下文推断类型时,只写 𝒟 t 时刻 𝒟(s) 的值是 s 的当前(时间 t)值与前一个(时间 t1)s 的值。例如,𝒟(𝑖𝑑)=\setsepchar\readlistarg01111[arg[1]arg[2]arg[3]arg[4]arg[5]]

如果s是一个流,那么𝒟(s)就是s变化流

Proposition 2.18:

分化𝒟是因果和LTI。

s+𝒟(s)z1 s+(s)z1
Differentiation Integration

集成运算符根据其变化“重建”流:

Definition 2.19 (集成):

积分运算符 𝒮A:𝒮A𝒮A 定义为 (s)=def0>λ2>​3>s4>1>。5>修复7>​8>α9> 6>。0>(s+z10>​1>(3>α4>)5>2>)6>.

我们一般也省略类型,只写 这是根据命题 2.16 使用 S 的恒等函数构造的。

Proposition 2.20:

(s) 是应用于流 s 的离散(不定)积分:(s)[t]=its[i]

例如,(𝑖𝑑)=\setsepchar\readlistarg013610[arg[1]arg[2]arg[3]arg[4]arg[5]]

Proposition 2.21:

是因果关系和 LTI。

Theorem 2.22 (反演):

积分和微分互为逆:s.(𝒟(s))=𝒟((s))=s

s𝒟o so s𝒟o

3. 增量计算

Definition 3.1:

给定一元流运算符Q:𝒮A𝒮B,我们将Q增量版本定义为QΔ=def𝒟Q QΔQ 具有相同的“类型”:QΔ:𝒮A𝒮B 对于具有多个输入的运算符,我们通过独立地将 应用于每个输入来定义增量版本:例如,if T:𝒮A×𝒮B𝒮C then TΔ(a,b)=def𝒟(T((a),(b)))

下图说明了这个定义背后的直觉: ΔsQ𝒟Δoso

如果 Q(s)=o 是计算,则 QΔ 执行与 Q 相同的计算,但在更改流 ΔsΔo 这是引言中的图表,用 Δs 代替事务流 T,用 o 代替视图版本流 V

请注意,我们对增量计算的定义仅对计算有意义;这与经典定义形成对比,例如(gupta-idb95, ) 仅考虑一项更改。 将定义推广到对流进行操作为我们提供了额外的功能,特别是在使用递归查询进行操作时。

以下命题是我们的核心成果之一。

Proposition 3.2:

(增量版本的属性):对于适当类型的计算,以下内容成立:

反转::

QQΔ 是双射的;它的倒数是QQ𝒟

不变性::

+Δ=+, (z0>−2>13>1>)4>Δ5>=6>z8>−0>11>9>7>,2>−4>Δ5>3> =6>−7>,8>ℐ0>Δ1>9> =2>ℐ3>,4>𝒟6>Δ7>5>=8>𝒟9>

推拉::

Q=QΔ𝒟Q=QΔ𝒟

链::

(Q1Q2)Δ=Q1ΔQ2Δ (这概括为具有多个输入的运算符。)

添加::

(Q1+Q20>)1>Δ2>=3>Q6>17>8>9>Δ0>5>+1>Q3>24>5>6>Δ 7>2>4>

循环::

(λs.修正α .T(1>s2>,3> z5>−7>18>6>4>(0>α 1>)2>9>)3>0>)4>Δ5>=6>λ7>s8>。9>修复0>α1> .2>T4>Δ5>3>(7>s8> ,9>z1>−3>14>2>0>(6>α7>)8>5>)9>6>

这些性质的证明依赖于基本的代数运算。 尽管它们很简单,但它们非常有用。 例如,链式法则指出以下两个电路是等效的:

iQ1Q2𝒟o iQ1ΔQ2Δo

换句话说,要增量化复合查询,您可以独立增量化每个子查询 这为我们提供了一个简单的确定性方法来计算任意复杂查询的增​​量版本。

我们通过给出链式法则的证明来进行说明,链式法则很简单,并且基于函数组合结合性:

(Q1Q2)Δ =𝒟Q1Q2
=𝒟Q1(𝒟)Q2
=(𝒟Q1)(𝒟Q2)
=(Q1)Δ(Q2)Δ.

循环规则指出以下电路是等效的:

sT𝒟oz1 sTΔoz1

围绕查询的反馈循环的增量版本就是增量查询的反馈循环。 当我们实现递归查询时,这个结果的重要性就会显而易见。

为了有效地执行增量查询,我们希望直接计算更改流而不集成它们。 上面的不变性属性表明流运算符 +z1 与其增量版本相同,因此 𝒟 可以省略:QΔ=Q𝒟=Q 以下定理将其推广到线性和双线性运算符:

Theorem 3.3 (线性):

对于 LTI 运算符 Q,我们有 QΔ=Q

Theorem 3.4 (双线性):

对于双线性时不变运算符 ×,我们有 (a×b)Δ=a×b+z1((a))×b+a×z1((b))

通过使用 Δa 重写此语句作为 a 的更改流,我们得到了熟悉的增量等连接公式:Δ(a×b)=Δa×Δb+a×(Δb)+(Δa)×b

这并不奇怪,因为等连接是双线性的,正如我们在下一节中讨论的那样。

4. 增量视图维护

§2 和 §3 中的结果适用于任意组值的流。 在本节中,我们将注意力转向在关系视图维护的上下文中使用这些结果。 正如简介中所解释的,我们希望有效地计算更新数据库视图的任何关系查询 Q 的增量版本。

然而,我们面临一个技术问题:𝒟运算符是在阿贝尔群上定义的,而关系数据库通常不是阿贝尔群,因为它们是在集合上操作的。 幸运的是,数据库文献中有一个众所周知的工具,它通过使用 -sets(也称为 z-relations (green-pods07, ))将集合操作转换为组操作) 而不是集合。

我们首先定义 集组,然后回顾如何通过 集将关系查询转换为 DBSP 电路。 使这种翻译高效增量化的原因是许多基本关系查询可以使用 LTI 集合运算符来表达。

4.1. -设置为阿贝尔群

给定一个集合A,我们定义-套222也称为-其他地方的关系(green-tcs11, ),因为在实践中A通常是笛卡尔积;然而,我们只需要大部分结果的集合结构。 A 上的函数具有从 A有限支持 这些是函数f:A,其中f(x)0最多代表有限数量的值xA 我们还为带有 A 元素的 集的类型编写 [A] [A] 中的值可以被视为键值映射,其中键位于 A 中,值位于 中,从而证明了数组索引符号的合理性。 我们写f[a]而不是f(a) 由于 是阿贝尔环,因此 [A] 也是阿贝尔环(因此是一个群)。 该组([A],+[A],0[A],A)具有逐点定义的加法和减法:(f+[A]g)(x)=f(x)+g(x).xA. [A]0 元素是 0[A](x)=0.xA 定义的函数 0[A]

特定的m[A]可以通过枚举映射到非零值及其对应值的输入来表示:m={x1w1,,xnwn} 我们将wiZ称为xiA重数(或权重)。 重数可以是负数。 我们将xm写为xA,当且仅当m[x]0

例如,让我们考虑一个由 R={joe1,anne1} 定义的具体 -set R[string] R 在其域中有两个元素,joe 的重数为 1(因此 R[joe]=1),以及 anne 的重数1 我们说joeRanneR

-sets概括了sets和bags。 通过将权重 1 与每个集合元素相关联,包含 A 元素的集合可以表示为 集合。 当将集合上的查询转换为 DBSP 程序时,我们在集合和 集合之间来回转换数据值。

Definition 4.1:

如果每个元素的重数都是一,我们就说 -set 代表 set 我们定义一个函数来检查这个属性 isset:[A]𝔹 给出的:

isset(m)=def{true if m[x]=1,xmfalse otherwise

对于我们的示例 isset(R)=false,从 R[anne]=1 开始。

Definition 4.2:

如果每个元素的重数都是正的,我们就说集合是(或bag)。 我们定义一个函数来检查这个属性ispositive:[A]𝔹 给出的

ispositive(m)=def{true if m[x]0,xAfalse otherwise

m[A].isset(m)ispositive(m)

R[anne]=1以来,我们有ispositive(R)=false

m为正时,我们写m0 对于正的m,n,我们为m,n[A]mn,当且仅当mn0 是偏序。

如果函数将正值映射到正值,我们将其称为f:[A][B] 正值:x[A],x0[A]f(x)0[B] 我们也将此表示法应用于函数:ispositive(f)

Definition 4.3 (不同):

函数 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡:[A][A] 集合投影到基础集合中(但 结果仍然是 集合):

𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(m)[x]=def{1 if m[x]>00 otherwise

𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡“删除”具有负重数的元素。 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(R)={joe1}

虽然非常简单,但 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 的定义经过精心选择,使我们能够精确定义 集合运算符中的所有关系(集合)运算符。

从关系查询派生的电路仅在正 集上进行计算;负值仅用于表示集的更改 负权重从集合中“删除”元素。

§2 中的所有结果都扩展到 集上的流。

Definition 4.4:

(单调性)如果流的每个值都为正,则流 s𝒮[A]:s[t]0.t 如果s[t]s[t1],t,则流s𝒮[A]单调

如果 s𝒮[A] 为正,则 (s) 是单调的。 如果s𝒮[A]是单调的,则𝒟(s)是正数。

概括盒箭头图

从现在开始,我们将使用电路来计算标量和流。 我们对流或标量上的函数使用相同的图形表示:带有输入和输出箭头的框。 对于标量函数,箭头的“值”是标量而不是流;否则,框作为函数应用的解释不会改变。

4.2. 实现关系运算符

事实上,关系代数可以通过 集的计算来实现,这一事实之前已经被证明过,例如(绿色-pods07,) 所有核心关系运算符的翻译如表4.2所示。 翻译本质上是通过查询结构的归纳给出的。

Operation SQL example DBSP circuit Details
Composition SELECT DISTINCT ... FROM (SELECT ... FROM ...) ICICOO CI circuit for inner query, CO circuit for outer query.
Union (SELECT * FROM I1) UNION (SELECT * FROM I2) I1I2+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O
Projection SELECT DISTINCT I.c FROM I Iπ𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O π(i)[y]=defxi,x|c=yi[x] x|c is projection on column c of the tuple x π is linear; ispositive(π),zpp(π).
Filtering SELECT * FROM I WHERE p(I.c) IσP𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O σP(m)[x]=def{m[x]x if P(x)0 otherwise P:A𝔹 is a predicate. σP is linear; ispositive(σP),zpp(σP).
Selection SELECT DISTINCT f(I.c, ...) FROM I Imap(f)𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O For a function f map(f) is linear, ispositive(map(f)),zpp(map(f)) .
Cartesian product SELECT I1.*, I2.* FROM I1, I2 I1I2×O (a×b)((x,y))=defa[x]×b[y]. × is bilinear, ispositive(×),zpp(×).
Equi-join SELECT I1.*, I2.* FROM I1 JOIN I2 ON I1.c1 = I2.c2 I1I2O (ab)((x,y))=defa[x]×b[y] if x|c1=y|c2. is bilinear, ispositive(),zpp().
Intersection (SELECT * FROM I1) INTERSECT (SELECT * FROM I2) I1I2O Special case of equi-join when both relations have the same schema.
Difference SELECT * FROM I1 EXCEPT SELECT * FROM I2 I1I2+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O
表格1。 DBSP 中 SQL 关系集运算符的实现。 每个查询都假设输入 II1I2 是集合,并生成输出集合。

The translation is fairly straightforward, but many operators require the application of a 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 to produce sets. The correctness of this implementation is predicated on the global circuit inputs being sets as well. For example, ab=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(a+b), ab=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(ab), (a×b)((x,y))=a[x]×b[y].

4.2.1. Correctness of the DBSP implementations

A relational query Q that transforms a set V into a set U will be implemented by a DBSP computation Q on -sets. The correctness of the implementation requires that the following diagram commutes:

VVZUUZQtozsetQtoset

The toset and tozset functions convert sets to -sets and vice-versa:

toset:[A]2A is defined by toset(m)=defx𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(m){x}.

tozset:2A[A] is defined by tozset(s)=defxs1x.

All standard algebraic properties of the relational operators can be used to optimize circuits (they can even be applied to queries before building the circuits).

Notice that the use of the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 operator allows DBSP to model the full relational algebra, including difference (and not just the positive fragment). Most of the operators that appear in the circuits in Table 4.2 are linear, and thus have very efficient incremental versions. A notable exception is 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡. While we show below that 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 can be computed efficiently incrementally, it does have an important cost in terms of memory, so we try to minimize its use. For this we can use a pair of optimizations:

Proposition 4.5:

Let Q be one of the following -sets operators: filtering σ, join , or Cartesian product ×. Then we have i[I],ispositive(i)Q(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(i))=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(Q(i)).

This rule allows us to delay the application of 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡.

Proposition 4.6:

Let Q be one of the following -sets operators: filtering σ, projection π, selection (map(f)), addition +, join , or Cartesian product ×. Then we have i[I],ispositive(i)𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(Q(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(i)))=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(Q(i)).

This is Proposition 6.13 in (green-tcs11, ).

These properties allow us to “consolidate” distinct operators by performing one 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 at the end of a chain of computations.

Finally, the next proposition shows that the incremental of 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 can be computed with work proportional to the size of the input change.

Proposition 4.7:

The following circuit implements (𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ: d(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δo dz1Hoi

where H:[A]×[A][A] is defined as: H(i, d)[x] =def{-1 if i[x] ¿ 0 and (i + d)[x] ≤01 if i[x] ≤0 and (i + d)[x] ¿ 00 otherwise

The function H detects whether the multiplicity of an element in the input set i when adding change d is changing from negative to positive or vice-versa. Notice that only multiplicities of the elements that appear in the change d can change from input to output, so the work needed to compute both H is bounded by the size of d and not i.

4.3. Incremental view maintenance

Let us consider a relational query Q defining a view. To create a circuit that maintains incrementally the view defined by Q we apply the following mechanical steps; this algorithm is deterministic and its running time is proportional to the complexity of the query (number of operators in the query):

Algorithm 4.8 (incremental view maintenance):
  1. (1)

    Translate Q into a circuit using the rules in Table 4.2.

  2. (2)

    Apply 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 consolidation until convergence.

  3. (3)

    Lift the whole circuit, by applying Proposition 2.4, converting it to a circuit operating on streams.

  4. (4)

    Incrementalize the whole circuit “surrounding” it with and 𝒟.

  5. (5)

    Apply the chain rule and other properties of the Δ operator from Proposition 3.2 recursively on the query structure to optimize the incremental implementation.

Step (3) yields a circuit that consumes a stream of complete database snapshots and outputs a stream of complete view snapshots. Step (4) yields a circuit that consumes a stream of changes to the database and outputs a stream of view changes; however, the internal operation of the circuit is non-incremental, as it computes on the complete state of the database reconstructed by the integration operator. Step (5) incrementalizes the internals of the circuit by rewriting it to compute on changes, avoiding integration when possible (see §3).

4.4. Example

In this section we apply the incremental view maintenance algorithm to a concrete query. Let us consider the following query:

CREATE VIEW v AS
SELECT DISTINCT t1.x, t2.y FROM (
SELECT t1.x, t1.id
FROM t
WHERE t.a > 2
) t1
JOIN (
SELECT t2.id, t2.y
FROM r
WHERE r.s > 5
) t2 ON t1.id = t2.id

Step 1: First we create a DBSP circuit to represent this query using the translation rules from Table 4.2:

t1σa>2𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡πx,d𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡t2σs>5𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡πy,id𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡id=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Step 2: we apply the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 optimization rules; first the rule from 4.6 gives us the following equivalent circuit:

t1σa>2πx,d𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡t2σs>5πy,id𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡id=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Applying the rule from 4.5 we get:

t1σa>2πx,dt2σs>5πy,idid=id𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡πx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

And applying again 4.6 we get:

t1σa>2πx,dt2σs>5πy,idid=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Step 3: we lift the circuit using distributivity of composition over lifting; we obtain a circuit that computes over streams, i.e., for each new input pair of relations t1 and t2 it will produce an output view V:

t1σa>2πx,dt2σs>5πy,idid=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Step 4: incrementalize circuit, obtaining a circuit that computes over changes; this circuit receives changes to relations t1 and t2 and for each such change it produces the corresponding change in the output view V:

Δt1σa>2πx,dΔt2σs>5πy,idid=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡𝒟ΔV

Step 5: apply the chain rule to rewrite the circuit as a composition of incremental operators;

Δt1(σa>2)Δ(πx,d)ΔΔt2(σs>5)Δ(πy,id)Δ(id=id)Δ(πx,y)Δ(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔΔV

Use the linearity of σ and π to simplify this circuit:

Δt1σa>2πx,dΔt2σs>5πy,id(id=id)Δπx,y(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔΔV

Finally, replace the incremental join using the formula for bilinear operators (Theorem 3.4), and the incremental 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 (Proposition 4.7), obtaining the circuit from Figure 1.

Δt1σa>2πx,dΔt2σs>5πy,idid=idz1z1id=idid=id+πx,yz1HΔV
Figure 1. Final version of the incremental query circuit from §4.4.

Notice that the resulting circuit contains three integration operations: two from the join, and one from the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡. It also contains three join operators. However, the work performed by each operator for each new input is proportional to the size of change, as we argue in the following section.

4.5. Complexity

Incremental circuits are efficient. The work performed (and the memory used) by a circuit is the sum of the work performed (and memory used) by its operators. We argue that each operator in the incremental version of a circuit is efficient.

For incrementalized circuits the input stream of each operator contains changes in its input relations. Denote C[t]=defs[t] the size of the value of stream s of changes at time t, and R[t]=def(s)[t] the size of the relation produced by integrating all changes in s. An unoptimized incremental operator QΔ=𝒟Q evaluates query Q on the integration of its input streams; hence its time complexity is the same as that of the non-incremental operator, a function of R[t]. In addition, because of the and 𝒟 operators, it uses O(R[t]) memory.

The optimizations described in §3 reduce the reduce the time complexity of an operator to be a function of C[t]. Assuming C[t]R[t], this translates to major performance improvements in practice. For example, Theorem 3.3, allows evaluating TΔ, where T is a linear operator, in time O(C[t]). Interestingly, while the operator uses O(R[t]) memory, it can be evaluated in O(C[t]) time, because all values that appear in the output at time t must be present in current input change for time t. Similarly, while the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 operator is not linear, (𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ can also be evaluated in O(C[t]) according to Proposition 4.7. Bilinear operators, including join, can be evaluated in time proportional to the product of the sizes of their input changes O(C[t]2) (Theorem 3.4).

The space complexity of linear operators is 0 (zero), since they store no data persistently. The space complexity of (𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ and join is O(R[t]).

5. Recursive queries

Recursive queries are very useful in a many applications. For example, many graph algorithms (such as graph reachability or transitive closure) are naturally expressed using recursive queries.

We introduce two new stream operators that are instrumental in expressing recursive query evaluation. These operators allow us to build circuits implementing looping constructs, which are used to iterate computations until a fixed-point is reached.

Definition 5.1:

We say that a stream s𝒮A is zero almost-everywhere if it has a finite number of non-zero values, i.e., there exists a time t0 s.t. tt0.s[t]=0. Denote the set of streams that are zero almost everywhere by 𝒮A¯.

Stream introduction

The delta function (named from the Dirac delta function) δ0:A𝒮A produces a stream from a scalar value: δ_0(v)[t] =def{ vif t = 00_A otherwise For example, δ0(5) is the stream \setsepchar\readlistarg50000[arg[1]arg[2]arg[3]arg[4]arg[5]].

Stream elimination

We define the function :𝒮A¯A, over streams that are zero almost everywhere, as (s)=deft0s[t]. is closely related to ; if is the indefinite integral, is the definite integral on the interval 0.

For many implementation strategies (including relational and Datalog queries given below) the operator can be approximated finitely and precisely by integrating until the first 0 value encountered, since it can be proven that its input is always the derivative of a monotone stream.

δ0 is the left inverse of , i.e.: δ0=𝑖𝑑A.

Proposition 5.2:

δ0 and are LTI.

Nested time domains

So far we used a tacit assumption that “time” is common for all streams in a program. For example, when we add two streams, we assume that they use the same “clock” for the time dimension. However, the δ0 operator creates a stream with a “new”, independent time dimension. We require well-formed circuits to “insulate” such nested time domains by nesting them between a δ0 and an operator:

iδ0Qo
Proposition 5.3:

If Q is time-invariant, the circuit above has the zero-preservation property: zpp(Qδo).

5.1. Implementing Recursive Datalog

We illustrate the implementation of recursive queries in DBSP for stratified Datalog. Datalog is strictly more expressive than relational algebra since it can express recursive programs, e.g.:

O(v) :- I(v). // base case
O(v) :- I(z), O(x), v = ... . // rec case

In general, a recursive Datalog program defines a set of mutually recursive relations O1,..,On as an equation (O1,..,On)=R(I1,..,Im,O1,..,On), where I1,..,Im are input relations and R is a relational (non-recursive) query.

The following algorithm generates a circuit that computes a solution to this equation. We describe the algorithm informally and for the special case of a single input I and single output O; the general case can be found in the companion technical report (tr, ), and is only slightly more involved.

  1. (1)

    Implement the non-recursive relational query R as described in §4 and Table 4.2; this produces an acyclic circuit whose inputs and outputs are a -set (i.e., not a stream):

    IORO
  2. (2)

    Lift this circuit to operate on streams and connect the output to the input in a feedback cycle as follows:

    Iδ0R𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡𝒟Ooz1

    We construct R by lifting each operator of the circuit individually according to Proposition 2.4.

The inner loop of the circuit computes the fixed point of R. The differentiation operator 𝒟 yields the set of new Datalog facts (changes) computed by each iteration of the loop. When the set of new facts becomes empty the iterations have completed. computes the value of the fixed point by aggregating these changes.

Theorem 5.4 (Recursion correctness):

If isset(I), the output of the circuit above is the relation O as defined by the Datalog semantics as a function of the input relation I.

Proof.

Let us compute the contents of the o stream, produced at the output of the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 operator. We will show that this stream is composed of increasing approximations of the value of O.

We define the following one-argument function: S(x)=λx.R(I,x). Notice that the left input of the R block is a constant stream with the value I. Due to the stratified nature of the language, we must have ispositive(S), so x.S(x)x. Also S is time-invariant, so S(0)=0. From §4, the definition of set union we know that xy=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(x+y). We get the following system of equations: o[0] =S(0) o[t] =S(o[t-1]) So, by induction on t we have o[t]=St(0), where by St we mean SSSt. S is monotone; thus, if there is a time k such that Sk(0)=Sk+1(0), we have j.Sk+j(0)=Sk(0). Applying a derivative to this stream will then produce a stream that is zero almost everywhere, and integrating this derivative will return the last distinct value in the stream o.

This is essentially the definition of the semantics of a recursive Datalog relation: O=fixx.R(I,x). ∎

Note that the use of unbounded data domains (like integers with arithmetic) does not guarantee convergence for all programs.

In fact, this circuit implements the standard naïve evaluation algorithm (e.g., see Algorithm 1 in (greco-sldm15, )). Notice that the inner part of the circuit is the incremental form of another circuit, since it is sandwiched between and 𝒟 operators. Using the cycle rule of Proposition 3.2 we can rewrite this circuit as:

(5.1) Iδ0(R)Δ(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔOz1

This last circuit effectively implements the semi-naïve evaluation algorithm (Algorithm 2 from (greco-sldm15, )). The correctness of semi-naïve evaluation is an immediate consequence of the cycle rule.

6. Incremental recursive programs

In §24 we showed how to incrementalize a relational query by compiling it into a circuit, lifting the circuit to compute on streams, and applying the Δ operator to the lifted circuit. In §5.1 we showed how to compile a recursive query into a circuit that employs incremental computation internally to compute the fixed point. Here we combine these results to construct a circuit that evaluates a recursive query incrementally. The circuit receives a stream of updates to input relations, and for every update recomputes the fixed point. To do this incrementally, it preserves the stream of changes to recursive relations produced by the iterative fixed point computation, and adjusts this stream to account for the modified inputs. Thus, every element of the input stream yields a stream of adjustments to the fixed point computation, using nested streams.

Nested streams, or streams of streams, 𝒮𝒮A=(A), are well defined, since streams form an abelian group. Equivalently, a nested stream is a value in ×A, i.e., a matrix with an infinite number of rows, indexed by two-dimensional time (t0,t1). where each row is a stream. In §A.1 we show a few example nested stream computations.

Lifting a stream operator S:𝒮A𝒮B yields an operator over nested streams S:𝒮𝒮A𝒮𝒮B, such that (S)(s)=Ss, or, pointwise: (S(s))[t0][t1]=S(s[t0])[t1],t0,t1. In particular, a scalar function f:AB can be lifted twice to produce an operator between streams of streams: f:𝒮𝒮A𝒮𝒮B.

We define a partial order over timestamps: (i0,i1)(t0,t1) iff i0t0 and i1t1. We extend the definition of strictness for operators over nested streams: a stream operator F:𝒮𝒮A𝒮𝒮B is strict if for any s,s𝒮𝒮A and all times t,i× we have i<t,s[i]=s[i] implies F(s)[t]=F(s)[t]. Proposition 2.10 holds for this notion of strictness, i.e., the fixed point operator fixα.F(α) is well defined for a strict operator F.

Proposition 6.1:

The operator z1:𝒮𝒮A𝒮𝒮A is strict.

The operator z1 on nested streams delays “rows” of the matrix, while z1 delays “columns”. (See examples in §A.1).

The operator on 𝒮𝒮A operates on rows of the matrix, treating each row as a single value. Lifting a stream operator computing on 𝒮A, such as :𝒮A𝒮A, also produces an operator on nested streams, but this time computing on the columns of the matrix :𝒮𝒮A𝒮𝒮A.

Proposition 6.2 (Lifting cycles):

For a binary, causal T we have: (λs.fixα.T(s,z1(α)))=λs.fixα.(T)(s,(z1)(α)) i.e., lifting a circuit containing a “cycle” can be accomplished by lifting all operators independently, including the z1 back-edge.

This means that lifting a DBSP stream function can be expressed within DBSP itself. For example, we have:

io i+oz1

This proposition gives the ability to lift entire circuits, including circuits computing on streams and having feedback edges, which are well-defined, due to Proposition 6.1. With this machinery we can now apply Algorithm 4.8 to arbitrary circuits, even circuits built for recursively-defined relations. Consider the “semi-naive” circuit (5.1), and denote 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡R with T:

Iδ0(T)ΔOz1

Lift the entire circuit using Proposition 6.2 and incrementalize it:

Iδ0(T)Δ𝒟Oz1

Now apply the chain rule to this circuit:

(6.1) Iδ0((T)Δ)ΔOz1

This is the incremental version of an arbitrary recursive query.

6.1. Example

In this section we derive the incremental version of a circuit containing recursion, by applying Algorithm 4.8. We start with a very simple program, expressed in Datalog, which computes the transitive closure of a directed graph:

// Edge relation with head and tail
input relation E(h: Node, t: Node)
// Reach relation with source s and sink t
output relation R(s: Node, t: Node)
R(x, x) :- E(x, _).
R(x, x) :- E(_, x).
R(x, y) :- E(x, y).
R(x, y) :- E(x, z), R(z, y).

We haven’t explained how Datalog is translated to circuits, but most Datalog operators are relational in nature. Assuming one could write recursive queries in SQL where a view is defined in terms of itself, the above program would be implemented by the following (illegal) SQL query:

CREATE VIEW R AS
(SELECT E.h, E.h FROM E)
UNION
(SELECT E.t, E.t FROM E)
UNION
(SELECT * FROM E)
UNION
(SELECT E.h, R.t
FROM E JOIN R
ON E.t = R.s)

We apply the algorithm from §5.1 to create first the non-recursive circuit, by assuming that R is already computed as a view R1, and using R1 in the definition of R instead of itself:

CREATE VIEW Reach AS
(SELECT E.h, E.h FROM E)
UNION
(SELECT E.t, E.t FROM E)
UNION
(SELECT * FROM E)
UNION
(SELECT E.h, R1.t
FROM E JOIN R ON E.t = R1.s)

Now we implement this query as a DBSP circuit with two inputs E and R1:

ER1t=sπh,tπhσh,hπtσt,t+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡R

Now lift the circuit by lifting each operator pointwise, and connect it in a feedback loop by connecting input R1 from the output R through a z1operator and bracket everything with δ0:

Eδ0t=sπh,tπhσh,hπtσt,t+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡𝒟Rz1

The above circuit is a complete implementation of the non-streaming recursive query; given an input relation E it will produce its transitive closure R at the output.

Now we use the seminaïve property 5.1 to rewrite the circuit:

(To save space in the figures we will omit the indices from π and σ in the subsequent figures, for example by writing just π instead of πh.)

Eδ0()Δ(π)Δ(π)Δ(σ)Δ(π)Δ(σ)Δ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔRz1

Using the linearity of π and σ, this can be rewritten as an equivalent circuit:

Eδ0()Δππσπσ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔRz1

To make this circuit into a streaming computation that evaluates a new transitive closure for a stream of inputs E, we lift it entirely, using Proposition 6.2:

Eδ0()Δππσπσ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔRz1

We convert this circuit into an incremental circuit, which receives in each transaction the changes to relation E and produces the corresponding changes to relation R:

ΔEδ0()Δππσπσ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ𝒟ΔRz1

We can now apply again the chain rule to this circuit:

ΔE(δ0)Δ(()Δ)Δ(π)Δ(π)Δ(σ)Δ(π)Δ(σ)Δ+((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δ()ΔΔR(z1)Δ

We now take advantage of the linearity of δ0, , z1, π, and σ to simplify the circuit by removing some Δ invocations:

ΔEδ0(()Δ)Δππσπσ+((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)ΔΔRz1

There are two applications of Δ left in this circuit: (()Δ)Δ and ((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δ. We expand their implementations separately, and we stitch them into the global circuit at the end. This ability to reason about sub-circuits independently highlights the modularity of DBSP.

The join is expanded twice, using the bilinearity of and . Let’s start with the inner circuit, implementing ()Δ, given by Theorem 3.4:

ab()Δo abz1z1+o

Now we lift and incrementalize to get the circuit for (()Δ)Δ:

abz1z1+𝒟o

Applying the chain rule and the linearity of and z1 this becomes:

a()Δbz1z1()Δ()Δ+o

We now have three applications of ()Δ. Each of these is the incremental form of a bilinear operator, so it looks like in the end we will have 3×3 applications of . In fact, the overall expression can be simplified (see (tr, ) for a precise derivation), and the end result only has 4 terms in .

Here is the final form of the expanded join circuit:

abz1z1z1z1+o

Returning to ((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δ, we can compute its circuit by expanding once using Proposition 4.7:

i((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δo iz1H𝒟o

Finally, stitching all these pieces together we get the final circuit shown in Figure 2.

ΔEδ0z1z1z1z1+ππσπσ+z1H𝒟ΔRz1
Figure 2. Final form of circuit from §6.1.

6.2. Complexity of incremental recursive queries

Time complexity

The time complexity of an incremental recursive query can be estimated as a product of the number of fixed point iterations and the complexity of each iteration. The incrementalized circuit (6.1) performs the same number of iterations as the non-incremental circuit (5.1) in the worst case: once the non-incremental circuit reaches the fixed point, its output is constant and so is its derivative computed by the incrementalized circuit.

Consider a nested stream of changes s𝒮𝒮A,s[t1][t2], where t1 is the input timestamp and t2 is the fixed point iteration number. The unoptimized loop body ((T)Δ)Δ=𝒟𝒟T has the same time complexity as T applied to the aggregated input of size R(s)[t1][t2]=def()(s)[t1][t2]=(i1,i2)(t1,t2)s[i1][i2]. As before, an optimized circuit can be significantly more efficient. For instance, by applying Theorem 3.4 twice, to and , we obtain a circuit for nested incremental join s1(()Δ)Δs2 that runs in O((s1)[t1][t2]×(s2)[t1][t2])O(R(s1)×R(s2)) (because each term is correspondingly smaller).

Space complexity

Integration () and differentiation (𝒟) of a stream s𝒮𝒮A uses memory proportional to t2t1s[t1][t2], i.e., the total size of changes aggregated over columns of the matrix. The unoptimized circuit integrates and differentiates respectively inputs and outputs of the recursive program fragment. As we move and 𝒟 inside the circuit using the chain rule, we additionally store changes to intermediate streams. Effectively we cache results of fixed point iterations from earlier timestamps to update them efficiently as new input changes arrive. Notice that space is proportional to the number of iterations of the inner while loop.

7. Extensions

The DBSP language can express a richer class of streaming computations (both incremental and non-incremental) than those covered so far. In this section we give several examples.

7.1. Multisets and bags

In §4 we have shown how to implement the relational algebra on sets. Some SQL queries however produce multisets, e.g., UNION ALL. Since -sets generalize multisets and bags, it is easy to implement query operators that compute on such structures. For example, while SQL UNION is -set addition followed by 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡, UNION ALL is just -set addition.

7.2. Aggregation

Aggregation in SQL applies a function a to a whole set producing a “scalar” result with some type R: a:2AR. We convert such aggregation functions to operate on -sets, so in DBSP an aggregation function has a signature a:[A]R. Correctness of the implementation is defined as in §4.2.1.

The SQL COUNT aggregation function is implemented on -sets by aCOUNT:[A], which computes a sum of all the element weights: aCOUNT(s)=xss[x]. The SQL SUM aggregation function is implemented on -sets by aSUM:[] which performs a weighted sum of all (real) values: aSUM(s)=xsx×s[x].

With this definition the aggregation functions aCOUNT and aSUM are in fact linear transformations between the group [A] and the result group (, and respectively).

If the output of the DBSP circuit can be such a “scalar” value, then aggregation with a linear function is simply function application, and thus it is automatically incremental. However, in general, for composing multiple queries we require the result of an aggregation to be a singleton -set (containing a single value), and not a scalar value. In this case the aggregation function is implemented in DBSP as the composition of the actual aggregation and the makeset:A[A] function, which converts a scalar value of type A to a singleton -set, defined as follows: makeset(x)=def1x.

In conclusion, the following SQL query: SELECT SUM(c) FROM I is implemented as the following circuit:

IπCaSUMmakesetO

The lifted incremental version of this circuit is interesting: since π and aSUM are linear, they are equivalent to their own incremental versions. Although (makeset)Δ=𝒟makeset cannot be simplified, it is nevertheless efficient, doing only O(1) work per invocation, since its input and output are singleton values.

An aggregation function such as AVG can be written as the composition of a more complex linear function that computes a pair of values using SUM and COUNT, followed by a makeset and a selection operation that divides the two columns.

SELECT AVG(c) FROM I
IπC(aSUM,aCOUNT)makesetσ/O

Finally, some aggregate functions, such as MIN, are not incremental in general, since for handling deletions they may need to know the full set, and not just its changes. The lifted incremental version of such aggregate functions is implemented essentially by “brute force”, using the formula (aMIN)Δ=𝒟aMIN. Such functions perform work proportional to R(s) at each invocation.

Note that the SQL ORDER BY directive can be modeled as a non-linear aggregate function that emits a list. However, such an implementation it is not efficiently incrementalizable in DBSP. We leave the efficient handling of ORDER BY to future work.

Even when aggregation results do not form a group, they usually form a structure with a zero element. We expect that a well-defined aggregation function maps empty -sets to zeros in the target domain.

7.3. Grouping; indexed relations

Pick an arbitrary set K of “key values.” Consider the mathematical structure of finite maps from K to -sets over some other domain A: K[A]=[A][K]. We call values i of this structure indexed -sets: for each key kK, i[k] is a -set. Because the codomain [A] is an abelian group, this structure is itself an abelian group.

We use this structure to model the SQL GROUP BY operator in DBSP. Consider a partitioning function p:AK that assigns a key to any value in A. We define the grouping function Gp:[A](K[A]) as Gp(a)[k]=defxa.p(x)=ka[x]x. When applied to a -set a this function returns a indexed -set, where each element is called a grouping333We use “group” for the algebraic structure and “grouping” for the result of GROUP BY.: for each key k a grouping is a -set containing all elements of a that map to k (as in SQL, groupings are multisets, represented by -sets). Consider our example -set R from §4, and a key function p(s) that returns the first letter of the string s. Then we have that Gp(R)={j{joe1},a{anne1}}, i.e., grouping with this key function produces an indexed -set with two groupings, each of which contains a -set with one element.

The grouping function Gp is linear for any p. It follows that the group-by implementation in DBSP is automatically incremental: given some changes to the input relation we can apply the partitioning function to each change separately to compute how each grouping changes.

7.4. GROUP BY-AGGREGATE

Grouping in SQL is almost always followed by aggregation. Let us consider an aggregation function a:(K×[A])B that produces values in some group B, and an indexed relation of type [A][K], as defined above in §7.3. The nested relation aggregation operator Agga:[A][K]B applies a to the contents of each grouping independently and adds the results: Agga(g)=defkKa(k,g[k]). To apply this to our example, let us compute the equivalent of GROUP-BY count; we use the following aggregation function count:K×[A], count(k,s)=makeset((k,aCOUNT(s))), using the -set counting function aCOUNT from §7.2; the notation (a,b) is a pair of values a and b. Then we have Aggcount(Gp(R))={(j,1)1,(a,1)1}.

Notice that, unlike SQL, DBSP can express naturally computations on indexed -sets, they are just an instance of a group structure. One can even implement queries that operate on each grouping in an indexed -set. However, our definition of incremental computation is only concerned with incrementality in the outermost structures. We leave it to future work to explore an appropriate definition of incremental computation that operates on the inner relations.

A very useful operation on nested relations is flatmap, which is essentially the inverse of partitioning, converting an indexed -set into a -set: flatmap:[A][K][A×K]. flatmap is in fact a particular instance of aggregation, using the aggregation function a:K×[A][A×K] defined by a(k,s)=xs[k]s[k][x](k,x). For our previous example, flatmap(Gp(R))={(j,joe)1,(a,anne)1}.

If we use an aggregation function a:K×Z[A] that is linear in its second argument, then the aggregation operator Agga is linear, and thus fully incremental. As a consequence, flatmap is linear. However, many practical aggregation functions for nested relations are in fact not linear; an example is the count function above, which is not linear since it uses the makeset non-linear function. Nevertheless, while the incremental evaluation of such functions is not fully incremental, it is at least partly incremental: when applying a change to groupings, the aggregation function only needs to be re-evaluated for groupings that have changed.

7.5. Antijoin

Antijoins arise in the implementation of Datalog programs with stratified negation. Consider the following program:

O(v, z) :- I1(v, z), not I2(v).

The semantics of such a rule is defined in terms of joins and set difference. This rule is equivalent with the following pair of rules:

C(v, z) :- I1(v, z), I2(v).
O(v, z) :- I1(v, z), not C(v, z).

This transformation reduces an antijoin to a join followed by a set difference. This produces the following DBSP circuit:

I1I2+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O

7.6. Streaming joins

Consider a binary query T(s,t)=(s)t. This is the relation-to-stream join operator supported by streaming databases like ksqlDB (jafarpour-edbt19, ). Stream s carries changes to a relation, while t carries arbitrary data, e.g., logs or telemetry data points. T discards values from t after matching them against the accumulated contents of the relation.

7.6.1. Streaming Window queries

Streaming databases often organize the contents of streams into windows, which store a subset of data points with a predefined range of timestamps. The circuit below (a convolution filter in DSP) computes a fixed-size sliding-window aggregate over the last four timestamps defined by the Ti functions.

sT0z1T1z1T2z1o

In practice, windowing is usually based on physical timestamps attached to stream values rather than logical time. For instance, the CQL (arasu-tr02, ) query “SELECT * FROM events [RANGE 1 hour]” returns all events received within the last hour. The corresponding circuit (on the left) takes input stream s𝒮[A] and an additional input θ𝒮 that carries the value of the current time.

sθIWo sθ+Woz1

where the window operator W prunes input -sets, only keeping values with timestamps less than an hour behind θ[t]. Assuming ts:A returns the physical timestamp of a value, W is defined as W(v,θ)[t]=def{xv[t].ts(x)θ[t]1hr}. Assuming θ increases monotonically, W can be moved inside integration, resulting in the circuit on the right, which uses bounded memory to compute a window of an unbounded stream. This circuit is a building block of a large family of window queries, including window joins and aggregation. We conjecture that DBSP can express any CQL query.

7.7. Relational while queries

(See also non-monotonic semantics for Datalog¬ and Datalog¬¬(Abiteboul-book95, ).) To illustrate the power of DBSP we implement the following “while” program, where Q is an arbitrary relational algebra query:

x := i;
while (x changes)
x := Q(x);

The DBSP implementation of this program is:

iδ0+Q𝒟xz1

This circuit can be converted to a streaming circuit that computes a stream of values i by lifting it; it can be incrementalized using Algorithm 4.8 to compute on changes of i:

Δiδ0+(Q)Δ𝒟Δxz1

Note that at runtime the execution of this circuit is not guaranteed to terminate; however, if the circuit does terminate, it will produce the correct output, i.e., the least fixpoint of Q that includes i.

8. Implementation

We are prototyping an implementation of DBSP as part of an open-source project with an MIT license: https://github.com/vmware/database-stream-processor. The implementation is written in Rust. The implementation consists of a library and a runtime. The library provides APIs for basic algebraic data types: such as groups, finite maps, -set, indexed -set. A separate circuit construction API allows users to create DBSP circuits by placing operator nodes (corresponding to boxes in our diagrams) and connecting them with streams, which correspond to the arrows in our diagrams. The library provides pre-built generic operators for integration, differentiation, delay, nested integration and differentiation. The -set library provides functions for computing the basic -set operations corresponding to plus, negation, grouping, joining, aggregation.

For iterative computations we provide the δ0 operator and an operator that approximates by terminating iteration of a loop at a user-specified condition (usually the condition is the requirement for a zero to appear in a specified stream). The low level library allows users to construct incremental circuits manually. Our plan is to add a higher-level library (a compiler) which will automatically incrementalize a circuit.

9. Related work

DBSP using non-nested streams is a simplified instance of a Kahn network (kahn-ifip74, ). Johnson (johnson-phd83, ) studies a very similar computational model without nested streams and its expressiveness. The implementation of such streaming models of computation and their relationship to dataflow machines has been studied by Lee (lee-ieee95, ). Lee (lee-ifip93, ) also introduced streams of streams and the z1 operator.

In §7 we have discussed the connections with window and stream database queries (arasu-tr02, ; aurora, ).

Incremental view maintenance (e.g. (gupta-idb93, )) is surveyed in (gupta-idb95, ); a large bibliography is present in (motik-ai19, ). Its most formal aspect is propagating “deltas” through algebraic expressions: Q(R+ΔR)=Q(R)+ΔQ(R,ΔR). This work eventually crystallized in (koch-pods16, ). DBSP incrementalization is both more modular and more fine-grain since it deals with streams of updates. Both (koch-pods10, ) and (green-tcs11, ) use -sets to uniformly model insertions/deletions.

Picallo et al. (picallo-scop19, ) provide a general solution to IVM for rich languages. DBSP requires a group structure on the values operated on; this assumption has two major practical benefits: it simplifies the mathematics considerably (e.g., Picallo uses monoid actions to model changes), and it provides a general, simple algorithm (4.8) for incrementalizing arbitrary programs. The downside of DBSP is that one has to find a suitable group structure (e.g., -sets for sets) to “embed” the computation. Picallo’s notion of “derivative” is not unique: they need creativity to choose the right derivative definition, we need creativity to find the right group structure.

Many heuristic algorithms were published for Datalog-like languages, e.g., counting based approaches (Dewan-iis92, ; motik-aaai15, ) that maintain the number of derivations, DRed (gupta-sigmod93, ) and its variants (Ceri-VLDB91, ; Wolfson-sigmod91, ; Staudt-vldb96, ; Kotowski-rr11, ; Lu-sigmod95, ; Apt-sigmod87, ), the backward-forward algorithm and variants (motik-aaai15, ; Harrison-wdd92, ; motik-ai19, ). DBSP is more general than these approaches. Interestingly, the -sets multiplicities in our relational implementation are related to the counting-number-of-derivations approaches.

DBSP is tightly related to Differential Dataflow (DD) (mcsherry-cidr13, ; murray-sosp13, ) and its theoretical foundations (abadi-fossacs15, ) (and recently (mchserry-vldb20, ; chothia-vldb16, )). All DBSP operators are based on DD operators. DD’s computational model is more powerful than DBSP, since it allows past values in a stream to be ”updated”. In contrast, our model assumes that the inputs of a computation arrive in the time order while allowing for nested time domains via the modular lifting transformer. However, DBSP can express both incremental and non-incremental computations; in essence DBSP is “deconstructing” DD into simple component building blocks; the core Proposition 3.2 and the Algorithm based on it 4.8 are new contributions.

10. Conclusions

In this paper we have introduced DBSP, a model of computation based on infinite streams over abelian groups. In this model streams are used to model consecutive snapshots of a database, consecutive changes (or transactions) applied to a database, and consecutive values of loop-carried variables.

We have defined an abstract notion of incremental computation over streams, and defined the incrementalization operator Δ, which transforms a stream computation Q into its incremental version QΔ. The incrementalization operator has some very nice algebraic properties, which can generate very efficient incremental implementations for linear and bilinear computations.

We have then applied these tools to two domains: relational queries and recursive stratified queries. This gave us a general algorithm for incrementalizing an arbitrary query, including recursive queries. However, we believe that both the incrementalization algorithm and DBSP are even more powerful and can apply to even richer classes of query languages, including languages operating on nested relations and streaming query languages.

References

  • (1) The Aurora project. http://cs.brown.edu/research/aurora/, December 2004.
  • (2) Martín Abadi, Frank McSherry, and Gordon Plotkin. Foundations of differential dataflow. In Foundations of Software Science and Computation Structures (FoSSaCS), London, UK, April 11-18 2015. URL: http://homepages.inf.ed.ac.uk/gdp/publications/differentialweb.pdf.
  • (3) Serge Abiteboul, Richard Hull, and Victor Vianu. Foundations of Databases. Addison-Wesley, 1995. URL: http://webdam.inria.fr/Alice/.
  • (4) Mario Alvarez-Picallo, Alex Eyers-Taylor, Michael Peyton Jones, and C.-H. Luke Ong. Fixing incremental computation. In European Symposium on Programming Languages and Systems (ESOP), pages 525–552, Prague, Czech Republic, April 6–11 2019. URL: https://link.springer.com/chapter/10.1007/978-3-030-17184-1_19.
  • (5) Krzysztof R. Apt and Jean-Marc Pugin. Maintenance of stratified databases viewed as a belief revision system. In Moshe Y. Vardi, editor, ACM SIGMOD International conference on Management of data (SIGMOD), pages 136–145, San Diego, California, March 23-25 1987. doi:10.1145/28659.28674.
  • (6) Arvind Arasu, Shivnath Babu, and Jennifer Widom. An abstract semantics and concrete language for continuous queries over streams and relations. Technical Report 2002-57, Stanford InfoLab, 2002. URL: http://ilpubs.stanford.edu:8090/563/.
  • (7) Mihai Budiu, Frank McSherry, Leonid Ryzhyk, and Val Tannen. DBSP: A language for expressing incremental view maintenance for rich query languages. https://github.com/vmware/database-stream-processor/blob/main/doc/spec.pdf. December 2021.
  • (8) Stefano Ceri and Jennifer Widom. Deriving production rules for incremental view maintenance. In Guy M. Lohman, Amílcar Sernadas, and Rafael Camps, editors, International Conference of Very Large Data Bases (VLDB), pages 577–589, Barcelona, Spain, 1991. URL: http://www.vldb.org/conf/1991/P577.PDF.
  • (9) Zaheer Chothia, John Liagouris, Frank McSherry, and Timothy Roscoe. Explaining outputs in modern data analytics. Proc. VLDB Endow., 9(12):1137–1148, August 2016. URL: https://doi.org/10.14778/2994509.2994530.
  • (10) Hasanat M. Dewan, David Ohsie, Salvatore J. Stolfo, Ouri Wolfson, and Sushil Da Silva. Incremental database rule processing in PARADISER. J. Intell. Inf. Syst., 1(2):177–209, 1992. doi:10.1007/BF00962282.
  • (11) Sergio Greco and Cristian Molinaro. Datalog and logic databases. Synthesis Lectures on Data Management, 7(2):1–169, 2015. URL: https://doi.org/10.2200/S00648ED1V01Y201505DTM041.
  • (12) Todd J Green, Zachary G Ives, and Val Tannen. Reconcilable differences. Theory of Computing Systems, 49(2):460–488, 2011. URL: https://web.cs.ucdavis.edu/~green/papers/tocs11_differences.pdf.
  • (13) Todd J. Green, Grigoris Karvounarakis, and Val Tannen. Provenance semirings. In Symposium on Principles of Database Systems (PODS), page 31–40, Beijing, China, June 11-14 2007. URL: https://doi.org/10.1145/1265530.1265535.
  • (14) Ashish Gupta, Inderpal Singh Mumick, et al. Maintenance of materialized views: Problems, techniques, and applications. IEEE Data Eng. Bull., 18(2):3–18, 1995.
  • (15) Ashish Gupta, Inderpal Singh Mumick, and V. S. Subrahmanian. Maintaining views incrementally. In Proceedings of the 1993 ACM SIGMOD International Conference on Management of Data, SIGMOD ’93, page 157–166, Washington, D.C., USA, 1993. URL: https://doi.org/10.1145/170035.170066.
  • (16) Ashish Gupta, Inderpal Singh Mumick, and V. S. Subrahmanian. Maintaining views incrementally. In ACM SIGMOD International conference on Management of data (SIGMOD), pages 157–166, Washington, DC, May 26-28 1993. ACM Press. doi:10.1145/170035.170066.
  • (17) John V. Harrison and Suzanne W. Dietrich. Maintenance of materialized views in a deductive database: An update propagation approach. In Kotagiri Ramamohanarao, James Harland, and Guozhu Dong, editors, Workshop on Deductive Databases, volume CITRI/TR-92-65 of Technical Report, pages 56–65, Washington, D.C., November 14 1992. Department of Computer Science, University of Melbourne.
  • (18) Hojjat Jafarpour, Rohan Desai, and Damian Guy. KSQL: Streaming SQL engine for Apache Kafka. In International Conference on Extending Database Technology (EDBT), pages 524–533, Lisbon, Portugal, March 26-29 2019. URL: http://openproceedings.org/2019/conf/edbt/EDBT19_paper_329.pdf.
  • (19) Steven Dexter Johnson. Synthesis of Digital Designs from Recursion Equations. PhD thesis, Indiana University, May 1983. https://help.luddy.indiana.edu/techreports/TRNNN.cgi?trnum=TR141.
  • (20) Gilles Kahn. The semantics of a simple language for parallel programming. In IFIP Congress on Information Processing, 1974. URL: http://www1.cs.columbia.edu/~sedwards/papers/kahn1974semantics.pdf.
  • (21) Christoph Koch. Incremental query evaluation in a ring of databases. In Symposium on Principles of Database Systems (PODS), page 87–98, Indianapolis, Indiana, USA, 2010. URL: https://doi.org/10.1145/1807085.1807100.
  • (22) Christoph Koch, Daniel Lupei, and Val Tannen. Incremental view maintenance for collection programming. In Symposium on Principles of Database Systems (PODS), page 75–90, San Francisco, California, USA, 2016. URL: https://doi.org/10.1145/2902251.2902286.
  • (23) Jakub Kotowski, François Bry, and Simon Brodt. Reasoning as axioms change - incremental view maintenance reconsidered. In Web Reasoning and Rule Systems RR, volume 6902 of Lecture Notes in Computer Science, pages 139–154, Galway, Ireland, August 29-30 2011. Springer. doi:10.1007/978-3-642-23580-1\_11.
  • (24) Edward A. Lee. Multidimensional streams rooted in dataflow. In IFIP Working Conference on Architectures and Compilation Techniques for Fine and Medium Grain Parallelism, Orlando, FL, January 20-22 1993. URL: https://ptolemy.berkeley.edu/publications/papers/93/mdsdf/.
  • (25) Edward A. Lee and Thomas M. Parks. Dataflow process networks. Proceedings of the IEEE, pages 773–801, May 1995. URL: https://ptolemy.berkeley.edu/publications/papers/95/processNets/.
  • (26) James J. Lu, Guido Moerkotte, Joachim Schü, and V. S. Subrahmanian. Efficient maintenance of materialized mediated views. In ACM SIGMOD International conference on Management of data (SIGMOD), pages 340–351, San Jose, California, May 22-25 1995. doi:10.1145/223784.223850.
  • (27) Frank McSherry, Andrea Lattuada, Malte Schwarzkopf, and Timothy Roscoe. Shared arrangements: Practical inter-query sharing for streaming dataflows. Proc. VLDB Endow., 13(10):1793–1806, June 2020. URL: https://doi.org/10.14778/3401960.3401974.
  • (28) Frank McSherry, Derek Gordon Murray, Rebecca Isaacs, and Michael Isard. Differential dataflow. In Conference on Innovative Data Systems Research (CIDR), Asilomar, CA, January 6–9 2013. URL: http://cidrdb.org/cidr2013/Papers/CIDR13_Paper111.pdf.
  • (29) Boris Motik, Yavor Nenov, Robert Piro, and Ian Horrocks. Maintenance of Datalog materialisations revisited. Artif. Intell., 269:76–136, 2019. URL: https://doi.org/10.1016/j.artint.2018.12.004.
  • (30) Boris Motik, Yavor Nenov, Robert Edgar Felix Piro, and Ian Horrocks. Incremental update of Datalog¡ materialisation: the backward/forward algorithm. In Conference on Artificial Intelligence (AAAI), pages 1560–1568, Austin, Texas, January 25-30 2015. AAAI Press. URL: http://www.aaai.org/ocs/index.php/AAAI/AAAI15/paper/view/9660.
  • (31) Derek G. Murray, Frank McSherry, Rebecca Isaacs, Michael Isard, Paul Barham, and Martín Abadi. Naiad: A timely dataflow system. In ACM Symposium on Operating Systems Principles (SOSP), page 439–455, Farminton, Pennsylvania, 2013. URL: https://doi.org/10.1145/2517349.2522738.
  • (32) L. R. Rabiner and B. Gold, editors. Theory and Application of Digital Signal Processing. Prentice-Hall, 1975.
  • (33) Martin Staudt and Matthias Jarke. Incremental maintenance of externally materialized views. In International Conference of Very Large Data Bases (VLDB), pages 75–86, Mumbai (Bombay), India, September 3-6 1996. URL: http://www.vldb.org/conf/1996/P075.PDF.
  • (34) Ouri Wolfson, Hasanat M. Dewan, Salvatore J. Stolfo, and Yechiam Yemini. Incremental evaluation of rules and its relationship to parallelism. In ACM SIGMOD International conference on Management of data (SIGMOD), pages 78–87, Denver, Colorado, May 29-31 1991. ACM Press. doi:10.1145/115790.115799.

Appendix A Supporting material

A.1. Operations on nested streams

If a stream can be thought of as an infinite vector, a stream of streams can be thought of as an “matrix” with an infinite number of rows, where each row is a stream. For example, we can depict the nested stream i𝒮𝒮 defined by i[t0][t1]=t0+2t1 as: i = arg0 1 2 3 2 3 4 5 4 5 6 7 6 7 8 9 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

(t0 is the column index, and t1 is the row index). Let us perform some computations on nested streams to get used to them. Lifting twice a scalar function computes on elements of the matrix pointwise:

(↑↑(x ↦x mod2))(i) = arg0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

The operator on 𝒮𝒮A is well-defined: it operates on rows of the matrix, treating each row as a single value:

I(i) = arg0 1 2 3 2 4 6 8 6 9 12 15 12 16 20 24 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Lifting a stream operator computing on 𝒮A, such as :𝒮A𝒮A, also produces an operator on nested streams, but this time computing on the columns of the matrix: :𝒮𝒮A𝒮𝒮A.

(↑I)(i) = arg0 1 3 6 2 5 9 14 4 9 15 22 6 13 21 30 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Similarly, we can apply 𝒟 to nested streams 𝒟:𝒮𝒮A𝒮𝒮A, computing on rows of the matrix:

D(i) = arg0 1 2 3 2 2 2 2 2 2 2 2 2 2 2 2 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

while 𝒟:𝒮𝒮A𝒮𝒮A computes on the columns:

(↑D)(i) = arg0 1 1 1 2 1 1 1 4 1 1 1 6 1 1 1 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Similarly, z1 and its lifted variant have different outcomes:

z1(i) = arg0 0 0 0 0 1 2 3 2 3 4 5 4 5 6 7 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Notice the following commutativity properties for integration and differentiation on nested streams: ()=() and 𝒟(𝒟)=(𝒟)𝒟.

(↑z1)(i) = arg0 0 1 2 0 2 3 4 0 4 5 6 0 6 7 8 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

z1 commutes with z1:

(↑z1)(z1(i)) = z1((↑z1)(i)) = arg0 0 0 0 0 0 1 2 0 2 3 4 0 4 5 6 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

D_𝒮𝒮(i) = (D(↑D))(i) = arg0 1 1 1 2 0 0 0 2 0 0 0 2 0 0 0 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

I_𝒮𝒮(i) = ((↑I)(I))(i)= arg0 1 3 6 2 6 12 20 6 15 27 42 12 28 48 72 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]