Friday, February 24, 2017

Building Maven Project

Below are the steps to create Maven Project

1. Getting Scala IDE . Open Eclipse -> Help -> Eclipse Market place and search for Scala . In the list 'Scala IDE 4.2.X' and click Install.

2. Selected Scala Prespective. Right click left pane - >New -> Project -> Maven and create new project GroupID "org.practice.Leela" ArtifactID "Projs".

3. Wait for 3-5 mins for the project to get updated. Right click project -> configure ->Add scala Nature.

Add Scala directory

Right click Project -> Java Build path -> Source -> Add Folder -> Select 'main' -> Add New Folder 'scala' -> Inclusion patterns and give **/*/.scala -> FInish -> Ok

4. Right click package -> New scala object -> "SparkWordcount"


Below is the latest POM.xml (From Narendra Machine)

Make sure groupId,artifactId and version are as like before.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.tek</groupId>
  <artifactId>Sparkprojs</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>hydraulic</name>
  <url>http://maven.apache.org</url>

   <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <scala.version>2.11.8</scala.version>
  <java.version>1.7</java.version>
 </properties>

  <dependencies>
  <dependency>
   <artifactId>scala-library</artifactId>
   <groupId>org.scala-lang</groupId>
   <version>${scala.version}</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.11</artifactId>
   <version>2.1.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.11</artifactId>
   <version>2.1.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.11</artifactId>
   <version>2.1.1</version>
  </dependency>
  <dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-avro_2.11</artifactId>
   <version>3.2.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
   <version>2.1.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.11</artifactId>
   <version>0.8.2.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.8.2.1</version>
  </dependency>
  <dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-csv_2.11</artifactId>
   <version>1.5.0</version>
  </dependency>
  <dependency>
   <groupId>com.google.code.gson</groupId>
   <artifactId>gson</artifactId>
   <version>2.5</version>
  </dependency>
  <!-- tests -->
  <dependency>
   <groupId>org.scalatest</groupId>
   <artifactId>scalatest_2.11</artifactId>
   <version>2.2.2</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <artifactId>junit</artifactId>
   <groupId>junit</groupId>
   <version>4.10</version>
   <scope>test</scope>
  </dependency>
<!-- https://mvnrepository.com/artifact/net.alchim31.maven/scala-maven-plugin -->
<dependency>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.1.6</version>
</dependency>
 </dependencies>

 <build>
  <sourceDirectory>src/main/scala</sourceDirectory>
  <testSourceDirectory>src/test/scala</testSourceDirectory>
  <plugins>
   <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.1.6</version>
    <executions>
     <execution>
      <phase>compile</phase>
      <goals>
       <goal>compile</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>2.0.2</version>
    <configuration>
     <source>${java.version}</source>
     <target>${java.version}</target>
    </configuration>
    <executions>
     <execution>
      <phase>compile</phase>
      <goals>
       <goal>compile</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
  </plugins>
 </build>
</project>



5. In the POM.xml file add above dependancies under <dependancies></dependancies> tags and save. Now dependancies will be downloaded

Note: In some cases we come across scala compatibility issues like https://stackoverflow.com/questions/34507966/spark-build-path-is-cross-compiled-with-an-incompatible-version-of-scala-2-10-0

 Errors :
"akka-actor_2.10-2.3.11.jar of Sparkexamples build path is cross-compiled with an incompatible version of Scala (2.10.0). In case this report is mistaken, this check can be disabled in the compiler preference page.    Sparkexamples        Unknown    Scala Version Problem"

To fix this need to set the correct scala version so Right click Project -> Scala -> Set the Scala Installation -> Select Fixed Installation 2.10.x  as the version specified in Maven is 2.10.5.

In case of alchim31 error

copy http://alchim31.free.fr/m2e-scala/update-site/
In Ecllipse -> help -> Install New Software and paste in 'Work with' and select all and Install

                                  OR

6. Project Right click -> Properties -> Java Build path -> Select existing 2.11 scala library -> edit and select 2.10.5 which is added version in POM.xml. In our case right version is 2.10.5.

7. To add newer dependencies search for maven dependencies and add to the POM.xml file. Like,

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.0</version>
</dependency>


8. Right click project -> Maven -> Update project -> check force option and OK

9. Right click Project -> New -> Scala Object
and write below code for testing.

package org.practice.Leela.Projs

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Wordcount {
  
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf()
    .setAppName("wordCount")
    .setMaster("local")
    
    val sc = new SparkContext(conf)
    val lst = List(1,2,3,4)
    val rdd1 = sc.parallelize(lst)
    rdd1.foreach(println)
  }
}

Export Jar

Maven jar:

add below lines after dependencies in POM.xml,

   <build>
    <plugins>
        <plugin>
        <inherited>true</inherited>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                       <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            </configuration>
        </plugin>
    </plugins>
  </build>



Right click project -> Run as -> Maven Install. This will generate maven JAR. The jar name can be specified in POM.xml

Ecllipse jar: not recommended
right click the driver class eg: SparkWC.scala -> Export -> JAR File -> specify JAR file path and click Finish

Run the command: To run on local machine with JAR, Input and Output in Local directories

spark-submit --class "Sparkjobs.Wordcount" --master local[*] /home/hadoop/Leela_out/Sparkprojs-1.0.jar /home/hadoop/Leela/WC/inpath /home/hadoop/Leela/WC/Outdir/

Input and output paths are passed as arguments, these paths are Local file system paths. The JAR file is in local file system 

 http://10.188.193.152:4040 - Spark UI. This UI will appear only while the Spark job is running.


 To run spark from HDFS

spark-submit --class "Sparkjobs.Wordcount" --master yarn --deploy-mode client /home/hadoop/Leela_out/Sparkprojs-1.0.jar /user/hadoop/Leela/WC/Leela_out /user/hadoop/Leela/WC/Outdir

Input and output paths are passed as arguments, these paths are HDFS file system paths. The JAR file is in local file system 

How to submit the Job with the built JAR file?

# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

Latest POM.xml as on July 2017

    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
 <dependency>
  <groupId>org.apache.spark</groupId> 
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>   
  <version>2.1.0</version> 
  </dependency>
   <dependency>
  <groupId>org.apache.kafka</groupId> 
  <artifactId>kafka_2.11</artifactId> 
  <version>0.8.2.1</version> 
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
 <dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>3.2.0</version>
</dependency>
  </dependencies>
<build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
           <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
           </descriptorRefs>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

Note: If the dependancy in Maven is,
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.1.0-cdh5.8.0</version>
</dependency>

then change the version to <version>1.1.0</version> by removing -cdhxxxx. This leads to error


Eg:
----------------------------- Hive Code ----------
Below JAR will read a CSV FILE and save the contents as an AVRO file. Then hive table is created on top of it

/Data/sathwik_r_and_d/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class com.hydraulics.HydraulicsCode --master local[*] --packages "com.databricks:spark-avro_2.11:3.2.0,com.databricks:spark-csv_2.11:1.5.0" /Data/sathwik_r_and_d/hptest/hydraulics/hydra_jar_file.jar /Data/sathwik_r_and_d/hptest/hydraulics/history_export_files/data-export-2017-01-01toCurrent.csv /user/new_hydra_data_second_time/

SET hive.mapred.supports.subdirectories=TRUE;
SET mapred.input.dir.recursive=TRUE;

set hive.execution.engine=spark;
CREATE TABLE hydraulics_temp_reload_v3034567891011131416171819
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/new_hydra_data_second_time/'
TBLPROPERTIES ('avro.schema.url'='/user/eoil_v_5.avsc',"hive.input.dir.recursive" = "TRUE", 
    "hive.mapred.supports.subdirectories" = "TRUE",
    "hive.supports.subdirectories" = "TRUE", 
    "mapred.input.dir.recursive" = "TRUE");

select count(*) from hydraulics_temp_reload_v3034567891011131416171819;

Thursday, February 23, 2017

Spark Core Functions

Attached are some of the Core RDD functions

https://drive.google.com/open?id=0BzG0wQkWbKpLVGF6NGZucnpmR00

Practice Code:

[cloudera@quickstart ~]$ hadoop fs -mkdir Spark
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal "/home/cloudera/Desktop/Spark/emp" "hdfs://quickstart.cloudera/user/cloudera/Spark"


scala> val words = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words", 2)
words: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/Words MapPartitionsRDD[7] at textFile at <console>:27

scala> words.collect()
res3: Array[String] = Array(I                Love Hadoop, I    Like    Spark   Coding, "   I   Love    Combination    of   Both")

scala> val wordcount = words.flatMap(x => x.split(" ").filter(x => x != ""))
wordcount: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:29

scala> wordcount.collect()
res8: Array[String] = Array(I, Love, Hadoop, I, Like, Spark, Coding, I, Love, Combination, of, Both)

///mapPartitions as an Alternative to flatMap////

val allWords = words.mapPartitions{ lines =>
     {
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
res14: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))

scala> val allWords = words.mapPartitionsWithIndex{
      (index, lines) => {
      println("called in Partition ->" + index)
      val myList = lines.toList
      myList.map(x => x.split(" ").filter(x => x != "")).iterator
      }
      }

scala> allWords.collect()
called in Partition ->0
called in Partition ->1
res16: Array[Array[String]] = Array(Array(I, Love, Hadoop), Array(I, Like, Spark, Coding), Array(I, Love, Combination, of, Both))


def customFunc(lines: Iterator[String]) : Iterator[Array[String]]= {
val myList = lines.toList
myList.map(x => x.split(" ").filter(x => x != "")).iterator
}

val allWOrds = words.mapPartitions(customFunc)

/////AggregrateByKey///
val rdd1 = sc.parallelize(List(("cat",5),("cat",10),("mouse",3),("dog",6),("cat", 12),("mouse", 5)),2)

def partWithIndex(index: Int, pairs: Iterator[(String,Int)]) = {
     | pairs.toList.map(x => "[Part ID:" + index + "Value:" + x + "]").iterator
     | }


//////contains and toLowerCase////////
val rdd1 = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/Words")

scala> val rdd2 = rdd1.map(x => x.toLowerCase.contains("hadoop"))
rdd2: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[4] at map at <console>:29

scala> rdd2.collect()
res2: Array[Boolean] = Array(true, false, false)

Contains Returns  true or false

/////////Filter/////////////
scala> val lines = List(10,20,30,40)
lines: List[Int] = List(10, 20, 30, 40)

scala> val x = lines.filter(x => (x > 20))
x: List[Int] = List(30, 40)

scala> val x = lines.filter(_ > 20)
x: List[Int] = List(30, 40)

eg2:
val logs = List("Success and built",
     | "ERROR build failed",
     | "Success Packaging Completed",
     | "ERROR Checkout Failed")
logs: List[String] = List(Success and built, ERROR build failed, Success Packaging Completed, ERROR Checkout Failed)

scala> val errs = logs.filter(_.toUpperCase.contains("ERROR"))
errs: List[String] = List(ERROR build failed, ERROR Checkout Failed)

errs.map(x => println(x))           OR          scala> errs.foreach(println)
ERROR build failed                ERROR build failed   
ERROR Checkout Failed                ERROR Checkout Failed

val r5 = r3.filter(x => x._1 != "");    //For Key value pairs
///////////////GroupByKey//////////////
scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",3)
emprec: org.apache.spark.rdd.RDD[String] = hdfs://quickstart.cloudera/user/cloudera/Spark/emp MapPartitionsRDD[1] at textFile at <console>:27

scala> emprec.foreach(println)
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13


scala> def sexsalpair(rec: String) = {
     | val itms = rec.split(",")
     | (itms(3),itms(2))
     | }
sexsalpair: (rec: String)(String, String)

scala> val salpair = emprec.map(x => sexsalpair(x))
salpair: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:31

scala> val groupedpair = salpair.groupByKey()
groupedpair: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[3] at groupByKey at <console>:33

scala> groupedpair.foreach(println)
(F,CompactBuffer(30000, 60000))
(M,CompactBuffer(20000, 50000, 15000))

/////////////////Union/////////////////

scala> val l1 = List(10,20,30,40,50)

scala> val l2 = List(10,25,30,45,50)

scala> val r1 = sc.parallelize(l1)

scala> val r2 = sc.parallelize(l2)

scala> val r3 = r1.union(r2)

/////////////countBYValue////////////

scala> val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:27

scala> b.countByValue
res9: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)


////////////cogroup///////////////////
scala> val rdd1 = sc.parallelize(Seq(("key1",1),("key2",2),("key3",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:27

scala> val rdd2 = sc.parallelize(Seq(("key1",5),("key2",6)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27

scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[32] at cogroup at <console>:31

scala> grouped.collect()
res11: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key2,(CompactBuffer(2),CompactBuffer(6))), (key3,(CompactBuffer(3),CompactBuffer())), (key1,(CompactBuffer(1),CompactBuffer(5))))

////////////////////

Exercise 1: WOrdcount in 3 partitions, sort and combine result, SOrting to be happened for all 3 RDDS combinely

SOl:
val words = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).sortByKey().reduceByKey(_+_) 
words.foreach(println)

///This will not workout in this manner Need to use Aggregrate function///////////

Exercise 2:

file1.txt
101,Amar,20000,M,11
102,Amala,30000,F,11
103,Rajeev,50000,M,13
104,Leela,15000,M,12
105,Annapurna,60000,F,13

Transform to case class of id, Name, salary, Gender, DeptNO

val emp = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp")
case class empdata(id: Int, Name: String, Salary: Int, Gender: String, Depno: String)

def MakeEmpClass(record: String) : empdata = {
val data = record.split(",")
val id = data(0).toInt
val Name = data(1)
val Salary = data(2).toInt
val Gender = data(3)
val Depno = data(4)
empdata(id,Name,Salary,Gender,Depno)
}

val empvalues = emp.map(x => MakeEmpClass(x))

val Nms = empvalues.map(x => x.Name)
Nms: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:35

Nms.foreach(println)
Amar
Amala
Rajeev
Leela
Annapurna


Exercise 3: From the above data set, generate following Report

Above Avg salary 3
Below Avg salary 2

Solution:

Approach 1:

scala> def extractSal(emprecs: empdata):  Int = {
      val sal = emprecs.Salary
      sal
      }

val sals = empvalues.map(x => extractSal(x))
sals.persist()
val sum = sals.reduce(_+_)
val avg = sum/sals.count()

def makepair(sal:Int, avg: Long) = {
                  println("sal =" + sal)
                  println("avg =" + avg)
                  var retval = 0
                  if(avg > sal){
                  retval =1
                  }
                  (retval)
                  }

val status = sals.map(x=> makepair(x,avg))       //IN this function we pass 'avg' a value from outside to a function

scala> status.foreach(println)                  //Every time the value 'avg' will be passed for each input element
sal =20000
avg =35000
1
sal =30000
avg =35000
1
sal =50000
avg =35000
0
sal =15000
avg =35000
1
sal =60000
avg =35000
0

val grp = status.groupBy(x => x)            // Gropuping based on Salary Grade either 1 or 0
(0,CompactBuffer(0, 0))
(1,CompactBuffer(1, 1, 1))

Approach 2: Using cartesian. This is better approach

scala> val emprec = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/Spark/emp",2)

scala> val esal = emprec.map{
     | x => x.split(",")
     | }

scala> val esal = emprec.map{ x=>
     | val sal = x.split(",")
     | val salary = sal(2).toInt
     | salary
     | }

scala> val tot = esal.sum
tot: Double = 175000.0

scala> val avg = tot/esal.count
avg: Double = 35000.0                                                          

scala> val rddavg = sc.parallelize(List(avg))
rddavg: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[21] at parallelize at <console>:35

scala> val trans = esal.cartesian(rddavg)
trans: org.apache.spark.rdd.RDD[(Int, Double)] = CartesianRDD[22] at cartesian at <console>:37

scala> trans.foreach(println)
(20000,35000.0)
(30000,35000.0)
(50000,35000.0)
(15000,35000.0)
(60000,35000.0)

scala> val res = trans.map{ x =>
     | if(x._1 >= x._2) "Above" else "Below"}

scala> res.foreach(println)
Below
Below
Above
Below
Above

Sunday, February 19, 2017

PIG

1. Filtering records in PIG and running PIG Script.
Consider data as below:
7|Ron|ron@abc.com
8|Rina
9|Don|dmes@xyz.com
9|Don|dmes@xyz.com
10|Maya|maya@cnn.com

11|marry|mary@abc.com

Loading data in PIG,
data = LOAD 'file:///home/cloudera/Desktop/Spark/Pig/demo.txt' using PigStorage('|');
dump data;
(7,Ron,ron@abc.com)
(8,Rina)
(9,Don,dmes@xyz.com)
(9,Don,dmes@xyz.com)
(10,Maya,maya@cnn.com)
()
(11,marry,mary@abc.com)


Now the need to filter out the empty tuple and duplicate one.

Use,
filterdata = FILTER data BY (($0 is not NULL) OR ($1 is not NULL) OR($2 is not NULL));

fltr_new = DISTINCT filterdata;

dump fltr_new;

To run set of commands in one instance  - PIG Script.

In Local mode browse to the location where 1.pig file is available and use the command,
pig -x local 1.pig

In MapReduce mode

As this is MapReduce mode the file has to be available in HDFS, so Load data file to HDFS.
 hadoop fs -put file:///home/cloudera/Desktop/Spark/Pig/demo.txt hdfs://quickstart.cloudera:8020/user/Pig/demo.txt

This will copy the 1.pig script file to HDFS for execution
hadoop fs -put file:///home/cloudera/Desktop/Spark/Pig/1.pig hdfs://quickstart.cloudera:8020/user/Pig/1.pig

Running from terminal
pig hdfs://quickstart.cloudera:8020/user/Pig/1.pig

Custom UDF in pig

Steps:

Step 1. Open Eclipse and create a New Java Project Right click ->New-> Project->JAVA Project, lets say CustomUDF.
Step 2. Now add a New package, Right click Java Project -> New -> Package, Lets say HadoopUDF.
Step 3. To this package add a JAVA class.
Step 4. Add HADOOP and PIG jars to this project.
Right Click on project —> Build Path —> Configure Build Path —> Libraries —> Add External Jars —> Select Hadoop and Pig Lib folder Jars files and Add other Jars files In Hadoop folder —–> Click Ok.
Step 5. The foremost step in UDF is to extend from EvalFunc class, so the added Java class should extend from EvalFunc<String>. <String> if the input is string.

Step 6. Override exec function.

Below is the code:

package HadoopUDF;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class UcFirst extends EvalFunc<String>{

    @Override
    public String exec(Tuple input) throws IOException {
        // TODO Auto-generated method stub
        if(input == null || input.size() == 0){
        return null;
        }
        try
        {
            String str = input.get(1).toString();
            return str.toUpperCase();
        }
        catch(Exception ex)
        {
            throw new IOException("Exception in func");
        }
    } 

public class Ucfirst extends EvalFunc<Class DataType> and you return the value. 

public String exec(Tuple input) throws IOException {
if (input.size() == 0)
return null;
Class Name String and The entire row in text file is consider as Tuple and first of all it will check the input is zero or not if the input is zero then it return null.

Step 7: Execute this code In Pig UDF ? 
 Export Jar: Right click Project -> Export -> Java -> Jar File -> Select the Project for Jar Export and also specify the Jar path. -> Finish

In Pig terminal  ,
Register Jar file
register file:///home/cloudera/Desktop/Spark/Pig/UDefFuncs.jar 

result = foreach fltr_new generate HadoopUDF.UcFirst($0, $1);

dump result; 

Note: Additionally to have an alias name,

 define toUpper HadoopUDF.UcFirst();

result = foreach fltr_new generate toUpper($0, $1); 


Pig Script for this is, 

data = LOAD 'hdfs://quickstart.cloudera:8020/user/Pig/demo.txt' using PigStorage('|');

filterdata = FILTER data BY (($0 is not NULL) OR ($1 is not NULL) OR($2 is not NULL));

fltr_new = DISTINCT filterdata;

register /home/cloudera/Desktop/Spark/Pig/UDefFuncs.jar;

define toUpper HadoopUDF.UcFirst();

result = foreach fltr_new generate toUpper($0, $1);

dump result; 




Some Pig Examples:



1. Use REGEX_EXTRACT_ALL to extract values from given INput

A= LOAD 'FILENAME' AS line
B = FOREACH A REGEX_EXTRACT_ALL('Expression) AS f0,f1,f2

2. A = LOAD 'FILENAME' AS PigStorage(':')

3. TO save a file

STORE B INTO 'FIle path'

4. TO FIlter records which has more than 3 fields.

C = FILTER A BY SIZE(TOTUPLE(*)) > 3  //THe whole record is formed as a Tuple and its lenght is greater than 3.

http://axbigdata.blogspot.in/2014/05/pig.html

5. Eliminating NULL values

corrupt_records = FILTER records BY temperature is null;

6. FOREACH usage,

A = load '/home/hadoop/work/pig_inputs/foreach_A' AS (f0:chararray, f1:chararray, f2:int);
dump A;
(Joe,cherry,2)
(Ali,apple,3)
(Joe,banana,2)
(Eve,apple,7)
B = foreach A generate $0, $2+1;         //Here we have taken 0th and 2nd records only so in this case foreach is used
dump B;
(Joe,3)
(Ali,4)
(Joe,3)
(Eve,8)


Few Pig examples:

1949 76 1 3
1941 78 1 3 5

1. Counting size of individual Atom in each row

A = LOAD 'file:///home/cloudera/Desktop/Hadoop/pig_0.15.0/pig_inputs/data' as (f1:chararray, f2:chararray, f3:chararray);
X = FOREACH A GENERATE SIZE(f1);
DUMP X;

Eg2: a = load 'file:///home/cloudera/Desktop/Hadoop/pig_0.15.0/pig_inputs/sample.txt';
B = FOREACH a generate SIZE($1);
DUMP B;

(1)
(2)
(3)
(3)
(2)

2. Saving output to a directory,

STORE a into 'file:///home/cloudera/Desktop/Hadoop/pig_0.15.0/pig_inputs/sample_out.txt' USING PigStorage(':');

3. GROUP - The GROUP operator is used to group the data in one or more relations. It collects the data having the same key.

group_data = GROUP student_details by age;

Grouping by Multiple Columns

grunt> group_multiple = GROUP student_details by (age, city);

grunt> Dump group_multiple;

((21,Pune),{(4,Preethi,Agarwal,21,9848022330,Pune)})
((21,Hyderabad),{(1,Rajiv,Reddy,21,9848022337,Hyderabad)})
((22,Delhi),{(3,Rajesh,Khanna,22,9848022339,Delhi)})
((22,Kolkata),{(2,siddarth,Battacharya,22,9848022338,Kolkata)})
((23,Chennai),{(6,Archana,Mishra,23,9848022335,Chennai)})
((23,Bhuwaneshwar),{(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)})
((24,Chennai),{(8,Bharathi,Nambiayar,24,9848022333,Chennai)})
(24,trivendram),{(7,Komal,Nayak,24,9848022334,trivendram)})

Group All

You can group a relation by all the columns as shown below.

grunt> group_all = GROUP student_details All;

grunt> Dump group_all;

(all,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334 ,trivendram),
(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar),
(4,Preethi,Agarwal,21,9848022330,Pune),(3,Rajesh,Khanna,22,9848022339,Delhi),
(2,siddarth,Battacharya,22,9848022338,Kolkata),(1,Rajiv,Reddy,21,9848022337,Hyderabad)})

All the values will be assigned to a key 'all'.

4. COGROUP

5. JOIN

6. UNION

7. FILTER

8. Counting number of unique records

A B user1
C D user2
A D user3
A D user1

A = LOAD 'file:///home/cloudera/Desktop/Hadoop/pig_0.15.0/pig_inputs/Mytestdata' USING PigStorage(' ') AS (a1,a2,a3);
A_UNIQUE = FOREACH A GENERATE $2;
A_UNIQUE = DISTINCT A_UNIQUE;
A_UNIQUE_GROUP = GROUP A_UNIQUE ALL;
u_count = FOREACH A_UNIQUE_GROUP GENERATE COUNT(A_UNIQUE);

9. counting number of elements in a row

10. TOtal number of records

a = load 'file:///home/cloudera/Desktop/Hadoop/pig_0.15.0/pig_inputs/data2' AS (f1:chararray);
X_GRP = GROUP X All;

X_CNT = FOREACH X_GRP GENERATE COUNT(X);
DUMP X_CNT;
3

Monday, February 6, 2017

Loading JSON file

Loading Multiline JSON File as a DataFrame ready to insert as a  Table Example 


Our JSON File 1.json:


{
"user": "gT35Hhhre9m",
"dates": ["2016-01-29", "2016-01-28"],
"status": "OK",
"reason": "some reason",
"content": [{
"foo": 123,
"bar": "val1"
}, {
"foo": 456,
"bar": "val2"
}, {
"foo": 789,
"bar": "val3"
}, {
"foo": 124,
"bar": "val4"
}, {
"foo": 126,
"bar": "val5"
}]
}

Expected output is:


-----------+---------+-------------+-----------+-------+-------
user   |status   | reason | dates  | foo   |   bar |
--------------------------------------------------------------
gT35Hhhre9m| OK   | some reason |2016-01-29 |  123  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-28 |  123  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-29 |  456  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-28 |  456  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-29 |  789  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-28 |  789  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-29 |  124  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-28 |  124  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-29 |  126  | val1  |
gT35Hhhre9m| OK   | some reason |2016-01-28 |  126  | val1  |
-----------+---------+-------------+-----------+-------+-------

Code:


val df1 = saprk.read.option("multiline",true).json("file:///Leela/Filesparsing/1.json")
val df2 = df1.withColumn("contents",explode(df1("content"))).withColumn("dates",explode(df1("dates")))
df2.select("user","status","reason","dates","contents.foo","contents.bar").show


Let us consider the JSON File as below.
[{
 "Year": "2013",
 "First Name": "DAVID",
 "County": "KINGS",
 "Sex": "M",
 "Count": "272"
}, {
 "Year": "2013",
 "First Name": "JAYDEN",
 "County": "KINGS",
 "Sex": "M",
 "Count": "268"
}, {
 "Year": "2013",
 "First Name": "JAYDEN",
 "County": "QUEENS",
 "Sex": "M",
 "Count": "219"
}, {
 "Year": "2013",
 "First Name": "MOSHE",
 "County": "KINGS",
 "Sex": "M",
 "Count": "219"
}, {
 "Year": "2013",
 "First Name": "ETHAN",
 "County": "QUEENS",
 "Sex": "M",
 "Count": "216"
}]

Note: sqlContext.read.json cannot be able to read the above JSON file because it is a multi line JSON file. In apache documentation it is clearly mentioned as "Each line must contain a separate, self-contained valid JSON object." only then, the below one is valid.
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
The file Babynames.json is a multi line JSON file so we cannot use above command. Reason for this failure is that spark does parallel processing by splitting the file into RDDs and does processing. Even in this case the JSON file is splitted which makes it to be invalid for reading.

Need to use wholeTextFiles(JSONFileName) so that a Key-Value pair is created with key as the file name and value as complete file content. This will ensure that the JSON file is not split and can be parsed successfully

scala> val jsonRDD = sc.wholeTextFiles("file:///home/cloudera/Desktop/Spark/RSJSON/Babynames.JSON",2).map(x => x._2)
jsonRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at map at <console>:27

scala> val namesJSON = sqlContext.read.json(jsonRDD)
namesJSON: org.apache.spark.sql.DataFrame = [Count: string, County: string, First Name: string, Sex: string, Year: string]

scala> namesJSON.printSchema
root
 |-- Count: string (nullable = true)
 |-- County: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Year: string (nullable = true)

scala> namesJSON.filter(namesJSON("count") > 250).show()
+-----+------+----------+---+----+
|Count|County|First Name|Sex|Year|
+-----+------+----------+---+----+
|  272| KINGS|     DAVID|  M|2013|
|  268| KINGS|    JAYDEN|  M|2013|
+-----+------+----------+---+----+
scala> namesJSON.select("First Name").show()
+----------+
|First Name|
+----------+
|     DAVID|
|    JAYDEN|
|    JAYDEN|
|     MOSHE|
|     ETHAN|
+----------+


SQOOP Commands

Sqoop imports and exports data from an RDBMS.

2 Types of connectivity:

1. ODBC - Need to specify RDBMS Server IP and port number for data transfer.
2. JDBC - JAVA Web Driver acts an interface for data transfer. Need to specify JAVA Driver

HIVE-HBASE Integration



HIVE and HBASE integration

From cloudera, HIVE files can be accessed via cd /usr/lib/hive/lib/
to open HIVE-site.xml,
cd /usr/lib/hive/conf
cat hive-site.xml

To allow Hive scripts to use HBase, add the following statements to the top of each script.
OR
adding these JARS BY populating in the hive.aux.jars.path property in hive-site.xml and Restart HIVE

ADD JAR /usr/lib/hive/lib/zookeeper.jar;
ADD JAR /usr/lib/hive/lib/hive-hbase-handler-0.13.1-cdh5.3.0.jar;
ADD JAR /usr/lib/hive/lib/guava-11.0.2.jar;
ADD JAR /usr/lib/hbase/hbase-client-0.98.6-cdh5.3.0.jar;
ADD JAR /usr/lib/hbase/lib/hbase-common-0.98.6-cdh5.3.0.jar;
ADD JAR /usr/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar;
ADD JAR /usr/lib/hbase/lib/hbase-server-0.98.6-cdh5.3.0.jar;
ADD JAR /usr/lib/hbase/lib/hbase-shell-0.98.6-cdh5.3.0.jar;
ADD JAR /usr/lib/hbase/lib/hbase-thrift-0.98.6-cdh5.3.0.jar;

Data,
arun 1 cse 1
sunil 2 cse 1
raj 3 cse 1
naveen 4 cse 1
venki 5 cse 2
prasad 6 cse 2
sudha 7 cse 2
ravi 1 mech 1
raju 2 mech 1
roja 3 mech 1
anil 4 mech 2
rani 5 mech 2
anvith 6 mech 2
madhu 7 mech 2

In general HBase expects a Key to be specified. In our data, there is no id, but we have specified key int as the first column. so because of this data in the file cannot be loaded into hive table.
CREATE TABLE IF NOT EXISTS student_hive (key int, name String, id int, course String, year int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES ("hbase.columns.mapping" = ":key, Course_Details: hname,Course_Details: hbid, Course_Details: hbcourse, Course_Details: hbyear") TBLPROPERTIES ("hbase.table.name" = "Studentdata_hbase");
//The above command is INVALID

So, CREATE the HIVE table with the below command where name String acts as key for HBase. Here ":key" is specified at the begining of 'hbase.columns.mapping' property which automatically maps to first column(name String in HIVE. Also this automatically creates HBase table with name 'Studentdata_hbase' with columnfamily name as 'Course_Details'. Refer http://hadooptutorial.info/hbase-integration-with-hive/

CREATE TABLE IF NOT EXISTS student_hive (name String, id int, course String, year int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with SERDEPROPERTIES ("hbase.columns.mapping" = ":key, Course_Details: hbid, Course_Details: hbcourse, Course_Details: hbyear") TBLPROPERTIES ("hbase.table.name" = "Studentdata_hbase");

Now Load data to the HIVE table using,
From Local file - load data local inpath
OR
Insert from a table - INSERT into student_hive SELECT student_partition2.name, student_partition2.id, student_partition2.course FROM student_partition2;

Note: Initially data already exists in HBase table. After  Hive table creation, the same data in HBase table would be reflected in the newly created Hive table. Till now all this is with he existing table and consider the existing data is of 10,000 records.

Our need is to insert new data which is of 5000 records and is in an ORC table student_partition2. After 'Insert into --' statement the new 5000 records from ORC table will be added up to the HBase table and combinely 15000 records are available. This is a simple append process.