DBSP: Automatic Incremental View Maintenance for Rich Query Languages

Mihai Budiu VMware Research mbudiu@vmware.com , Frank McSherry Materialize Inc. mcsherry@materialize.com , Leonid Ryzhyk VMware Research lryzhyk@vmware.com and Val Tannen University of Pennsylvania val@seas.upenn.edu
Abstract.

Incremental view maintenance has been for a long time a central problem in database theory (gupta-idb93, ). Many solutions have been proposed for restricted classes of database languages, such as the relational algebra, or Datalog. These techniques do not naturally generalize to richer languages. In this paper we give a general solution to this problem in 3 steps: (1) we describe a simple but expressive language called DBSP for describing computations over data streams; (2) we give a general algorithm for solving the incremental view maintenance problem for arbitrary DBSP programs, and (3) we show how to model many rich database query languages (including the full relational queries, grouping and aggregation, monotonic and non-monotonic recursion, and streaming aggregation) using DBSP. As a consequence, we obtain efficient incremental view maintenance techniques for all these rich languages.

1. Introduction

In this paper we present a simple mathematical theory for modeling streaming and incremental computations. This model has immediate practical applications in the design and implementation of streaming databases and incremental view maintenance. Our model is based on mathematical formalisms used in discrete digital signal processing (DSP) (rabiner-book75, ), but we apply it to database computations. Thus, we have called it “DBSP”. DBSP is inspired from Differential Dataflow (mcsherry-cidr13, ) (DD), and started as an attempt to provide a simpler formalization of DD than the one of Abadi et al. (abadi-fossacs15, ) (as discussed in §9), but has evolved behind that purpose.

The core concept of DBSP is the stream: a stream s with type 𝒮A maps “time” moments t to values s[t] of type A; think of it as an ”infinite vector”. A streaming computation is a function that consumes one or more streams and produces another stream. We depict streaming computations with typical DSP box-and-arrow diagrams (also called “circuits”), where boxes are computations and streams are arrows, as in the following diagram, which shows a stream operator T consuming two input streams s0 and s1 and producing one output stream s:

s0s1Ts

We generally think of streams as sequences of small values, and we will use them in this way. However, we make a leap of imagination and also treat a whole database as a stream value. What is a stream of databases? It is a sequence of database snapshots. We model the time-evolution of a database DB as a stream DB𝒮SCH, where SCH is the database schema. Time is not the wall-clock time, but essentially a counter of the sequence of transactions applied to the database. Since transactions are linearizable, they have a total order, which defines a linear time t dimension: the value of the stream DB[t] is the snapshot of the database contents after t transactions have been applied. We assume that DB[0]=0, i.e., the database starts empty.

Database transactions also form a stream T, a stream of changes, or deltas that are applied to our database. The database snapshot at time t is the cumulative result of applying all transactions in the sequence up to t: DB[t]=itT[i]=def(T)[t] (we make the notion of “addition” precise later.). The operation of adding up all changes is stream integration. The following diagram expresses this relationship using the operator for stream integration:

TDB

Conversely, we can say that transactions are the changes of a database, and write T=𝒟(DB), or T[t]=DB[t]DB[t1]. This is the definition of stream differentiation, denoted by 𝒟; this operation computes the changes of a stream, and is the inverse of stream integration. §2 precisely defines streams, integration and differentiation, and analyzes their properties.

Let us apply these concepts to view maintenance. Consider a database DB and a query Q defining a view V as a function of a database snapshot V=Q(DB). Corresponding to the stream of database snapshots DB we have a stream of view snapshots: V[t] is the view’s contents after the t-th transaction has been applied. We show this relationship using the following diagram:

DBQV

The symbol Q (the “lifting” of Q) shows that the query Q is applied independently to every element of the stream of database snapshots DB. Q is a “streaming query” since it operates on a stream of values. The incremental view maintenance problem requires an algorithm to compute the stream ΔV of changes of the view V, i.e., 𝒟(V), as a function of the stream T. By chaining these definitions together we get the following fundamental equation of the view maintenance problem: ΔV=𝒟(Q(DB))=𝒟(Q((T))), graphically shown as:

TQ𝒟ΔVDB

This definition can be generalized to more general streaming queries S:𝒮A𝒮B that are richer than lifted pointwise queries Q. The incremental version of streaming query S is denoted by SΔ and is defined according to the above equation, which can also be written as: SΔ=𝒟S.

It is generally assumed that the changes to a dataset are much smaller than the dataset itself; thus, computing on streams of changes may produce significant performance benefits.

Applying the query incrementalization operator SSΔ constructs a query that computes directly on changes; however, the resulting query is no more efficient than a query that computes on the entire dataset, because it uses an integration operator to reconstitute the full dataset. §3 shows how algebraic properties of the Δ operator are used to optimize the implementation of SΔ:

  1. (1)

    The first property is that many classes of primitive operations have very efficient incremental versions. In particular, linear queries have the property Q=QΔ. Almost all relational and Datalog queries are based on linear operators. Thus, the incremental version of such queries can be computed in time proportional to the size of the changes. Bilinear operators (such as joins) have a more complex implementation, which nevertheless still performs work proportional to the size of the changes, but require storing an amount of data proportional to the size of the relations.

  2. (2)

    The second key property is the chain rule: (S1S2)Δ=S1ΔS2Δ. This rule gives the incremental version of a complex query as a composition of incremental versions of its components. It follows that we can implement any incremental query as a composition of primitive incremental queries, all of which perform work proportional to the size of the changes.

Armed with this general theory of incremental computation, in §4 we show how to model relational queries in DBSP. This immediately gives us a general algorithm to compute the incremental version of any relational query. These results are well-known, but they are cleanly modeled by DBSP.

Applying DBSP to recursive queries requires extending this computational model. In §5 we introduce two additional operators: δ0 creates a stream from a scalar value, and creates a scalar value from a stream. These operators can be used to implement computations with while loops. So, in addition to modelling changing inputs and database, we also use streams as a model for sequences of consecutive values of loop iteration variables. With this addition DBSP becomes rich enough to implement recursive queries. §5.1 shows how stratified recursive Datalog programs with negation can be implemented in DBSP.

In §6 we use DBSP to model computations on nested streams, where each value of a stream is another stream. This allows us to define incremental streaming computations for recursive programs. As a consequence we derive a universal algorithm for incrementalizing arbitrary streaming Datalog programs.

DBSP is a simple language: the basic DBSP streaming model is built essentially from two elementary mathematical operators: lifting and delay z1. The nested streams model adds two additional operators, δ0 for stream construction and for destruction.

DBSP is also expressive: for example, it is more powerful than stratified Datalog. DBSP can also describe streaming window queries, or queries on nested relations (such as grouping), and non-monotone recursive queries. We discuss briefly the application of DBSP to richer languages in §7.

This paper omits most proofs; the full proofs are available in an expansive companion technical report (tr, ).

This paper makes the following contributions:

  1. (1)

    It defines DBSP, a small language for streaming computation, which nonetheless can express nested non-monotonic recursion;

  2. (2)

    It provides an algorithm incrementalizing all DBSP programs;

  3. (3)

    For fragments of DBSP corresponding to the relational algebra and stratified-monotonic Datalog, the automatic incrementalization algorithm provides results matching state-of-the-art approaches. Moreover, our approach also applies to more powerful languages, such as while-relational and non-monotonic Datalog (Abiteboul-book95, ).

  4. (4)

    It develops a formal, sound foundation for the manipulation of streaming and incremental computations, which allows one to reason formally about program transformations and design new efficient implementations.

  5. (5)

    DBSP can express both streaming and incremental computation models in a single framework. We regard this unification as a significant contribution.

2. Stream computations

In this section we introduce formally the notion of a stream as an infinite sequence of values, and we define computations on streams. Stream operators (§2.1) are the basic building block of stream computations. We employ (§2.2) restricted types of stream operators: causal operators (which cannot “look into the future”), and strict operators (which cannot even “look into the present”). Moreover, all our operators are “synchronous”: they consume and produce data at the same “rate”. Causal operators can be chained into complex acyclic computational circuits; cyclic circuits are restricted to using strict operators on back-edges. Finally, we define (§2.3) two useful stream operators: integration and differentiation.

All the results in this section have been known for decades, but we recapitulate them to clarify our model’s formal assumptions.

2.1. Streams and stream operators

is the set of natural numbers, 𝔹 is the set of Booleans, and is the set of integers.

Definition 2.1 (stream):

Given a set A, a stream of values from A, or an A-stream, is a function A. We denote by 𝒮A=def{s|s:A} the set of all A-streams.

When s𝒮A and t we write s[t] for the t-th element of the stream s instead of the usual s(t) to distinguish it from other function applications. We think of the index t as (discrete) time and of s[t]A as the value of the the stream s “at time” t. For example, the stream of natural numbers id𝒮 given by 𝑖𝑑[t]=t is the sequence of values \setsepchar\readlistarg01234[arg[1]arg[2]arg[3]arg[4]arg[5]].

Definition 2.2 (stream operator):

A (typed) stream operator with n inputs is a function T:𝒮A0××𝒮An1𝒮B.

In general we will use “operator” for functions on streams, and “function” for computations on “scalar” values.

We are using an extension of the simply-typed lambda calculus to write DBSP programs; we will introduce its elements gradually. However, we find it more readable to also use signal-processing-like circuit diagrams to depict DBSP programs. In a circuit diagram a rectangle represents an operator application (labeled with the operator name, e.g., T), while an arrow is a stream.

Stream operator composition (function composition) is shown as chained circuits. The composition of a binary operator T:𝒮A×𝒮B𝒮A with the unary operator S:𝒮A𝒮B into the computation λs.T(T(s,S(s)),S(s)):𝒮A𝒮A is:

sSTTSo

(Diagrams obscure the order of the inputs of an operator; for non-commutative operators we have to provide more information.)

Definition 2.3:

(lifting) Given a (scalar) function f:AB, we define a stream operator f:𝒮A𝒮B by lifting the function f pointwise in time: (f)(s)=deffs. Equivalently, (f)(s)[t]=deff(s[t]). This extends to functions of multiple arguments.

For example, ((λx.(2x)))(id)=\setsepchar\readlistarg02468[arg[1]arg[2]arg[3]arg[4]arg[5]].

Proposition 2.4 (distributivity):

Lifting distributes over function composition: (fg)=(f)(g).

We say that two DBSP programs are equivalent if they compute the same input-output function on streams. We use the symbol to indicate that two circuits are equivalent. For example, Proposition 2.4 states the following circuit equivalence:

sgfo s(fg)o

2.2. Streams over abelian groups

For the rest of the technical development we require the set of values A of a stream 𝒮A to form a commutative group (A,+,0,). Now we introduce the primitive stream operators that DBSP uses.

2.2.1. Delays and time-invariance

Definition 2.5 (Delay):

The delay operator111The name z1 comes from the DSP literature, and is related to the z-transform (rabiner-book75, ). produces an output stream by delaying its input by one step: zA1:𝒮A𝒮A:

zA1(s)[t]=def{0whent=0As[t1]whent1 sz1o

We often omit the type parameter A, and write just z1. For example, z1(𝑖𝑑)=\setsepchar\readlistarg00123[arg[1]arg[2]arg[3]arg[4]arg[5]].

Definition 2.6 (Time invariance):

A stream operator S:𝒮A𝒮B is time-invariant iff S(zA1(s))=zB1(S(s)) for all s𝒮A, or, in other words, iff the two following circuits are equivalent:

sSz1o sz1So

This definition extends naturally to operators with multiple inputs.

The composition of time-invariant operators of any number of inputs is time invariant. The delay operator z1 is time-invariant. DBSP only uses time-invariant operators.

Definition 2.7:

We say that a function between groups f:AB has the zero-preservation property if f(0A)=0B. We write zpp(f).

A lifted operator f is time-invariant iff zpp(f).

2.2.2. Causal and strict operators

Definition 2.8 (Causality):

A stream operator S:𝒮A𝒮B is causal when for all s,s𝒮A, and all times t we have: (its[i]=s[i])S(s)[t]=S(s)[t].

In other words, the output value at time t can only depend on input values from times tt. Operators produced by lifting are causal, and z1 is causal. All DBSP operators are causal. The composition of causal operators of any number of inputs is causal.

Definition 2.9 (Strictness):

A stream operator, F:𝒮A𝒮B is strictly causal (abbreviated strict) if s,s𝒮A,t we have: (i<t.s[i]=s[i])F(s)[t]=F(s)[t].

So the t-th output of F(s) can depend only on “past” values of the input s, between 0 and t1. In particular, F(s)[0]=0B is the same for all s𝒮A. Strict operators are causal. Lifted operators in general are not strict. z1 is strict.

Proposition 2.10:

For a strict F:𝒮A𝒮A the equation α=F(α) has a unique solution α𝒮A, denoted by fixα.F(α).

Thus every strict operator from a set to itself has a unique fixed point. The simple proof relies on strong induction, showing that α[t] depends only on the values of α prior to t.

We show that the following circuit, having a strict “feedback” edge F, is a well-defined function on streams:

sTαF
Lemma 2.11:

If F:𝒮B𝒮B is strict and T:𝒮A×𝒮B𝒮B is causal, then for a fixed s the operator λα.T(s,F(α)):𝒮A𝒮B is strict.

Corollary 2.12:

If F:𝒮B𝒮B is strict and T:𝒮A×𝒮B𝒮B is causal, the operator Q(s)=fixα.T(s,F(α)) is well-defined and causal. If, moreover, F and T are time-invariant then so is Q.

All stream computations in DBSP are built from the primitive operators we have described: lifted operators and delays (we add two more operators in §6). Circuits composed of such operators can be efficiently implemented using Dataflow machines (lee-ieee95, ).

Circuits with feedback are used for two purposes: defining an integration operator (in the next section), and defining recursive computations (§5). In turn, the integration operator will be instrumental in defining incremental computations (§3).

2.3. Integration and differentiation

Remember that we require the elements of a stream to come from an abelian group A. Streams themselves form an abelian group:

Proposition 2.13:

The structure (𝒮A,+,0,), obtained by lifting the + and unary operations from A to 𝒮A, is an abelian group.

Stream addition and negation are causal, time-invariant operators.

Definition 2.14:

Given abelian groups A and B we call a stream operator S:𝒮A𝒮B linear if it is a group homomorphism, that is, S(a+b)=S(a)+S(b) (and therefore S(0)=0 and S(a)=S(a)).

Lifting a linear function f:AB produces a stream operator f that is linear, time-invariant (LTI). z1 is LTI.

Definition 2.15:

(bilinear) A function of two arguments f:A×BC with A,B,C groups, is bilinear if it is linear separately in each argument (i.e., it distributes over addition): a,b,c,d.f(a+b,c)=f(a,c)+f(b,c), and f(a,c+d)=f(a,c)+f(c,d).

This definition extends to stream operators. Lifting a bilinear function f:A×BC produces a bilinear stream operator f. An example bilinear operator over 𝒮 is lifted multiplication: f:𝒮×𝒮𝒮,f(a,b)[t]=a[t]b[t].

The composition of (bi)linear operators with linear operators is (bi)linear (since homomorphisms compose).

The feedback loop produced with a linear operator is linear:

Proposition 2.16:

Let S be a unary causal LTI operator. The operator Q(s)=fixα.S(s+z1(α)) is well-defined and LTI:

s+Sαz1
Definition 2.17 (Differentiation):

The differentiation operator 𝒟𝒮A:𝒮A𝒮A is defined by: 𝒟(s)=defsz1(s).

We generally omit the type, and write just 𝒟 when the type can be inferred from the context. The value of 𝒟(s) at time t is the difference between the current (time t) value of s and the previous (time t1) value of s. As an example, 𝒟(𝑖𝑑)=\setsepchar\readlistarg01111[arg[1]arg[2]arg[3]arg[4]arg[5]].

If s is a stream, then 𝒟(s) is the stream of changes of s.

Proposition 2.18:

Differentiation 𝒟 is causal and LTI.

s+𝒟(s)z1 s+(s)z1
Differentiation Integration

The integration operator “reconstitutes” a stream from its changes:

Definition 2.19 (Integration):

The integration operator 𝒮A:𝒮A𝒮A is defined by (s)=defλs.fixα.(s+z1(α)).

We also generally omit the type, and write just . This is the construction from Proposition 2.16 using the identity function for S.

Proposition 2.20:

(s) is the discrete (indefinite) integral applied to the stream s: (s)[t]=its[i].

As an example, (𝑖𝑑)=\setsepchar\readlistarg013610[arg[1]arg[2]arg[3]arg[4]arg[5]].

Proposition 2.21:

is causal and LTI.

Theorem 2.22 (Inversion):

Integration and differentiation are inverses of each other: s.(𝒟(s))=𝒟((s))=s.

s𝒟o so s𝒟o

3. Incremental computation

Definition 3.1:

Given a unary stream operator Q:𝒮A𝒮B we define the incremental version of Q as QΔ=def𝒟Q. QΔ has the same “type” as Q: QΔ:𝒮A𝒮B. For an operator with multiple inputs we define the incremental version by applying to each input independently: e.g., if T:𝒮A×𝒮B𝒮C then TΔ(a,b)=def𝒟(T((a),(b))).

The following diagram illustrates the intuition behind this definition: ΔsQ𝒟Δoso

If Q(s)=o is a computation, then QΔ performs the “same” computation as Q, but between streams of changes Δs and Δo. This is the diagram from the introduction, substituting Δs for the transaction stream T, and o for the stream of view versions V.

Notice that our definition of incremental computation is meaningful only for streaming computations; this is in contrast to classic definitions, e.g. (gupta-idb95, ) which consider only one change. Generalizing the definition to operate on streams gives us additional power, especially when operating with recursive queries.

The following proposition is one of our central results.

Proposition 3.2:

(Properties of the incremental version): For computations of appropriate types, the following hold:

inversion::

QQΔ is bijective; its inverse is QQ𝒟.

invariance::

+Δ=+,(z1)Δ=z1,Δ=,Δ=,𝒟Δ=𝒟

push/pull::

Q=QΔ; 𝒟Q=QΔ𝒟

chain::

(Q1Q2)Δ=Q1ΔQ2Δ (This generalizes to operators with multiple inputs.)

add::

(Q1+Q2)Δ=Q1Δ+Q2Δ

cycle::

(λs.fixα.T(s,z1(α)))Δ=λs.fixα.TΔ(s,z1(α))

The proof of these properties relies on elementary algebraic manipulations. Despite their simplicity, they are very useful. For example, the chain rule states that the following two circuits are equivalent:

iQ1Q2𝒟o iQ1ΔQ2Δo

In other words, to incrementalize a composite query you can incrementalize each sub-query independently. This gives us a simple deterministic recipe for computing the incremental version of an arbitrarily complex query.

We illustrate by giving the proof of the chain rule, which is trivial, and is based on function composition associativity:

(Q1Q2)Δ =𝒟Q1Q2
=𝒟Q1(𝒟)Q2
=(𝒟Q1)(𝒟Q2)
=(Q1)Δ(Q2)Δ.

The cycle rule states that the following circuits are equivalent:

sT𝒟oz1 sTΔoz1

The incremental version of a feedback loop around a query is just the feedback loop with the incremental query. The significance of this result will be apparent when we implement recursive queries.

To execute incremental queries efficiently, we want to compute directly on streams of changes without integrating them. The invariance property above shows that stream operators +, , and z1 are identical to their incremental versions, thus and 𝒟 can be omitted for them: QΔ=Q𝒟=Q. The following theorems generalize this to linear and bi-linear operators:

Theorem 3.3 (Linear):

For an LTI operator Q we have QΔ=Q.

Theorem 3.4 (Bilinear):

For a bilinear time-invariant operator × we have (a×b)Δ=a×b+z1((a))×b+a×z1((b)).

By rewriting this statement using Δa for the stream of changes to a we get the familiar formula for incremental equi-joins: Δ(a×b)=Δa×Δb+a×(Δb)+(Δa)×b.

This should not be surprising because equi-joins are bilinear, as we discuss in the next section.

4. Incremental View Maintenance

Results in §2 and §3 apply to streams of arbitrary group values. In this section we turn our attention to using these results in the context of relational view maintenance. As explained in the introduction, we want to efficiently compute the incremental version of any relational query Q that updates a database view.

However, we face a technical problem: the and 𝒟 operators were defined on abelian groups, and relational databases in general are not abelian groups, since they operate on sets. Fortunately, there is a well-known tool in the database literature which converts set operations into group operations by using -sets (also called z-relations (green-pods07, )) instead of sets.

We start by defining the -sets group, and then we review how relational queries are converted into DBSP circuits over -sets. What makes this translation efficiently incrementalizable is the fact that many basic relational queries can be expressed using LTI -set operators.

4.1. -sets as an abelian group

Given a set A, we define -sets222Also called -relations elsewhere (green-tcs11, ), because often A is a Cartesian product in practice; however, we only need the set structure for most of our results. over A as functions with finite support from A to . These are functions f:A where f(x)0 for at most a finite number of values xA. We also write [A] for the type of -sets with elements from A. Values in [A] can be thought of as key-value maps with keys in A and values in , justifying the array indexing notation. We write f[a] instead of f(a). Since is an abelian ring, [A] is also an abelian ring (and thus a group). This group ([A],+[A],0[A],A) has addition and subtraction defined pointwise: (f+[A]g)(x)=f(x)+g(x).xA. The 0 element of [A] is the function 0[A] defined by 0[A](x)=0.xA.

A particular -set m[A] can be denoted by enumerating the inputs that map to non-zero values and their corresponding values: m={x1w1,,xnwn}. We call wiZ the multiplicity (or weight) of xiA. Multiplicities can be negative. We write that xm for xA, iff m[x]0.

For example, let’s consider a concrete -set R[string], defined by R={joe1,anne1}. R has two elements in its domain, joe with a multiplicity of 1 (so R[joe]=1), and anne with a multiplicity of 1. We say joe R and anne R.

-sets generalize sets and bags. A set with elements from A can be represented as a -set by associating a weight of 1 with each set element. When translating queries on sets to DBSP programs we convert the data values back and forth between sets and -sets.

Definition 4.1:

We say that a -set represents a set if the multiplicity of every element is one. We define a function to check this property isset:[A]𝔹 given by:

isset(m)=def{true if m[x]=1,xmfalse otherwise

For our example isset(R)=false, since R[anne]=1.

Definition 4.2:

We say that a -set is positive (or a bag) if the multiplicity of every element is positive. We define a function to check this property ispositive:[A]𝔹. given by

ispositive(m)=def{true if m[x]0,xAfalse otherwise

m[A].isset(m)ispositive(m).

We have ispositive(R)=false, since R[anne]=1.

We write m0 when m is positive. For positive m,n we write mn for m,n[A] iff mn0. is a partial order.

We call a function f:[A][B] positive if it maps positive values to positive values: x[A],x0[A]f(x)0[B]. We apply this notation to functions as well: ispositive(f).

Definition 4.3 (distinct):

The function 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡:[A][A] projects a -set into an underlying set (but the result is still a -set):

𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(m)[x]=def{1 if m[x]>00 otherwise

𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 “removes” elements with negative multiplicities. 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(R)={joe1}.

While very simple, this definition of 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 has been carefully chosen to enable us to define precisely all relational (set) operators from -sets operators.

Circuits derived from relational queries only compute on positive -sets; negative values will only be used to represent changes to -sets. Negative weights “remove” elements from a set.

All the results from §2 extend to streams over -sets.

Definition 4.4:

(mononotonicity) A stream s𝒮[A] is positive if every value of the stream is positive: s[t]0.t. A stream s𝒮[A] is monotone if s[t]s[t1],t.

If s𝒮[A] is positive, then (s) is monotone. If s𝒮[A] is monotone, 𝒟(s) is positive.

Generalizing box-and-arrow diagrams

From now on we will use circuits to compute both on scalars and streams. We use the same graphical representation for functions on streams or scalars: boxes with input and output arrows. For scalar functions the “values” of the arrows are scalars instead of streams; otherwise the interpretation of boxes as function application is unchanged.

4.2. Implementing relational operators

The fact that relational algebra can be implemented by computations on -sets has been shown before, e.g. (green-pods07, ). The translation of all the core relational operators is shown in Table 4.2. The translation is essentially given by induction on the query structure.

Operation SQL example DBSP circuit Details
Composition SELECT DISTINCT ... FROM (SELECT ... FROM ...) ICICOO CI circuit for inner query, CO circuit for outer query.
Union (SELECT * FROM I1) UNION (SELECT * FROM I2) I1I2+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O
Projection SELECT DISTINCT I.c FROM I Iπ𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O π(i)[y]=defxi,x|c=yi[x] x|c is projection on column c of the tuple x π is linear; ispositive(π),zpp(π).
Filtering SELECT * FROM I WHERE p(I.c) IσP𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O σP(m)[x]=def{m[x]x if P(x)0 otherwise P:A𝔹 is a predicate. σP is linear; ispositive(σP),zpp(σP).
Selection SELECT DISTINCT f(I.c, ...) FROM I Imap(f)𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O For a function f map(f) is linear, ispositive(map(f)),zpp(map(f)) .
Cartesian product SELECT I1.*, I2.* FROM I1, I2 I1I2×O (a×b)((x,y))=defa[x]×b[y]. × is bilinear, ispositive(×),zpp(×).
Equi-join SELECT I1.*, I2.* FROM I1 JOIN I2 ON I1.c1 = I2.c2 I1I2O (ab)((x,y))=defa[x]×b[y] if x|c1=y|c2. is bilinear, ispositive(),zpp().
Intersection (SELECT * FROM I1) INTERSECT (SELECT * FROM I2) I1I2O Special case of equi-join when both relations have the same schema.
Difference SELECT * FROM I1 EXCEPT SELECT * FROM I2 I1I2+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O
Table 1. Implementation of SQL relational set operators in DBSP. Each query assumes that inputs I, I1, I2, are sets and it produces output sets.

The translation is fairly straightforward, but many operators require the application of a 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 to produce sets. The correctness of this implementation is predicated on the global circuit inputs being sets as well. For example, ab=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(a+b), ab=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(ab), (a×b)((x,y))=a[x]×b[y].

4.2.1. Correctness of the DBSP implementations

A relational query Q that transforms a set V into a set U will be implemented by a DBSP computation Q on -sets. The correctness of the implementation requires that the following diagram commutes:

VVZUUZQtozsetQtoset

The toset and tozset functions convert sets to -sets and vice-versa:

toset:[A]2A is defined by toset(m)=defx𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(m){x}.

tozset:2A[A] is defined by tozset(s)=defxs1x.

All standard algebraic properties of the relational operators can be used to optimize circuits (they can even be applied to queries before building the circuits).

Notice that the use of the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 operator allows DBSP to model the full relational algebra, including difference (and not just the positive fragment). Most of the operators that appear in the circuits in Table 4.2 are linear, and thus have very efficient incremental versions. A notable exception is 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡. While we show below that 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 can be computed efficiently incrementally, it does have an important cost in terms of memory, so we try to minimize its use. For this we can use a pair of optimizations:

Proposition 4.5:

Let Q be one of the following -sets operators: filtering σ, join , or Cartesian product ×. Then we have i[I],ispositive(i)Q(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(i))=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(Q(i)).

This rule allows us to delay the application of 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡.

Proposition 4.6:

Let Q be one of the following -sets operators: filtering σ, projection π, selection (map(f)), addition +, join , or Cartesian product ×. Then we have i[I],ispositive(i)𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(Q(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(i)))=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(Q(i)).

This is Proposition 6.13 in (green-tcs11, ).

These properties allow us to “consolidate” distinct operators by performing one 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 at the end of a chain of computations.

Finally, the next proposition shows that the incremental of 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 can be computed with work proportional to the size of the input change.

Proposition 4.7:

The following circuit implements (𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ: d(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δo dz1Hoi

where H:[A]×[A][A] is defined as: H(i, d)[x] =def{-1 if i[x] ¿ 0 and (i + d)[x] ≤01 if i[x] ≤0 and (i + d)[x] ¿ 00 otherwise

The function H detects whether the multiplicity of an element in the input set i when adding change d is changing from negative to positive or vice-versa. Notice that only multiplicities of the elements that appear in the change d can change from input to output, so the work needed to compute both H is bounded by the size of d and not i.

4.3. Incremental view maintenance

Let us consider a relational query Q defining a view. To create a circuit that maintains incrementally the view defined by Q we apply the following mechanical steps; this algorithm is deterministic and its running time is proportional to the complexity of the query (number of operators in the query):

Algorithm 4.8 (incremental view maintenance):
  1. (1)

    Translate Q into a circuit using the rules in Table 4.2.

  2. (2)

    Apply 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 consolidation until convergence.

  3. (3)

    Lift the whole circuit, by applying Proposition 2.4, converting it to a circuit operating on streams.

  4. (4)

    Incrementalize the whole circuit “surrounding” it with and 𝒟.

  5. (5)

    Apply the chain rule and other properties of the Δ operator from Proposition 3.2 recursively on the query structure to optimize the incremental implementation.

Step (3) yields a circuit that consumes a stream of complete database snapshots and outputs a stream of complete view snapshots. Step (4) yields a circuit that consumes a stream of changes to the database and outputs a stream of view changes; however, the internal operation of the circuit is non-incremental, as it computes on the complete state of the database reconstructed by the integration operator. Step (5) incrementalizes the internals of the circuit by rewriting it to compute on changes, avoiding integration when possible (see §3).

4.4. Example

In this section we apply the incremental view maintenance algorithm to a concrete query. Let us consider the following query:

CREATE VIEW v AS
SELECT DISTINCT t1.x, t2.y FROM (
SELECT t1.x, t1.id
FROM t
WHERE t.a > 2
) t1
JOIN (
SELECT t2.id, t2.y
FROM r
WHERE r.s > 5
) t2 ON t1.id = t2.id

Step 1: First we create a DBSP circuit to represent this query using the translation rules from Table 4.2:

t1σa>2𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡πx,d𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡t2σs>5𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡πy,id𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡id=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Step 2: we apply the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 optimization rules; first the rule from 4.6 gives us the following equivalent circuit:

t1σa>2πx,d𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡t2σs>5πy,id𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡id=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Applying the rule from 4.5 we get:

t1σa>2πx,dt2σs>5πy,idid=id𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡πx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

And applying again 4.6 we get:

t1σa>2πx,dt2σs>5πy,idid=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Step 3: we lift the circuit using distributivity of composition over lifting; we obtain a circuit that computes over streams, i.e., for each new input pair of relations t1 and t2 it will produce an output view V:

t1σa>2πx,dt2σs>5πy,idid=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡V

Step 4: incrementalize circuit, obtaining a circuit that computes over changes; this circuit receives changes to relations t1 and t2 and for each such change it produces the corresponding change in the output view V:

Δt1σa>2πx,dΔt2σs>5πy,idid=idπx,y𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡𝒟ΔV

Step 5: apply the chain rule to rewrite the circuit as a composition of incremental operators;

Δt1(σa>2)Δ(πx,d)ΔΔt2(σs>5)Δ(πy,id)Δ(id=id)Δ(πx,y)Δ(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔΔV

Use the linearity of σ and π to simplify this circuit:

Δt1σa>2πx,dΔt2σs>5πy,id(id=id)Δπx,y(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔΔV

Finally, replace the incremental join using the formula for bilinear operators (Theorem 3.4), and the incremental 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 (Proposition 4.7), obtaining the circuit from Figure 1.

Δt1σa>2πx,dΔt2σs>5πy,idid=idz1z1id=idid=id+πx,yz1HΔV
Figure 1. Final version of the incremental query circuit from §4.4.

Notice that the resulting circuit contains three integration operations: two from the join, and one from the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡. It also contains three join operators. However, the work performed by each operator for each new input is proportional to the size of change, as we argue in the following section.

4.5. Complexity

Incremental circuits are efficient. The work performed (and the memory used) by a circuit is the sum of the work performed (and memory used) by its operators. We argue that each operator in the incremental version of a circuit is efficient.

For incrementalized circuits the input stream of each operator contains changes in its input relations. Denote C[t]=defs[t] the size of the value of stream s of changes at time t, and R[t]=def(s)[t] the size of the relation produced by integrating all changes in s. An unoptimized incremental operator QΔ=𝒟Q evaluates query Q on the integration of its input streams; hence its time complexity is the same as that of the non-incremental operator, a function of R[t]. In addition, because of the and 𝒟 operators, it uses O(R[t]) memory.

The optimizations described in §3 reduce the reduce the time complexity of an operator to be a function of C[t]. Assuming C[t]R[t], this translates to major performance improvements in practice. For example, Theorem 3.3, allows evaluating TΔ, where T is a linear operator, in time O(C[t]). Interestingly, while the operator uses O(R[t]) memory, it can be evaluated in O(C[t]) time, because all values that appear in the output at time t must be present in current input change for time t. Similarly, while the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 operator is not linear, (𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ can also be evaluated in O(C[t]) according to Proposition 4.7. Bilinear operators, including join, can be evaluated in time proportional to the product of the sizes of their input changes O(C[t]2) (Theorem 3.4).

The space complexity of linear operators is 0 (zero), since they store no data persistently. The space complexity of (𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ and join is O(R[t]).

5. Recursive queries

Recursive queries are very useful in a many applications. For example, many graph algorithms (such as graph reachability or transitive closure) are naturally expressed using recursive queries.

We introduce two new stream operators that are instrumental in expressing recursive query evaluation. These operators allow us to build circuits implementing looping constructs, which are used to iterate computations until a fixed-point is reached.

Definition 5.1:

We say that a stream s𝒮A is zero almost-everywhere if it has a finite number of non-zero values, i.e., there exists a time t0 s.t. tt0.s[t]=0. Denote the set of streams that are zero almost everywhere by 𝒮A¯.

Stream introduction

The delta function (named from the Dirac delta function) δ0:A𝒮A produces a stream from a scalar value: δ_0(v)[t] =def{ vif t = 00_A otherwise For example, δ0(5) is the stream \setsepchar\readlistarg50000[arg[1]arg[2]arg[3]arg[4]arg[5]].

Stream elimination

We define the function :𝒮A¯A, over streams that are zero almost everywhere, as (s)=deft0s[t]. is closely related to ; if is the indefinite integral, is the definite integral on the interval 0.

For many implementation strategies (including relational and Datalog queries given below) the operator can be approximated finitely and precisely by integrating until the first 0 value encountered, since it can be proven that its input is always the derivative of a monotone stream.

δ0 is the left inverse of , i.e.: δ0=𝑖𝑑A.

Proposition 5.2:

δ0 and are LTI.

Nested time domains

So far we used a tacit assumption that “time” is common for all streams in a program. For example, when we add two streams, we assume that they use the same “clock” for the time dimension. However, the δ0 operator creates a stream with a “new”, independent time dimension. We require well-formed circuits to “insulate” such nested time domains by nesting them between a δ0 and an operator:

iδ0Qo
Proposition 5.3:

If Q is time-invariant, the circuit above has the zero-preservation property: zpp(Qδo).

5.1. Implementing Recursive Datalog

We illustrate the implementation of recursive queries in DBSP for stratified Datalog. Datalog is strictly more expressive than relational algebra since it can express recursive programs, e.g.:

O(v) :- I(v). // base case
O(v) :- I(z), O(x), v = ... . // rec case

In general, a recursive Datalog program defines a set of mutually recursive relations O1,..,On as an equation (O1,..,On)=R(I1,..,Im,O1,..,On), where I1,..,Im are input relations and R is a relational (non-recursive) query.

The following algorithm generates a circuit that computes a solution to this equation. We describe the algorithm informally and for the special case of a single input I and single output O; the general case can be found in the companion technical report (tr, ), and is only slightly more involved.

  1. (1)

    Implement the non-recursive relational query R as described in §4 and Table 4.2; this produces an acyclic circuit whose inputs and outputs are a -set (i.e., not a stream):

    IORO
  2. (2)

    Lift this circuit to operate on streams and connect the output to the input in a feedback cycle as follows:

    Iδ0R𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡𝒟Ooz1

    We construct R by lifting each operator of the circuit individually according to Proposition 2.4.

The inner loop of the circuit computes the fixed point of R. The differentiation operator 𝒟 yields the set of new Datalog facts (changes) computed by each iteration of the loop. When the set of new facts becomes empty the iterations have completed. computes the value of the fixed point by aggregating these changes.

Theorem 5.4 (Recursion correctness):

If isset(I), the output of the circuit above is the relation O as defined by the Datalog semantics as a function of the input relation I.

Proof.

Let us compute the contents of the o stream, produced at the output of the 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡 operator. We will show that this stream is composed of increasing approximations of the value of O.

We define the following one-argument function: S(x)=λx.R(I,x). Notice that the left input of the R block is a constant stream with the value I. Due to the stratified nature of the language, we must have ispositive(S), so x.S(x)x. Also S is time-invariant, so S(0)=0. From §4, the definition of set union we know that xy=𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡(x+y). We get the following system of equations: o[0] =S(0) o[t] =S(o[t-1]) So, by induction on t we have o[t]=St(0), where by St we mean SSSt. S is monotone; thus, if there is a time k such that Sk(0)=Sk+1(0), we have j.Sk+j(0)=Sk(0). Applying a derivative to this stream will then produce a stream that is zero almost everywhere, and integrating this derivative will return the last distinct value in the stream o.

This is essentially the definition of the semantics of a recursive Datalog relation: O=fixx.R(I,x). ∎

Note that the use of unbounded data domains (like integers with arithmetic) does not guarantee convergence for all programs.

In fact, this circuit implements the standard naïve evaluation algorithm (e.g., see Algorithm 1 in (greco-sldm15, )). Notice that the inner part of the circuit is the incremental form of another circuit, since it is sandwiched between and 𝒟 operators. Using the cycle rule of Proposition 3.2 we can rewrite this circuit as:

(5.1) Iδ0(R)Δ(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔOz1

This last circuit effectively implements the semi-naïve evaluation algorithm (Algorithm 2 from (greco-sldm15, )). The correctness of semi-naïve evaluation is an immediate consequence of the cycle rule.

6. Incremental recursive programs

In §24 we showed how to incrementalize a relational query by compiling it into a circuit, lifting the circuit to compute on streams, and applying the Δ operator to the lifted circuit. In §5.1 we showed how to compile a recursive query into a circuit that employs incremental computation internally to compute the fixed point. Here we combine these results to construct a circuit that evaluates a recursive query incrementally. The circuit receives a stream of updates to input relations, and for every update recomputes the fixed point. To do this incrementally, it preserves the stream of changes to recursive relations produced by the iterative fixed point computation, and adjusts this stream to account for the modified inputs. Thus, every element of the input stream yields a stream of adjustments to the fixed point computation, using nested streams.

Nested streams, or streams of streams, 𝒮𝒮A=(A), are well defined, since streams form an abelian group. Equivalently, a nested stream is a value in ×A, i.e., a matrix with an infinite number of rows, indexed by two-dimensional time (t0,t1). where each row is a stream. In §A.1 we show a few example nested stream computations.

Lifting a stream operator S:𝒮A𝒮B yields an operator over nested streams S:𝒮𝒮A𝒮𝒮B, such that (S)(s)=Ss, or, pointwise: (S(s))[t0][t1]=S(s[t0])[t1],t0,t1. In particular, a scalar function f:AB can be lifted twice to produce an operator between streams of streams: f:𝒮𝒮A𝒮𝒮B.

We define a partial order over timestamps: (i0,i1)(t0,t1) iff i0t0 and i1t1. We extend the definition of strictness for operators over nested streams: a stream operator F:𝒮𝒮A𝒮𝒮B is strict if for any s,s𝒮𝒮A and all times t,i× we have i<t,s[i]=s[i] implies F(s)[t]=F(s)[t]. Proposition 2.10 holds for this notion of strictness, i.e., the fixed point operator fixα.F(α) is well defined for a strict operator F.

Proposition 6.1:

The operator z1:𝒮𝒮A𝒮𝒮A is strict.

The operator z1 on nested streams delays “rows” of the matrix, while z1 delays “columns”. (See examples in §A.1).

The operator on 𝒮𝒮A operates on rows of the matrix, treating each row as a single value. Lifting a stream operator computing on 𝒮A, such as :𝒮A𝒮A, also produces an operator on nested streams, but this time computing on the columns of the matrix :𝒮𝒮A𝒮𝒮A.

Proposition 6.2 (Lifting cycles):

For a binary, causal T we have: (λs.fixα.T(s,z1(α)))=λs.fixα.(T)(s,(z1)(α)) i.e., lifting a circuit containing a “cycle” can be accomplished by lifting all operators independently, including the z1 back-edge.

This means that lifting a DBSP stream function can be expressed within DBSP itself. For example, we have:

io i+oz1

This proposition gives the ability to lift entire circuits, including circuits computing on streams and having feedback edges, which are well-defined, due to Proposition 6.1. With this machinery we can now apply Algorithm 4.8 to arbitrary circuits, even circuits built for recursively-defined relations. Consider the “semi-naive” circuit (5.1), and denote 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡R with T:

Iδ0(T)ΔOz1

Lift the entire circuit using Proposition 6.2 and incrementalize it:

Iδ0(T)Δ𝒟Oz1

Now apply the chain rule to this circuit:

(6.1) Iδ0((T)Δ)ΔOz1

This is the incremental version of an arbitrary recursive query.

6.1. Example

In this section we derive the incremental version of a circuit containing recursion, by applying Algorithm 4.8. We start with a very simple program, expressed in Datalog, which computes the transitive closure of a directed graph:

// Edge relation with head and tail
input relation E(h: Node, t: Node)
// Reach relation with source s and sink t
output relation R(s: Node, t: Node)
R(x, x) :- E(x, _).
R(x, x) :- E(_, x).
R(x, y) :- E(x, y).
R(x, y) :- E(x, z), R(z, y).

We haven’t explained how Datalog is translated to circuits, but most Datalog operators are relational in nature. Assuming one could write recursive queries in SQL where a view is defined in terms of itself, the above program would be implemented by the following (illegal) SQL query:

CREATE VIEW R AS
(SELECT E.h, E.h FROM E)
UNION
(SELECT E.t, E.t FROM E)
UNION
(SELECT * FROM E)
UNION
(SELECT E.h, R.t
FROM E JOIN R
ON E.t = R.s)

We apply the algorithm from §5.1 to create first the non-recursive circuit, by assuming that R is already computed as a view R1, and using R1 in the definition of R instead of itself:

CREATE VIEW Reach AS
(SELECT E.h, E.h FROM E)
UNION
(SELECT E.t, E.t FROM E)
UNION
(SELECT * FROM E)
UNION
(SELECT E.h, R1.t
FROM E JOIN R ON E.t = R1.s)

Now we implement this query as a DBSP circuit with two inputs E and R1:

ER1t=sπh,tπhσh,hπtσt,t+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡R

Now lift the circuit by lifting each operator pointwise, and connect it in a feedback loop by connecting input R1 from the output R through a z1operator and bracket everything with δ0:

Eδ0t=sπh,tπhσh,hπtσt,t+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡𝒟Rz1

The above circuit is a complete implementation of the non-streaming recursive query; given an input relation E it will produce its transitive closure R at the output.

Now we use the seminaïve property 5.1 to rewrite the circuit:

(To save space in the figures we will omit the indices from π and σ in the subsequent figures, for example by writing just π instead of πh.)

Eδ0()Δ(π)Δ(π)Δ(σ)Δ(π)Δ(σ)Δ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔRz1

Using the linearity of π and σ, this can be rewritten as an equivalent circuit:

Eδ0()Δππσπσ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔRz1

To make this circuit into a streaming computation that evaluates a new transitive closure for a stream of inputs E, we lift it entirely, using Proposition 6.2:

Eδ0()Δππσπσ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)ΔRz1

We convert this circuit into an incremental circuit, which receives in each transaction the changes to relation E and produces the corresponding changes to relation R:

ΔEδ0()Δππσπσ+(𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ𝒟ΔRz1

We can now apply again the chain rule to this circuit:

ΔE(δ0)Δ(()Δ)Δ(π)Δ(π)Δ(σ)Δ(π)Δ(σ)Δ+((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δ()ΔΔR(z1)Δ

We now take advantage of the linearity of δ0, , z1, π, and σ to simplify the circuit by removing some Δ invocations:

ΔEδ0(()Δ)Δππσπσ+((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)ΔΔRz1

There are two applications of Δ left in this circuit: (()Δ)Δ and ((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δ. We expand their implementations separately, and we stitch them into the global circuit at the end. This ability to reason about sub-circuits independently highlights the modularity of DBSP.

The join is expanded twice, using the bilinearity of and . Let’s start with the inner circuit, implementing ()Δ, given by Theorem 3.4:

ab()Δo abz1z1+o

Now we lift and incrementalize to get the circuit for (()Δ)Δ:

abz1z1+𝒟o

Applying the chain rule and the linearity of and z1 this becomes:

a()Δbz1z1()Δ()Δ+o

We now have three applications of ()Δ. Each of these is the incremental form of a bilinear operator, so it looks like in the end we will have 3×3 applications of . In fact, the overall expression can be simplified (see (tr, ) for a precise derivation), and the end result only has 4 terms in .

Here is the final form of the expanded join circuit:

abz1z1z1z1+o

Returning to ((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δ, we can compute its circuit by expanding once using Proposition 4.7:

i((𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡)Δ)Δo iz1H𝒟o

Finally, stitching all these pieces together we get the final circuit shown in Figure 2.

ΔEδ0z1z1z1z1+ππσπσ+z1H𝒟ΔRz1
Figure 2. Final form of circuit from §6.1.

6.2. Complexity of incremental recursive queries

Time complexity

The time complexity of an incremental recursive query can be estimated as a product of the number of fixed point iterations and the complexity of each iteration. The incrementalized circuit (6.1) performs the same number of iterations as the non-incremental circuit (5.1) in the worst case: once the non-incremental circuit reaches the fixed point, its output is constant and so is its derivative computed by the incrementalized circuit.

Consider a nested stream of changes s𝒮𝒮A,s[t1][t2], where t1 is the input timestamp and t2 is the fixed point iteration number. The unoptimized loop body ((T)Δ)Δ=𝒟𝒟T has the same time complexity as T applied to the aggregated input of size R(s)[t1][t2]=def()(s)[t1][t2]=(i1,i2)(t1,t2)s[i1][i2]. As before, an optimized circuit can be significantly more efficient. For instance, by applying Theorem 3.4 twice, to and , we obtain a circuit for nested incremental join s1(()Δ)Δs2 that runs in O((s1)[t1][t2]×(s2)[t1][t2])O(R(s1)×R(s2)) (because each term is correspondingly smaller).

Space complexity

Integration () and differentiation (𝒟) of a stream s𝒮𝒮A uses memory proportional to t2t1s[t1][t2], i.e., the total size of changes aggregated over columns of the matrix. The unoptimized circuit integrates and differentiates respectively inputs and outputs of the recursive program fragment. As we move and 𝒟 inside the circuit using the chain rule, we additionally store changes to intermediate streams. Effectively we cache results of fixed point iterations from earlier timestamps to update them efficiently as new input changes arrive. Notice that space is proportional to the number of iterations of the inner while loop.

7. Extensions

The DBSP language can express a richer class of streaming computations (both incremental and non-incremental) than those covered so far. In this section we give several examples.

7.1. Multisets and bags

In §4 we have shown how to implement the relational algebra on sets. Some SQL queries however produce multisets, e.g., UNION ALL. Since -sets generalize multisets and bags, it is easy to implement query operators that compute on such structures. For example, while SQL UNION is -set addition followed by 𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡, UNION ALL is just -set addition.

7.2. Aggregation

Aggregation in SQL applies a function a to a whole set producing a “scalar” result with some type R: a:2AR. We convert such aggregation functions to operate on -sets, so in DBSP an aggregation function has a signature a:[A]R. Correctness of the implementation is defined as in §4.2.1.

The SQL COUNT aggregation function is implemented on -sets by aCOUNT:[A], which computes a sum of all the element weights: aCOUNT(s)=xss[x]. The SQL SUM aggregation function is implemented on -sets by aSUM:[] which performs a weighted sum of all (real) values: aSUM(s)=xsx×s[x].

With this definition the aggregation functions aCOUNT and aSUM are in fact linear transformations between the group [A] and the result group (, and respectively).

If the output of the DBSP circuit can be such a “scalar” value, then aggregation with a linear function is simply function application, and thus it is automatically incremental. However, in general, for composing multiple queries we require the result of an aggregation to be a singleton -set (containing a single value), and not a scalar value. In this case the aggregation function is implemented in DBSP as the composition of the actual aggregation and the makeset:A[A] function, which converts a scalar value of type A to a singleton -set, defined as follows: makeset(x)=def1x.

In conclusion, the following SQL query: SELECT SUM(c) FROM I is implemented as the following circuit:

IπCaSUMmakesetO

The lifted incremental version of this circuit is interesting: since π and aSUM are linear, they are equivalent to their own incremental versions. Although (makeset)Δ=𝒟makeset cannot be simplified, it is nevertheless efficient, doing only O(1) work per invocation, since its input and output are singleton values.

An aggregation function such as AVG can be written as the composition of a more complex linear function that computes a pair of values using SUM and COUNT, followed by a makeset and a selection operation that divides the two columns.

SELECT AVG(c) FROM I
IπC(aSUM,aCOUNT)makesetσ/O

Finally, some aggregate functions, such as MIN, are not incremental in general, since for handling deletions they may need to know the full set, and not just its changes. The lifted incremental version of such aggregate functions is implemented essentially by “brute force”, using the formula (aMIN)Δ=𝒟aMIN. Such functions perform work proportional to R(s) at each invocation.

Note that the SQL ORDER BY directive can be modeled as a non-linear aggregate function that emits a list. However, such an implementation it is not efficiently incrementalizable in DBSP. We leave the efficient handling of ORDER BY to future work.

Even when aggregation results do not form a group, they usually form a structure with a zero element. We expect that a well-defined aggregation function maps empty -sets to zeros in the target domain.

7.3. Grouping; indexed relations

Pick an arbitrary set K of “key values.” Consider the mathematical structure of finite maps from K to -sets over some other domain A: K[A]=[A][K]. We call values i of this structure indexed -sets: for each key kK, i[k] is a -set. Because the codomain [A] is an abelian group, this structure is itself an abelian group.

We use this structure to model the SQL GROUP BY operator in DBSP. Consider a partitioning function p:AK that assigns a key to any value in A. We define the grouping function Gp:[A](K[A]) as Gp(a)[k]=defxa.p(x)=ka[x]x. When applied to a -set a this function returns a indexed -set, where each element is called a grouping333We use “group” for the algebraic structure and “grouping” for the result of GROUP BY.: for each key k a grouping is a -set containing all elements of a that map to k (as in SQL, groupings are multisets, represented by -sets). Consider our example -set R from §4, and a key function p(s) that returns the first letter of the string s. Then we have that Gp(R)={j{joe1},a{anne1}}, i.e., grouping with this key function produces an indexed -set with two groupings, each of which contains a -set with one element.

The grouping function Gp is linear for any p. It follows that the group-by implementation in DBSP is automatically incremental: given some changes to the input relation we can apply the partitioning function to each change separately to compute how each grouping changes.

7.4. GROUP BY-AGGREGATE

Grouping in SQL is almost always followed by aggregation. Let us consider an aggregation function a:(K×[A])B that produces values in some group B, and an indexed relation of type [A][K], as defined above in §7.3. The nested relation aggregation operator Agga:[A][K]B applies a to the contents of each grouping independently and adds the results: Agga(g)=defkKa(k,g[k]). To apply this to our example, let us compute the equivalent of GROUP-BY count; we use the following aggregation function count:K×[A], count(k,s)=makeset((k,aCOUNT(s))), using the -set counting function aCOUNT from §7.2; the notation (a,b) is a pair of values a and b. Then we have Aggcount(Gp(R))={(j,1)1,(a,1)1}.

Notice that, unlike SQL, DBSP can express naturally computations on indexed -sets, they are just an instance of a group structure. One can even implement queries that operate on each grouping in an indexed -set. However, our definition of incremental computation is only concerned with incrementality in the outermost structures. We leave it to future work to explore an appropriate definition of incremental computation that operates on the inner relations.

A very useful operation on nested relations is flatmap, which is essentially the inverse of partitioning, converting an indexed -set into a -set: flatmap:[A][K][A×K]. flatmap is in fact a particular instance of aggregation, using the aggregation function a:K×[A][A×K] defined by a(k,s)=xs[k]s[k][x](k,x). For our previous example, flatmap(Gp(R))={(j,joe)1,(a,anne)1}.

If we use an aggregation function a:K×Z[A] that is linear in its second argument, then the aggregation operator Agga is linear, and thus fully incremental. As a consequence, flatmap is linear. However, many practical aggregation functions for nested relations are in fact not linear; an example is the count function above, which is not linear since it uses the makeset non-linear function. Nevertheless, while the incremental evaluation of such functions is not fully incremental, it is at least partly incremental: when applying a change to groupings, the aggregation function only needs to be re-evaluated for groupings that have changed.

7.5. Antijoin

Antijoins arise in the implementation of Datalog programs with stratified negation. Consider the following program:

O(v, z) :- I1(v, z), not I2(v).

The semantics of such a rule is defined in terms of joins and set difference. This rule is equivalent with the following pair of rules:

C(v, z) :- I1(v, z), I2(v).
O(v, z) :- I1(v, z), not C(v, z).

This transformation reduces an antijoin to a join followed by a set difference. This produces the following DBSP circuit:

I1I2+𝑑𝑖𝑠𝑡𝑖𝑛𝑐𝑡O

7.6. Streaming joins

Consider a binary query T(s,t)=(s)t. This is the relation-to-stream join operator supported by streaming databases like ksqlDB (jafarpour-edbt19, ). Stream s carries changes to a relation, while t carries arbitrary data, e.g., logs or telemetry data points. T discards values from t after matching them against the accumulated contents of the relation.

7.6.1. Streaming Window queries

Streaming databases often organize the contents of streams into windows, which store a subset of data points with a predefined range of timestamps. The circuit below (a convolution filter in DSP) computes a fixed-size sliding-window aggregate over the last four timestamps defined by the Ti functions.

sT0z1T1z1T2z1o

In practice, windowing is usually based on physical timestamps attached to stream values rather than logical time. For instance, the CQL (arasu-tr02, ) query “SELECT * FROM events [RANGE 1 hour]” returns all events received within the last hour. The corresponding circuit (on the left) takes input stream s𝒮[A] and an additional input θ𝒮 that carries the value of the current time.

sθIWo sθ+Woz1

where the window operator W prunes input -sets, only keeping values with timestamps less than an hour behind θ[t]. Assuming ts:A returns the physical timestamp of a value, W is defined as W(v,θ)[t]=def{xv[t].ts(x)θ[t]1hr}. Assuming θ increases monotonically, W can be moved inside integration, resulting in the circuit on the right, which uses bounded memory to compute a window of an unbounded stream. This circuit is a building block of a large family of window queries, including window joins and aggregation. We conjecture that DBSP can express any CQL query.

7.7. Relational while queries

(See also non-monotonic semantics for Datalog¬ and Datalog¬¬(Abiteboul-book95, ).) To illustrate the power of DBSP we implement the following “while” program, where Q is an arbitrary relational algebra query:

x := i;
while (x changes)
x := Q(x);

The DBSP implementation of this program is:

iδ0+Q𝒟xz1

This circuit can be converted to a streaming circuit that computes a stream of values i by lifting it; it can be incrementalized using Algorithm 4.8 to compute on changes of i:

Δiδ0+(Q)Δ𝒟Δxz1

Note that at runtime the execution of this circuit is not guaranteed to terminate; however, if the circuit does terminate, it will produce the correct output, i.e., the least fixpoint of Q that includes i.

8. Implementation

We are prototyping an implementation of DBSP as part of an open-source project with an MIT license: https://github.com/vmware/database-stream-processor. The implementation is written in Rust. The implementation consists of a library and a runtime. The library provides APIs for basic algebraic data types: such as groups, finite maps, -set, indexed -set. A separate circuit construction API allows users to create DBSP circuits by placing operator nodes (corresponding to boxes in our diagrams) and connecting them with streams, which correspond to the arrows in our diagrams. The library provides pre-built generic operators for integration, differentiation, delay, nested integration and differentiation. The -set library provides functions for computing the basic -set operations corresponding to plus, negation, grouping, joining, aggregation.

For iterative computations we provide the δ0 operator and an operator that approximates by terminating iteration of a loop at a user-specified condition (usually the condition is the requirement for a zero to appear in a specified stream). The low level library allows users to construct incremental circuits manually. Our plan is to add a higher-level library (a compiler) which will automatically incrementalize a circuit.

9. Related work

DBSP using non-nested streams is a simplified instance of a Kahn network (kahn-ifip74, ). Johnson (johnson-phd83, ) studies a very similar computational model without nested streams and its expressiveness. The implementation of such streaming models of computation and their relationship to dataflow machines has been studied by Lee (lee-ieee95, ). Lee (lee-ifip93, ) also introduced streams of streams and the z1 operator.

In §7 we have discussed the connections with window and stream database queries (arasu-tr02, ; aurora, ).

Incremental view maintenance (e.g. (gupta-idb93, )) is surveyed in (gupta-idb95, ); a large bibliography is present in (motik-ai19, ). Its most formal aspect is propagating “deltas” through algebraic expressions: Q(R+ΔR)=Q(R)+ΔQ(R,ΔR). This work eventually crystallized in (koch-pods16, ). DBSP incrementalization is both more modular and more fine-grain since it deals with streams of updates. Both (koch-pods10, ) and (green-tcs11, ) use -sets to uniformly model insertions/deletions.

Picallo et al. (picallo-scop19, ) provide a general solution to IVM for rich languages. DBSP requires a group structure on the values operated on; this assumption has two major practical benefits: it simplifies the mathematics considerably (e.g., Picallo uses monoid actions to model changes), and it provides a general, simple algorithm (4.8) for incrementalizing arbitrary programs. The downside of DBSP is that one has to find a suitable group structure (e.g., -sets for sets) to “embed” the computation. Picallo’s notion of “derivative” is not unique: they need creativity to choose the right derivative definition, we need creativity to find the right group structure.

Many heuristic algorithms were published for Datalog-like languages, e.g., counting based approaches (Dewan-iis92, ; motik-aaai15, ) that maintain the number of derivations, DRed (gupta-sigmod93, ) and its variants (Ceri-VLDB91, ; Wolfson-sigmod91, ; Staudt-vldb96, ; Kotowski-rr11, ; Lu-sigmod95, ; Apt-sigmod87, ), the backward-forward algorithm and variants (motik-aaai15, ; Harrison-wdd92, ; motik-ai19, ). DBSP is more general than these approaches. Interestingly, the -sets multiplicities in our relational implementation are related to the counting-number-of-derivations approaches.

DBSP is tightly related to Differential Dataflow (DD) (mcsherry-cidr13, ; murray-sosp13, ) and its theoretical foundations (abadi-fossacs15, ) (and recently (mchserry-vldb20, ; chothia-vldb16, )). All DBSP operators are based on DD operators. DD’s computational model is more powerful than DBSP, since it allows past values in a stream to be ”updated”. In contrast, our model assumes that the inputs of a computation arrive in the time order while allowing for nested time domains via the modular lifting transformer. However, DBSP can express both incremental and non-incremental computations; in essence DBSP is “deconstructing” DD into simple component building blocks; the core Proposition 3.2 and the Algorithm based on it 4.8 are new contributions.

10. Conclusions

In this paper we have introduced DBSP, a model of computation based on infinite streams over abelian groups. In this model streams are used to model consecutive snapshots of a database, consecutive changes (or transactions) applied to a database, and consecutive values of loop-carried variables.

We have defined an abstract notion of incremental computation over streams, and defined the incrementalization operator Δ, which transforms a stream computation Q into its incremental version QΔ. The incrementalization operator has some very nice algebraic properties, which can generate very efficient incremental implementations for linear and bilinear computations.

We have then applied these tools to two domains: relational queries and recursive stratified queries. This gave us a general algorithm for incrementalizing an arbitrary query, including recursive queries. However, we believe that both the incrementalization algorithm and DBSP are even more powerful and can apply to even richer classes of query languages, including languages operating on nested relations and streaming query languages.

References

  • (1) The Aurora project. http://cs.brown.edu/research/aurora/, December 2004.
  • (2) Martín Abadi, Frank McSherry, and Gordon Plotkin. Foundations of differential dataflow. In Foundations of Software Science and Computation Structures (FoSSaCS), London, UK, April 11-18 2015. URL: http://homepages.inf.ed.ac.uk/gdp/publications/differentialweb.pdf.
  • (3) Serge Abiteboul, Richard Hull, and Victor Vianu. Foundations of Databases. Addison-Wesley, 1995. URL: http://webdam.inria.fr/Alice/.
  • (4) Mario Alvarez-Picallo, Alex Eyers-Taylor, Michael Peyton Jones, and C.-H. Luke Ong. Fixing incremental computation. In European Symposium on Programming Languages and Systems (ESOP), pages 525–552, Prague, Czech Republic, April 6–11 2019. URL: https://link.springer.com/chapter/10.1007/978-3-030-17184-1_19.
  • (5) Krzysztof R. Apt and Jean-Marc Pugin. Maintenance of stratified databases viewed as a belief revision system. In Moshe Y. Vardi, editor, ACM SIGMOD International conference on Management of data (SIGMOD), pages 136–145, San Diego, California, March 23-25 1987. doi:10.1145/28659.28674.
  • (6) Arvind Arasu, Shivnath Babu, and Jennifer Widom. An abstract semantics and concrete language for continuous queries over streams and relations. Technical Report 2002-57, Stanford InfoLab, 2002. URL: http://ilpubs.stanford.edu:8090/563/.
  • (7) Mihai Budiu, Frank McSherry, Leonid Ryzhyk, and Val Tannen. DBSP: A language for expressing incremental view maintenance for rich query languages. https://github.com/vmware/database-stream-processor/blob/main/doc/spec.pdf. December 2021.
  • (8) Stefano Ceri and Jennifer Widom. Deriving production rules for incremental view maintenance. In Guy M. Lohman, Amílcar Sernadas, and Rafael Camps, editors, International Conference of Very Large Data Bases (VLDB), pages 577–589, Barcelona, Spain, 1991. URL: http://www.vldb.org/conf/1991/P577.PDF.
  • (9) Zaheer Chothia, John Liagouris, Frank McSherry, and Timothy Roscoe. Explaining outputs in modern data analytics. Proc. VLDB Endow., 9(12):1137–1148, August 2016. URL: https://doi.org/10.14778/2994509.2994530.
  • (10) Hasanat M. Dewan, David Ohsie, Salvatore J. Stolfo, Ouri Wolfson, and Sushil Da Silva. Incremental database rule processing in PARADISER. J. Intell. Inf. Syst., 1(2):177–209, 1992. doi:10.1007/BF00962282.
  • (11) Sergio Greco and Cristian Molinaro. Datalog and logic databases. Synthesis Lectures on Data Management, 7(2):1–169, 2015. URL: https://doi.org/10.2200/S00648ED1V01Y201505DTM041.
  • (12) Todd J Green, Zachary G Ives, and Val Tannen. Reconcilable differences. Theory of Computing Systems, 49(2):460–488, 2011. URL: https://web.cs.ucdavis.edu/~green/papers/tocs11_differences.pdf.
  • (13) Todd J. Green, Grigoris Karvounarakis, and Val Tannen. Provenance semirings. In Symposium on Principles of Database Systems (PODS), page 31–40, Beijing, China, June 11-14 2007. URL: https://doi.org/10.1145/1265530.1265535.
  • (14) Ashish Gupta, Inderpal Singh Mumick, et al. Maintenance of materialized views: Problems, techniques, and applications. IEEE Data Eng. Bull., 18(2):3–18, 1995.
  • (15) Ashish Gupta, Inderpal Singh Mumick, and V. S. Subrahmanian. Maintaining views incrementally. In Proceedings of the 1993 ACM SIGMOD International Conference on Management of Data, SIGMOD ’93, page 157–166, Washington, D.C., USA, 1993. URL: https://doi.org/10.1145/170035.170066.
  • (16) Ashish Gupta, Inderpal Singh Mumick, and V. S. Subrahmanian. Maintaining views incrementally. In ACM SIGMOD International conference on Management of data (SIGMOD), pages 157–166, Washington, DC, May 26-28 1993. ACM Press. doi:10.1145/170035.170066.
  • (17) John V. Harrison and Suzanne W. Dietrich. Maintenance of materialized views in a deductive database: An update propagation approach. In Kotagiri Ramamohanarao, James Harland, and Guozhu Dong, editors, Workshop on Deductive Databases, volume CITRI/TR-92-65 of Technical Report, pages 56–65, Washington, D.C., November 14 1992. Department of Computer Science, University of Melbourne.
  • (18) Hojjat Jafarpour, Rohan Desai, and Damian Guy. KSQL: Streaming SQL engine for Apache Kafka. In International Conference on Extending Database Technology (EDBT), pages 524–533, Lisbon, Portugal, March 26-29 2019. URL: http://openproceedings.org/2019/conf/edbt/EDBT19_paper_329.pdf.
  • (19) Steven Dexter Johnson. Synthesis of Digital Designs from Recursion Equations. PhD thesis, Indiana University, May 1983. https://help.luddy.indiana.edu/techreports/TRNNN.cgi?trnum=TR141.
  • (20) Gilles Kahn. The semantics of a simple language for parallel programming. In IFIP Congress on Information Processing, 1974. URL: http://www1.cs.columbia.edu/~sedwards/papers/kahn1974semantics.pdf.
  • (21) Christoph Koch. Incremental query evaluation in a ring of databases. In Symposium on Principles of Database Systems (PODS), page 87–98, Indianapolis, Indiana, USA, 2010. URL: https://doi.org/10.1145/1807085.1807100.
  • (22) Christoph Koch, Daniel Lupei, and Val Tannen. Incremental view maintenance for collection programming. In Symposium on Principles of Database Systems (PODS), page 75–90, San Francisco, California, USA, 2016. URL: https://doi.org/10.1145/2902251.2902286.
  • (23) Jakub Kotowski, François Bry, and Simon Brodt. Reasoning as axioms change - incremental view maintenance reconsidered. In Web Reasoning and Rule Systems RR, volume 6902 of Lecture Notes in Computer Science, pages 139–154, Galway, Ireland, August 29-30 2011. Springer. doi:10.1007/978-3-642-23580-1\_11.
  • (24) Edward A. Lee. Multidimensional streams rooted in dataflow. In IFIP Working Conference on Architectures and Compilation Techniques for Fine and Medium Grain Parallelism, Orlando, FL, January 20-22 1993. URL: https://ptolemy.berkeley.edu/publications/papers/93/mdsdf/.
  • (25) Edward A. Lee and Thomas M. Parks. Dataflow process networks. Proceedings of the IEEE, pages 773–801, May 1995. URL: https://ptolemy.berkeley.edu/publications/papers/95/processNets/.
  • (26) James J. Lu, Guido Moerkotte, Joachim Schü, and V. S. Subrahmanian. Efficient maintenance of materialized mediated views. In ACM SIGMOD International conference on Management of data (SIGMOD), pages 340–351, San Jose, California, May 22-25 1995. doi:10.1145/223784.223850.
  • (27) Frank McSherry, Andrea Lattuada, Malte Schwarzkopf, and Timothy Roscoe. Shared arrangements: Practical inter-query sharing for streaming dataflows. Proc. VLDB Endow., 13(10):1793–1806, June 2020. URL: https://doi.org/10.14778/3401960.3401974.
  • (28) Frank McSherry, Derek Gordon Murray, Rebecca Isaacs, and Michael Isard. Differential dataflow. In Conference on Innovative Data Systems Research (CIDR), Asilomar, CA, January 6–9 2013. URL: http://cidrdb.org/cidr2013/Papers/CIDR13_Paper111.pdf.
  • (29) Boris Motik, Yavor Nenov, Robert Piro, and Ian Horrocks. Maintenance of Datalog materialisations revisited. Artif. Intell., 269:76–136, 2019. URL: https://doi.org/10.1016/j.artint.2018.12.004.
  • (30) Boris Motik, Yavor Nenov, Robert Edgar Felix Piro, and Ian Horrocks. Incremental update of Datalog¡ materialisation: the backward/forward algorithm. In Conference on Artificial Intelligence (AAAI), pages 1560–1568, Austin, Texas, January 25-30 2015. AAAI Press. URL: http://www.aaai.org/ocs/index.php/AAAI/AAAI15/paper/view/9660.
  • (31) Derek G. Murray, Frank McSherry, Rebecca Isaacs, Michael Isard, Paul Barham, and Martín Abadi. Naiad: A timely dataflow system. In ACM Symposium on Operating Systems Principles (SOSP), page 439–455, Farminton, Pennsylvania, 2013. URL: https://doi.org/10.1145/2517349.2522738.
  • (32) L. R. Rabiner and B. Gold, editors. Theory and Application of Digital Signal Processing. Prentice-Hall, 1975.
  • (33) Martin Staudt and Matthias Jarke. Incremental maintenance of externally materialized views. In International Conference of Very Large Data Bases (VLDB), pages 75–86, Mumbai (Bombay), India, September 3-6 1996. URL: http://www.vldb.org/conf/1996/P075.PDF.
  • (34) Ouri Wolfson, Hasanat M. Dewan, Salvatore J. Stolfo, and Yechiam Yemini. Incremental evaluation of rules and its relationship to parallelism. In ACM SIGMOD International conference on Management of data (SIGMOD), pages 78–87, Denver, Colorado, May 29-31 1991. ACM Press. doi:10.1145/115790.115799.

Appendix A Supporting material

A.1. Operations on nested streams

If a stream can be thought of as an infinite vector, a stream of streams can be thought of as an “matrix” with an infinite number of rows, where each row is a stream. For example, we can depict the nested stream i𝒮𝒮 defined by i[t0][t1]=t0+2t1 as: i = arg0 1 2 3 2 3 4 5 4 5 6 7 6 7 8 9 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

(t0 is the column index, and t1 is the row index). Let us perform some computations on nested streams to get used to them. Lifting twice a scalar function computes on elements of the matrix pointwise:

(↑↑(x ↦x mod2))(i) = arg0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

The operator on 𝒮𝒮A is well-defined: it operates on rows of the matrix, treating each row as a single value:

I(i) = arg0 1 2 3 2 4 6 8 6 9 12 15 12 16 20 24 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Lifting a stream operator computing on 𝒮A, such as :𝒮A𝒮A, also produces an operator on nested streams, but this time computing on the columns of the matrix: :𝒮𝒮A𝒮𝒮A.

(↑I)(i) = arg0 1 3 6 2 5 9 14 4 9 15 22 6 13 21 30 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Similarly, we can apply 𝒟 to nested streams 𝒟:𝒮𝒮A𝒮𝒮A, computing on rows of the matrix:

D(i) = arg0 1 2 3 2 2 2 2 2 2 2 2 2 2 2 2 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

while 𝒟:𝒮𝒮A𝒮𝒮A computes on the columns:

(↑D)(i) = arg0 1 1 1 2 1 1 1 4 1 1 1 6 1 1 1 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Similarly, z1 and its lifted variant have different outcomes:

z1(i) = arg0 0 0 0 0 1 2 3 2 3 4 5 4 5 6 7 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

Notice the following commutativity properties for integration and differentiation on nested streams: ()=() and 𝒟(𝒟)=(𝒟)𝒟.

(↑z1)(i) = arg0 0 1 2 0 2 3 4 0 4 5 6 0 6 7 8 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

z1 commutes with z1:

(↑z1)(z1(i)) = z1((↑z1)(i)) = arg0 0 0 0 0 0 1 2 0 2 3 4 0 4 5 6 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

D_𝒮𝒮(i) = (D(↑D))(i) = arg0 1 1 1 2 0 0 0 2 0 0 0 2 0 0 0 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]

I_𝒮𝒮(i) = ((↑I)(I))(i)= arg0 1 3 6 2 6 12 20 6 15 27 42 12 28 48 72 [[arg[1]arg[2]arg[3]arg[4]][arg[5]arg[6]arg[7]arg[8]][arg[9]arg[10]arg[11]arg[12]][arg[13]arg[14]arg[15]arg[16]]]