Typical processing scenario for big data:
Assumes that the value of the data is hidden in it (“needle in haystack”)
Running processes generate data continuously, users need to continously monitor processes. The fact that we use mostly static data is due to legacy constraints.
Static data vs time
Q: Most datasets are:
you guessed right, unbounded.
What changes faster over time; data or code?
If \(\frac{\Delta d}{\Delta t} \gg \frac{\Delta c}{\Delta t}\), this is a streaming problem
If \(\frac{\Delta c}{\Delta t} \gg \frac{\Delta d}{\Delta t}\), this is an exploration problem
— Joe Hellerstein
Some real-world examples of such data include:
[11/Oct/2018:09:02:41 +0000] "GET /pub/developer-testing-in-IDE.pdf HTTP/1.1" 200 8232808 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_0 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11A465 Safari/9537.53 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
[11/Oct/2018:09:04:36 +0000] "GET /courses/bigdata/spark.html HTTP/1.1" 200 2145339 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
[11/Oct/2018:09:06:20 +0000] "GET /atom.xml HTTP/1.1" 200 255069 "-" "Gwene/1.0 (The gwene.org rss-to-news gateway)"
[11/Oct/2018:09:08:37 +0000] "GET /pub/eval-quality-of-open-source-software.pdf HTTP/1.1" 200 1306751 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
Suppose we want to calculate the total number of users on our system per hour every hour.
tail -f log.txt | # Monitor interactions
sed -e … | # Extract user from logline
sort | uniq | # Unique users
wc -l | # Get user count
xargs -I {} echo -n `date` {} # Print with timestampQ When will the above command finish?
Never! There is no way to tell tail that we want it to
aggregate data per hour and make the pipeline recompute when
tail emits.
Unix streaming
Unix has many components required for stream processing:
tail or
pipeIt is missing:
Stream processing is a set of techniques and corresponding systems that process timestamped events. For a stream processing system to work, we need two major components:
To make stream processing viable in the real world, both components must be scalable, distributable and fault-tolerant.
The fundamental entity that stream processing deals with is an event. Events are produced by continuous processes and in order to be processed they must be consumed.
Messaging systems solve the problem of connecting producers to consumers.
tail -f log.txt | wc -ltail is the producer and wc is the
consumer. The messaging system is the pipe. A pipe has the
following functionality
Pipes implement the publish / subscribe model for 1 producer to 1 consumer.
Publish/subscribe systems connect multiple producers to multiple consumers.
Direct messaging systems use simple network communication (usually UDP) to broadcast messages to multiple consumers. They are fast, but require the producers/consumers to deal with data loss. Example: ZeroMQ
Message brokers or queues are centralized systems that sit between producers and consumers and deal with the complexities of reliable message delivery.
The producers send messages in any of the following modes:
The broker:
The consumers:
Competing
workers: Multiple consumers read from a single queue, competing for
incoming messages
Fan out pattern
Fan out: Each consumer has a queue of its own. Incoming messages are replicated on all queues
Topics pattern
Message routing: The producer assigns keys to msg metadata. The consumer creates topic queues by specifying the keys it is interested to receive messages for.
GHTorrent architecture
GHTorrent uses topic queues to decouple the following the GitHub event stream from the retrieval of items linked from events. Events are written to the RabbitMQ broker with a routing key according to their event type; a configurable number of data retrieval processes subscribes to those queues.
Broker-based messaging is widely used and well understood. It has however one drawback: after a message is received, it dissapears. This leads to lost opportunities:
Q: How can we solve those problems?
A: Instead of just forwarding messages, we can store and forward them.
A log is an append-only data structure stored on disk. We can exploit logs to implement messaging systems:
A generic partitioned log system
Kafka use at Uber
Kafka is the most well known log server. It is being used both to aggregate and store raw events and as an intermediary between systems.
What programming models for streams enable processing of events to derive (some form of) state.
Read also this comprehensive blog post by Martin Kleppmann.
Capture all changes to an application state as a sequence of events.
Instead of mutating the application state (e.g. in a database), we store the event that causes the mutation in an immutable log. The application state is generated by processing the events. This enables us to:
Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change (Wikipedia).
Reactive APIs model event sources as infinite collections on which observers subscribe to receive events.
Observable.from(TwitterSource). // List of tweets
filter{_.location == 'NL'}. // Do some filtering
flatMap{t => GeolocateService(t)}. // Precise geolocation
groupBy{loc => loc.city}. // Group results per city
flatMap{grp => grp.map(v => (grp.key, v))}.
subscribe(println)Example is in the Reactive Extensions (Rx) formulation. .NET Rx and Java 9 (Flow), include facilities for reactive programming.
The Dataflow model was introduced by Akidau et al. [1] as a generic implementation of the MillWheel system [2]. Flink was heavily inspired by it.
The DataFlow model attempts to explain stream processing in four dimensions:
In streaming systems, we have two notions of time:
D: Describe a scenario where those are different.
Applications that calculate streaming aggregates (e.g. avg rainfall per country per hour) don’t care much about the event order.
Applications with precise timing requirements (e.g. bank transactions, fraud detection) care about event time. Events may however enter the system delayed or out of order.
Event time skew
If processing (wall-clock) time is \(t\)
Element-wise ops apply a function to each
individual message. This is the equivalent of map or
flatMap in batch systems.
Aggregations group multiple events together and
apply a reduction (e.g. fold or max) on
them.
map\(Stream[A].map(x: A \rightarrow B) : Stream[B]\)
Convert types of stream elements
Map
// Rx
Observable.from(List(1,2,3)).map(x => 10 * x)
// Flink
env.fromCollection(List(1,2,3)).map(x => 10 * x)filter\(Stream[A].filter(x: A \rightarrow Boolean) : Stream[A]\)
Only keep events that satisfy the predicate.
Filtering
// Rx
Observable.from(List(2,30,22,5,60,1)).map(x => x > 10)
// Flink
env.fromCollection(List(2,30,22,5,60,1)).map(x => x > 10)merge\(Stream[A].merge(b: Stream[B >: A]) : Stream[B]\)
Emit a stream that combines all events from both streams.
Merging streams
// Rx
val a = Observable.from(List(20,40,60,80,100))
val b = Observable.from(List(1,1))
a.merge(b)
// Flink
val a = env.fromCollection(List(20,40,60,80,100))
val b = env.fromCollection((List(1,1))
a.union(b)flatMap\(flatMap(f: A \rightarrow Stream[B]) : Stream[B]\)
Apply f on all elements of Stream[A] and
flatten any nested results to a new stream.
// Rx
Observable.from(List('foo', 'bar')).
flatMap(x => Observable.from(x.toCharArray))
// Flink
env.fromCollection(List('foo', 'bar'))
flatMap(x => x.toCharArray)keyBy\(Stream[A].keyBy(f: A \rightarrow K): Stream[(K, Stream[K])]\)
Partition a stream using a discriminator function and produce (a stream of) streams that emit the partitioned data.
Stream keyBy
keyBy (or groupBy in Rx) creates
partitioned streams that can be processed in parallel. Moreover, keys
are required for various operations combining data.
joinStream[A].join(b: Stream[B],
kl: A => K,
kr: B => K,
rs: (A,B) => R): Stream[R]
Join streams \(A\) and \(B\). Key selector functions kl
and kr extract keys of type \(K\), on which the join operation is
performed. On each joined pair, the result selector function
rs is applied to derive the result type.
Stream join
Find commits that caused exceptions and notify the authors.
case class StackEntry(file: String, line: Int)
case class Exception(exception: String, entries: Seq[StackEntry])
case class DiffLine(file: String, line: Int, content: ...)
case class Commit(author: String, diff: Seq[DiffLine])
val logs : Stream[(Exception, StackEntry)] = env.socketTextStream(host, port).
flatMap(e => for(s <- e.entries) yield (e, s))
val diffs : Stream[(Commit, Diff)] = env.GitRepoSource(...).
flatMap(c => for(d <- c.diff) yield(c, d))
logs.join(diffs).
where(stackEntry => stackEntry.file).
equalTo(diff => diff._2._file).
apply((log, diff) => (diff._1.author, diff._1.commit.sha, log.exception)).
map(e => sendEmail(...))Q: How can a stream processor execute this?
A: Presumably, stack traces is a faster stream than commits; joining will require all keys to be kept in memory (per processing node). Theoretically, this requires infinite memory.
Stream[A].aggregate(f: AggregationFunction[A, T, B]): Stream[B]
trait AggregationFunction[IN, ACC, OUT] {
def createAccumulator(): ACC
def add(value: IN, acc: ACC): ACC // Type conversion IN -> ACC
def getResult(acc: ACC): OUT // Type conversion ACC -> OUT
def merge(a: ACC, b: ACC): ACC
}Aggregations group multiple events together and apply a reduction on them.
Q: How can we aggregate events, when our event stream is infinite?
Hint: remember the Unix example from before.
A: We can create event groups by time (e.g. every 2 minutes) or by count (e.g., every 100 events).
Windowing in streaming systems
Windows are static size (e.g., 1000 events) or time-length (e.g., 10 secs) “batches” of data.