Sunday, June 11, 2017

Spark Quick Review : Study Notes

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

Driver program's in memory array can be converted to distributed data structure using parallelize
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc.

JavaRDD<String> distFile = sc.textFile("data.txt");

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).


Since RDDs are distributed data stuctures, something special needs to be done to count elements inside RDD.

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

rdd.collect().foreach(println)  //can give out of memory since all data brought to drivernode
rdd.take(100).foreach(println)


Total Character Count Example

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);


The following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

Filter lines in a file for occurence of string "ganesh"

JavaRDD<String> logData = sc.textFile(logFile).cache();
JavaRDD<String> retData = logData.filter(s -> s.contains("ganesh"));


Different types of transformations and actions
https://spark.apache.org/docs/2.1.0/programming-guide.html


Word Count
JavaRDD<String> words = lines.flatMap(s->Arrays.asList(s.split(" ")).iterator()); JavaPairRDD<String, Integer> pairs = words.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);



Shared Variables
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10