Thursday, February 23, 2017

Spark Core Functions

Attached are some of the Core RDD functions

https://drive.google.com/open?id=0BzG0wQkWbKpLVGF6NGZucnpmR00

Practice Code:

[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"


scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27

scala> words.collect()
res3: Array[String] = Array(I                Love Hadoop, I    Like    Spark   Coding, "   I   Love    Combination    of   Both")

scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29

scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)

///mapPartitions as an Alternative to flatMap////

val allWords = words.mapPartitions{ lines =>
     {
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))

scala> val allWords = words.mapPartitionsWithIndex{
      (index, lines) => {
      println("called in Partition ->" + index)
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))


def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}

val allWOrds = words.mapPartitions(customFunc)

/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)

def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
     | pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
     | }


//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")

scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29

scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)

Contains Returns  true or false

/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)

scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)

scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)

eg2:
val logs = List("Success and built",
     | "ERROR build failed",
     | "Success Packaging Completed",
     | "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)

scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)

errs.map(x => println(x))           OR          scala> errs.foreach(println)
ERROR build failed                ERROR build failed   
ERROR Checkout Failed                ERROR Checkout Failed

val r5 = r3.filter(x => x._1 != "");    //For Key value pairs
///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27

scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13


scala> def sexsalpair(rec: String) = {
     | val itms = rec.split(",")
     | (itms(3),itms(2))
     | }
sexsalpair: (rec: String)(String, String)

scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31

scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33

scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))

/////////////////Union/////////////////

scala> val l1 = List(10,20,30,40,50)

scala> val l2 = List(10,25,30,45,50)

scala> val r1 = sc.parallelize(l1)

scala> val r2 = sc.parallelize(l2)

scala> val r3 = r1.union(r2)

/////////////countBYValue////////////

scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27

scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)


////////////cogroup///////////////////
scala> val rdd1 = sc.parallelize(Seq(("key1",1),("key2",2),("key3",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:27

scala> val rdd2 = sc.parallelize(Seq(("key1",5),("key2",6)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27

scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[32] at cogroup at <console>:31

scala> grouped.collect()
res11: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key2,(CompactBuffer(2),CompactBuffer(6))), (key3,(CompactBuffer(3),CompactBuffer())), (key1,(CompactBuffer(1),CompactBuffer(5))))

////////////////////

Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely

SOl:
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_) 
words.foreach(println)

///This will not workout in this manner Need to use Aggregrate function///////////

Exercise 2:

file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13

Transform to case class of id, Name, salary, Gender, DeptNO

val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)

def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}

val empvalues = emp.map(x => MakeEmpClass(x))

val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35

Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna


Exercise 3: From the above data set, generate following Report

Above Avg salary 3
Below Avg salary 2

Solution:

Approach 1:

scala> def extractSal(emprecs: empdata):  Int = {
      val sal = emprecs.Salary
      sal
      }

val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()

def makepair(sal:Int, avg: Long) = {
                  println("sal =" + sal)
                  println("avg =" + avg)
                  var retval = 0
                  if(avg > sal){
                  retval =1
                  }
                  (retval)
                  }

val status = sals.map(x=> makepair(x,avg))       //IN this function we pass 'avg' a value from outside to a function

scala> status.foreach(println)                  //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0

val grp = status.groupBy(x => x)            // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))

Approach 2: Using cartesian. This is better approach

scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)

scala> val esal = emprec.map{
     | x => x.split(",")
     | }

scala> val esal = emprec.map{ x=>
     | val sal = x.split(",")
     | val salary = sal(2).toInt
     | salary
     | }

scala> val tot = esal.sum
tot: Double = 175000.0

scala> val avg = tot/esal.count
avg: Double = 35000.0                                                          

scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35

scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37

scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)

scala> val res = trans.map{ x =>
     | if(x._1 >= x._2) "Above" else "Below"}

scala> res.foreach(println)
Below
Below
Above
Below
Above

1 comment: