When RDD's will be creaed:
1. When data is loaded from file using spark context SC eg: val r1 = sc.textfile("FilePATH")
2. When filter or transformation operation is performed over RDD. eg: val r2 = r1.flatMap(_.split(" "))
In this transformation phase, previous RDD will comeout of RAM whenever a newer RDD is ready.
3. When scala/python/JAVA object is parallized.
In case of any crash, the Execution flow will start from begining. When any RDD is set a persist, and once its execution is completed, from then this RDD is available in RAM until unpersist is called. Advanatage - in case of any crash in execution the execution will not begin from starting and will continue from the Persisted RDD.
PERSIST OPTIONS:
MEMORY_ONLY (default)
Format: Object
Adv: Speed( at the time of ruse no need to Deserialize)
disAdv: More Memory(RAM) space
MEMORY_SER_ONLY
Format: Byte( Serialized format)
Adv: Less Memory space(compressed format)
DisAdv: Deserialization is required
GroupBy - Spark RDD groupBy function returns an RDD of grouped items
Useful Links:
http://backtobazics.com/category/big-data/spark/
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Practice:
RDD functions description is under https://drive.google.com/open?id=0BzG0wQkWbKpLYUgwRHNRRkFia2M
scala> val names = List("Raj", "Ravi", "Rohan", "Leela Prasad", "Vinay")
names: List[String] = List(Raj, Ravi, Rohan, Leela Prasad, Vinay)
scala> val rdd = sc.parallize(names,2)
<console>:29: error: value parallize is not a member of org.apache.spark.SparkContext
val rdd = sc.parallize(names,2)
^
scala> val rdd = sc.parallelize(names, 2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:29
scala> val rdd2 = rdd.map(x=> x.length
length lengthCompare
scala> val rdd2 = rdd.map(x=> (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[5] at map at <console>:33
scala> val r3 = rdd2.groupBy(t => t._1)
r3: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] = ShuffledRDD[9] at groupBy at <console>:37
scala> r3.foreach(println)
(4,CompactBuffer((4,Ravi)))
(12,CompactBuffer((12,Leela Prasad)))
(3,CompactBuffer((3,Raj)))
(5,CompactBuffer((5,Rohan), (5,Vinay)))
scala> val list1 = List(1,2,3,4,5,6)
list1: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val list2 = List('a', 'b', 'c', 'd', 'e')
list2: List[Char] = List(a, b, c, d, e)
scala> val rdd1 = sc.parallelize(list1,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:29
scala> val rdd2 = rdd1.reduce((a,b) => a+b)
rdd2: Int = 21
scala> val rdd2 = rdd1.reduce((a,b) => a*b)
rdd2: Int = 720
scala> val rdd2 = rdd1.reduce((a,b) => if(a>b) a else b)
rdd2: Int = 6
scala> rdd1.fold(0)((a,b) => a+b)
res32: Int = 21
scala> rdd1.fold(0)((a,b) => a*b)
res33: Int = 0
scala> rdd1.fold(1)((a,b) => a*b)
res34: Int = 720
scala> rdd1.reduce((a,b) => a*b)
res35: Int = 720
scala> val List3 = List("I am going to Hyd", "I am Learning", "I Love spark")
List3: List[String] = List(I am going to Hyd, I am Learning, I Love spark)
scala> val rdd3 = sc.parallelize(List3,2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:29
scala> val rdd4 = rdd3.flatMap
flatMap flatMapWith
scala> val rdd4 = rdd3.flatMap(x => x.sp
span splitAt
scala> val rdd4 = rdd3.flatMap(x => x.split(" "))
rdd4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at flatMap at <console>:33
scala> rdd4.foreach(println)
I
am
going
to
Hyd
I
am
Learning
I
Love
spark
scala> val rdd4 = rdd3.flatMap(x => x.split(" ")).map(x => (x,1))
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:33
scala> rdd4.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(Hyd,1)
(I,1)
(am,1)
(Learning,1)
(I,1)
(Love,1)
(spark,1)
scala> val rddout = rdd3.flatMap(x => x.split(" ")).map((_,1)).reduceByKey((a,b) => a+b).sortBy(t => t._1)
rddout: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[43] at sortBy at <console>:37
scala> rddout.foreach(println)
(Hyd,1)
(I,3)
(Learning,1)
(Love,1)
(am,2)
(going,1)
(spark,1)
(to,1)
scala> rdd3.flatMap(x => x.split(" ")).map((_,1)).reduce((a,b) => a+b)
<console>:36: error: type mismatch;
found : (String, Int)
required: String //This gives error because reduce will add a+b where a is string and b is int.
rdd3.flatMap(x => x.split(" ")).map((_,1)).reduce((a,b) => a+b) //However reduceByKey can be used which reduces based on key.
^
scala> val rddout = rdd3.flatMap(x => x.split(" ")).map((_,1)).reduceByKey((a,b) => a+b).sortByKey().collect.foreach(println)
//collect will get the data from the partitions and ensure the data comes in sorted order without applying sortByKey(). DOnot use collect in realtime as the data from all the worker machines will be collected to the master node where Application Master runs
///////////countByValue/////////////////////s
scala> rdd3.flatMap(x => x.split(" ")).countByValue().foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(spark,1)
(learning,1)
///////////Distinct:///////////////
scala> val rddout = rdd3.flatMap(x => x.split(" ")).foreach(println)
I
am
going
to
Hyd
I
am
Learning
I
Love
spark
rddout: Unit = ()
scala> val rddout = rdd3.flatMap(x => x.split(" ")).distinct().foreach(println)
am
Love
going
Learning
spark
I
to
Hyd
rddout: Unit = ()
scala> prdd1.foreach(println)
(aaa,3)
(bb,2)
(ccc,3)
(ddd,3)
scala> prdd2.foreach(println)
[Stage 85:> (0 + 0) / 2](aa,2)
(bbb,3)
(cc,2)
(dd,2)
scala> prdd1.leftOuterJoin(prdd2).foreach(println)
(ddd,(3,None))
(bb,(2,None))
(ccc,(3,None))
(aaa,(3,None))
scala> prdd1.union(prdd2).foreach(println)
(aaa,3)
(bb,2)
(ccc,3)
(ddd,3)
(aa,2)
(bbb,3)
(cc,2)
(dd,2)
scala> prdd1.foreach(println)
(3,aaa)
(2,bb)
(3,ccc)
(3,ddd)
scala> prdd2.foreach(println)
(2,aa)
(3,bbb)
(2,cc)
(2,dd)
scala> prdd1.join(prdd2).foreach(println)
(2,(bb,aa))
(2,(bb,cc))
(2,(bb,dd))
(3,(aaa,bbb))
(3,(ccc,bbb))
(3,(ddd,bbb))
scala> prdd1.foreach(println)
(3,aaa)
(2,bb)
(3,ccc)
(3,ddd)
scala> prdd2.foreach(println)
(2,aa)
(3,bbb)
(2,cc)
(2,dd)
scala> prdd1.cogroup(prdd2).foreach(println)
(2,(CompactBuffer(bb),CompactBuffer(aa, cc, dd)))
(3,(CompactBuffer(aaa, ccc, ddd),CompactBuffer(bbb)))
[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"
scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27
scala> words.collect()
res3: Array[String] = Array(I Love Hadoop, I Like Spark Coding, " I Love Combination of Both")
scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29
scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)
///mapPartitions as an Alternative to flatMap////
val allWords = words.mapPartitions{ lines =>
{
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
}
scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))
scala> val allWords = words.mapPartitionsWithIndex{
(index, lines) => {
println("called in Partition ->" + index)
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
}
scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))
def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
val allWOrds = words.mapPartitions(customFunc)
/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)
def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
| pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
| }
//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")
scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29
scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)
Contains Returns true or false
/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)
scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)
scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)
eg2:
val logs = List("Success and built",
| "ERROR build failed",
| "Success Packaging Completed",
| "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)
scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)
errs.map(x => println(x)) OR scala> errs.foreach(println)
ERROR build failed ERROR build failed
ERROR Checkout Failed ERROR Checkout Failed
///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27
scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13
scala> def sexsalpair(rec: String) = {
| val itms = rec.split(",")
| (itms(3),itms(2))
| }
sexsalpair: (rec: String)(String, String)
scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31
scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33
scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))
/////////////////Union/////////////////
scala> val l1 = List(10,20,30,40,50)
scala> val l2 = List(10,25,30,45,50)
scala> val r1 = sc.parallelize(l1)
scala> val r2 = sc.parallelize(l2)
scala> val r3 = r1.union(r2)
/////////////countBYValue////////////
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27
scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
////////////////////
Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely
SOl:
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_)
words.foreach(println)
///This will not workout in this manner Need to use Aggregrate function///////////
Exercise 2:
file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13
Transform to case class of id, Name, salary, Gender, DeptNO
val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)
def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}
val empvalues = emp.map(x => MakeEmpClass(x))
val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35
Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna
Exercise 3: From the above data set, generate following Report
Above Avg salary 3
Below Avg salary 2
Solution:
Approach 1:
scala> def extractSal(emprecs: empdata): Int = {
val sal = emprecs.Salary
sal
}
val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()
def makepair(sal:Int, avg: Long) = {
println("sal =" + sal)
println("avg =" + avg)
var retval = 0
if(avg > sal){
retval =1
}
(retval)
}
val status = sals.map(x=> makepair(x,avg)) //IN this function we pass 'avg' a value from outside to a function
scala> status.foreach(println) //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0
val grp = status.groupBy(x => x) // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))
Approach 2: Using cartesian. This is better approach
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)
scala> val esal = emprec.map{
| x => x.split(",")
| }
scala> val esal = emprec.map{ x=>
| val sal = x.split(",")
| val salary = sal(2).toInt
| salary
| }
scala> val tot = esal.sum
tot: Double = 175000.0
scala> val avg = tot/esal.count
avg: Double = 35000.0
scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35
scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37
scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)
scala> val res = trans.map{ x =>
| if(x._1 >= x._2) "Above" else "Below"}
scala> res.foreach(println)
Below
Below
Above
Below
Above
GroupBykey:
Consider an RDD in 2 Partitions, based on Keys a and b the values in each partition will be shuffled to form 2 sets based on keys.
groupByKey will shuffle all the value key pairs as the diagrams show.
ReducebyKey:
bykey means key will be applied automatically for values of same keys.Values of same keys will be passes directly to the function as Input.
Eg: reduceBykey((x,y) => (x+y))
Input Operation
(a,1),(b,1),(a,5),(b,3),(a,4),(b,1),(c,1)
for each keys a,b,c the inputs (x,y) would be as below:
a - Input = (1,5) followed by Operation (x+y) = 6
Now, result becomes input along with next value. So, here Input (x,y) is (6,4). followed by operation (x+y) = 10
Finally, (a,10) is the result of this reduceBykey.
b - Input = (1,3) followed by Operation (x+y) = 4
Now, Input = (4,1) Operation (4+1) Result = 5
Finally, (b,5)
c - (c,1) as no other values
reduceByKey - performs the operation on values in each partition and sends the result to Driver and here again the same operation is performed. Here load on network would be less as the operation is performed in all the nodes and the result specific to each key and each node would be sent to driver node. In the driver node again the operation is applied to get final result.
Consider,
P1 has,
(a,1)
(b,1) P1 output: (a,6) and (b,1)
(a,5)
P2 has,
(b,3)
(a,4) P2 output: (a,4), (b,4) and (c,1)
(b,1)
(c,1)
2 Values from P1 and 3 values from P2 would be sent to Driver node. So, less network traffic as oppeosed to groupBykey.
Difference between GroupByKey and ReduceByKey
https://medium.com/@sujathamudadla1213/explain-the-difference-between-groupbykey-and-reducebykey-bf0171e985ac
foldBykey
fold() is similar to reduce() except that it takes an Initial value 'Zero value'. Also, there is difference in syntax and kind of inputs
fold(acc:T)((acc,value) => acc)
It has following three things
1. T is the data type of RDD
2. acc is accumulator of type T which will be return value of the fold operation.
3. A function , which will be called for each element in rdd with previous accumulator.
List (1,2,3,4)
fold(5)((acc,val) => acc + val)
In the first iteration,
acc = 5 and val = 1 which will go as input to fold(). here acc + val will be applied, so 5 + 1 =6.
In second iteration,
acc = 6 which is the result of 1st Iteration and val is the next item in the list i.e 2. Now, the Input to fold() is (6,2). so 6 + 2 = 8
In third iteration,
acc = 8 and val = 3.
foldbykey acts on pair RDD's, yet the concept is same.
combineByKey
refer https://www.youtube.com/watch?v=vJZup4dRL6s , http://codingjunkie.net/spark-combine-by-key/ and https://www.edureka.co/blog/apache-spark-combinebykey-explained
1. create combiner - First value for a specific key in a specific partition.
i/p: (v)
o/p: (v,1)
2. Merge value - Second value of a specific key in a specific partition and (v,1) from create combine would be carried to this as (accum : (Int,Int),v)
i/p: (accum : (Int,Int),v)
o/p: (key,(total,count))
3. Merge combiner - merges all the values across the partitions in the executors and sends the data back to the driver.
i/p: (accum1: (Int,Int), accum2: (Int,Int))
o/p: (key, (totalAcrossAllPartitions, countAcrossAllPartitions)).
code:
val marks = List(("a",30),("a",20),("b",15),("c",10),("b",5),("a",40),("b",30),("c",15),("a",10),("b",20),("c",10))
val rddmarks = sc.parallelize(marks,2)
val createScorecombiner = (score: Int) => {
(score,1)
}
val createmergecombiner = (accum: (Int,Int),newmarks: Int) => {
val (totalscores, countScores) = accum
(totalscores + newmarks, countScores +1)
}
val createmergevalues = (accum1: (Int,Int), accum2: (Int,Int)) => {
val (totalscores1, countScores1) = accum1
val (totalscores2, countScores2) = accum2
(totalscores1 + totalscores2, countScores1 + countScores2)
}
val totalvalscount = rddmarks.combineByKey(createScorecombiner,createmergecombiner,createmergevalues)
def calculateAvg(collector: (String, (Int, Int))) =
{
val (keyname, (totalmarks, count)) = collector
val avg = totalmarks/count
(keyname,avg)
}
val out = totalvalscount.map(calculateAvg)
out.collect
aggregateByKey
P1
(foo, A)
(foo, A)
(bar, C)
(foo, A)
P2
(foo, B)
(bar, C)
(bar, D)
(foo, A)
aggregrateBykey(0)(((accum: x, val: y) => accum + 1), (acc1: Int, acc2: Int)) //In this case we are completely ignoring values like A,A,C,A,B,C,D,A.
Step 1: Initial accum value is 0, so
(accum: x, val: y) => accum + 1) = x is 0 and y is A. As we are ignoring value so accum + 1 = 0 + 1
Step 2: Now, (accum: x, val: y) => accum + 1) = x is 1 and operation applied is accum + 1 = 1 + 1 = 2
Step 3: Initial accum value is 0, so //Here new key bar has encountered.
(accum: x, val: y) => accum + 1) = x is 0 and y is C. As we are ignoring value so accum + 1 = 0 + 1
Difference between AggregateBykey and combineBykey lies in input type of functions mergeValues, mergeCombiner.
In aggregateBykey- initial value could be 0, In this case
ReplyDeleteP1
(foo, A)
(foo, A)
(bar, C)
(foo, A)
P2
(foo, B)
(bar, C)
(bar, D)
(foo, A) aggregrateBykey can be done using pyspark?
Good Blog
ReplyDeleteWe are making the Best Software training in bangalore.
Software Training Institute in Bangalore
Selenium Training in Bangalore
Hadoop Training in Bangalore
Devops Training in Bangalore
Python Training in Bangalore
RPA Training in Bangalore
AWS Training in Bangalore
TABLEAU Training in Bangalore
Spark Training in Bangalore
Your post is very great.I read this post. It’s very helpful. I will definitely go ahead and take advantage of this. You absolutely have wonderful stories. Cheers for sharing with us your blog. For more learning about data science visit at Data science course in Bangalore
ReplyDelete