Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions 2018/state/fs-state-formalism.tex
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,44 @@
\subsection{Reference streaming model}

\begin{definition}{Reference stream processing system}
is a tuple of $(\Gamma,D)$. $\Gamma$ is a set of all possible data flow elements. $D\subseteq{\Gamma\times\Gamma}$ is a binary relation. Pair $(x,y)\in{D}$ if $y$ can be generated from $x$ within user-defined operations, that is commonly expressed in a form of a logical graph. We assume that user-defined operations are pure.
is a tuple of $(\Gamma,D)$. Where $\Gamma$ is a set of all possible data flow elements and $D\subseteq{\Gamma\times\Gamma}$ is a binary relation on it. Pair $(x,y)$ is in ${D}$ if $y$ can be generated from $x$ within user-defined operations.
\end{definition}

Within our model, one can define streaming system using only data flow elements and business logic. However, without the notion of time, we cannot observe any processing progress. Let $\tau\in{\mathbb{N}}$ be an exact global discrete time. Let $a_\tau\in{\Gamma}$ be the element, which enters at the time $\tau$, and $b_\tau\in{2^\Gamma}$ be the elements, which leave at the time $\tau$. Let $A_{\tau}=\bigcup\limits_{i=1}^{\tau}{a_i}$ be a set of all input elements by the time $\tau$ and ${B}_\tau=\bigcup\limits_{i=1}^{\tau}{b_i}$ be a set of all output elements.
Basically $D$ captures the notion of a logical graph which vertices are pure functions.

Within our model, one can define streaming system using only data flow elements and business logic. However, without the notion of time, we cannot observe any processing progress. Let $\tau\in{\mathbb{N}}$ be an exact global discrete time. Let $a_\tau\in{\Gamma}$ be the element, which enters at the time $\tau$, and $b_\tau\in{2^\Gamma}$ be the elements, which leave at the time $\tau$. Let $A_{\tau}=\{a_i\}_{i=1}^{\tau}$ be a set of all input elements by the time $\tau$ and ${B}_\tau=\bigcup\limits_{i=1}^{\tau}{b_i}$ be a set of all output elements.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a_i - элемент множества. Объединять элементы не кошено. Объединяют множества.


\begin{definition}{A reference working set}
$\widehat{W}_\tau\subseteq{\Gamma}$ at the time $\tau$ is the set of elements, which are currently in the reference system:

$\widehat{W}_0=\emptyset$:

$\widehat{W}_{\tau+1}=\begin{sqcases}
\widehat{W}_{\tau}\cup{a_{\tau+1}}, & \text{or}\\
\widehat{W}_{\tau}\cup{\{a_{\tau+1}\}}, & \text{or}\\
\widehat{W}_{\tau}\setminus{b_{\tau+1}}, & \text{or}\\
\widehat{W}_{\tau}\setminus{X}\cup{Y}, \forall{x\in{X}\exists{y\in{Y}}}:(x,y)\in{D} & \text{}.
\end{sqcases}$

\end{definition}

We can imagine a stream processing system as a pool, where some elements are poured in and others are poured out. Inside a pool, each element can be substituted by the other element, which can be substituted as well, and so on. Only survived elements are poured out from the pool.
We can imagine a stream processing system as a pool, where some elements are poured in and others are poured out. Inside a pool, each element can be substituted by the other element accroding to logical graph, which can be substituted as well, and so on. Only survived elements are poured out from the pool.

\begin{definition}{System state}
$S_\tau$ at the time $\tau$ is $\widehat{W}_\tau^{\infty}$ if $A_{\infty}=\bigcup\limits_{i=1}^{\tau}{a_i}$.
$S_\tau$ at the time $\tau$ is $\widehat{W}_\tau^{\infty}$ if $A_{\infty}=\{a_i\}_{i=1}^{\tau}$.
\end{definition}

System state essescially is everything that stays in the system if we block the input and wait for an infinite time.

\begin{definition}{Nullification time}
of an input element $a_\tau$ is the time $\theta_{a_\tau}=inf(\hat{\tau}>\tau|W_{\hat{\tau}}\setminus{S_{\hat{\tau}}}\cap{Cl(D)(a_\tau)=\emptyset})$, where $Cl(D)$ is a transitive closure of the relation $D$.
of an input element $a_\tau$ is the first time $\theta_{a_tau}$ when all elements that are produced from the $a_\tau$ are in state or are released from the system. $\theta_{a_\tau}=inf(\hat{\tau}>\tau|W_{\hat{\tau}}\setminus{S_{\hat{\tau}}}\cap{Cl(D)(a_\tau)=\emptyset})$, where $Cl(D)$ is a transitive closure of the relation $D$.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Убойная формула, расписал словами ее перед тем как формулу давать

\end{definition}

Technically, states in existing stream processing systems are not data flow elements, but as it was mentioned above, they can be presented in such a way using drifting state model. The main purpose of the state is to accumulate the information about input items. Data flow elements cannot be in $W\setminus{S}$ for an infinite time by the definition. Hence, for each input element $a_\tau$, there is a nullification time $\theta_{a_\tau}$, thereafter all elements, which depend on $a_\tau$, are in the system state. Since the nullification time, the input element can affect output elements only through the state.
Technically, states in existing stream processing systems are not data flow elements, but as it was mentioned above, they can be presented in such a way using drifting state model. The main purpose of the state is to accumulate the information about input items. Data flow elements cannot stay in $W\setminus{S}$ for an infinite time by the definition. Hence, for each input element $a_\tau$, there is a nullification time $\theta_{a_\tau}$, thereafter all elements, which depend on $a_\tau$, are in the system state. Since the nullification time, the input element can affect output elements only through the state.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что это за определение такое, что элементы не могут вечно сидеть в системе? По-хорошему, надо тогда вводить свойство прогресса в самом начале. А-ля "для любого a_i существует момент времени, когда он покинет систему"


In our model internal transitions cannot be observed directely from outside the system. Moreover, it is not important the exact order and value of internal elements, only the output elements are essensial for reasoning on consistency guaranties. Therefore it is convinient to limit global discreete time only to the moments when output element are released.

\begin{definition}{Time quantization}
$\tau(t)$ is a discrete time such that $\tau(t)$ determines output elements $\forall{t}\exists{b_{\tau(t)}}$.
$\tau(t)$ is a discrete time such that $\tau(t)$ determines output elements $\exists{b_{\tau(t)}}$.
\end{definition}

\begin{definition}{Probability of output element in a reference system}
Expand Down Expand Up @@ -317,4 +323,4 @@ \subsubsection{Spark streaming}
% A non-deterministic system provides for at most once guarantee only if $\forall{b_{t_o}}:\exists{P^{c}_{t_o}} \wedge \forall{a\in{Cl^{-1}(b_{t_o})}},\theta_a=t_o$.
% \end{theorem}

% Hence, non-deterministic systems that use state snapshots must atomically output elements and take a consistent snapshot that contains their inverted dependencies in order to achieve exactly once. In practice, it means that the lower bound of latency in the worst case in such systems is the time between snapshotting together with the duration of taking a snapshot. There is a trade-off between latency and the frequency of taking snapshots because too frequent snapshotting can cause high extra load, while rare snapshots lead to high latency. We can observe such behavior in all stream processing systems that provide for exactly once and use state snapshots, e.g., in Flink atomicity between state snapshotting and elements releasing is preserved using the modification of 2PC protocol. On the other hand, if a system is deterministic, atomicity between output and snapshotting is not necessary, because in case of replay system releases exactly the same output, that can be somehow deduplicated.
% Hence, non-deterministic systems that use state snapshots must atomically output elements and take a consistent snapshot that contains their inverted dependencies in order to achieve exactly once. In practice, it means that the lower bound of latency in the worst case in such systems is the time between snapshotting together with the duration of taking a snapshot. There is a trade-off between latency and the frequency of taking snapshots because too frequent snapshotting can cause high extra load, while rare snapshots lead to high latency. We can observe such behavior in all stream processing systems that provide for exactly once and use state snapshots, e.g., in Flink atomicity between state snapshotting and elements releasing is preserved using the modification of 2PC protocol. On the other hand, if a system is deterministic, atomicity between output and snapshotting is not necessary, because in case of replay system releases exactly the same output, that can be somehow deduplicated.
2 changes: 1 addition & 1 deletion 2018/state/fs-state-intro.tex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

% Currently, distributed stream processing is a hot area of research that has a lot of practical applications in IoT, short-term personalization, fraud detection, etc. Unlike batch processing systems like Google's MapReduce~\cite{Dean:2008:MSD:1327452.1327492}, Apache Hadoop~\cite{hadoop2009hadoop}, and Apache Spark~\cite{Zaharia:2016:ASU:3013530.2934664}, streaming systems provide for low latency, early termination, retrieval of approximate results, etc~\cite{Doulkeridis:2014:SLA:2628707.2628782}. Despite the fact that stream processing is mostly used in the fields, where interactive or near real-time processing is required, popular kappa architecture pattern postulates that stream processing fits for any data processing scenario~\cite{lin2017lambda}.

% However, modern stream processing is hardly can be claimed as silver bullet. Most of batch processing systems assure consistent and deterministic results even in case of failures. In state-of-the-art stream processing systems~\cite{carbone2015apache, apache:storm, Zaharia:2016:ASU:3013530.2934664} a contract with end-user regarding ~{\em which data} will be eventually processed and released in case of failures is usually described in terms of so-called~{\em delivery guarantees}. These guarantees include {\em at most once}, {\em at least once}, and {\em exactly once}. Unfortunately, these notions are rather intuitive than formal. {\it At most once} states that each input event is processed once or not processed at all. {\it At least once} guarantees that each input item is processed, but possibly multiple times. {\it Exactly once} assures that each input event is processed exactly one time. Only exactly once guarantees the same level of data consistency as batch processing. These definitions are seemingly simple, but have several main pitfalls:
% However, modern stream processing is hardly can be claimed as silver bullet. Most of batch processing systems assure consistent and deterministic results even in case of failures. In state-of-the-art stream processing systems~\cite{carbone2015apache, apache:storm, Zaharia:2016:ASU:3013530.2934664} a contract with end-user regarding~{\em which data} will be eventually processed and released in case of failures is usually described in terms of so-called~{\em delivery guarantees}. These guarantees include {\em at most once}, {\em at least once}, and {\em exactly once}. Unfortunately, these notions are rather intuitive than formal. {\it At most once} states that each input event is processed once or not processed at all. {\it At least once} guarantees that each input item is processed, but possibly multiple times. {\it Exactly once} assures that each input event is processed exactly one time. Only exactly once guarantees the same level of data consistency as batch processing. These definitions are seemingly simple, but have several main pitfalls:

% \begin{itemize}
% \item Any stream processing system has a {\em state}. Each output item depends not only on the corresponding input item but also on the system state. This fact implies that a system that {\em technically} supports at most once or exactly once delivery guarantee, in practice can release completely invalid results, because of inconsistencies in the state.
Expand Down
8 changes: 4 additions & 4 deletions 2018/state/fs-state-preliminaries.tex
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
In this section, we remind main concepts of stream processing, that we use further in this paper. Basically, a distributed stream processing system is a shared-nothing distributed runtime, that can handle a potentially unlimited sequence of input items. Each item can be transformed several times before the final result is released from the system. Elements can be processed one-by-one or grouped into small input sets, usually called {\em micro-batches}.

An element has been {\em entered} if the system is aware of this element since that moment and takes some kind of responsibility for its processing. This concept can be implemented in a different way in different systems. For example, in Flink the fact that the element has been entered means that the element has arrived at {\em Source} vertex. In Spark Streaming, element enters, when it is read or received by an input agent also called {\em Source}.
An element has been {\em entered} the system if the system is aware of it and takes some kind of responsibility for its processing. This concept can be implemented in a different way in different systems. For example, in Flink the fact that the element has been entered means that the element has arrived at {\em Source} vertex. In Spark Streaming, element enters, when it is read or received by an input agent also called {\em Source}.

An element has {\em left} the system if an element has been released to a consumer. Since that time system cannot observe it anymore. This concept can also be implemented differently in various systems. For instance, in Spark Streaming element leaves when it is pushed to output operation, .e.g., written to HDFS or file. In Flink element leaves when it leaves from {\em Sink} vertex.
An element has {\em left} the system if an element has been released to a consumer. Since that moment system cannot observe it anymore. This concept can also be mapped differently onto various systems. For instance, in Spark Streaming element leaves when it is pushed to output operation, .e.g., written to HDFS or file. In Flink element leaves when it leaves from {\em Sink} vertex.

It is important to note that input and output elements cannot be directly matched due to a possibility of complex transformations within the system. For instance, a single input element can be transformed into multiple ones. After the split, resulting elements are able to be processed in completely different ways and even influence each other. Hence, in general, it is impossible to determine input element by an output.
It is important to note that input and output elements cannot be directly matched to each other due to a possibility of complex transformations within the system. For instance, a single input element can be transformed into multiple ones. After the split, resulting elements are able to be processed in completely different ways and even influence each other. Hence, in general, it is impossible to determine which input element triggers an output one.

The way how a system transforms input elements is usually defined in the form of a {\em logical graph}. A logical graph is a graph, typically provided by a user, where vertices are {\em business-logic} operations and edges determine the order of processing. Each user-defined operation may be {\em stateless} or {\em stateful}. States of operations are usually managed by the system itself to prevent inconsistencies.

The main difference between the state of user-defined operation and an ordinary element is that the state is consumed, updated, and produced by the same operation. In~\cite{we2018adbis} it is shown that operations state can practically be an ordinary data flow element for any stateful operation. Therefore, we can assume that the states of operations are just special elements in a data flow.

% In order to make our mathematical model independent of any implementation, we consider streaming consistency guarantees as correspondences between input and output streams and system state. If we formulate guarantees only in these terms, they can be applied to any system. However, the question of what to consider as a system state is a little bit sophisticated. At a very high-level, we can define a state as a {\em information} about input elements, which have been previously entered the system.

% An obvious idea is to say that system state is a state of user-defined operations, i.e., so-called {\em business logic} state. The main purpose of the states of operations is to accumulate the information about input items. It allows a system to not store all previous input elements in order to process a new one in a stateful user-defined operation. However, output items can be affected not only by input ones and operations state but also by other elements, which are currently in the system. For instance, if cycles are allowed in a logical graph, cycling elements can affect output ones, but they do not belong to a state. In-flight elements can also influence the result, being not in the state. These examples demonstrate the evident fact that information about input elements can be contained not only in the states of operations but in data flow elements as well.
% An obvious idea is to say that system state is a state of user-defined operations, i.e., so-called {\em business logic} state. The main purpose of the states of operations is to accumulate the information about input items. It allows a system to not store all previous input elements in order to process a new one in a stateful user-defined operation. However, output items can be affected not only by input ones and operations state but also by other elements, which are currently in the system. For instance, if cycles are allowed in a logical graph, cycling elements can affect output ones, but they do not belong to a state. In-flight elements can also influence the result, being not in the state. These examples demonstrate the evident fact that information about input elements can be contained not only in the states of operations but in data flow elements as well.