Thursday, September 21, 2017

Scala

Scala:

Scala is a Hybrid programming which is a combination of Functional Programming + Object Oriented Programming.

Bigdata engineers are more inclined towards Scala's Functional Programming feature.

Functional Programming generally deals with applying functions on data sets and inputs are immutable. No pass by reference.

Fields:

 - Each object has its unique set of instance variables, In the below example numToppings,size and maxNumToppings are Fields. Abstract and concrete assignments are also shown below.
Eg:
trait PizzaTrait {
    var numToppings: Int     // abstract
    var size = 14            // concrete
    val maxNumToppings = 10  // concrete
}

Closure

 − A closure is a function, which can operate (or) its return value depends on the value of one or more variables declared outside this function.

Eg: See the below code, here the function is known as closure because it uses 'm_hi' which is declared and assigned outside its function.

scala> var m_hi = "Hie"
hi: String = Hie

scala> def sayHie(name: String): Unit =
     | {
     | println(m_hi + " " + name)
     | }
sayHie: (name: String)Unit

scala> sayHie("Leela3")
Hie Leela3

Traits 

− A trait encapsulates method and field definitions, which can then be reused by mixing them into classes. Traits are used to define object types by specifying the signature of the supported methods. These are like interfaces in Java.

Functions with Variable names:

In the below example square is a variable that has functionality of squaring and is used in map() for squaring the values in the List.
val addVals = { (a: Int, b: Int) => a + b }

OR

scala> val square = (a: Int) => a * a
square: Int => Int = $$Lambda$1323/1080359903@3feedaec

scala> square(3)
res14: Int = 9

scala> val nums = List(1,2,3)
nums: List[Int] = List(1, 2, 3)

scala> nums.map(square)
res16: List[Int] = List(1, 4, 9)

Variables as functions: 

variables as functions are also called as first class functions .

Higher Order Function:

If a function does any one of the below 2 activities:

1. Takes 1 or more functions as input
2. returns a function


Eg: In the below example exec function is called Higher Order function as it takes another function sayHi as input.
scala> def sayHi(name1: String, name2: String): Unit =
     | {
     | println("Hi " + name1 + name2)
     | }
sayHi: (name: String)Unit

scala> sayHi("Leela","PRASAD")
Hi LeelaPRASAD

scala> def exec(f: (String,String) => Unit,name: String) =
     | {
     | f(name,name)
     | }
exec: (f: (String, String) => Unit, name: String)Unit

scala> exec(sayHi,"Leela")
Hi LeelaLeela

Anonymous Function:

A general function signature: def doubled(i: Int): Int = { i*2}

Anonymous function is (i: Int) => {i*2}
This is also called as function literal

Difference lies in => instead of = in general function.

Eg:
scala> val twiceVal = (i: Int) => {i * 2}
twiceVal: Int => Int = $$Lambda$1053/961628534@7bdb4f17

scala> twiceVal(3)
res1: Int = 6

Why to use anonymous to be used in place of normal function.

In case of scenarios where we might need to use inline functions which are used only in this place. So, instead of creating a function anonymous functions can be used.

Another example where a function returns a function using Anonymous functions.

This functions returns doubler value if the input passed is positive and tripler if the input is negative.

Signature for returning a function. Here an anonymous function/literal has to be defined that can return a function

def multiplier(i: Int) = (i:Int) => { body of function to return a function}

This literal (i:Int) => { body of function to return a function} says that Int is the input for this function. This function has to be applied in map() by looping through the List of Integers which are inputs for this literal.

def multiplier(c: Int) = (i: Int) => {
     val doubler = (x: Int) => { x * 2}         //a variable as function that takes Int as input
     val tripler = (x: Int) => { x * 3}         //a variable as function that takes Int as input
     if(c > 0)
        doubler(i)
     else
        tripler(i)
}

Here, multiplier is a Higher order function that returns a function( either doubler/tripler) which is going to get operated on list values inside Map function.

To call this,

val a  = 1 to 5
scala> a.map(multiplier(-1))
res17: scala.collection.immutable.IndexedSeq[Int] = Vector(3, 6, 9, 12, 15)
                           OR
scala> a.map(multiplier(1))
res18: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4, 6, 8, 10)

Lambdas:

 An expression can be passed directly to a function because the expression x => x*x is passed directly to map().
Eg:
scala> val square = (a: Int) => a * a

scala> val nums = List(1,2,3)
nums: List[Int] = List(1, 2, 3)

scala> nums.map(square)
res16: List[Int] = List(1, 4, 9)

OR
scala> nums.map(x => x*x)
res16: List[Int] = List(1, 4, 9)

Source: http://michaelpnash.github.io/scala-lambdas/

Partially Applied function: 

If only a few arguments are sent, then you get back a partially applied function. This gives you the convenience of binding some arguments and leaving the rest to be filled in later. here when nme_partial is created only 1 argument is passed and second one is left as _: String to be passed later.

scala> def nameMsg(name: String, msg: String) = {
     | println(name + msg)
     | }
nameMsg: (name: String, msg: String)Unit

scala> val nme = nameMsg("Leela ", "Evening")
Leela Evening
nme: Unit = ()

scala> val nme_partial = nameMsg("Leela ", _: String)
nme: String => Unit = $$Lambda$1326/167993803@569fb30d

scala> nme_partial("Morning")
Leela Morning

Traits: 

A trait encapsulates method and field definitions, which can then be reused by mixing them into classes. Unlike class inheritance, in which each class must inherit from just one superclass, a class can mix in any number of traits.

A trait definition looks just like a class definition except that it uses the keyword trait. The following is the basic example syntax of trait.

trait Equal {
   def isEqual(x: Any): Boolean
   def isNotEqual(x: Any): Boolean = !isEqual(x)
}

Trait Equal contain two methods isEqual() and isNotEqual(). The trait Equal contain one implemented method that is isEqual() so when user defined class Point extends the trait Equal, implementation to isEqual() method in Point class should be provided.

Implicit:

See the below code snippet

Create a function that takes last argument as implicit
def addHello(name: String)(implicit ss: String): Unit = println(name + " " + ss)

Create an object of implicit type, in the above line implicit argument type is String. So created String type object.
implicit val qq = new String("Hello")

Now, call the function that takes implicit argument as its last argument.
addHello("Leela")

Output: Leela Hello

Background: Whenever an argument is declared as implicit in the functions signature, then it would have the capability to look for the arguments declared as implicit of the implicit type in that context. Usually last argument would be the implicit one. In the 2nd line in the above code snippet String object of type implicit is declared. So addHello function has taken it as its last argument as type also matched. "Leela" is passed explicitly and "Hello" is passed explicitly.

Only 1 implicit variable has to be declared, If more than 1 implicit variables are declared then this leads to error as below:

scala> addHello("Leela")
<console>:34: error: ambiguous implicit values:
 both value qq of type => String
 and value qq2 of type => String
 match expected type String
              addHello("Leela")

Fold:

Takes 2 arguments, the start value and a function. This function also takes two arguments; the accumulated value and the current item in the list.

Eg:

val s = List(3,5,7,9)

s.fold(0) { _+_}
res10: Int = 24

s.fold(5) { _+_}
res10: Int = 29

Currying:


Consider below code,

scala> def curriedSum(x: Int)(y: Int) = x+y

scala> curriedSum(1)(2)
res11: Int = 3

When curriedSum is invoked, 2 function invocations gets called back to back. The first function invocation takes a single Int parameter named x and returns a function value for the second function. The above functionality can be acheived with the below code.

scala> val onePlus = curriedSum(1)_

scala> onePlus(2)
res12: Int = 3

fold(), MappartitionswithIndex() are few currying functions examples.

for comprehension:


basic structure: for(seq) { expr }
                 
seq: Condition and controls the iterations
expr: the functionaity that has to be executed.

Eg:
val nums = 1 to 5

for( i <- nums){ println(i) }
          OR
for( i <- 1 to 5){ println(i) }

Eg:

for(country <- List("India","USA","China","Japan")) {
           country match {
                    case "India"   => println("Delhi")
                    case "USA"   => println("WT D.C")
                    case "Japan"   => println("Tokyo")
                    case _   => println("Not sure")
}
}


foreach syntax: foreach is a higher order controlled abstraction

rdd.foreach { country =>
            country match {
                    case "India"   => println("Delhi")
                    case "USA"   => println("WT D.C")
                    case "Japan"   => println("Tokyo")
                    case _   => println("Not sure")
            }
}

Difference between for loop and map function is that map function returns a value, however for loop does not return.

For Comprehension: To make for loop return a value, need to add yield. This makes for loop behave like map function
Eg:
for(country <- List("India","USA","China","Japan")) yield {
           country match {
                    case "India"   => println("Delhi")
                    case "USA"   => println("WT D.C")
                    case "Japan"   => println("Tokyo")
                    case _   => println("Not sure")
}
}


Lambda functions:


Functions as variables are part of lamda functions

3 Ways of using lambda functions:

1. val squareDouble = (a: Double) => a*a;
Just a methods input type (a: Double) is specified, however the expression is followed by =>.

2. val c = (_: Int) * 2;

3. To, specify the return type
val squarelong:(Long) => Long = a => a*a;
"squarelong:(Long) => Long" specifies that the squarelong input is (Long) and returns Long as output.
This is followed by expression "a => a*a;" that is assigned to squarelong. Here type is not required to mention as it is already mentioned.

another way by using _ instead of (x,y) => x* y:

val squarelng2:(Long , Long) => Long = _ * _;


Passing functions as input to another function

///////////Passing functions as arguments/////

consider a function that doesn't take any arguments and returns Unit(void)

def sayHi(): Unit =
{
println("Hi All")
}

scala> sayHi()
Hi All

Now we will write a function that takes this function as an input agrument.

def invokeUnitFun(callback :() => Unit) : Int =
{
callback()
(5)
}

This function takes input as a function that doesn't take any input and returns nothing, finally invokeUnitFun returns Int from it. The function passed to it is represented as callback and this can be any name.

calling the invokeUnitFun function.

scala> invokeUnitFun(sayHi)
Hi All
res5: Int = 5

While invoking the function In place of invokeUnitFun(sayHi), we can even pass any function that doesn't take any input and returns nothing.Eg: invokeUnitFun(sayBye).


Till now we have seen functions those are passed which doest take any input and doesn't return anything.

Next, imagine that you want to create a different version of callback, and this one should take two Int parameters and return an Int. Its signature would look like this:

def invokeFun(f: (Int,Int) => Int): Long =
{
val b = f(5,3)
b+10
}

def sum(a: Int,b:Int) =
{
(a+b)
}

scala> invokeFun(sum)
res0: Long = 18

This function takes input as a function that takes 2 Int variables as input and returns Int, finally invokeFun returns Long from it.

Few other examples:

foo(f:(String) => Int)
bar(f:(Int, Int) => Int)

If you observe in invokeFun the function call f(5,3) is hardcoded with 5 and 3 values. In general usage, Function would be passed as input along with arguments on which it performs operations.

Eg:
def executeAndPrint(f: (Int, Int) => Int, x: Int, y: Int): Unit = {
    val result = f(x, y)
    println(result)
}

While calling,
executeAndPrint(sum, 3, 11)       // prints 14
executeAndPrint(multiply, 3, 9)   // prints 27
 

Reference: https://alvinalexander.com/scala/fp-book/how-write-functions-take-function-input-parameters


There is another way where functions are passed as input and their return types acts as function input. 

scala> object add{
     | def addition(a: Long, b: Long) : Long =
     | {
     |    (a+b)
     | }
     | }
defined module add

scala> add.addition(5,6)
res0: Long = 11

scala> object sub{
     | def subtraction(a:Long, b:Long) =
     | {
     |    (a-b)
     | }
     | }
defined module sub

scala> sub.subtraction(10,5)
res1: Long = 5

scala> add.addition(add.addition(2,sub.subtraction(10,7)),sub.subtraction(12,6))
res2: Long = 11

Closures

use functions as variables (values) in Scala - 



You want to pass a Scala function around like a variable, just like you pass StringInt, and other variables around in an object-oriented programming language.

The following Scala code defines a function literal that takes an Int parameter and returns a value that is twice the amount of the Int that is passed in:

(i: Int) => { i * 2 }

the => symbol as a transformer. In this case, the function transforms the Int value i to an Intvalue that is twice the value of i.

assign that function literal to a variable:

val double = (i: Int) => { i * 2 }

can now invoke double just like you’d call a method:

double(2)   // 4

double(3)   // 6



can pass the double method into the map method of an Int sequence:

scala> val list = List.range(1, 5)
list: List[Int] = List(1, 2, 3, 4)

scala> list.map(double)
res0: List[Int] = List(2, 4, 6, 8)


 explicitly declare the return type of a function 

These functions all take two Int parameters and return a single Int value, which is the sum of the two input values:

// implicit approach   - No Return type specified
val add = (x: Int, y: Int) => { x + y }
val add = (x: Int, y: Int) => x + y

// explicit approach  - Return type Int specified
val add: (Int, Int) => Int = (x,y) => { x + y }
val add: (Int, Int) => Int = (x,y) => x + y

The curly braces around the body of the function in these simple examples are optional, but they are required when the function body grows to more than one expression:

val addThenDouble: (Int, Int) => Int = (x,y) => {
    val a = x + y
    2 * a
}


Reference:

Scala - https://danielwestheide.com/scala/neophytes.html. If you would like, please get a paid pdf version as well. He did an amazing job.



Option: In Scala Option is used to handle NULL values. Option takes type as Some(). Simple example is below, val a: Option[String] = Some("Leela") This statement states that val a is of type Option which holds input type as string. While assigning value to option use Some(). If the value passed is NULL then it returns None. Eg: scala> val lst = Map(1 -> "Leela", 2 -> "Prasad", 3 -> "G") lst: scala.collection.immutable.Map[Int,String] = Map(1 -> Leela, 2 -> Prasad, 3 -> G) scala> lst.get(2) res7: Option[String] = Some(Prasad) scala> lst.get(5) res8: Option[String] = None Instead of None, this can be enhanced using getOrElse() scala> lst.get(5).getOrElse("Found Nothing") res9: String = Found Nothing

Pattern Matching

scala> case class user (id: Int, name: String, age: Int, gender: Option[String]); defined class user scala> val a1 = user(1, "Leela", 33, Some("Male")); a1: user = user(1,Leela,33,Some(Male)) scala> val a2 = user(2,"some",31,None); a2: user = user(2,some,31,None) scala> a1.gender match { | case Some(gender) =>{ println("Gender is:" + a1.gender) } | case None => { println("Gender Not specified") } | } Gender is:Some(Male) scala> a2.gender match { | case Some(gender) =>{ println("Gender is:" + a2.gender) } | case None => { println("Gender Not specified") } | } Gender Not specified In the above pattern matching case the gender(a1.gender or a2.gender) is matched with either Some() or None and is handled.

write a Scala method that takes a simple generic type( Similar as Template in C++)

Below is the function that takes string as input,

def randomName(names: Seq[String]): String = { val randomNum = util.Random.nextInt(names.length) names(randomNum) }

To make this function that can take input as any datatype, modify the code to,

def randomName[T](input : seq[T]): T = {
val randomNum = util.Random.nextInt(input.length)
input(randomNumber)
}


With this change, the method can now be called on a variety of types:



randomElement(Seq("Aleka", "Christina", "Tyler", "Molly"))

randomElement(List(1,2,3))

randomElement(List(1.0,2.0,3.0))

randomElement(Vector.range('a', 'z'))



Source: https://alvinalexander.com/scala/how-to-write-scala-methods-generic-types-parameters-syntax

To Assign values to multiple variables inside if-else condition use the below example,


    val(jdbcSqlConnStr,driver) = {
      if condition== 'Y') {
        val sqlConnStr = JdbcConnectionUtility.constructTypeListJDBCConnStr(propertyConfigs)
        val jdbcDriver = JdbcConnectionUtility.getTLJDBCDriverName(propertyConfigs)
        (sqlConnStr,jdbcDriver)
      }
      else {
        val sqlConnStr  = JdbcConnectionUtility.constructJDBCConnStr(propertyConfigs)
        val jdbcDriver  = JdbcConnectionUtility.getJDBCDriverName(propertyConfigs)
        (sqlConnStr,jdbcDriver)
      }

    }


lazy val and @transient

lazy val - In Scala lazy val denotes a field that will only be calculated once it is accessed for the first time and is then stored for future reference.

@transient - on the other hand one can denote a field that shall not be serialized.

Eg:
import org.apache.log4j.Logger

object Holder extends Serializable {
  @transient lazy val log = Logger.getLogger(getClass.getName)
}


The variable log will be calculated only once per deserialization. In the above case the log class need not be executed each time as it would be constant across the application also needs to be executed once per deserialization.

Using _ in map()

See the below example,

val lstKV: List[(String, String)]  = List(("Leela","USA"),("Karthik","India"))

val names = lstKV.map(x => println((x._1)))

O/P: Leela
Karthik
names: List[Unit] = List((), ())

val country = lstKV.map(x => println((x._2)))

USA
India
country: List[Unit] = List((), ())

HERE, in the List names are retrieved by using ._1 and country names are retrieved when ._2 is used

Options and Match

val opStr: Option[String] = Some("Leela")
val opStr2: Option[String] = Some("Leela2")

def optionInput(a:Option[String]) ={
   a match {
     case Some("Leela") => println("Found " + a.getOrElse(""))
     case None => println("Not found")
   }
}

optionInput(opStr)
optionInput(opStr2)

Variables as Functions vs Variables

see the below example,
val name = "Leela"
val a = 2val defName : String = if(a==2) "Leela" else "Karthik"
name
defName

O/P: res0: String = Leela
res1: String = Leela


val name = "Leela"
val a = 3
val defName : String = if(a==2) "Leela" else "Karthik"
name
defName

O/P: res0: String = Leela
res1: String = Karthik

Here in the above code the variable name value will never change. However, for defName each time it will execute and will update it's value as a result of execution.

Exception handling in Scala:

Just like any other language exception in scala can be handled with Try{} and catch{} blocks. Below is the simple example from, https://alvinalexander.com/scala/scala-try-catch-finally-syntax-examples-exceptions-wildcard def runAppleScriptCommand(c: AppleScriptCommand) { val scriptEngineManager = new ScriptEngineManager val appleScriptEngine = scriptEngineManager.getEngineByName("AppleScript") try { appleScriptEngine.eval(c.command) } catch { case e: ScriptException => e.printStackTrace } } e.printStackTrace or e.getStackTrace will fetch the exception trace information. When exception is handled and thrown the job will not break and hence the exception can be caught gracefully and the job can proceed with it's next set of execution. Below is the sample code for it. Try { functionf1(names) } match{ case Success(_) => Holder.log("Success") case Failure(exception) => Failure(new Throwable("Exception occured" + exception.getMessage +exception.getStackTrace)) } getStackTrace is important as it can give info about exception which can be logged to an audit table where multiple jobs run as a framework of application. Another point of interest is match{} can also be used in case of catch{}.


Exit code from Spark program:


Like a C++ program, if needed spark job can also return exit code.

usage,

System.exit(0)


Factory Pattern implementation in Scala:


Below is the Factory pattern implementation. In the below example companion object is create and apply method is the one that creates the approriate object. 

trait Animal{
def speak
}

object Animal{

private class dog extends Animal{
override def speak(): Unit = {
println("woow")
}
def speak2(): Unit = {
println("woow2")
}
}
private class cat extends Animal{
override def speak = {
println("meawoo")
}
}

def apply(name: String): Animal= {
if(name == "dog") return new dog()
else if(name == "cat") return new cat()
else return new cat
}
}

Creating the object

Animal.apply("dog").speak
OR
Animal("dog").speak          //calling Animal("dog") is no different that calling Animal.apply("dog") 


A point to be noted that the method speak2 cannot be called as Animal is the companion object of trait Animal and this trait has only speak().


Some Scala functions:


def getCurrentTimeStampasString: String = {
import java.text.SimpleDateFormat

//create date/time formatters
val today = Calender.getInstance().getTime()
val minuteFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val transactionTimestamp = (minuteFormat.format(today)).toString
transactionTimestamp
}

def convertToDate(datetime: String, format: String): Date = {
import java.util.Date
val today = Calender.getInstance().getTime().toString()
val minuteFormat = new SimpleDateFormat(format)
val transactionTimestamp = minuteFormat.parse(datetime)
transactionTimestamp
}

def getLastdatePreviousMonth(): String = {
val month: Int = DateTime.now().getMonthOfYear
val year: Int  = DateTime.now().getYear
val prevMonth = month -1
val lastMonth = new DateTime(year, prevMonth,1,0,0,0)
val lastDay = lastMonth.dayOfMonth.getMaximumValue
val formatter = new DecimalFormat("00")
val formattedPrevMonth = formatter.format(prevMonth)
val lastDate = s"${year}-${formattedPrevMonth}-${lastDay}"
(lastDate)
}

def calcByteCount(filePath: String): Long = {
val someFile = new File(filePath)
(someFile.lenght.toLong)
}

def getFirstdatePreviousMonth(): String = {
val month: Int = DateTime.now().getMonthOfYear
val year: Int = DateTime.now().getYear
val prevMonth = month -1
val firstDate = s"${year}-${prevMonth}-01"
(firstDate)
}

def addNullColumns(df: DataFrame, columnsList: List[String]): DataFrame = {
var df1 = df
columnsList.map(x => { df1 = df1.withColumn(x, lit(null).cast(StringType))  })
(df1)
}

def convertTimeFormat(date: String, fromFormat: String, toFormat: String): String = {
val format = new SimpleDateFormat(fromFormat)
val format2 = new SimpleDateFormat(toFormat)
val date1 = format.parse(date)
val date_new = format2.format(date1)
(date_new)
}

def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
dir.listFiles.filter(_.isFile).toList.filter { file =>
    extensions.exists(file.getName.endsWith(_))
}
}

def deleteDirectory(file: File) {
if(file.isDirectory)
Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(deleteDirectory(_))
file.delete
}


Tuesday, September 12, 2017

RDD Functions


When RDD's will be creaed:
1. When data is loaded from file using spark context SC eg: val r1 = sc.textfile("FilePATH")
2. When filter or transformation operation is performed over RDD. eg: val r2 = r1.flatMap(_.split(" "))
In this transformation phase, previous RDD will comeout of RAM whenever a newer RDD is ready.
3. When scala/python/JAVA object is parallized.

In case of any crash, the Execution flow will start from begining. When any RDD is set a persist, and once its execution is completed, from then this RDD is available in RAM until unpersist is called. Advanatage - in case of any crash in execution the execution will not begin from starting and will continue from the Persisted RDD.

PERSIST OPTIONS:

MEMORY_ONLY (default)
Format: Object
Adv: Speed( at the time of ruse no need to Deserialize)
disAdv: More Memory(RAM) space

MEMORY_SER_ONLY
Format: Byte( Serialized format)
Adv: Less Memory space(compressed format)
DisAdv: Deserialization is required

GroupBy - Spark RDD groupBy function returns an RDD of grouped items




Useful Links:
http://backtobazics.com/category/big-data/spark/
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html


Practice:


scala> val names = List("Raj", "Ravi", "Rohan", "Leela Prasad", "Vinay")
names: List[String] = List(Raj, Ravi, Rohan, Leela Prasad, Vinay)

scala> val rdd = sc.parallize(names,2)
<console>:29: error: value parallize is not a member of org.apache.spark.SparkContext
         val rdd = sc.parallize(names,2)
                      ^

scala> val rdd = sc.parallelize(names, 2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:29

scala> val rdd2 = rdd.map(x=> x.length
length          lengthCompare   

scala> val rdd2 = rdd.map(x=> (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[5] at map at <console>:33

scala> val r3 = rdd2.groupBy(t => t._1)
r3: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] = ShuffledRDD[9] at groupBy at <console>:37


scala> r3.foreach(println)
(4,CompactBuffer((4,Ravi)))
(12,CompactBuffer((12,Leela Prasad)))
(3,CompactBuffer((3,Raj)))
(5,CompactBuffer((5,Rohan), (5,Vinay)))


scala> val list1 = List(1,2,3,4,5,6)
list1: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val list2 = List('a', 'b', 'c', 'd', 'e')
list2: List[Char] = List(a, b, c, d, e)

scala> val rdd1 = sc.parallelize(list1,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:29

scala> val rdd2 = rdd1.reduce((a,b) => a+b)
rdd2: Int = 21

scala> val rdd2 = rdd1.reduce((a,b) => a*b)
rdd2: Int = 720                     

scala> val rdd2 = rdd1.reduce((a,b) => if(a>b) a else b)
rdd2: Int = 6

scala> rdd1.fold(0)((a,b) => a+b)
res32: Int = 21

scala> rdd1.fold(0)((a,b) => a*b)
res33: Int = 0

scala> rdd1.fold(1)((a,b) => a*b)
res34: Int = 720

scala> rdd1.reduce((a,b) => a*b)
res35: Int = 720

scala> val  List3 = List("I am going to Hyd", "I am Learning", "I Love spark")
List3: List[String] = List(I am going to Hyd, I am Learning, I Love spark)

scala> val rdd3 = sc.parallelize(List3,2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:29

scala> val rdd4 = rdd3.flatMap
flatMap       flatMapWith   

scala> val rdd4 = rdd3.flatMap(x => x.sp
span      splitAt   

scala> val rdd4 = rdd3.flatMap(x => x.split(" "))
rdd4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[22] at flatMap at <console>:33

scala> rdd4.foreach(println)
I
am
going
to
Hyd
I
am
Learning
I
Love
spark

scala> val rdd4 = rdd3.flatMap(x => x.split(" ")).map(x => (x,1))
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:33

scala> rdd4.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(Hyd,1)
(I,1)
(am,1)
(Learning,1)
(I,1)
(Love,1)
(spark,1)


scala> val rddout = rdd3.flatMap(x => x.split(" ")).map((_,1)).reduceByKey((a,b) => a+b).sortBy(t => t._1)
rddout: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[43] at sortBy at <console>:37


scala> rddout.foreach(println)
(Hyd,1)
(I,3)
(Learning,1)
(Love,1)
(am,2)
(going,1)
(spark,1)
(to,1)


scala> rdd3.flatMap(x => x.split(" ")).map((_,1)).reduce((a,b) => a+b)
<console>:36: error: type mismatch;
 found   : (String, Int)
 required: String                                                                 //This gives error because reduce will add a+b where a is string and b is int.
              rdd3.flatMap(x => x.split(" ")).map((_,1)).reduce((a,b) => a+b)  //However reduceByKey can be used which reduces based on key.
                                                                           ^

scala> val rddout = rdd3.flatMap(x => x.split(" ")).map((_,1)).reduceByKey((a,b) => a+b).sortByKey().collect.foreach(println)

//collect will get the data from the partitions and ensure the data comes in sorted order without applying sortByKey(). DOnot use collect in realtime as the data from all the worker machines will be collected to the master node where Application Master runs

///////////countByValue/////////////////////s

scala> rdd3.flatMap(x => x.split(" ")).countByValue().foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(spark,1)
(learning,1)
///////////Distinct:///////////////

scala> val rddout = rdd3.flatMap(x => x.split(" ")).foreach(println)
I
am
going
to
Hyd
I
am
Learning
I
Love
spark
rddout: Unit = ()

scala> val rddout = rdd3.flatMap(x => x.split(" ")).distinct().foreach(println)
am
Love
going
Learning
spark
I
to
Hyd
rddout: Unit = ()

scala> prdd1.foreach(println)
(aaa,3)
(bb,2)
(ccc,3)
(ddd,3)

scala> prdd2.foreach(println)
[Stage 85:>                                                         (0 + 0) / 2](aa,2)
(bbb,3)
(cc,2)
(dd,2)
                                                                                
scala> prdd1.leftOuterJoin(prdd2).foreach(println)
(ddd,(3,None))
(bb,(2,None))
(ccc,(3,None))
(aaa,(3,None))

scala> prdd1.union(prdd2).foreach(println)
(aaa,3)
(bb,2)
(ccc,3)
(ddd,3)
(aa,2)
(bbb,3)
(cc,2)
(dd,2)


scala> prdd1.foreach(println)
(3,aaa)
(2,bb)
(3,ccc)
(3,ddd)

scala> prdd2.foreach(println)
(2,aa)
(3,bbb)
(2,cc)
(2,dd)
      

scala> prdd1.join(prdd2).foreach(println)
(2,(bb,aa))
(2,(bb,cc))
(2,(bb,dd))
(3,(aaa,bbb))
(3,(ccc,bbb))
(3,(ddd,bbb))

scala> prdd1.foreach(println)
(3,aaa)
(2,bb)
(3,ccc)
(3,ddd)

scala> prdd2.foreach(println)
(2,aa)
(3,bbb)
(2,cc)
(2,dd)

scala> prdd1.cogroup(prdd2).foreach(println)
(2,(CompactBuffer(bb),CompactBuffer(aa, cc, dd)))
(3,(CompactBuffer(aaa, ccc, ddd),CompactBuffer(bbb)))


[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"


scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27

scala> words.collect()
res3: Array[String] = Array(I                Love Hadoop, I    Like    Spark   Coding, "   I   Love    Combination    of   Both")

scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29

scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)

///mapPartitions as an Alternative to flatMap////

val allWords = words.mapPartitions{ lines => 
     {
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))

scala> val allWords = words.mapPartitionsWithIndex{
      (index, lines) => {
      println("called in Partition ->" + index)
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))


def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}

val allWOrds = words.mapPartitions(customFunc)

/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)

def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
     | pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
     | }


//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")

scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29

scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)

Contains Returns  true or false

/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)

scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)

scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)

eg2:
val logs = List("Success and built",
     | "ERROR build failed",
     | "Success Packaging Completed",
     | "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)

scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)

errs.map(x => println(x))           OR          scala> errs.foreach(println)
ERROR build failed ERROR build failed
ERROR Checkout Failed ERROR Checkout Failed

///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27

scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13


scala> def sexsalpair(rec: String) = {
     | val itms = rec.split(",")
     | (itms(3),itms(2))
     | }
sexsalpair: (rec: String)(String, String)

scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31

scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33

scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))

/////////////////Union/////////////////

scala> val l1 = List(10,20,30,40,50)

scala> val l2 = List(10,25,30,45,50)

scala> val r1 = sc.parallelize(l1)

scala> val r2 = sc.parallelize(l2)

scala> val r3 = r1.union(r2)

/////////////countBYValue////////////

scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27

scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)


////////////////////

Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely

SOl: 
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_)  
words.foreach(println)

///This will not workout in this manner Need to use Aggregrate function///////////

Exercise 2: 

file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13

Transform to case class of id, Name, salary, Gender, DeptNO

val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)

def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}

val empvalues = emp.map(x => MakeEmpClass(x))

val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35

Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna


Exercise 3: From the above data set, generate following Report

Above Avg salary 3
Below Avg salary 2

Solution:

Approach 1:

scala> def extractSal(emprecs: empdata):  Int = {
      val sal = emprecs.Salary
      sal
      }

val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()

def makepair(sal:Int, avg: Long) = {
                  println("sal =" + sal)
                  println("avg =" + avg)
                  var retval = 0
                  if(avg > sal){
                  retval =1
                  }
                  (retval)
                  }

val status = sals.map(x=> makepair(x,avg))       //IN this function we pass 'avg' a value from outside to a function

scala> status.foreach(println)                  //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0

val grp = status.groupBy(x => x)            // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))

Approach 2: Using cartesian. This is better approach

scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)

scala> val esal = emprec.map{
     | x => x.split(",")
     | }

scala> val esal = emprec.map{ x=> 
     | val sal = x.split(",")
     | val salary = sal(2).toInt
     | salary
     | }

scala> val tot = esal.sum
tot: Double = 175000.0

scala> val avg = tot/esal.count
avg: Double = 35000.0                                                           

scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35

scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37

scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)

scala> val res = trans.map{ x => 
     | if(x._1 >= x._2) "Above" else "Below"}

scala> res.foreach(println)
Below
Below
Above
Below
Above

                                              GroupBykey:


Consider an RDD in 2 Partitions, based on Keys a and b the values in each partition will be shuffled to form 2 sets based on keys.

groupByKey will shuffle all the value key pairs as the diagrams show.

More load on Network traffic as the keys from each data node are shuffled to new machine.


                                                       ReducebyKey:

bykey means key will be applied automatically for values of same keys.
Values of same keys will be passes directly to the function as Input.

Eg: reduceBykey((x,y) => (x+y))
Input    Operation

(a,1),(b,1),(a,5),(b,3),(a,4),(b,1),(c,1)

for each keys a,b,c the inputs (x,y) would be as below:

a - Input = (1,5) followed by Operation (x+y) = 6
  Now, result becomes input along with next value. So, here Input (x,y) is (6,4). followed by operation (x+y) = 10

Finally, (a,10) is the result of this reduceBykey.

b - Input = (1,3) followed by Operation (x+y) = 4
    Now, Input = (4,1) Operation (4+1) Result = 5
    Finally, (b,5)
c - (c,1) as no other values

reduceByKey - performs the operation on values in each partition and sends the result to Driver and here again the same operation is performed. Here load on network would be less as the operation is performed in all the nodes and the result specific to each key and each node would be sent to driver node. In the driver node again the operation is applied to get final result.

Consider,

P1 has,
(a,1)
(b,1)          P1 output: (a,6) and (b,1)
(a,5)

P2 has,
(b,3)
(a,4) P2 output: (a,4), (b,4) and (c,1)
(b,1)
(c,1)

2 Values from P1 and 3 values from P2 would be sent to Driver node. So, less network traffic as oppeosed to groupBykey.

Difference between GroupByKey and ReduceByKey

https://medium.com/@sujathamudadla1213/explain-the-difference-between-groupbykey-and-reducebykey-bf0171e985ac

                                                 foldBykey


fold() is similar to reduce() except that it takes an Initial value 'Zero value'. Also, there is difference in syntax and kind of inputs

fold(acc:T)((acc,value) => acc)

It has following three things

1. T is the data type of RDD
2. acc is accumulator of type T which will be return value of the fold operation.
3. A function , which will be called for each element in rdd with previous accumulator.

List (1,2,3,4)
fold(5)((acc,val) => acc + val)

In the first iteration,

acc = 5 and val = 1 which will go as input to fold(). here acc + val will be applied, so 5 + 1 =6.

In second iteration,

acc = 6 which is the result of 1st Iteration and val is the next item in the list i.e 2. Now, the Input to fold() is (6,2). so 6 + 2 = 8

In third iteration,
acc = 8 and val = 3.

foldbykey acts on pair RDD's, yet the concept is same.

                                     combineByKey



1. create combiner - First value for a specific key in a specific partition.
i/p: (v)
o/p: (v,1)

2. Merge value - Second value of a specific key in a specific partition and (v,1) from create combine would be carried to this as (accum : (Int,Int),v)

i/p: (accum : (Int,Int),v)
o/p: (key,(total,count))

3. Merge combiner - merges all the values across the partitions in the executors and sends the data back to the driver.

i/p: (accum1: (Int,Int), accum2: (Int,Int))
o/p: (key, (totalAcrossAllPartitions, countAcrossAllPartitions)).

code:


val marks = List(("a",30),("a",20),("b",15),("c",10),("b",5),("a",40),("b",30),("c",15),("a",10),("b",20),("c",10))


val rddmarks = sc.parallelize(marks,2)

val createScorecombiner = (score: Int) => { 
(score,1)
}

val createmergecombiner = (accum: (Int,Int),newmarks: Int)  => {
val (totalscores, countScores) = accum
(totalscores + newmarks, countScores +1)
}

val createmergevalues = (accum1: (Int,Int), accum2: (Int,Int)) => {
      val (totalscores1, countScores1) = accum1
      val (totalscores2, countScores2) = accum2
      (totalscores1 + totalscores2, countScores1 + countScores2)
}

val totalvalscount = rddmarks.combineByKey(createScorecombiner,createmergecombiner,createmergevalues)


def calculateAvg(collector: (String, (Int, Int))) =
      {
      val (keyname, (totalmarks, count)) = collector
      val avg = totalmarks/count
      (keyname,avg)
      }

val out = totalvalscount.map(calculateAvg)

out.collect

                                   

                                 aggregateByKey


In aggregateBykey- initial value could be 0, In this case 

P1
(foo, A)
(foo, A)
(bar, C)
(foo, A)

P2
(foo, B)
(bar, C)
(bar, D)
(foo, A)

aggregrateBykey(0)(((accum: x, val: y) => accum + 1), (acc1: Int, acc2: Int))     //In this case we are completely ignoring values like A,A,C,A,B,C,D,A.

Step 1: Initial accum value is 0, so

(accum: x, val: y) => accum + 1) = x is 0 and y is A. As we are ignoring value so accum + 1 = 0 + 1

Step 2: Now, (accum: x, val: y) => accum + 1) = x is 1 and operation applied is accum + 1 = 1 + 1 = 2

Step 3: Initial accum value is 0, so   //Here new key bar has encountered.

(accum: x, val: y) => accum + 1) = x is 0 and y is C. As we are ignoring value so accum + 1 = 0 + 1


Difference between AggregateBykey and combineBykey lies in input type of  functions mergeValues, mergeCombiner.