What is streaming?

Big data is big

  • Typical processing scenario for big data:

    • Aggregate data in intermediate storage
    • Run batch job overnight, store results in permanent storage
    • Use Spark for interactive exploration of recent data

Assumes that the value of the data is hidden in it (“needle in haystack”)

Data is NOT static

Running processes generate data continuously, users need to continously monitor processes. The fact that we use mostly static data is due to legacy constraints.

Static data vs time

Bounded and unbounded datasets

  • Bounded Data: A dataset that can be enumerated and/or iterated upon
    • Students in CSE2520
    • Countries in the world
  • Unbounded Data: A dataset that we can only enumerate given a snapshot. Unbounded data does not have a size property.
    • Natural numbers
    • Facebook/Twitter posts

Q: Most datasets are:

you guessed right, unbounded.

Stream and batch processing

  • Batch processing applies an algorithm on a bounded dataset to produce a single result at the end
    • Unix, Map/Reduce and Spark are batch processing systems
  • Stream processing applies an algorithm on continuosly updating data and continuously creates results
    • Flink and Storm are stream processing systems
    • Natural fit for unbouded datasets
    • Bounded data are usually a time-restricted view of unbounded data

Use cases for stream procesing

  • Intrusion and fraud detection
  • Algorithmic trading
  • Process monitoring (e.g. production processes, or logs)
  • Traffic monitoring
  • When we can discard raw data and prefer to store aggregates

What changes faster over time; data or code?

If \(\frac{\Delta d}{\Delta t} \gg \frac{\Delta c}{\Delta t}\), this is a streaming problem

If \(\frac{\Delta c}{\Delta t} \gg \frac{\Delta d}{\Delta t}\), this is an exploration problem

— Joe Hellerstein

Data in streaming processing

Some real-world examples of such data include:

[11/Oct/2018:09:02:41 +0000] "GET /pub/developer-testing-in-IDE.pdf HTTP/1.1" 200 8232808 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_0 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11A465 Safari/9537.53 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
[11/Oct/2018:09:04:36 +0000] "GET /courses/bigdata/spark.html HTTP/1.1" 200 2145339 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
[11/Oct/2018:09:06:20 +0000] "GET /atom.xml HTTP/1.1" 200 255069 "-" "Gwene/1.0 (The gwene.org rss-to-news gateway)"
[11/Oct/2018:09:08:37 +0000] "GET /pub/eval-quality-of-open-source-software.pdf HTTP/1.1" 200 1306751 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

Unix for stream processing

Suppose we want to calculate the total number of users on our system per hour every hour.

tail -f log.txt |               # Monitor interactions
sed -e|               # Extract user from logline
sort | uniq     |               # Unique users
wc -l           |               # Get user count
xargs -I {} echo -n `date` {}   # Print with timestamp

Q When will the above command finish?

Never! There is no way to tell tail that we want it to aggregate data per hour and make the pipeline recompute when tail emits.

What can we learn from Unix?

Unix streaming

Unix has many components required for stream processing:

  • Streaming data acquisition: tail or pipe
  • Intermediate storage: pipes
  • Ways of applying functions on streaming data

It is missing:

  • Splitting streams in batches (windowing)
  • Recomputing when new batches arrive (triggers)

Stream processing in a nutshell

Stream processing is a set of techniques and corresponding systems that process timestamped events. For a stream processing system to work, we need two major components:

  • A component that acquires events from producers and forwards it to consumers
  • A component that processes events

To make stream processing viable in the real world, both components must be scalable, distributable and fault-tolerant.

Messaging systems

The fundamental entity that stream processing deals with is an event. Events are produced by continuous processes and in order to be processed they must be consumed.

Messaging systems solve the problem of connecting producers to consumers.

Unix again

tail -f log.txt | wc -l

tail is the producer and wc is the consumer. The messaging system is the pipe. A pipe has the following functionality

  • Reads data from the producer and buffers it
  • Blocks the producer when the buffer is full
  • Notifies the consumer that data is available

Pipes implement the publish / subscribe model for 1 producer to 1 consumer.

Publish / Subscribe

Publish/subscribe systems connect multiple producers to multiple consumers.

  • Direct messaging systems use simple network communication (usually UDP) to broadcast messages to multiple consumers. They are fast, but require the producers/consumers to deal with data loss. Example: ZeroMQ

  • Message brokers or queues are centralized systems that sit between producers and consumers and deal with the complexities of reliable message delivery.

Broker-based messaging

The producers send messages in any of the following modes:

  • Fire and forget. The broker acks the message immediately
  • Transaction-based: The broker writes the message to permanent storage prior to ack’ing it.

The broker:

  • Buffers the messages, spilling to disk as necessary
  • Routes the messages to the appropriate queues
  • Notifies consumers when messages have arrived

The consumers:

  • Subscribe to a queue that contains the desired messages
  • Ack the message receipt (or successful processing)

Messaging patterns

Work queue pattern Competing workers: Multiple consumers read from a single queue, competing for incoming messages

Fan out pattern

Fan out: Each consumer has a queue of its own. Incoming messages are replicated on all queues

Topics pattern

Message routing: The producer assigns keys to msg metadata. The consumer creates topic queues by specifying the keys it is interested to receive messages for.

Broker-based example: GHTorrent

GHTorrent architecture

GHTorrent uses topic queues to decouple the following the GitHub event stream from the retrieval of items linked from events. Events are written to the RabbitMQ broker with a routing key according to their event type; a configurable number of data retrieval processes subscribes to those queues.

Drawbacks of broker-based messaging

Broker-based messaging is widely used and well understood. It has however one drawback: after a message is received, it dissapears. This leads to lost opportunities:

  • We cannot reprocess messages (e.g. when a new application version is installed)
  • We cannot prove that a message was delivered

Q: How can we solve those problems?

A: Instead of just forwarding messages, we can store and forward them.

Log-based messaging systems

A log is an append-only data structure stored on disk. We can exploit logs to implement messaging systems:

  • Producers append messages to the log,
  • All consumers connect to the log and pull messages from it. A new client starts processing from the beginning of the log.
  • To maximize performance, the broker partitions and distributes the log to a cluster of machines.
  • The broker keeps track of the current message offset for each consumer per partition

Log-based messaging overview

A generic partitioned log system

Kafka

Kafka use at Uber

Kafka is the most well known log server. It is being used both to aggregate and store raw events and as an intermediary between systems.

Programming models for stream processing

What programming models for streams enable processing of events to derive (some form of) state.

Read also this comprehensive blog post by Martin Kleppmann.

Event sourcing and CQS

Capture all changes to an application state as a sequence of events.

Instead of mutating the application state (e.g. in a database), we store the event that causes the mutation in an immutable log. The application state is generated by processing the events. This enables us to:

  • Use specialized systems for scaling writes (e.g. Kafka) and reads (e.g. Redis), while the application remains stateless.
  • Provide separate, continuously updated views of the application state (e.g. per user, per location etc)
  • Regenerate the application state at any point in time by reprocessing events

Reactive programming

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change (Wikipedia).

Reactive APIs model event sources as infinite collections on which observers subscribe to receive events.

Observable.from(TwitterSource).      // List of tweets
  filter{_.location == 'NL'}.        // Do some filtering
  flatMap{t => GeolocateService(t)}. // Precise geolocation
  groupBy{loc => loc.city}.          // Group results per city
  flatMap{grp => grp.map(v => (grp.key, v))}.
  subscribe(println)

Example is in the Reactive Extensions (Rx) formulation. .NET Rx and Java 9 (Flow), include facilities for reactive programming.

The Dataflow model

The Dataflow model was introduced by Akidau et al. [1] as a generic implementation of the MillWheel system [2]. Flink was heavily inspired by it.

The DataFlow model attempts to explain stream processing in four dimensions:

  • \(\color{orange}{\sf{What}}\): results are being computed
  • \(\color{blue}{\sf{Where}}\): in event time they are being computed
  • \(\color{green}{\sf{When}}\): in processing time they are materialized
  • \(\color{red}{\sf{How}}\): earlier results relate to later refinements

Time in stream processing

In streaming systems, we have two notions of time:

  • Processing time: the time at which events are observed in the system
  • Event time: the time at which events occurred

D: Describe a scenario where those are different.

Applications that calculate streaming aggregates (e.g. avg rainfall per country per hour) don’t care much about the event order.

Applications with precise timing requirements (e.g. bank transactions, fraud detection) care about event time. Events may however enter the system delayed or out of order.

Event Time skew

Event time skew

If processing (wall-clock) time is \(t\)

  • Skew is calculated \(t - s\), where \(s\) is the timestamp of the latest event processed
  • Lag is calculated as \(t - s\), where \(s\) is the actual timestamp of an event

\(\color{orange}{\sf{What:}}\) operations on streams

  • Element-wise ops apply a function to each individual message. This is the equivalent of map or flatMap in batch systems.

  • Aggregations group multiple events together and apply a reduction (e.g. fold or max) on them.

Element-wise operations: map

\(Stream[A].map(x: A \rightarrow B) : Stream[B]\)

Convert types of stream elements

Map

// Rx
Observable.from(List(1,2,3)).map(x => 10 * x)

// Flink
env.fromCollection(List(1,2,3)).map(x => 10 * x)

Element-wise operations: filter

\(Stream[A].filter(x: A \rightarrow Boolean) : Stream[A]\)

Only keep events that satisfy the predicate.

Filtering

// Rx
Observable.from(List(2,30,22,5,60,1)).map(x => x > 10)

// Flink
env.fromCollection(List(2,30,22,5,60,1)).map(x => x > 10)

Element-wise operations: merge

\(Stream[A].merge(b: Stream[B >: A]) : Stream[B]\)

Emit a stream that combines all events from both streams.

Merging streams

// Rx
val a = Observable.from(List(20,40,60,80,100))
val b = Observable.from(List(1,1))
a.merge(b)

// Flink
val a = env.fromCollection(List(20,40,60,80,100))
val b = env.fromCollection((List(1,1))
a.union(b)

Element-wise operations: flatMap

\(flatMap(f: A \rightarrow Stream[B]) : Stream[B]\)

Apply f on all elements of Stream[A] and flatten any nested results to a new stream.

// Rx
Observable.from(List('foo', 'bar')).
           flatMap(x => Observable.from(x.toCharArray))

// Flink
env.fromCollection(List('foo', 'bar'))
    flatMap(x => x.toCharArray)

Element-wise operations: keyBy

\(Stream[A].keyBy(f: A \rightarrow K): Stream[(K, Stream[K])]\)

Partition a stream using a discriminator function and produce (a stream of) streams that emit the partitioned data.

Stream keyBy

keyBy (or groupBy in Rx) creates partitioned streams that can be processed in parallel. Moreover, keys are required for various operations combining data.

Element-wise operations: join

Stream[A].join(b: Stream[B],
               kl: A => K,
               kr: B => K,
               rs: (A,B) => R): Stream[R]

Join streams \(A\) and \(B\). Key selector functions kl and kr extract keys of type \(K\), on which the join operation is performed. On each joined pair, the result selector function rs is applied to derive the result type.

Stream join

Stream joining example

Find commits that caused exceptions and notify the authors.

case class StackEntry(file: String, line: Int)
case class Exception(exception: String, entries: Seq[StackEntry])
case class DiffLine(file: String, line: Int, content: ...)
case class Commit(author: String, diff: Seq[DiffLine])

val logs : Stream[(Exception, StackEntry)] = env.socketTextStream(host, port).
  flatMap(e => for(s <- e.entries) yield (e, s))
val diffs : Stream[(Commit, Diff)] = env.GitRepoSource(...).
  flatMap(c => for(d <- c.diff) yield(c, d))

logs.join(diffs).
     where(stackEntry => stackEntry.file).
     equalTo(diff => diff._2._file).
     apply((log, diff) => (diff._1.author, diff._1.commit.sha, log.exception)).
     map(e => sendEmail(...))

Q: How can a stream processor execute this?

A: Presumably, stack traces is a faster stream than commits; joining will require all keys to be kept in memory (per processing node). Theoretically, this requires infinite memory.

Aggregations / Reductions

Stream[A].aggregate(f: AggregationFunction[A, T, B]): Stream[B]

trait AggregationFunction[IN, ACC, OUT] {
  def createAccumulator(): ACC
  def add(value: IN, acc: ACC): ACC // Type conversion IN -> ACC
  def getResult(acc: ACC): OUT      // Type conversion ACC -> OUT
  def merge(a: ACC, b: ACC): ACC
}

Aggregations group multiple events together and apply a reduction on them.

Q: How can we aggregate events, when our event stream is infinite?

Hint: remember the Unix example from before.

A: We can create event groups by time (e.g. every 2 minutes) or by count (e.g., every 100 events).

\(\color{blue}{\textsf{Where:}}\) Streaming Windows

Windowing in streaming systems

Windows are static size (e.g., 1000 events) or time-length (e.g., 10 secs) “batches” of data.

Session windows