Tuesday, September 12, 2017

RDD Functions


When RDD's will be creaed:
1. When data is loaded from file using spark context SC eg: val r1 = sc.textfile("FilePATH")
2. When filter or transformation operation is performed over RDD. eg: val r2 = r1.flatMap(_.split(" "))
In this transformation phase, previous RDD will comeout of RAM whenever a newer RDD is ready.
3. When scala/python/JAVA object is parallized.

In case of any crash, the Execution flow will start from begining. When any RDD is set a persist, and once its execution is completed, from then this RDD is available in RAM until unpersist is called. Advanatage - in case of any crash in execution the execution will not begin from starting and will continue from the Persisted RDD.

PERSIST OPTIONS:

MEMORY_ONLY (default)
Format: Object
Adv: Speed( at the time of ruse no need to Deserialize)
disAdv: More Memory(RAM) space

MEMORY_SER_ONLY
Format: Byte( Serialized format)
Adv: Less Memory space(compressed format)
DisAdv: Deserialization is required

GroupBy - Spark RDD groupBy function returns an RDD of grouped items




Useful Links:
http://backtobazics.com/category/big-data/spark/
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html


Practice:


scala> val names = List("Raj", "Ravi", "Rohan", "Leela Prasad", "Vinay")
names: List[String] = List(Raj, Ravi, Rohan, Leela Prasad, Vinay)

scala> val rdd = sc.parallize(names,2)
<console>:29: error: value parallize is not a member of org.apache.spark.SparkContext
         val rdd = sc.parallize(names,2)
                      ^

scala> val rdd = sc.parallelize(names, 2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:29

scala> val rdd2 = rdd.map(x=> x.length
length          lengthCompare   

scala> val rdd2 = rdd.map(x=> (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[5] at map at <console>:33

scala> val r3 = rdd2.groupBy(t => t._1)
r3: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] = ShuffledRDD[9] at groupBy at <console>:37


scala> r3.foreach(println)
(4,CompactBuffer((4,Ravi)))
(12,CompactBuffer((12,Leela Prasad)))
(3,CompactBuffer((3,Raj)))
(5,CompactBuffer((5,Rohan), (5,Vinay)))


scala> val list1 = List(1,2,3,4,5,6)
list1: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val list2 = List('a', 'b', 'c', 'd', 'e')
list2: List[Char] = List(a, b, c, d, e)

scala> val rdd1 = sc.parallelize(list1,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:29

scala> val rdd2 = rdd1.reduce((a,b) => a+b)
rdd2: Int = 21

scala> val rdd2 = rdd1.reduce((a,b) => a*b)
rdd2: Int = 720                     

scala> val rdd2 = rdd1.reduce((a,b) => if(a>b) a else b)
rdd2: Int = 6

scala> rdd1.fold(0)((a,b) => a+b)
res32: Int = 21

scala> rdd1.fold(0)((a,b) => a*b)
res33: Int = 0

scala> rdd1.fold(1)((a,b) => a*b)
res34: Int = 720

scala> rdd1.reduce((a,b) => a*b)
res35: Int = 720

scala> val  List3 = List("I am going to Hyd", "I am Learning", "I Love spark")
List3: List[String] = List(I am going to Hyd, I am Learning, I Love spark)

scala> val rdd3 = sc.parallelize(List3,2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:29

scala> val rdd4 = rdd3.flatMap
flatMap       flatMapWith   

scala> val rdd4 = rdd3.flatMap(x => x.sp
span      splitAt   

scala> val rdd4 = rdd3.flatMap(x => x.split(" "))
rdd4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at flatMap at <console>:33

scala> rdd4.foreach(println)
I
am
going
to
Hyd
I
am
Learning
I
Love
spark

scala> val rdd4 = rdd3.flatMap(x => x.split(" ")).map(x => (x,1))
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:33

scala> rdd4.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(Hyd,1)
(I,1)
(am,1)
(Learning,1)
(I,1)
(Love,1)
(spark,1)


scala> val rddout = rdd3.flatMap(x => x.split(" ")).map((_,1)).reduceByKey((a,b) => a+b).sortBy(t => t._1)
rddout: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[43] at sortBy at <console>:37


scala> rddout.foreach(println)
(Hyd,1)
(I,3)
(Learning,1)
(Love,1)
(am,2)
(going,1)
(spark,1)
(to,1)


scala> rdd3.flatMap(x => x.split(" ")).map((_,1)).reduce((a,b) => a+b)
<console>:36: error: type mismatch;
 found   : (String, Int)
 required: String                                                                 //This gives error because reduce will add a+b where a is string and b is int.
              rdd3.flatMap(x => x.split(" ")).map((_,1)).reduce((a,b) => a+b)  //However reduceByKey can be used which reduces based on key.
                                                                           ^

scala> val rddout = rdd3.flatMap(x => x.split(" ")).map((_,1)).reduceByKey((a,b) => a+b).sortByKey().collect.foreach(println)

//collect will get the data from the partitions and ensure the data comes in sorted order without applying sortByKey(). DOnot use collect in realtime as the data from all the worker machines will be collected to the master node where Application Master runs

///////////countByValue/////////////////////s

scala> rdd3.flatMap(x => x.split(" ")).countByValue().foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(spark,1)
(learning,1)
///////////Distinct:///////////////

scala> val rddout = rdd3.flatMap(x => x.split(" ")).foreach(println)
I
am
going
to
Hyd
I
am
Learning
I
Love
spark
rddout: Unit = ()

scala> val rddout = rdd3.flatMap(x => x.split(" ")).distinct().foreach(println)
am
Love
going
Learning
spark
I
to
Hyd
rddout: Unit = ()

scala> prdd1.foreach(println)
(aaa,3)
(bb,2)
(ccc,3)
(ddd,3)

scala> prdd2.foreach(println)
[Stage 85:>                                                         (0 + 0) / 2](aa,2)
(bbb,3)
(cc,2)
(dd,2)
                                                                                
scala> prdd1.leftOuterJoin(prdd2).foreach(println)
(ddd,(3,None))
(bb,(2,None))
(ccc,(3,None))
(aaa,(3,None))

scala> prdd1.union(prdd2).foreach(println)
(aaa,3)
(bb,2)
(ccc,3)
(ddd,3)
(aa,2)
(bbb,3)
(cc,2)
(dd,2)


scala> prdd1.foreach(println)
(3,aaa)
(2,bb)
(3,ccc)
(3,ddd)

scala> prdd2.foreach(println)
(2,aa)
(3,bbb)
(2,cc)
(2,dd)
      

scala> prdd1.join(prdd2).foreach(println)
(2,(bb,aa))
(2,(bb,cc))
(2,(bb,dd))
(3,(aaa,bbb))
(3,(ccc,bbb))
(3,(ddd,bbb))

scala> prdd1.foreach(println)
(3,aaa)
(2,bb)
(3,ccc)
(3,ddd)

scala> prdd2.foreach(println)
(2,aa)
(3,bbb)
(2,cc)
(2,dd)

scala> prdd1.cogroup(prdd2).foreach(println)
(2,(CompactBuffer(bb),CompactBuffer(aa, cc, dd)))
(3,(CompactBuffer(aaa, ccc, ddd),CompactBuffer(bbb)))


[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"


scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27

scala> words.collect()
res3: Array[String] = Array(I                Love Hadoop, I    Like    Spark   Coding, "   I   Love    Combination    of   Both")

scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29

scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)

///mapPartitions as an Alternative to flatMap////

val allWords = words.mapPartitions{ lines => 
     {
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))

scala> val allWords = words.mapPartitionsWithIndex{
      (index, lines) => {
      println("called in Partition ->" + index)
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))


def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}

val allWOrds = words.mapPartitions(customFunc)

/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)

def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
     | pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
     | }


//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")

scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29

scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)

Contains Returns  true or false

/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)

scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)

scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)

eg2:
val logs = List("Success and built",
     | "ERROR build failed",
     | "Success Packaging Completed",
     | "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)

scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)

errs.map(x => println(x))           OR          scala> errs.foreach(println)
ERROR build failed ERROR build failed
ERROR Checkout Failed ERROR Checkout Failed

///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27

scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13


scala> def sexsalpair(rec: String) = {
     | val itms = rec.split(",")
     | (itms(3),itms(2))
     | }
sexsalpair: (rec: String)(String, String)

scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31

scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33

scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))

/////////////////Union/////////////////

scala> val l1 = List(10,20,30,40,50)

scala> val l2 = List(10,25,30,45,50)

scala> val r1 = sc.parallelize(l1)

scala> val r2 = sc.parallelize(l2)

scala> val r3 = r1.union(r2)

/////////////countBYValue////////////

scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27

scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)


////////////////////

Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely

SOl: 
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_)  
words.foreach(println)

///This will not workout in this manner Need to use Aggregrate function///////////

Exercise 2: 

file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13

Transform to case class of id, Name, salary, Gender, DeptNO

val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)

def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}

val empvalues = emp.map(x => MakeEmpClass(x))

val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35

Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna


Exercise 3: From the above data set, generate following Report

Above Avg salary 3
Below Avg salary 2

Solution:

Approach 1:

scala> def extractSal(emprecs: empdata):  Int = {
      val sal = emprecs.Salary
      sal
      }

val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()

def makepair(sal:Int, avg: Long) = {
                  println("sal =" + sal)
                  println("avg =" + avg)
                  var retval = 0
                  if(avg > sal){
                  retval =1
                  }
                  (retval)
                  }

val status = sals.map(x=> makepair(x,avg))       //IN this function we pass 'avg' a value from outside to a function

scala> status.foreach(println)                  //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0

val grp = status.groupBy(x => x)            // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))

Approach 2: Using cartesian. This is better approach

scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)

scala> val esal = emprec.map{
     | x => x.split(",")
     | }

scala> val esal = emprec.map{ x=> 
     | val sal = x.split(",")
     | val salary = sal(2).toInt
     | salary
     | }

scala> val tot = esal.sum
tot: Double = 175000.0

scala> val avg = tot/esal.count
avg: Double = 35000.0                                                           

scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35

scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37

scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)

scala> val res = trans.map{ x => 
     | if(x._1 >= x._2) "Above" else "Below"}

scala> res.foreach(println)
Below
Below
Above
Below
Above

                                              GroupBykey:


Consider an RDD in 2 Partitions, based on Keys a and b the values in each partition will be shuffled to form 2 sets based on keys.

groupByKey will shuffle all the value key pairs as the diagrams show.

More load on Network traffic as the keys from each data node are shuffled to new machine.


                                                       ReducebyKey:

bykey means key will be applied automatically for values of same keys.
Values of same keys will be passes directly to the function as Input.

Eg: reduceBykey((x,y) => (x+y))
Input    Operation

(a,1),(b,1),(a,5),(b,3),(a,4),(b,1),(c,1)

for each keys a,b,c the inputs (x,y) would be as below:

a - Input = (1,5) followed by Operation (x+y) = 6
  Now, result becomes input along with next value. So, here Input (x,y) is (6,4). followed by operation (x+y) = 10

Finally, (a,10) is the result of this reduceBykey.

b - Input = (1,3) followed by Operation (x+y) = 4
    Now, Input = (4,1) Operation (4+1) Result = 5
    Finally, (b,5)
c - (c,1) as no other values

reduceByKey - performs the operation on values in each partition and sends the result to Driver and here again the same operation is performed. Here load on network would be less as the operation is performed in all the nodes and the result specific to each key and each node would be sent to driver node. In the driver node again the operation is applied to get final result.

Consider,

P1 has,
(a,1)
(b,1)          P1 output: (a,6) and (b,1)
(a,5)

P2 has,
(b,3)
(a,4) P2 output: (a,4), (b,4) and (c,1)
(b,1)
(c,1)

2 Values from P1 and 3 values from P2 would be sent to Driver node. So, less network traffic as oppeosed to groupBykey.

Difference between GroupByKey and ReduceByKey

http://www.ruxizhang.com/blog/spark-difference-between-reducebykey-groupbykey

                                                 foldBykey


fold() is similar to reduce() except that it takes an Initial value 'Zero value'. Also, there is difference in syntax and kind of inputs

fold(acc:T)((acc,value) => acc)

It has following three things

1. T is the data type of RDD
2. acc is accumulator of type T which will be return value of the fold operation.
3. A function , which will be called for each element in rdd with previous accumulator.

List (1,2,3,4)
fold(5)((acc,val) => acc + val)

In the first iteration,

acc = 5 and val = 1 which will go as input to fold(). here acc + val will be applied, so 5 + 1 =6.

In second iteration,

acc = 6 which is the result of 1st Iteration and val is the next item in the list i.e 2. Now, the Input to fold() is (6,2). so 6 + 2 = 8

In third iteration,
acc = 8 and val = 3.

foldbykey acts on pair RDD's, yet the concept is same.

                                     combineByKey



1. create combiner - First value for a specific key in a specific partition.
i/p: (v)
o/p: (v,1)

2. Merge value - Second value of a specific key in a specific partition and (v,1) from create combine would be carried to this as (accum : (Int,Int),v)

i/p: (accum : (Int,Int),v)
o/p: (key,(total,count))

3. Merge combiner - merges all the values across the partitions in the executors and sends the data back to the driver.

i/p: (accum1: (Int,Int), accum2: (Int,Int))
o/p: (key, (totalAcrossAllPartitions, countAcrossAllPartitions)).

code:


val marks = List(("a",30),("a",20),("b",15),("c",10),("b",5),("a",40),("b",30),("c",15),("a",10),("b",20),("c",10))


val rddmarks = sc.parallelize(marks,2)

val createScorecombiner = (score: Int) => { 
(score,1)
}

val createmergecombiner = (accum: (Int,Int),newmarks: Int)  => {
val (totalscores, countScores) = accum
(totalscores + newmarks, countScores +1)
}

val createmergevalues = (accum1: (Int,Int), accum2: (Int,Int)) => {
      val (totalscores1, countScores1) = accum1
      val (totalscores2, countScores2) = accum2
      (totalscores1 + totalscores2, countScores1 + countScores2)
}

val totalvalscount = rddmarks.combineByKey(createScorecombiner,createmergecombiner,createmergevalues)


def calculateAvg(collector: (String, (Int, Int))) =
      {
      val (keyname, (totalmarks, count)) = collector
      val avg = totalmarks/count
      (keyname,avg)
      }

val out = totalvalscount.map(calculateAvg)

out.collect

                                   

                                 aggregateByKey


In aggregateBykey- initial value could be 0, In this case 

P1
(foo, A)
(foo, A)
(bar, C)
(foo, A)

P2
(foo, B)
(bar, C)
(bar, D)
(foo, A)

aggregrateBykey(0)(((accum: x, val: y) => accum + 1), (acc1: Int, acc2: Int))     //In this case we are completely ignoring values like A,A,C,A,B,C,D,A.

Step 1: Initial accum value is 0, so

(accum: x, val: y) => accum + 1) = x is 0 and y is A. As we are ignoring value so accum + 1 = 0 + 1

Step 2: Now, (accum: x, val: y) => accum + 1) = x is 1 and operation applied is accum + 1 = 1 + 1 = 2

Step 3: Initial accum value is 0, so   //Here new key bar has encountered.

(accum: x, val: y) => accum + 1) = x is 0 and y is C. As we are ignoring value so accum + 1 = 0 + 1


Difference between AggregateBykey and combineBykey lies in input type of  functions mergeValues, mergeCombiner.

3 comments:

  1. In aggregateBykey- initial value could be 0, In this case

    P1
    (foo, A)
    (foo, A)
    (bar, C)
    (foo, A)

    P2
    (foo, B)
    (bar, C)
    (bar, D)
    (foo, A) aggregrateBykey can be done using pyspark?

    ReplyDelete
  2. Your post is very great.I read this post. It’s very helpful. I will definitely go ahead and take advantage of this. You absolutely have wonderful stories. Cheers for sharing with us your blog. For more learning about data science visit at Data science course in Bangalore

    ReplyDelete