Attached are some of the Core RDD functions
https://drive.google.com/open?id=0BzG0wQkWbKpLVGF6NGZucnpmR00
Practice Code:
[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"
scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27
scala> words.collect()
res3: Array[String] = Array(I Love Hadoop, I Like Spark Coding, " I Love Combination of Both")
scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29
scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)
///mapPartitions as an Alternative to flatMap////
val allWords = words.mapPartitions{ lines =>
{
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
}
scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))
scala> val allWords = words.mapPartitionsWithIndex{
(index, lines) => {
println("called in Partition ->" + index)
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
}
scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))
def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
val allWOrds = words.mapPartitions(customFunc)
/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)
def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
| pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
| }
//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")
scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29
scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)
Contains Returns true or false
/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)
scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)
scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)
eg2:
val logs = List("Success and built",
| "ERROR build failed",
| "Success Packaging Completed",
| "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)
scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)
errs.map(x => println(x)) OR scala> errs.foreach(println)
ERROR build failed ERROR build failed
ERROR Checkout Failed ERROR Checkout Failed
val r5 = r3.filter(x => x._1 != ""); //For Key value pairs
///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27
scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13
scala> def sexsalpair(rec: String) = {
| val itms = rec.split(",")
| (itms(3),itms(2))
| }
sexsalpair: (rec: String)(String, String)
scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31
scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33
scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))
/////////////////Union/////////////////
scala> val l1 = List(10,20,30,40,50)
scala> val l2 = List(10,25,30,45,50)
scala> val r1 = sc.parallelize(l1)
scala> val r2 = sc.parallelize(l2)
scala> val r3 = r1.union(r2)
/////////////countBYValue////////////
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27
scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
////////////cogroup///////////////////
scala> val rdd1 = sc.parallelize(Seq(("key1",1),("key2",2),("key3",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:27
scala> val rdd2 = sc.parallelize(Seq(("key1",5),("key2",6)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[32] at cogroup at <console>:31
scala> grouped.collect()
res11: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key2,(CompactBuffer(2),CompactBuffer(6))), (key3,(CompactBuffer(3),CompactBuffer())), (key1,(CompactBuffer(1),CompactBuffer(5))))
////////////////////
Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely
SOl:
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_)
words.foreach(println)
///This will not workout in this manner Need to use Aggregrate function///////////
Exercise 2:
file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13
Transform to case class of id, Name, salary, Gender, DeptNO
val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)
def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}
val empvalues = emp.map(x => MakeEmpClass(x))
val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35
Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna
Exercise 3: From the above data set, generate following Report
Above Avg salary 3
Below Avg salary 2
Solution:
Approach 1:
scala> def extractSal(emprecs: empdata): Int = {
val sal = emprecs.Salary
sal
}
val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()
def makepair(sal:Int, avg: Long) = {
println("sal =" + sal)
println("avg =" + avg)
var retval = 0
if(avg > sal){
retval =1
}
(retval)
}
val status = sals.map(x=> makepair(x,avg)) //IN this function we pass 'avg' a value from outside to a function
scala> status.foreach(println) //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0
val grp = status.groupBy(x => x) // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))
Approach 2: Using cartesian. This is better approach
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)
scala> val esal = emprec.map{
| x => x.split(",")
| }
scala> val esal = emprec.map{ x=>
| val sal = x.split(",")
| val salary = sal(2).toInt
| salary
| }
scala> val tot = esal.sum
tot: Double = 175000.0
scala> val avg = tot/esal.count
avg: Double = 35000.0
scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35
scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37
scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)
scala> val res = trans.map{ x =>
| if(x._1 >= x._2) "Above" else "Below"}
scala> res.foreach(println)
Below
Below
Above
Below
Above
https://drive.google.com/open?id=0BzG0wQkWbKpLVGF6NGZucnpmR00
Practice Code:
[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"
scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27
scala> words.collect()
res3: Array[String] = Array(I Love Hadoop, I Like Spark Coding, " I Love Combination of Both")
scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29
scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)
///mapPartitions as an Alternative to flatMap////
val allWords = words.mapPartitions{ lines =>
{
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
}
scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))
scala> val allWords = words.mapPartitionsWithIndex{
(index, lines) => {
println("called in Partition ->" + index)
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
}
scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))
def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}
val allWOrds = words.mapPartitions(customFunc)
/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)
def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
| pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
| }
//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")
scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29
scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)
Contains Returns true or false
/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)
scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)
scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)
eg2:
val logs = List("Success and built",
| "ERROR build failed",
| "Success Packaging Completed",
| "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)
scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)
errs.map(x => println(x)) OR scala> errs.foreach(println)
ERROR build failed ERROR build failed
ERROR Checkout Failed ERROR Checkout Failed
val r5 = r3.filter(x => x._1 != ""); //For Key value pairs
///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27
scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13
scala> def sexsalpair(rec: String) = {
| val itms = rec.split(",")
| (itms(3),itms(2))
| }
sexsalpair: (rec: String)(String, String)
scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31
scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33
scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))
/////////////////Union/////////////////
scala> val l1 = List(10,20,30,40,50)
scala> val l2 = List(10,25,30,45,50)
scala> val r1 = sc.parallelize(l1)
scala> val r2 = sc.parallelize(l2)
scala> val r3 = r1.union(r2)
/////////////countBYValue////////////
scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27
scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
////////////cogroup///////////////////
scala> val rdd1 = sc.parallelize(Seq(("key1",1),("key2",2),("key3",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:27
scala> val rdd2 = sc.parallelize(Seq(("key1",5),("key2",6)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[32] at cogroup at <console>:31
scala> grouped.collect()
res11: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key2,(CompactBuffer(2),CompactBuffer(6))), (key3,(CompactBuffer(3),CompactBuffer())), (key1,(CompactBuffer(1),CompactBuffer(5))))
////////////////////
Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely
SOl:
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_)
words.foreach(println)
///This will not workout in this manner Need to use Aggregrate function///////////
Exercise 2:
file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13
Transform to case class of id, Name, salary, Gender, DeptNO
val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)
def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}
val empvalues = emp.map(x => MakeEmpClass(x))
val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35
Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna
Exercise 3: From the above data set, generate following Report
Above Avg salary 3
Below Avg salary 2
Solution:
Approach 1:
scala> def extractSal(emprecs: empdata): Int = {
val sal = emprecs.Salary
sal
}
val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()
def makepair(sal:Int, avg: Long) = {
println("sal =" + sal)
println("avg =" + avg)
var retval = 0
if(avg > sal){
retval =1
}
(retval)
}
val status = sals.map(x=> makepair(x,avg)) //IN this function we pass 'avg' a value from outside to a function
scala> status.foreach(println) //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0
val grp = status.groupBy(x => x) // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))
Approach 2: Using cartesian. This is better approach
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)
scala> val esal = emprec.map{
| x => x.split(",")
| }
scala> val esal = emprec.map{ x=>
| val sal = x.split(",")
| val salary = sal(2).toInt
| salary
| }
scala> val tot = esal.sum
tot: Double = 175000.0
scala> val avg = tot/esal.count
avg: Double = 35000.0
scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35
scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37
scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)
scala> val res = trans.map{ x =>
| if(x._1 >= x._2) "Above" else "Below"}
scala> res.foreach(println)
Below
Below
Above
Below
Above
very nice article,Thank you ..
ReplyDeleteKeep updating..
Big Data Hadoop Certification