Programming for Big Data

Basic Data Types

In this section, we review the basic data types we use when processing data.

Types of data

  • Unstructured: Data whose format is not known
    • Raw text documents
    • HTML pages
  • Semi-Structured: Data with a known format.
    • Pre-parsed data to standard formats: JSON, CSV, XML
  • Structured: Data with known formats, linked together in graphs or tables
    • SQL or Graph databases
    • Images

D: What types of data are more convenient when processing?

Sequences / Lists

Sequences or Lists or Arrays represent consecutive items in memory

In Python:

a = [1, 2, 3, 4]

In Scala

val l = List(1,2,3,4)

Basic properties:

  • Size is bounded by memory
  • Items can be accessed by an index: a[1] or l.get(3)
  • Items can only inserted at the end (append)
  • Can be sorted

Sets

Sets store values, without any particular order, and no repeated values.

scala> val s = Set(1,2,3,4,4)
s: scala.collection.immutable.Set[Int] = Set(1, 2, 3, 4)

Basic properties:

  • Size is bounded by memory
  • Can be queried for containment
  • Set operations: union, intersection, difference, subset

Maps or Dictionaries

Maps or Dictionaries or Associative Arrays is a collection of (k,v) pairs in such a way that each k appears only once.

Some languages have build-in support for Dictionaries

a = {'a' : 1, 'b' : 2}

Basic properties:

  • One key always corresponds to one value.
  • Accessing a value given a key is very fast (\(\approx O(1)\))

A HashTable

Nested data types: Graphs

A graph data structure consists of a finite set of vertices or nodes, together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph.

  • Nodes can contain attributes
  • Edges can contain weights and directions

Graphs are usually represented as Map[Node, List[Edge]], where

case class Node(id: Int, attributes: Map[A, B])
case class Edge(a: Node, b: Node, directed: Option[Boolean],
                  weight: Option[Double] )

Nested data types: Trees

Ordered graphs without loops

a = {"id": "5542101946", "type": "PushEvent",
    "actor": {
      "id": 801183,
      "login": "tvansteenburgh"
    },
    "repo": {
      "id": 42362423,
      "name": "juju-solutions/review-queue"
   }}

If we parse the above JSON in almost any language, we get a series of nested maps

Map("id" -> 5542101946L,
    "type" -> "PushEvent",
    "actor" -> Map("id" -> 801183.0, "login" -> "tvansteenburgh"),
    "repo" -> Map("id" -> 4.2362423E7, "name" -> "juju-solutions/review-queue")
)

Tuples

An \(n\)-tuple is a sequence of \(n\) elements, whose types are known.

val record = Tuple4[Int, String, String, Int]
                   (1, 'Georgios', 'Mekelweg', 4)

Scala makes it easy to declare and use tuples by automatically infering the types of the tuple contents.

val a = (1, ("Foo", 2)) // type: Tuple2[Int, Tuple2[String, Int]]

println(a._1) // prints 1
println(a._2._1) // prints Foo

Relations

A relation is a Set of \(n\)-tuples \((d1, d2, ..., dn)\) of the same type; one of the tuple elements denotes a key. Keys cannot be repeated.

Relations are very important for data processing, as they form the theoretical framework (Relational Algebra) for relational (SQL) databases.

Typical operations on relations are insert, remove and join. Join allows us to compute new relations by joining existing ones on common fields.

Relations example

val addr1 = (1, "Gebouw 35", "Mekelweg", 5)
val addr2 = (2, "Gebouw 36", "Drebbelweg", 4)

val addr = Set(addr1, addr2)

val georgios = (1, "Georgios", 1)
val wouter = (1, "Wouter", 2)
val joris = (1, "Joris", 4)

val people = Set(georgios, wouter, joris)

Q: How can we get a list of buildings and the people that work there?

Key/Value pairs

A key/value pair (or K/V) is a more general type of a relation, where each key can appear more than once.

// We assume that the first Tuple element represents the key
val a = (1, ("Mekelweg", 4))
val b = (1, ("VMB", 6))

val kv = List(a, b)

Another way to represent K/V pairs is with a Map

val xs : Map[Int, List[(String, Int)]] =
  Map(1 -> List(("Mekelweg", 4), ("VMB", 6)))

K and V are flexible: that’s why the Key/Value abstraction is key to NoSQL databases, including MongoDB, DynamoDB, Redis etc.

Functional programming in a nutshell

In this section, we discuss the basics of functional programming, as those apply to data processing with tools like Hadoop, Spark and Flink.

Functional programming

Functional programming is a programming paradigm that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data (Wikipedia).

Functional programming characteristics:

  • Absence of side-effects: A function, given an argument, always returns the same results irrespective of and without modifying its environment.
  • Higher-order functions: Functions can take functions as arguments to parametrise their behavior
  • Lazyness: The art of waiting to compute till you can wait no more

Function signatures

\(foo(x: [A], y: B) \rightarrow C\)

  • \(foo\): function name
  • \(x\) and \(y\): Names of function arguments
  • \([A]\) and \(B\): Types of function arguments.
  • \(\rightarrow\): Denotes the return type
  • \(C\): Type of the returned result
  • \([A]\): Denotes that type \(A\) can be traversed

We read this as: Function foo takes as arguments an array/list of type A and an argument of type B and returns an argument of type C

Q: What does the following function signature mean? \(f(x:[A], y:(z: A) \rightarrow B) \rightarrow [B]\)

Side effects

A function has a side effect if it modifies some state outside its scope or has an observable interaction with its calling functions or the outside world besides returning a value.

max = -1

def ge(a, b):
    global max
    if a >= b:
        max = a ## <- Side effect!
        return True
    else:
        max = b
        return False

As a general rule, any function that returns nothing (void or Unit) does a side effect!

Examples of side effects

  • Setting a field on an object: OO is not FP!
  • Modifying a data structure in place: In FP, data structures are always persistent.
  • Throwing an exception or halting with an error: In FP, we use types that encapsulate and propagate erroneous behaviour
  • Printing to the console or reading user input, reading writing to files or the screen: In FP, we encapsulate external resources into Monads.

How can we write code that does something useful given those restrictions?

From OO to FP

Buying coffee in OO

class Cafe {

  def buyCoffee(cc: CreditCard): Coffee = {
    val cup = new Coffee()
    cc.charge(cup.price)
    cup
  }
}

Can you spot any problems with this code?

  • charge() performs a side-effect: we need to contact the credit card company!
  • buyCoffee() is not testable

Breaking external dependencies

class Cafe {
  def buyCoffee(cc: CreditCard, p: Payments): Coffee = {
    val cup = new Coffee()
    p.charge(cc, cup.price)
    cup
  }
}

Slightly better option, but:

  • Payments has to be an interface
  • We still need to perform side effects
  • We need to inspect state within the Payments mock

Q: How can we buy 10 coffees?


Buying 10 coffees

class Cafe {
  def buyCoffee(cc: CreditCard, p: Payments): Coffee = { ... }

  def buyCoffees(cc: CreditCard, p: Payments, num: Int) = {
    for (i <- 1 to num) {
      buyCoffee(cc, p)
    }
  }
}

Seems to be working, but:

  • No way to batch payments
  • No way to batch checkouts

Removing side effects

Idea: how about instead of charging in place, we decouple the action of buying coffee from that of charging for it?

class Charge(cc: CreditCard, amount: Double) {
  def combine(other: Charge) = new Charge(cc, amount + other.amount)
  def pay = cc.charge(amount)
}

class Cafe {
  def buyCoffee(cc: CreditCard): (Coffee, Charge) = {
    val cup = new Coffee()
    (cup, Charge(cc, cup.price))
  }
}

Nice! We can now:

  • Test
  • Combine multiple Charges in one
  • Maintain in flight accounts for all customers

Buying 10 functional coffees

class Charge(cc: CreditCard, amount: Double) {
  def combine(other: Charge) = new Charge(cc, amount + other.amount)
  def pay = cc.charge(amount)
}

class Cafe {
  def buyCoffee(cc: CreditCard): (Coffee, Charge) = { ...   }
  def buyCoffees(cc: CreditCard, num: Int): Seq[(Coffee, Charge)] =
    (1 to num).map(buyCoffee(cc))

  def checkout(cc: CreditCard, charges: Seq[Charge]) : Seq[Charge] = {
    charges.
      filter(charge => charge.cc == cc).
      foldLeft(new Charge(cc, 0.0)){(acc, x) => acc.combine(x)}.
      pay  // <- side-effect, but once, in one place.

    charges.filter(charge => charge.cc != cc)
  }
}

This example was adapted from the (awesome) FP in Scala book, by Chiusano and Bjarnason

Higher-Order functions

A higher order function is a function that can take a function as an argument or return a function.

class Array[A] {
  // Return elements that satisfy f
  def filter(f: A => Boolean) : Array[A]
}

In the context of BDP, high-order functions capture common idioms of processing data as enumarated elements, e.g. going over all elements, selectively removing elements and aggregating them.


Important higher-order functions

map(xs: List[A], f: A => B) : List[B]

Applies f to all elements and returns a new list.

flatMap(xs: List[A], f: A => List[B]) : List[B]

Like map, but flattens the result to a single list.

foldL(xs: List[A], f: (B, A) => B, init: B) : B

Takes f of 2 arguments and an init value and combines the elements by applying f on the result of each previous application. AKA reduce.

Aux higher-order functions

groupBy(xs: List[A], f: A => K): Map[K, List[A]]

Partitions xs into a map of traversable collections according to a discriminator function.

filter(xs: List[A], f: A => Boolean) : List[A]

Takes a predicate and returns all elements that satisfy it

scanL(xs: List[A], f: (B, A) => B, init: B) : List[B]

Like foldL, but returns a list of all intermediate results

zip(xs: List[A], ys: List[B]): List[(A,B)]

Returns a iterable collection formed by iterating over the corresponding items of xs and ys.

Laziness

Laziness is an evaluation strategy which delays the evaluation of an expression until its value is needed.

  • Seperating a pipeline construction from its evaluation
  • Not requiring to read datasets in memory: we can process them in lazy-loaded batches
  • Generating infinite collections
  • Optimising execution plans
def primes(limit):
  sieve = [False]*limit

  for j in range(2, limit):
    if sieve[j]: continue
    yield j
    if j*j > limit: continue
    for i in range(j, limit, j):
      sieve[i] = True

print([x for x in primes(100)][1:10])
## [3, 5, 7, 11, 13, 17, 19, 23, 29]

Lazy data pipelines

In tools like Spark and Flink, we always express computations in a lazy manner. This allows for optimizations before the actual computation is executed

# Word count in PySpark
text_file = sc.textFile("words.txt")
counts = text_file \
              .flatMap(lambda line: line.split(" ")) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b)

counts.saveAsTextFile("results.txt")

Q: In the code above, when will Spark compute the result?

A: When saveAsTextFile is called!

Monads

Monads are a design pattern that defines how functions can be used together to build generic types. Practically, a monad is a value-wrapping type that:

  • Has an identity function
  • Has a flatMap function, that allows data to be transfered between monad types
trait Monad[M[_]] {
  def unit[S](a: S) : M[S]
  def flatMap[S, T] (m: M[S])(f: S => M[T]) : Monad[T]
}

Monads are the tool FP uses to deal with (side-)effects

  • Null points: Option[T]
  • Exceptions: Try[T]
  • Latency in asynchronous actions: Future[T]

Combining monads flatMap

flatMap enables us to join sequences of arbitrary types.

Example: Futures wrap values that will be eventually available.

def callWebService(): Future[R]
def heavyComputation(r: R): Future[V]

val r = callWebService().flatMap(r => heavyComputation2(r))

Example: Options wrap values that may be null.

def getArgument(k: String): Option[Arg]
def process(a: Arg): Option[Result]

getArgument("foo").
  flatMap(processArgument).
  getOrElse(new Result("default"))

Dealing with exceptions: Scala

object Converter extends App {
  def toInt(a: String): Try[Int] = Try{Integer.parseInt(a)}
  def toString(b: Int): Try[String] = Try{b.toString}

  val a = toInt("4").flatMap(x => toString(x))
  println(a)

  val b = toInt("foo").flatMap(x => toString(x))
  println(b)
}

Try is a type that can have 2 instances:

  • Success[T], where T represents the type of the result
  • Failure[E], where E represents the type of error, usually an exception

Dealing with exceptions: Java

public class Converter {
  public Integer toInt(String a) throws NumberFormatException {
    return Integer.parseInt(a)
  }
  public String toString(Integer a) throws NullPointerException {
    return b.toString();
  }

  public static void main(String[] args) {
    try {
      Integer five = toInt('5');
        try {
          return toString(five);
        } catch(NullPointerException e) {
          //baaah
        }
    } catch(NumberFormatException r) {
      //ooofff
    }
  }
}

Mapping effects to the type system

class Amazon {
  def login(login: String, passwd: String): Future[Amazon]
  def search(s: String): Future[Seq[String]]
}

class Main extends App {
  val amazon = new Amazon()
  val result =
    amazon.login("uname", "passwd") // Might fail
          .flatMap(a => a.search("foo"))
}

We now need to encode error handling, so that the compiler checks that we are doing it propertly.

D: Which of the following should our new search return?

  • Seq[Future[Try]]
  • Future[Seq[Try]]
  • Future[Try[Seq]]

Enumerating datasets

Before starting to process datasets, we need to be able to go over their contents in a systematic way. The process of visiting all items in a dataset is called traversal.

Big data sets

In a big data system:

  • Client code processes data
  • A data source is a container of data (e.g. array, database, web service)

There are two fundamental techniques for the client to process all available data in the data source

  • Iteration: The client asks the data source whether there are items left and then pulls the next item.

  • Observation: The data source pushes the next available item to a client end point.

D: What are the relative merits of each technique?

Iteration

In the context of BDP, iteration allows us to process finite-sized data sets without loading them in memory at once.

trait Iterator[A] {
  def hasNext: Boolean
  def next(): A
}

Typical usage

val it = Array(1,2,3,4).iterator
while(it.hasNext) {
  val i = it.next + 1
  println(i)
}

The Iterator pattern is supported in all programming languages.

Iteration example

Reading from a file, first in Scala

val data = scala.io.Source.fromFile("/big/data").getLines
while (data.hasNext) {
  println(data.next)
}
// Equivalently...
for (line <- data) {
  println(line)
}

and then in Python

with open("/big/data","r") as data:
  # readlines() returns an object that implements __iter__()
  for line in data.readlines():
      print line

Observation

Observation allows us to process (almost) unbounded size data sets, where the data source controls the processing rate

// Consumer
trait Observer[A] {
  def onNext(a: A): Unit
  def onError(t: Throwable): Unit
  def onComplete(): Unit
}

// Producer
trait Observable[A] {
  def subscribe(obs: Observer[A]): Unit
}

Typical usage

Observable.from(1,2,3,4,5).
  map(x => x + 1).
  subscribe(x => println(x))

Iteration and Observation

  • Iteration and Observation are dual
  • The same set of higher-order functions can be used to process data in both cases

Iterator-based map. “Pulls” data out of array.

Array(1,2,3,4).map(x => x + 1) // Scala
map([1,2,3,4], lambda x: x + 1) # Python

Observation-based (reactive) map. Data is “pushed” to it asynchronously, when new data is available.

Observable.from(1,2,3,4,5).map(x => x + 1) // Scala
Observable.from_([1,2,3,4]).map(lambda x: x + 1) # Python

D: (How) Can we convert between the two types of enumeration?

Traversal

We apply a strategy to visit all individual items in a collection.

for i in [1,2,3]:
  print i

for k,v in {"x": 1, "y": 2}:
  print k

In case of nested data types (e.g. trees/graphs), we need to decide how to traverse. Common strategies include:

  • Breadth-first traversal: From any node A, visit its neighbours first, then its children.
  • Depth-first traversal: From any node A, visit its children first, then its neighbours.

Traversal through iteration

In most programming environments, traversal is implemented by iterators.

class Tree(object):

  def __init__(self, title, children=None):
    self.title = title
    self.children = children or []

  def __iter__(self): ## Depth first
    yield self
    for child in self.children:
      for node in child:
        yield node

Then, we can iterate using standard language constructs

t = Tree("a", [Tree("b", [Tree("c"), Tree("d")])])
for node in iter(t):
  print node

Operations

Operations are transformations, aggregations or cross-referenceing of data stored in data types. All of container data types can be iterated.

We generally have two types of operations:

Element-wise operations

  • Conversion: Convert values of type \(A\) to type \(B\)
    • Celcius to Kelvin
    • € to $
  • Filtering: Only present data items that match a condition
    • All adults from a list of people
    • Remove duplicates
  • Projection: Only present parts of each data item
    • From a list of cars, only display their brand

Our running example

Suppose we have a list of people; each person is identified by a unique identifier, their age, their height in cm and their gender.

p = {id: 10, age: 50, height: 180, weight: 75, gender: 'Male'}

Now, let’s create 1000 random people!

from random import randint, seed
seed(42)
from random import randint, seed
seed(42)

people = []
genders = ['Male', 'Female', 'Other']
for i in range(1000):
  p = {'id': i,
    'age': randint(10,80),
    'height': randint(60, 200),
    'weight': randint(40, 120),
    'gender': genders[randint(0,2)] }
  people.append(p)
## ('people[0]:', {'gender': 'Male', 'age': 55, 'id': 0, 'weight': 62, 'height': 63})

Conversion or Mapping

To convert values, we traverse a collection and apply a convertion function to each individual element. This is generalized to the \(map\) function:

\(map(xs: [A], f: (A) \rightarrow B) \rightarrow [B]\)

Let’s convert the persons’ heights to meters

def to_m(person):
  person['height'] = person['height'] * 1.0 / 100
  return person

people = list(map(lambda x: to_m(x), people))
## ('people[0]:', {'gender': 'Male', 'age': 55, 'id': 0, 'weight': 62, 'height': 0.63})

Projection

Projection allows us to select parts of a Tuple, Relation or other nested data type for further processing. To implement this, we need to iterate over all items of a collection and apply a conversion function.

Filtering

To filter values from a list, we traverse a collection and apply a predicate function to each individual element.

\(filter(xs: [A], f: (A) \rightarrow Boolean): [A]\)

Q: How can we implement filter?

def filter(xs, pred):
  result = []
  for i in xs:
    if pred(i):
      result.append(i)

  return result

Aggregations

Aggregations apply a combining operator on a traversable sequence to aggregate the individual items into a single result.

Aggregation is implemented using reduction (or folding). Two variants exist: left reduction and right reduction.

  • With left reduction, we traverse items from the first to last
  • With right reduction, we traverse items from the last to first

Example: Calculate total weight

Our task is to calculate the total weight for our people list. To do so, we need to iterate over the list and add the individual weights. First, we do it with an imperative approach:

total_weight = 0
for p in people:
  total_weight = total_weight + p['weight']

print(total_weight)

We notice that iteration and reduction are independent. What if we abstract iteration away?

Functional Reduction: Left

\(reduceL(xs: [A], init: B, f: (acc: B, x: A) \rightarrow B) \rightarrow B\)

Left reduce takes a function f with 2 arguments and an initial value and applies f on all items, starting from the left most. f combines the result of its previous application with the current element.

def total_weight(acc, p):  # acc is for accumulator
  return acc + p['weight']

Then we can calculate the total weight of all people in our collection as follows

total = reduce(total_weight, people, 0)
# or equivalently, using an anonymous function
total = reduce(lambda x,y: x + y['weight'], people, 0)

Functional Reduction: Right

from functools import reduce

\(reduceR(xs: [A], init: B, f: (x: A, acc: B) \rightarrow B) \rightarrow B\)

Right reduction takes a function f with 2 arguments and an initial value and applies f on all items, starting from the right most. f combines the result of its previous application with the current element.

def reduceR(func, xs, init):
  return reduce(lambda x,y: func(y,x), reversed(xs), init)

reduceR and reduceL: differences

To see how reduceR and reduceL are evaluated, we define a reduction function that prints the intermediate steps.

def reduce_pp(acc, x):
  return "(%s + %s)" % (acc, x)

How does reduceL work?

print(reduce(reduce_pp, range(1,10), 0))
## (((((((((0 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)

How does reduceR work?

print(reduceR(reduce_pp, range(1,10), 0))
## (1 + (2 + (3 + (4 + (5 + (6 + (7 + (8 + (9 + 0)))))))))

Q: Can we always apply reduceL instead of reduceR?

reduceR != reduceL

The answer to the previous question is: it depends on whether the reduction operation is commutative.

An op \(\circ\) is commutative iff \(x \circ y = y \circ x\).

We can see that if we choose a non-commutative operator, reduceL and reduceR produce different results.

print(reduce(lambda x, y: x * 1.0 / y, range(1,10), 1))
## 2.7557319224e-06
print(reduceR(lambda x, y: x * 1.0 / y, range(1,10), 1))
## 2.4609375

Counting elements

The simplest possible aggregation is counting the number of elements, possiblibly matching a condition

\(count(xs: [A], pred): Integer\)

def count(xs, pred):
  len(filter(xs, pred))

Q: How can we implement this with a reduction?

Distinct elements

Given a sequence of items, produce a new sequence with no duplicates.

\(distinct(xs: [A]): [A]\)

Distinct operations are usually implemented by copying the initial sequence elements to a set data structure.

a = [1,2,2,3]
set(a)
## set([1, 2, 3])

Distinct assumes that items have unique identities; in Python, we use the id() method (same as hashCode() in Java).

Numerical aggregations

Aggregation functions have the following generic signature:

\(f: [A] \rightarrow Number\)

Their job is to reduce sequences of elements to a single measurement. Some examples are:

  • Mathematical functions: min, max, count
  • Statistical functions: mean, median, stdev

Grouping

Grouping splits a sequence of items to groups given a classification function.

\(groupBy(xs:[A], f: A \rightarrow K): Map[K, [A]]\)

def group_by(classifier, xs):
  result = dict()
  for x in xs:
    k = classifier(x)
    if k in result.keys():
        result[k].append(x)
    else:
        result[k] = [x]
  return result
def number_classifier(x):
  if x % 2 == 0:
    return "even"
  else:
    return "odd"

a = [1,2,3,4,5,6,7]
print(group_by(number_classifier, a))
## {'even': [2, 4, 6], 'odd': [1, 3, 5, 7]}

Aggregation example

How can we get the average height per gender (adults only) in our list of people?

from numpy import mean

adults = filter(lambda x: x['age'] > 18, people)
adults_per_gender = group_by(lambda x: x['gender'], adults)
avg_age_per_gender = \
  map(lambda k, v: {k, mean(map(lambda y: y['age'], v))},
      adults_per_gender.items())

print(avg_age_per_gender)

The above is equivalent to the following SQL expression

select gender, mean(age)
from people
where age > 18
group by gender

D: What are the relative strengths and weaknesses of each representation?

Key-value pairs

KV databases / systems

KV stores is the most common format for distributed databases.

What KV systems enable us to do effectively is processing data locally (e.g. by key) before re-distributing them for further processing. Keys are naturally used to aggregate data before distribution. They also enable (distributed) data joins.

Typical examples of distributed KV stores are Dynamo, MongoDB and Cassandra

Keys and Values

The most common data structure in big data processing is key-value pairs.

  • A key is something that identifies a data record.
  • A value is the data record. Can be a complex data structure.
  • The KV pairs are usually represented as sequences
[ # Python
  ['EWI': ["Mekelweg", 4]],
  ['TPM': ["Jafaalaan", 5]],
  ['AE': ["Kluyverweg", 1]]
]
List( // Scala
  List("EWI", Tuple2("Mekelweg", 4)),
  List("TPM", Tuple2("Jafaalaan", 5)),
  List("AE",  Tuple2("Kluyverweg", 1))
)

Typical operations for KV collections

mapValues: Transform the values part

\(mapVal(kv: [(K,V)], f: V\rightarrow U): [(K,U)]\)

groupByKey: Group the values for each key into a single sequence.

\(groupByKey(kv: [(K,V)]) : [(K, [V])]\)

reduceByKey: Combine all elements mapped by the same key into one

\(reduceByKey(kv: [(K,V)], f: (V,V) \rightarrow V) : [(K, V)]\)

join: Return a sequence containing all pairs of elements with matching keys

\(join(kv1: [(K,V)], kv2: [(K,W)]) : [(K, (V,W))]\)

Common operations on KVs

mapValues: With mapValues we can apply a transformation and keep data local.

def mapValues[U](f: (V) => U): RDD[(K, U)]

reduceByKey: Reducing values by key allows us to avoid moving data among nodes. This is because we can reduce locally and only distribute the results of the reduction for further aggregation. This is also why f does not allow us to change the reduction type.

def reduceByKey(f: (V, V) => V): RDD[(K, V)]

Joining datasets

Supose we have a dataset of addresses

case class Addr(k: String, street: String, num: Int)
val addr = List(
  Addr("EWI", "Mekelweg", 4),
  Addr("EWI", "Van Mourik Broekmanweg", 6),
  Addr("TPM", "Jafaalaan", 5),
  Addr("AE",  "Kluyverweg", 1)
)

and a dataset of deans

case class Dean(k: String, name: String, surname: String)
val deans = List(
  Dean("EWI", "John", "Schmitz"),
  Dean("TPM", "Hans", "Wamelink")
)

Q: Define a method deanAddresses to retrieve a list of address of each dean.


First attempt

def deanAddresses : List[Dean, List[Addr]] =
  deans.map {d => (d, addr.filter(a => a.k == d.k))}

In practice, we get the following results

// EEEWWWW
List(
  (Dean(EWI,John,Schmitz), List(
    Addr(EWI,Mekelweg,4),
    Addr(EWI,Van Mourik Broekmanweg,6))
  ),
  (Dean(TPM,Hans,Wamelink),List(
    Addr(TPM,Jafaalaan,5))
  )
)

This is OK, but a more practical result would be a List[(Dean, Addr)]. For this, we need to flatten the internal sequence.

flatMap: Map and flatten in one step

\(flatMap(xs: [A], f: A \rightarrow [B]): [B]\)

flatMap enables us to combine two data collections and return a new collection with flattened values.

def deanAddresses2: Seq[(Dean, Addr)] =
  deans.flatMap(d =>
    addr.filter(a => a.k.==(d.k)).
      map(v => (d, v)))

In Scala, flatMap is special

def deanAddresses2: Seq[(Dean, Addr)] = {
  for (
    d <- deans;
    v <- addr.filter(a => a.k == d.k)
  ) yield (d, v)
}

KV pairs and SQL tables

A KV pair is an alternative form of a relation, indexed by a key. We can always convert between the two

val relation : Set[Tuple3[Int, Int, String]] = ???

Convert the above relation to a KV pair

val kvpair : Set[Tuple2[Int, Tuple2[Int, String]]] =
  relation.map(x => (x._1, (x._2, x._3)))

Convert the KV pair back to a relation

val relation2: Set[Tuple3[Int, Int, String]] =
  kvpair.map(x=> (x._1, x._2._1, x._2._2))

This means that any operation we can do on relations, we can also do on KV pairs.

Types of joins

Types of joins

Immutability

Big data is immutable

One of the key characteristics of data processing is that data is never modified in place. Instead, we apply operations that create new versions of the data, without modifying the original version.

Immutability is a general concept that expands in much of the data processing stack.

Immutability accross the stack

Immutability

Image from Helland in ref [1]

Copy-On-Write (COW)

Copy-On-Write is a general technique that allows us to share memory for read-only access accross processes and deal with writes only if/when they are performed by copying the modified resource in a private version.

COW is the basis for many operating system mechanisms, such as process creation (forking), while many new filesystems (e.g. BTRFS, ZFS) use it as their storage format.

COW enables systems with multiple readers and few writers to efficiently share resources.

Immutable data structures

Immutable or persistent data structures always preserve the previous version of themselves when they are modified [2].

With immutable data structures, we can:

  • Avoid locking while processing them, so we can process items in parallel
  • Maintain and share old versions of data
  • Seamlesly persist the data on disk
  • Reason about changes in the data

They come at a cost of increased memory usage (data is never deleted).

Scala has both mutable and immutable versions of many common data structures. If in doubt, use immutable.

Example: immutable tree

A tree

The tree after addition

Data structures in Scala

ADT collection.mutable collection.immutable
Array ArrayBuffer Vector
List LinkedList List
Map HashMap HashMap
Set HashSet HashSet
Queue SynchronizedQueue Queue
Tree TreeSet

Bibliography

[1]
P. Helland, “Immutability changes everything,” Queue, vol. 13, no. 9, p. 40, 2015.
[2]
C. Okasaki, Purely functional data structures. Cambridge University Press, 1999.
[3]
G. Hutton, “A tutorial on the universality and expressiveness of fold,” Journal of Functional Programming, vol. 9, no. 4, pp. 355–372, 1999.