Thursday, November 30, 2017

ELK

·         MySQL => Databases => Tables => Columns/Rows
·         Elasticsearch => Indices => Types => Documents with Properties

Elasticsearch has to store the data somewhere. This functionality is stored into shards, which are either the Primary or Replica

ELK Stack Installation:
ELK stack components being used are:
·         filebeat version 5.5.2
·         logstash 5.5.2
·         elasticsearch 5.5.2
·         kibana 5.5.2
filebeat
Beats needs to be installed on all the host machines from which you want to read your logs.
To get specific version of ELK browse to https://www.elastic.co/downloads/past-releases
Select the appropriate product and version and download the RPM. In the directory execute the sudo yum install filebeat in all the host machines.
sudo chmod 755 filebeat
Logstash
Needs to be installed on the host machine/ edge node. Download RPM and
sudo yum install logstash
To test your installation,
cd /usr/share/logstash/
sudo /usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'
# After starting Logstash, wait until you see "Pipeline main started" and then enter hello world at the command prompt

ElasticSearch
Needs to be installed on the machine which is going to Elasticsearch filesystem. Download RPM and
sudo yum install elasticsearch
To test your installation
curl -XGET 'localhost:9200/?pretty'

Kibana

sudo yum install kibana

vi /etc/kibana/kibana.yml
 edit,enable server.port: and server.host:

sudo service kibana start

To test your installation
Use a browser to open http:[hostname]:5601

Configuration

filebeat
Edit filebeat config file to add the log files to be scanned and shipped to logstash.

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    #- /home/sraja005/flume.log
    - /var/log/flume-ng/flume-ng-agent.log
  fields:
     log_type: flumeLog

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["tsb1.devlab.motive.com:5044"]


Logstash
Create a logstash configuration file and place it in the folder mentioned below
cd /etc/logstash/conf.d/

#Here is a sample conf file.

vi flumetest.conf
input {
  beats {
    port => "5044"
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^(%{MONTHDAY} %{MONTH} %{YEAR} %{TIME}|%{YEAR}-%{MONTHNUM})"
      negate => true
      what => "previous"
    }
  }

  }

filter {
        if ([fields][log_type] == "flumeLog") {
        grok {
                match => { "message" => "%{MONTHDAY:logDate} %{MONTH:logMonth} %{YEAR:logYear} %{TIME:logTime} %{LOGLEVEL:logLevel} %{GREEDYDATA:message}"}
              }
                }
}
output {
        elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}


Issues and Points:

1.     Source location and Index of the message can be viewed on Message dropdown.
2.     For log with starting content,

[12/Oct/2017 09:05:51 ] supervisor   ERROR    Exception in supervisor main loop

In config file under /etc/logstash/conf.d . Add grok as, where \[ and \] represent []

if ([fields][log_type] == "hueLog") {
        grok {
                match => { "message" => "\[%{MONTHDAY:logDate}/%{MONTH:logMonth}/%{YEAR:logYear} %{TIME:logTime} \] %{LOGLEVEL:logLevel} %{GREEDYDATA:message}"}
              }

Add | \[ in pattern,

pattern => "^(%{MONTHDAY} %{MONTH} %{YEAR} %{TIME}|%{YEAR}-%{MONTHNUM}|\[| )"

Filter for, 17/10/26 13:37:59 ERROR TaskSchedulerImpl: Lost an executor driver (already removed): Executor heartbeat timed out after 239118 ms

if ([fields][log_type] == "sparkLog") {
        grok {
                match => { "message" => "%{YEAR:logYear}/%{MONTHNUM:logMonth}/%{MONTHDAY:logDate} %{TIME:logTime} %{LOGLEVEL:logLevel} %{GREEDYDATA:message}"}
              }
                }

3.     By default, an index would be created for every day. To have the data into a single index add index in output of the logstash config file.
        elasticsearch {
        index => "blogs2"
        hosts => [ "localhost:9200" ]
    }

To have output in multiple indexes,
output {
        elasticsearch {
        index => "blogs2"
        hosts => [ "localhost:9200" ]
    }
        elasticsearch {
        index => "blogs"
        hosts => [ "localhost:9200" ]
    }
}

4.     To list the indexes,
curl 'localhost:9200/_cat/indices?v'

5.     To get info of a particular Index ‘blogs2’
curl -XGET localhost:9200/blogs2

6.     To check the filter of GROK on the text? https://grokdebug.herokuapp.com/

7.     For combining timestamp and date value of the log follow https://stackoverflow.com/questions/40385107/logstash-grok-how-to-parse-timestamp-field-using-httpderror-date-pattern
8.     To see the health of cluster
curl 'localhost:9200/_cluster/health?pretty'

9.     To create new Index, in kibana -> Dev Tools execute the command to create blogs index,
PUT /blogs
{
   "settings" : {
      "number_of_shards" : 3,
      "number_of_replicas" : 1
   }
}
10.  Create Index Patterns,


In Kibana -> Management -> Index Patterns -> Create Index pattern and provide the index name or pattern as ‘blogs*’ -> Create.


Extract values from existing field and create new field in logstash.

2 Approaches for this:

1. copy of source by creating a temp variable,
 if ([fields][log_type] == "yarnHive2kafkaLog") {
    grok {
            match => { "message" => "%{YEAR:logYear}-%{MONTHNUM:logMonth}-%{MONTHDAY:logDate} %{TIME:logTime} \!%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\! %{GREEDYDATA:message}"}
         }
    mutate {
            copy => { "source" => "source_tmp" }
           }
    mutate {
            split => ["source_tmp", "/"]
            add_field => { "applicationID" => "%{source_tmp[4]}" }
           }                       
            }  
2. grok filter on source
 if ([fields][log_type] == "yarnHive2kafkaLog") {
    grok {
            match => { "message" => "%{YEAR:logYear}-%{MONTHNUM:logMonth}-%{MONTHDAY:logDate} %{TIME:logTime} \!%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}\! %{GREEDYDATA:message}"}
         }
    grok {
            match => { "source" => "/%{GREEDYDATA:primaryDir}/%{GREEDYDATA:subDir1}/%{GREEDYDATA:subDir2}/%{GREEDYDATA:subDir3}/%{GREEDYDATA:containerID}/%{GREEDYDATA:fileName}"}
            }
    mutate {
           add_field => { "applicationID" => "%{subDir3}" }
           }                       
            }


2017-11-15 09:21:06,578 ! ERROR ! [Driver] ! imps.CuratorFrameworkImpl ! Background 
2017-11-20 03:35:17,730 !  WARN ! [Reporter] ! impl.AMRMClientImpl ! ApplicationMaster 

In the above 2 logs the space is not indented in the same manner for ERROR and WARN. To handle this use %{SPACE}, which is equivalent to 0 or many spaces

grok {
match => { "message" => "%{YEAR:logYear}-%{MONTHNUM:logMonth}-%{MONTHDAY:logDate} %{TIME:logTime} !%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}! %{GREEDYDATA:message}"}
}

1.     Remove trailing white space in logstash filter


Approach 1: using something like NOTSPACE instead of GREEDYDATA.

For log,
[24/Oct/2017 15:04:53 ] cluster       WARNING Picking RM HA: ha

[%{MONTHDAY:logDate}/%{MONTH:logMonth}/%{YEAR:logYear} %{TIME:logTime} ]%{SPACE}%{GREEDYDATA:platformType} +\s %{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{GREEDYDATA:message}

The above filter leads to trailing white space for cluster    platformType.

\[%{MONTHDAY:logDate}/%{MONTH:logMonth}/%{YEAR:logYear} %{TIME:logTime} \]%{SPACE}%{NOTSPACE:platformType}%{SPACE}%{LOGLEVEL:logLevel}%{SPACE}%{GREEDYDATA:message}

GREEDYDATA when replaced with NOTSPACE resolves this Issue.

Approach2:

Place after grok to strip whitespaces,

              mutate {
                    strip => ["platformType"]

                      }

Monday, November 20, 2017

Kafka - Zookeeper configuration

https://www.youtube.com/watch?v=SxHsnNYxcww

Basically there are 2 types of zookeeper configurations:

1. Single Node - Only 1 Zookeeper server - Single point of failure
2. Zookeeper Ensemble - cluster of zookeeper nodes - more robust and no single point of failure.

In Zookeeper Ensemble case, even if one of the zookeeper node is down still the zookeeper can maintain the cluster state because of the remaining zookeeper servers running on other nodes.

Setting Zookeeper Ensemble - Kafka 3 brokers setup.

changes in zookeeper side:

mach1 - in /usr/lib/zookeeper/conf/zoo1.cfg

server.1=mach1:2888:3888
server.2=mach2:2889:3889
server.3=mach3:2890:3890

Here, the above 3 entries will specify the Ensemble of zookeeper servers cluster.

Start zookeeper server with this config file,
zookeeper-server-start.sh config/zoo1.cfg

update the same numbering sequence in "dataDir" property as this will help in numbering the zookeeper ensemble cluser.

clientPort=2181 . This port represents the clients to connect to this zookeeper server.

The entries in the config file would be same in other 2 machines apart from clientPort.
They might have clientPort=2182 and clientPort=2183 respectively.

Changes in kafka side:

mach1 in /opt/kafka-2.11-0.10.1.1/config/server.properties specify the client connection ports of all the 3 zookeeper server in Ensemble

zookeeper.connect=mach1IP:2181,mach2IP:2182,mach3IP:2183

configure the server.properties file all the kafka brokers and remember to have different port port and Brokerids for each of them.

Start kafka with this server.properties file. Now this kafka cluster state is being watched by 3 zookepers Ensemble

Eg: bin/kafka-topics.sh --list --zookeeper xvzw160.xdev.motive.com:2181,xvzw161.xdev.motive.com:2181,xvzw162.xdev.motive.com:2181

/bin/kafka-console-producer --broker-list kafka02.example.com:9092,kafka03.example.com:9092 --topic t

Note: In case of Single node zookeeper the zookeeper.connect will have only 1 zookeeper server entry.

Thursday, September 21, 2017

Scala

Scala:

Scala is a Hybrid programming which is a combination of Functional Programming + Object Oriented Programming.

Bigdata engineers are more inclined towards Scala's Functional Programming feature.

Functional Programming generally deals with applying functions on data sets and inputs are immutable. No pass by reference.

Fields:

 - Each object has its unique set of instance variables, In the below example numToppings,size and maxNumToppings are Fields. Abstract and concrete assignments are also shown below.
Eg:
trait PizzaTrait {
    var numToppings: Int     // abstract
    var size = 14            // concrete
    val maxNumToppings = 10  // concrete
}

Closure

 − A closure is a function, which can operate (or) its return value depends on the value of one or more variables declared outside this function.

Eg: See the below code, here the function is known as closure because it uses 'm_hi' which is declared and assigned outside its function.

scala> var m_hi = "Hie"
hi: String = Hie

scala> def sayHie(name: String): Unit =
     | {
     | println(m_hi + " " + name)
     | }
sayHie: (name: String)Unit

scala> sayHie("Leela3")
Hie Leela3

Traits 

− A trait encapsulates method and field definitions, which can then be reused by mixing them into classes. Traits are used to define object types by specifying the signature of the supported methods. These are like interfaces in Java.

Functions with Variable names:

In the below example square is a variable that has functionality of squaring and is used in map() for squaring the values in the List.
val addVals = { (a: Int, b: Int) => a + b }

OR

scala> val square = (a: Int) => a * a
square: Int => Int = $$Lambda$1323/1080359903@3feedaec

scala> square(3)
res14: Int = 9

scala> val nums = List(1,2,3)
nums: List[Int] = List(1, 2, 3)

scala> nums.map(square)
res16: List[Int] = List(1, 4, 9)

Variables as functions: 

variables as functions are also called as first class functions .

Higher Order Function:

If a function does any one of the below 2 activities:

1. Takes 1 or more functions as input
2. returns a function


Eg: In the below example exec function is called Higher Order function as it takes another function sayHi as input.
scala> def sayHi(name1: String, name2: String): Unit =
     | {
     | println("Hi " + name1 + name2)
     | }
sayHi: (name: String)Unit

scala> sayHi("Leela","PRASAD")
Hi LeelaPRASAD

scala> def exec(f: (String,String) => Unit,name: String) =
     | {
     | f(name,name)
     | }
exec: (f: (String, String) => Unit, name: String)Unit

scala> exec(sayHi,"Leela")
Hi LeelaLeela

Anonymous Function:

A general function signature: def doubled(i: Int): Int = { i*2}

Anonymous function is (i: Int) => {i*2}
This is also called as function literal

Difference lies in => instead of = in general function.

Eg:
scala> val twiceVal = (i: Int) => {i * 2}
twiceVal: Int => Int = $$Lambda$1053/961628534@7bdb4f17

scala> twiceVal(3)
res1: Int = 6

Why to use anonymous to be used in place of normal function.

In case of scenarios where we might need to use inline functions which are used only in this place. So, instead of creating a function anonymous functions can be used.

Another example where a function returns a function using Anonymous functions.

This functions returns doubler value if the input passed is positive and tripler if the input is negative.

Signature for returning a function. Here an anonymous function/literal has to be defined that can return a function

def multiplier(i: Int) = (i:Int) => { body of function to return a function}

This literal (i:Int) => { body of function to return a function} says that Int is the input for this function. This function has to be applied in map() by looping through the List of Integers which are inputs for this literal.

def multiplier(c: Int) = (i: Int) => {
     val doubler = (x: Int) => { x * 2}         //a variable as function that takes Int as input
     val tripler = (x: Int) => { x * 3}         //a variable as function that takes Int as input
     if(c > 0)
        doubler(i)
     else
        tripler(i)
}

Here, multiplier is a Higher order function that returns a function( either doubler/tripler) which is going to get operated on list values inside Map function.

To call this,

val a  = 1 to 5
scala> a.map(multiplier(-1))
res17: scala.collection.immutable.IndexedSeq[Int] = Vector(3, 6, 9, 12, 15)
                           OR
scala> a.map(multiplier(1))
res18: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4, 6, 8, 10)

Lambdas:

 An expression can be passed directly to a function because the expression x => x*x is passed directly to map().
Eg:
scala> val square = (a: Int) => a * a

scala> val nums = List(1,2,3)
nums: List[Int] = List(1, 2, 3)

scala> nums.map(square)
res16: List[Int] = List(1, 4, 9)

OR
scala> nums.map(x => x*x)
res16: List[Int] = List(1, 4, 9)

Source: http://michaelpnash.github.io/scala-lambdas/

Partially Applied function: 

If only a few arguments are sent, then you get back a partially applied function. This gives you the convenience of binding some arguments and leaving the rest to be filled in later. here when nme_partial is created only 1 argument is passed and second one is left as _: String to be passed later.

scala> def nameMsg(name: String, msg: String) = {
     | println(name + msg)
     | }
nameMsg: (name: String, msg: String)Unit

scala> val nme = nameMsg("Leela ", "Evening")
Leela Evening
nme: Unit = ()

scala> val nme_partial = nameMsg("Leela ", _: String)
nme: String => Unit = $$Lambda$1326/167993803@569fb30d

scala> nme_partial("Morning")
Leela Morning

Traits: 

A trait encapsulates method and field definitions, which can then be reused by mixing them into classes. Unlike class inheritance, in which each class must inherit from just one superclass, a class can mix in any number of traits.

A trait definition looks just like a class definition except that it uses the keyword trait. The following is the basic example syntax of trait.

trait Equal {
   def isEqual(x: Any): Boolean
   def isNotEqual(x: Any): Boolean = !isEqual(x)
}

Trait Equal contain two methods isEqual() and isNotEqual(). The trait Equal contain one implemented method that is isEqual() so when user defined class Point extends the trait Equal, implementation to isEqual() method in Point class should be provided.

Implicit:

See the below code snippet

Create a function that takes last argument as implicit
def addHello(name: String)(implicit ss: String): Unit = println(name + " " + ss)

Create an object of implicit type, in the above line implicit argument type is String. So created String type object.
implicit val qq = new String("Hello")

Now, call the function that takes implicit argument as its last argument.
addHello("Leela")

Output: Leela Hello

Background: Whenever an argument is declared as implicit in the functions signature, then it would have the capability to look for the arguments declared as implicit of the implicit type in that context. Usually last argument would be the implicit one. In the 2nd line in the above code snippet String object of type implicit is declared. So addHello function has taken it as its last argument as type also matched. "Leela" is passed explicitly and "Hello" is passed explicitly.

Only 1 implicit variable has to be declared, If more than 1 implicit variables are declared then this leads to error as below:

scala> addHello("Leela")
<console>:34: error: ambiguous implicit values:
 both value qq of type => String
 and value qq2 of type => String
 match expected type String
              addHello("Leela")

Fold:

Takes 2 arguments, the start value and a function. This function also takes two arguments; the accumulated value and the current item in the list.

Eg:

val s = List(3,5,7,9)

s.fold(0) { _+_}
res10: Int = 24

s.fold(5) { _+_}
res10: Int = 29

Currying:


Consider below code,

scala> def curriedSum(x: Int)(y: Int) = x+y

scala> curriedSum(1)(2)
res11: Int = 3

When curriedSum is invoked, 2 function invocations gets called back to back. The first function invocation takes a single Int parameter named x and returns a function value for the second function. The above functionality can be acheived with the below code.

scala> val onePlus = curriedSum(1)_

scala> onePlus(2)
res12: Int = 3

fold(), MappartitionswithIndex() are few currying functions examples.

for comprehension:


basic structure: for(seq) { expr }
                 
seq: Condition and controls the iterations
expr: the functionaity that has to be executed.

Eg:
val nums = 1 to 5

for( i <- nums){ println(i) }
          OR
for( i <- 1 to 5){ println(i) }

Eg:

for(country <- List("India","USA","China","Japan")) {
           country match {
                    case "India"   => println("Delhi")
                    case "USA"   => println("WT D.C")
                    case "Japan"   => println("Tokyo")
                    case _   => println("Not sure")
}
}


foreach syntax: foreach is a higher order controlled abstraction

rdd.foreach { country =>
            country match {
                    case "India"   => println("Delhi")
                    case "USA"   => println("WT D.C")
                    case "Japan"   => println("Tokyo")
                    case _   => println("Not sure")
            }
}

Difference between for loop and map function is that map function returns a value, however for loop does not return.

For Comprehension: To make for loop return a value, need to add yield. This makes for loop behave like map function
Eg:
for(country <- List("India","USA","China","Japan")) yield {
           country match {
                    case "India"   => println("Delhi")
                    case "USA"   => println("WT D.C")
                    case "Japan"   => println("Tokyo")
                    case _   => println("Not sure")
}
}


Lambda functions:


Functions as variables are part of lamda functions

3 Ways of using lambda functions:

1. val squareDouble = (a: Double) => a*a;
Just a methods input type (a: Double) is specified, however the expression is followed by =>.

2. val c = (_: Int) * 2;

3. To, specify the return type
val squarelong:(Long) => Long = a => a*a;
"squarelong:(Long) => Long" specifies that the squarelong input is (Long) and returns Long as output.
This is followed by expression "a => a*a;" that is assigned to squarelong. Here type is not required to mention as it is already mentioned.

another way by using _ instead of (x,y) => x* y:

val squarelng2:(Long , Long) => Long = _ * _;


Passing functions as input to another function

///////////Passing functions as arguments/////

consider a function that doesn't take any arguments and returns Unit(void)

def sayHi(): Unit =
{
println("Hi All")
}

scala> sayHi()
Hi All

Now we will write a function that takes this function as an input agrument.

def invokeUnitFun(callback :() => Unit) : Int =
{
callback()
(5)
}

This function takes input as a function that doesn't take any input and returns nothing, finally invokeUnitFun returns Int from it. The function passed to it is represented as callback and this can be any name.

calling the invokeUnitFun function.

scala> invokeUnitFun(sayHi)
Hi All
res5: Int = 5

While invoking the function In place of invokeUnitFun(sayHi), we can even pass any function that doesn't take any input and returns nothing.Eg: invokeUnitFun(sayBye).


Till now we have seen functions those are passed which doest take any input and doesn't return anything.

Next, imagine that you want to create a different version of callback, and this one should take two Int parameters and return an Int. Its signature would look like this:

def invokeFun(f: (Int,Int) => Int): Long =
{
val b = f(5,3)
b+10
}

def sum(a: Int,b:Int) =
{
(a+b)
}

scala> invokeFun(sum)
res0: Long = 18

This function takes input as a function that takes 2 Int variables as input and returns Int, finally invokeFun returns Long from it.

Few other examples:

foo(f:(String) => Int)
bar(f:(Int, Int) => Int)

If you observe in invokeFun the function call f(5,3) is hardcoded with 5 and 3 values. In general usage, Function would be passed as input along with arguments on which it performs operations.

Eg:
def executeAndPrint(f: (Int, Int) => Int, x: Int, y: Int): Unit = {
    val result = f(x, y)
    println(result)
}

While calling,
executeAndPrint(sum, 3, 11)       // prints 14
executeAndPrint(multiply, 3, 9)   // prints 27
 

Reference: https://alvinalexander.com/scala/fp-book/how-write-functions-take-function-input-parameters


There is another way where functions are passed as input and their return types acts as function input. 

scala> object add{
     | def addition(a: Long, b: Long) : Long =
     | {
     |    (a+b)
     | }
     | }
defined module add

scala> add.addition(5,6)
res0: Long = 11

scala> object sub{
     | def subtraction(a:Long, b:Long) =
     | {
     |    (a-b)
     | }
     | }
defined module sub

scala> sub.subtraction(10,5)
res1: Long = 5

scala> add.addition(add.addition(2,sub.subtraction(10,7)),sub.subtraction(12,6))
res2: Long = 11

Closures

use functions as variables (values) in Scala - 



You want to pass a Scala function around like a variable, just like you pass StringInt, and other variables around in an object-oriented programming language.

The following Scala code defines a function literal that takes an Int parameter and returns a value that is twice the amount of the Int that is passed in:

(i: Int) => { i * 2 }

the => symbol as a transformer. In this case, the function transforms the Int value i to an Intvalue that is twice the value of i.

assign that function literal to a variable:

val double = (i: Int) => { i * 2 }

can now invoke double just like you’d call a method:

double(2)   // 4

double(3)   // 6



can pass the double method into the map method of an Int sequence:

scala> val list = List.range(1, 5)
list: List[Int] = List(1, 2, 3, 4)

scala> list.map(double)
res0: List[Int] = List(2, 4, 6, 8)


 explicitly declare the return type of a function 

These functions all take two Int parameters and return a single Int value, which is the sum of the two input values:

// implicit approach   - No Return type specified
val add = (x: Int, y: Int) => { x + y }
val add = (x: Int, y: Int) => x + y

// explicit approach  - Return type Int specified
val add: (Int, Int) => Int = (x,y) => { x + y }
val add: (Int, Int) => Int = (x,y) => x + y

The curly braces around the body of the function in these simple examples are optional, but they are required when the function body grows to more than one expression:

val addThenDouble: (Int, Int) => Int = (x,y) => {
    val a = x + y
    2 * a
}


Reference:

Scala - https://danielwestheide.com/scala/neophytes.html. If you would like, please get a paid pdf version as well. He did an amazing job.



Option: In Scala Option is used to handle NULL values. Option takes type as Some(). Simple example is below, val a: Option[String] = Some("Leela") This statement states that val a is of type Option which holds input type as string. While assigning value to option use Some(). If the value passed is NULL then it returns None. Eg: scala> val lst = Map(1 -> "Leela", 2 -> "Prasad", 3 -> "G") lst: scala.collection.immutable.Map[Int,String] = Map(1 -> Leela, 2 -> Prasad, 3 -> G) scala> lst.get(2) res7: Option[String] = Some(Prasad) scala> lst.get(5) res8: Option[String] = None Instead of None, this can be enhanced using getOrElse() scala> lst.get(5).getOrElse("Found Nothing") res9: String = Found Nothing

Pattern Matching

scala> case class user (id: Int, name: String, age: Int, gender: Option[String]); defined class user scala> val a1 = user(1, "Leela", 33, Some("Male")); a1: user = user(1,Leela,33,Some(Male)) scala> val a2 = user(2,"some",31,None); a2: user = user(2,some,31,None) scala> a1.gender match { | case Some(gender) =>{ println("Gender is:" + a1.gender) } | case None => { println("Gender Not specified") } | } Gender is:Some(Male) scala> a2.gender match { | case Some(gender) =>{ println("Gender is:" + a2.gender) } | case None => { println("Gender Not specified") } | } Gender Not specified In the above pattern matching case the gender(a1.gender or a2.gender) is matched with either Some() or None and is handled.

write a Scala method that takes a simple generic type( Similar as Template in C++)

Below is the function that takes string as input,

def randomName(names: Seq[String]): String = { val randomNum = util.Random.nextInt(names.length) names(randomNum) }

To make this function that can take input as any datatype, modify the code to,

def randomName[T](input : seq[T]): T = {
val randomNum = util.Random.nextInt(input.length)
input(randomNumber)
}


With this change, the method can now be called on a variety of types:



randomElement(Seq("Aleka", "Christina", "Tyler", "Molly"))

randomElement(List(1,2,3))

randomElement(List(1.0,2.0,3.0))

randomElement(Vector.range('a', 'z'))



Source: https://alvinalexander.com/scala/how-to-write-scala-methods-generic-types-parameters-syntax

To Assign values to multiple variables inside if-else condition use the below example,


    val(jdbcSqlConnStr,driver) = {
      if condition== 'Y') {
        val sqlConnStr = JdbcConnectionUtility.constructTypeListJDBCConnStr(propertyConfigs)
        val jdbcDriver = JdbcConnectionUtility.getTLJDBCDriverName(propertyConfigs)
        (sqlConnStr,jdbcDriver)
      }
      else {
        val sqlConnStr  = JdbcConnectionUtility.constructJDBCConnStr(propertyConfigs)
        val jdbcDriver  = JdbcConnectionUtility.getJDBCDriverName(propertyConfigs)
        (sqlConnStr,jdbcDriver)
      }

    }


lazy val and @transient

lazy val - In Scala lazy val denotes a field that will only be calculated once it is accessed for the first time and is then stored for future reference.

@transient - on the other hand one can denote a field that shall not be serialized.

Eg:
import org.apache.log4j.Logger

object Holder extends Serializable {
  @transient lazy val log = Logger.getLogger(getClass.getName)
}


The variable log will be calculated only once per deserialization. In the above case the log class need not be executed each time as it would be constant across the application also needs to be executed once per deserialization.

Using _ in map()

See the below example,

val lstKV: List[(String, String)]  = List(("Leela","USA"),("Karthik","India"))

val names = lstKV.map(x => println((x._1)))

O/P: Leela
Karthik
names: List[Unit] = List((), ())

val country = lstKV.map(x => println((x._2)))

USA
India
country: List[Unit] = List((), ())

HERE, in the List names are retrieved by using ._1 and country names are retrieved when ._2 is used

Options and Match

val opStr: Option[String] = Some("Leela")
val opStr2: Option[String] = Some("Leela2")

def optionInput(a:Option[String]) ={
   a match {
     case Some("Leela") => println("Found " + a.getOrElse(""))
     case None => println("Not found")
   }
}

optionInput(opStr)
optionInput(opStr2)

Variables as Functions vs Variables

see the below example,
val name = "Leela"
val a = 2val defName : String = if(a==2) "Leela" else "Karthik"
name
defName

O/P: res0: String = Leela
res1: String = Leela


val name = "Leela"
val a = 3
val defName : String = if(a==2) "Leela" else "Karthik"
name
defName

O/P: res0: String = Leela
res1: String = Karthik

Here in the above code the variable name value will never change. However, for defName each time it will execute and will update it's value as a result of execution.

Exception handling in Scala:

Just like any other language exception in scala can be handled with Try{} and catch{} blocks. Below is the simple example from, https://alvinalexander.com/scala/scala-try-catch-finally-syntax-examples-exceptions-wildcard def runAppleScriptCommand(c: AppleScriptCommand) { val scriptEngineManager = new ScriptEngineManager val appleScriptEngine = scriptEngineManager.getEngineByName("AppleScript") try { appleScriptEngine.eval(c.command) } catch { case e: ScriptException => e.printStackTrace } } e.printStackTrace or e.getStackTrace will fetch the exception trace information. When exception is handled and thrown the job will not break and hence the exception can be caught gracefully and the job can proceed with it's next set of execution. Below is the sample code for it. Try { functionf1(names) } match{ case Success(_) => Holder.log("Success") case Failure(exception) => Failure(new Throwable("Exception occured" + exception.getMessage +exception.getStackTrace)) } getStackTrace is important as it can give info about exception which can be logged to an audit table where multiple jobs run as a framework of application. Another point of interest is match{} can also be used in case of catch{}.


Exit code from Spark program:


Like a C++ program, if needed spark job can also return exit code.

usage,

System.exit(0)


Factory Pattern implementation in Scala:


Below is the Factory pattern implementation. In the below example companion object is create and apply method is the one that creates the approriate object. 

trait Animal{
def speak
}

object Animal{

private class dog extends Animal{
override def speak(): Unit = {
println("woow")
}
def speak2(): Unit = {
println("woow2")
}
}
private class cat extends Animal{
override def speak = {
println("meawoo")
}
}

def apply(name: String): Animal= {
if(name == "dog") return new dog()
else if(name == "cat") return new cat()
else return new cat
}
}

Creating the object

Animal.apply("dog").speak
OR
Animal("dog").speak          //calling Animal("dog") is no different that calling Animal.apply("dog") 


A point to be noted that the method speak2 cannot be called as Animal is the companion object of trait Animal and this trait has only speak().


Some Scala functions:


def getCurrentTimeStampasString: String = {
import java.text.SimpleDateFormat

//create date/time formatters
val today = Calender.getInstance().getTime()
val minuteFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val transactionTimestamp = (minuteFormat.format(today)).toString
transactionTimestamp
}

def convertToDate(datetime: String, format: String): Date = {
import java.util.Date
val today = Calender.getInstance().getTime().toString()
val minuteFormat = new SimpleDateFormat(format)
val transactionTimestamp = minuteFormat.parse(datetime)
transactionTimestamp
}

def getLastdatePreviousMonth(): String = {
val month: Int = DateTime.now().getMonthOfYear
val year: Int  = DateTime.now().getYear
val prevMonth = month -1
val lastMonth = new DateTime(year, prevMonth,1,0,0,0)
val lastDay = lastMonth.dayOfMonth.getMaximumValue
val formatter = new DecimalFormat("00")
val formattedPrevMonth = formatter.format(prevMonth)
val lastDate = s"${year}-${formattedPrevMonth}-${lastDay}"
(lastDate)
}

def calcByteCount(filePath: String): Long = {
val someFile = new File(filePath)
(someFile.lenght.toLong)
}

def getFirstdatePreviousMonth(): String = {
val month: Int = DateTime.now().getMonthOfYear
val year: Int = DateTime.now().getYear
val prevMonth = month -1
val firstDate = s"${year}-${prevMonth}-01"
(firstDate)
}

def addNullColumns(df: DataFrame, columnsList: List[String]): DataFrame = {
var df1 = df
columnsList.map(x => { df1 = df1.withColumn(x, lit(null).cast(StringType))  })
(df1)
}

def convertTimeFormat(date: String, fromFormat: String, toFormat: String): String = {
val format = new SimpleDateFormat(fromFormat)
val format2 = new SimpleDateFormat(toFormat)
val date1 = format.parse(date)
val date_new = format2.format(date1)
(date_new)
}

def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
dir.listFiles.filter(_.isFile).toList.filter { file =>
    extensions.exists(file.getName.endsWith(_))
}
}

def deleteDirectory(file: File) {
if(file.isDirectory)
Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(deleteDirectory(_))
file.delete
}