Monday, December 31, 2018

CICD Process

CICD Process:

Continuous Integration and Continuous Deployment

Git is the source repository. Below are some of the points.

1. For enhancements a feature branch would be created out of Master. The changes for this feature would be made in the
feature branch by checking out the source code and pushing the changes.

For pushing the changes to the remote GIT repository, it first needs to be committed and then pushed.
commit - adds changes to local repository.
push - transfer the last commits to GIT Remote server.

What is difference between commit and push in git?
Since git is a distributed version control system, the difference is that commit will commit changes to your local repository, whereas push will push
changes up to a remote repo. git commit record your changes to the local repository. git push update the remote repository with your local changes.

2. Jenkins is a build tool in which source code would be compiled, JAR would be generated and copies the JAR to the specified location in the EMR cluster.

3. This would be used for UAT Testing in L3 environment. Dev testing would be done by using the JAR compiled in developers machine in L1 environment.

4. Once UAT is passed, deployment would be done in L4 with the same JAR and post 15 days of DEV warranty this code from feature branch would pushed to the
Master branch by DEVOPS team.



Basic Structure and usage:
Initially Master would be created and that would have 1.0 version of code. 2 more branches will be created out of it namely staging and Dev.
Development process:
1.       Each developer would create each of their feature branch namely “feature/JIRATicketNumber” branches from Dev. This branch would be created only in local machine and not in Remote server at this point of time. Below are the steps for this process:
è Clone code from all the 3 branches Master, staging and Dev- command: Git Clone.
è Checkout Dev for creating a branch out of this. command: Git checkout Dev
è Create a feature branch in the local machine. Command: Git checkout –b feature/JIRATicketNumber.

2.       Each developer would make changes in their Local feature branch and the changes can be saved in the local machine as in the form of branch by using commit. Command: Git commit. To compare the changes made the command used is, command: Git Status.

3.       Pushing the changes from the feature branch to the Remote Repository.
Currently the code changes made by each developer are in the local Repository. Here Local refers as Laptop and Remote server as Git Server. At this point of time, below are the branches in Remote Server and Local.
Remote Server - 3 Branches. Master, Staging and Dev.
Local Repository – 4 Branches. Master, Staging and Dev + feature branch.
The changes in the local branch are to be pushed to Remote Server, in this case first the feature branch has to be create in Remote server and subsequently each time the code has to be pushed.
Commands: Git push –set–upstream origin <name>
                      Git push (for subsequent pushes)
4.       Once the changes are done at the developer end along with Unit testing and is ready to push the changes from feature branch to Dev branch. Below are the steps to be followed:

Pull the latest changes from Dev in the Remote server to local Dev Repository. So, on doing this the latest code changes from peer developers would be pulled in and as an example consider 2 new source files have been added to the Dev branch by a peer developer during this phase and these are not available in the feature branch. Also these are to be updated in the local Dev branch, so a pull has to be made and changes re the be merged.
Commands: Git pull   (Remote Dev branch to Local Dev)
                      Git push (from Local feature branch to Remote feature branch)
                      Git pull Request (This includes code review process)
       Git merge (Once Code review is completed the code would be merged to the Dev branch)

Anatomy of a Pull Request
When you file a pull request, all you’re doing is requesting that another developer (e.g., the project maintainer) pulls a branch from your repository into their repository.

Sources: https://www.atlassian.com/git/tutorials/making-a-pull-request


GIT Commands in Git Bash:

cd dirname
git clone https://github.ent.srccode.com/leel0002/DataUpgrade
cd DataUpgrade/
git branch          //Gives the current branch we are pointing to
git branch -a       // Lists all the branches in the repository
git checkout unit    //Switching to the branch
git pull origin unit unit           // Updates code in all the branches in the repository
Make changes in the code
git status          // gives the status of the updated files
git add .
git commit -a -m "adding new updates"   // commit the code changes in the local branch in laptop
git checkout master  //Need to comeout of the branch and only then we can check-in
git push origin unit           // pushes the code which is committed in the local branch in the repository to the remote server

To create a feature branch from development branch.

1. In the UI, select develop branch and type the new branch name as feature/JIRANUMBER, as below. Observe the from 'develop'.




Wednesday, November 7, 2018

OOZIE


OOZIE Schedule


frequency=0 09,15,22 * * *

This frequency indicates Job to get triggered every day at 09:00 am, 3:00p.m and 10:00 p.m

0 indicates Minutes
09,15,22 indicates hour
1st * indicates days
2nd * indicates Months
3rd * is default or Year

If like to run for every 24 hours, specify
frequency=1440

here, 1st parameter is minutes.

Source: http://blog.cloudera.com/blog/2014/04/how-to-use-cron-like-scheduling-in-apache-oozie/


OOZIE Commands:

1] To submit job - Goto to directory containing job.properties and run following command.
oozie job --oozie http://HOSTNAME:11000/oozie --config job.properties -submit

2] To kill a job
oozie job --oozie http://HOSTNAME:11000/oozie --kill [coord_jobid]

3]To suspend a job
oozie job --oozie http://HOSTNAME:11000/oozie --suspend [coord_jobid]

4]To resume suspended job(coord_jobid is the one used which is suspended)
oozie job --oozie http://HOSTNAME:11000/oozie --resume [coord_jobid]

5] To restart a failed workflow.
oozie job -rerun [parent_workflow_jobid] -Doozie.wf.rerun.failnodes=true


Configuring YARN Capacity Scheduler Queues in AWS EMR

Followhttps://mitylytics.com/2017/11/configuring-multiple-queues-aws-emr-yarn/


Saturday, August 4, 2018

AWS


Command to copy from local linux file system to S3.

aws s3 sync ${v_input_path}/ s3://${output_path}/ --include "*.gz"



Typical Prod cluster is,

1 - r4.16xlarge

12- r4.4xlarge

r4.16xlarge - 64 vCore, 488 GiB memory
r4.4xlarge - 16 vCore, 122 GiB memory

Dev
m4.4xlarge - 32 vCore, 64 GiB memory

1- m4.4xlarge Master

10 - m4.4xlarge Datanodes


IAM Roles

Using a command line tool avenue, we create IAM Roles

eg command: avenue policy create-p <ENV> --policy-name <NEWIAM POLICYNAME>

After creating the IAM role we need to attach policies which enables accesses to multiple AWS Services,

eg command: avenue policy attach ..

Tuesday, July 17, 2018

SQL Server commands


Get the table name and the row count in a DB:


SELECT
    t.NAME AS TableName,
    SUM(p.rows) AS [RowCount]
FROM
    sys.tables t
INNER JOIN   
    sys.indexes i ON t.OBJECT_ID = i.object_id
INNER JOIN
    sys.partitions p ON i.object_id = p.OBJECT_ID AND i.index_id = p.index_id
WHERE 
    i.index_id <= 1
GROUP BY
    t.NAME, i.object_id, i.index_id, i.name
ORDER BY
    SUM(p.rows) DESC

Source: https://stackoverflow.com/questions/3980622/sql-server-2008-i-have-1000-tables-i-need-to-know-which-tables-have-data

Get System time

select SYSDATE()

Sunday, June 10, 2018

Amazon Kenisis

Kenisis:


Can be run on EC2 Instances.
Similar as Kafka

Records of a stream can be accessible up to 24 hours by default and can be extended up to 7 days by enabling extended data retention.

The maximum size of a data blob (the data payload before Base64-encoding) in one record is 1 megabyte (MB).

Kenisis storage:


1. Streams

2. Record - The unit of data of the Kinesis data stream, which is composed of a sequence number, a partition key, and a data blob.

data blob in simple terms can be called as actual message.

3. Shards - Data would be stored in Shards, replicated in Availability zones. Available for applications to consume the records.

4. Streams are made of multiple shards. Each shard can ingest data upto 1MB/sec and upto 1000 Transactions Per Second

Stram = shard1 + shard2 +shard3..

As stream data increases shards can be increased and can reduced when stream inflow is less.

5. Partitioning feature is also available, where in if customer_id is chosen as partition key then HASH(cust_id) would be done and always same customer id would go to
same shard.

6. APIs are available to read from kenisis. These client libraries would handle the complexity from multiple shards and distributed mode. The experience would be likle reading from a single source.

7. Auto scaling can be enabled, which can spun up a new EC2 instance when number of shards are increased.

Features of Amazon Kinesis

Real-time processing − It allows to collect and analyze information in real-time like stock trade prices otherwise we need to wait for data-out report.

Easy to use − Using Amazon Kinesis, we can create a new stream, set its requirements, and start streaming data quickly.

High throughput, elastic − It allows to collect and analyze information in real-time like stock trade prices otherwise we need to wait for data-out report.

Integrate with other Amazon services − It can be integrated with Amazon Redshift, Amazon S3 and Amazon DynamoDB.

Build kinesis applications − Amazon Kinesis provides the developers with client libraries that enable the design and operation of real-time data processing applications. Add the Amazon Kinesis Client Library to Java application and it will notify when new data is available for processing.

Cost-efficient − Amazon Kinesis is cost-efficient for workloads of any scale. Pay as we go for the resources used and pay hourly for the throughput required.

Source: https://www.youtube.com/watch?v=ZROcwFis7wI


Queries:

1. How to programatically increase shars or is it automatic?

Monday, April 23, 2018

HBase Architecture and inserting data

                                            HBase

Architecture:

Master Slave Architecture

3 Major components:

-> Region Servers - Responsible to serve data to clients. Equivalent as data dones in HDFS.
-> Zookeeper maintains cluster state. Zookeeper ensemble is usually configured to maintain the state.
-> HMaster - As master system in the cluster.
Asssign regions
Load balancing
fault tolerance
health monitoring


Region servers those run on datanode machines will send heartbeats to zookeeper nodes. HMaster listens to heartbeats of Region Servers from zookeeper. Incase, heartbeat is not received for 3 seconds then HMaster treats the Region server as down.

Only 1 HMaster is always active, if active HMaster is down the inactive HMaster will become Active.


  •  HBase tables are horizontally divided into regios. Defaultregion size = 1GB 
  • A Single Region Server can have multiple regions of same table or different tables.
  • Max number regions for a Region Server = 1000
  • Regions of same table can be in same region server or different region server.
  • Initially these regions will be allocated in same Region server, later for better load balancing purpose newly allocated region will be moved to another region server.

Writing Process to HBase:

Key components

  1. WAL - Client when writes will write to WAL. Although it is not the area where the data is stored, it is done for the fault tolerant purpose. So, later if any error occurs while writing data, HBase always has WAL to look into.
  2. Memcache - Later WAL writes record to memcache. Memcache caches all write and edited records. Once memcache limit is reached, all data will be flushed into HFile and memcache becomes empty. As memcache gets filled, those many HFile will get created and in this way multiple HFiles will get generated. A single region can have multiple memcaches.
  3. HFile - Actual data is stored in these and are in HDFS


In case of multiple memcaches, all memcaches will flush data into different HFiles and results in numerous small HFiles.
Hadoop is bad for small files, so comes Minor compaction into picture.

Minor Compaction: Merge all small files into one big file.

https://www.edureka.co/blog/hbase-architecture/

Writing data to HBase via


1. Inserting data to HBase table via hbase shell


Put command: put ’<table name>’,’row1’,’<colfamily:colname>’,’<value>’
Eg:
hbase(main):005:0> put 'emp','1','personal data:name','raju'
0 row(s) in 0.6600 seconds
hbase(main):006:0> put 'emp','1','personal data:city','hyderabad'
0 row(s) in 0.0410 seconds
hbase(main):007:0> put 'emp','1','professional
data:designation','manager'
0 row(s) in 0.0240 seconds
hbase(main):007:0> put 'emp','1','professional data:salary','50000'
0 row(s) in 0.0240 seconds

Read data:

get command: get ’<table name>’,’row1’

eg:
hbase(main):012:0> get 'emp', '1'

Read specific column: get 'table name', ‘rowid’, {COLUMN ⇒ ‘column family:column name ’}

eg:
hbase(main):015:0> get 'emp', 'row1', {COLUMN ⇒ 'personal:name'}

Read complete table data: scan 'emp'

Update Data: update an existing cell value using the put command

put ‘table name’,’row ’,'Column family:column name',’new value’
Eg:
hbase(main):002:0> put 'emp','row1','personal:city','Delhi'

Delete using delete command

Drop HBase table: disable it  and then drop

hbase(main):018:0> disable 'emp'
0 row(s) in 1.4580 seconds

hbase(main):019:0> drop 'emp'


2. Spark and JAVA APIs.


API function used to insert data to HBase is Put()

Put() Sample code:

      // Instantiating Configuration class
      Configuration config = HBaseConfiguration.create();

      // Instantiating HTable class
      HTable hTable = new HTable(config, "emp");

      // Instantiating Put class
      // accepts a row name.
      Put p = new Put(Bytes.toBytes("row2"));

      // adding values using add() method
      // accepts column family name, qualifier/row name ,value
      p.add(Bytes.toBytes("personal"),
      Bytes.toBytes("name"),Bytes.toBytes("raju2"));

      p.add(Bytes.toBytes("personal"),
      Bytes.toBytes("city"),Bytes.toBytes("hyderabad2"));

      p.add(Bytes.toBytes("professional"),Bytes.toBytes("designation"),
      Bytes.toBytes("manager2"));

      p.add(Bytes.toBytes("professional"),Bytes.toBytes("salary"),
      Bytes.toBytes("60000"));

      // Saving the put Instance to the HTable.
      hTable.put(p);

Also Bulkput() is also available which does bulk insersion of data:

      List<String> list= new ArrayList<String>();
      list.add("1," + columnFamily + ",a,1");
      list.add("2," + columnFamily + ",a,2");
      list.add("3," + columnFamily + ",a,3");
      list.add("4," + columnFamily + ",a,4");
      list.add("5," + columnFamily + ",a,5");

      JavaRDD<String> rdd = jsc.parallelize(list);
      Configuration conf = HBaseConfiguration.create();

      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

      hbaseContext.bulkPut(rdd,
              TableName.valueOf(tableName),
              new PutFunction());

  public static class PutFunction implements Function<String, Put> {

    private static final long serialVersionUID = 1L;

    public Put call(String v) throws Exception {
      String[] cells = v.split(",");
      Put put = new Put(Bytes.toBytes(cells[0]));

      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
              Bytes.toBytes(cells[3]));
      return put;
    }

  }

3. Bulk upload

using TSVimport commandline.

Wednesday, March 28, 2018

Kafka Connect


Kafka Connect- https://www.confluent.io/product/connectors/

Kafka Connect
Kafka Connect is a framework included in Apache Kafka that integrates Kafka with other systems. Its purpose is to make it easy to add new systems to your scalable and secure stream data pipelines.

To copy data between Kafka and another system, users instantiate Kafka Connectors for the systems they want to pull data from or push data to. Source Connectors import data from another system (e.g. a relational database into Kafka) and Sink Connectors export data (e.g. the contents of a Kafka topic to an HDFS file).


https://kafka.apache.org/documentation/#connectapi - Look under Transformations

https://docs.confluent.io/current/connect/connect-hdfs/docs/hdfs_connector.html
https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect

Few links to customize the connectors or write own connectors.

JAR files would be under /home/gorrepat/confluent/confluent-4.0.0/share/java/kafka-connect-hdfs


https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java


https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/json/JsonFormat.java

Need to add Custom logic(fingerprint removal) in write() of https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/json/JsonRecordWriterProvider.java

Friday, March 9, 2018

Apache Phoenix



Apache phoenix is installed under /opt/mas/phoenix/

Launch Phoenix CLI with zookeeper

bin/sqlline.py localhost:2181
OR
bin/sqlline.py xvzw160.xdev.motive.com,xvzw161.xdev.motive.com,xvzw162.xdev.motive.com


Create Table:
CREATE TABLE IF NOT EXISTS STOCK_SYMBOL (SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY VARCHAR);

List tables in Phoenix
select DISTINCT("TABLE_NAME") from SYSTEM.CATALOG;

Insert Data into table.
UPSERT INTO STOCK_SYMBOL VALUES ('CRM','SalesForce.com');

Bulkupload data to table STOCK_SYMBOL:
bin/sqlline.py -t STOCK_SYMBOL xvzw160.xdev.motive.com,xvzw161.xdev.motive.com,xvzw162.xdev.motive.com /opt/mas/phoenix/examples/STOCK_SYMBOL.csv

To bulk upload data to Phoenix:
./psql.py xvzw160.xdev.motive.com,xvzw161.xdev.motive.com,xvzw162.xdev.motive.com /opt/mas/phoenix/examples/WEB_STAT.sql /opt/mas/phoenix/examples/WEB_STAT.csv /opt/mas/phoenix/examples/WEB_STAT_QUERIES.sql

./sqlline.py xvzw160.xdev.motive.com,xvzw161.xdev.motive.com,xvzw162.xdev.motive.com

Importing Java project and integrating with MAVEN



Point 1: Local Maven Repository:
Usually there would be a local repository in which all the JARs would be placed. By default these JARs would be downloaded from MAVEN repository, To override this repository and make our local repository to download the JARS, update settings.xml under Window -> preferences -> Maven -> User Settings.
Jenkins build also picks the jars from this local repository during build.

JARs acn also be added to the project externally via class Path -> Add External Jars.. Also, via Maven these can be added via POM.xml by adding <scope> and <systemPath> additionally to the dependancy as below,

    <dependency>
        <groupId>com.loopj.android.http</groupId>
        <artifactId>android-async-http</artifactId>
        <version>1.3.2</version>
        <type>jar</type>
        <scope>system</scope>
        <systemPath>${project.basedir}/libs/android-async-http-1.3.2.jar</systemPath>
    </dependency>

Point 2: If any of the Jars are failing to download then go the repository location by entering the repository URL in the browser and check if the jars exists.

Point 3: Inorder to add a new file repository to download JARs, add the entry in settings.xml

case : Found a sample project in an article then follow the steps.

1. See the code if it fits exactly to the need.
2. Integrate to the workspace and try to build with maven command prompt.
Before this verify if maven has installed properly. In command prompt, Maven home path to be set properly. Try to use Maven 3.2.2 which is most stable

C:\Users\gorrepat>mvn -version
Apache Maven 3.2.2 (45f7c06d68e745d05611f7fd14efb6594181933e; 2014-06-17T19:21:42+05:30)
Maven home: C:\SWSetup\apache-maven-3.2.2-bin\apache-maven-3.2.2
Java version: 1.8.0_151, vendor: Oracle Corporation
Java home: C:\Program Files\Java\jdk1.8.0_151\jre
Default locale: en_IN, platform encoding: Cp1252
OS name: "windows 10", version: "10.0", arch: "amd64", family: "dos"

3. From command prompt go to workspace location and give command "mvn clean install".
4. Resolve JARs issue which failed to download using Points 1 and 2.
5. Upgrade to the latest version Jars and try to build again.
6. Try to incluse same versions those are available in the cluster.

case : If we get a sample class and need to create a project out of it, then create a new project with below skeleton and search for the jars based on package names.
Eg: search in google as <PACKAGENAME> jar as org.apache.hadoop.hbase.tablename jar

Then we will come to know the jar file and include it in pom.xml.

A sample POM.xml that includes all the dependancies is below, the key is "<descriptorRef>jar-with-dependencies</descriptorRef>"

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.spark.streaming</groupId>
  <artifactId>JavaSparkStr</artifactId>
  <version>0.0.1-SNAPSHOT</version>
 <dependencies>
  <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
</dependencies>
<build>
    <plugins>   
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
      <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>     
    </plugins>
</build>
</project>

To make Runnable JAR, need to create a manifest file or add below entries in POM.xml,

          <manifest>
            <addClasspath>true</addClasspath>
            <classpathPrefix>lib/</classpathPrefix>
            <mainClass>consumerTest.KafkaConsumer</mainClass>
          </manifest>

Latest Maven Projects complied with java 1.8 are under https://drive.google.com/open?id=1yp3Q6jucpGW4bCZ8P9MkDpy7q-x0j7hK



To Run a Java standalone program,

java -cp samples-0.0.1-SNAPSHOT-jar-with-dependencies.jar producer.SimpleStringProducer

Wednesday, February 28, 2018

JDBC connection from Spark


JDBC connection from Spark Steps:


Both Read and Write.

1. Create JDBC URL,
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}

////Sets username and password////
  import java.util.Properties
  val connectionProperties = new Properties()

  connectionProperties.put("user", s"${jdbcUsername}")
  connectionProperties.put("password", s"${jdbcPassword}")

2. Read data from JDBC

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

3. Write data to JDBC

New table creation,
spark.table("diamonds").withColumnRenamed("table", "table_number")
     .write
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Insertion to existing table,
import org.apache.spark.sql.SaveMode

spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
     .write
     .mode(SaveMode.Append) // <--- Append to the existing table
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Source: https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Second Approach:

val jdbcSqlConnStr = "jdbc:sqlserver://sn1:1433;database=test;user=user1;password=pwd123#;"

val query = """(select c.Number, c.Num_Ext, s.ID, cast(snapshotdate as date) as snapshotdate
     FROM Tbl1 C
     join Tbl2 P on P.ID = C.ID
     join Tbl3 s on c.ID = s.LocID
     WHERE p.Location = 'NY') aliasName"""

val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

val dfJDBC = spark.sqlContext.read.format("jdbc").options(Map("url"->jdbcSqlConnStr, "Driver"->driver,"dbTable"->query)).load()

source: https://mapr.com/support/s/article/How-do-I-connect-from-JDBC-to-MySQL-using-Spark-SQL-datasource?language=en_US

Parallelism while pulling data from RDBMS


By default when the below function is executed only 1 Connection to RDBMS is established to an executor node for pulling data.

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

Multiple connections can be established by increasing numPartitions. This gives parallel connections for faster data pull. Multiple connections to RDBMS will get established to pull data in faster manner.
Eg:

val employees_table = spark.read.jdbc(jdbcUrl, "employees", "Employee_ID", 1, 200000, 10, connectionProperties);

Here, we have specified 10 connections for data pull.

10 Executors will run in this case.

See the below screenshot for 10 Tasks parallel run.




While writing or reading from RDBMS. 


Reading:
Eg:
val numPartitions = 5
val min = 1
val max = 45000000
val fetch = 10000
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", someprimaryKey).
option("dbtable", config("table")).
option("user", user).
option("fetchsize",fetch).
option("password", password).load()

Here, we have given numPartitions = 5 , partitionColumn = someprimaryKey, min = 1, max = 45000000
So, While pulling the data 5 connections would be established to the db by 5 executors which divides the data pull of 45000000/5 = 9,000,000 / Executor per connection.

val fetch = 10000 specifies the JDBC fetch size, which determines how many rows to fetch per round trip.

options avialable for JDBC connection are under, https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Writing:
Same is the case while writing to RDBMS, numPartitions = Number of connections established to the db by same number of executors.
Note:
1. No need to specify lowerBound and upperBound values while writing to RDBMS.
2. Instead of fetch size, it is batchsize.

option("batchsize",batchcount).

Eg:
df.write
  .mode(SaveMode.Append)
  .option("batchsize",batchcount)
  .option("numPartitions", 8)
  .option(JDBCOptions.JDBC_DRIVER_CLASS, "org.postgresql.Driver")
  .jdbc(url, dbTable, connectionProperties)

Monday, February 19, 2018

Flume



Written custom Interceptor for Flume.


Problem: Messages are in Kafka in Avro format and has been encoded with 8 bytes of text in front. Need to extract each message by removing the first 8 bytes and send this message to HDFS Location which is a Hive external table location.

Solution: This interceptor is required to remove finger print, so the logic has to decode the message and remove first 8 bytes and convert the data to AVRO format and send to sink.

Below is the approach:
1. create a class that implements Interceptor.
2. Override public Event intercept(Event event) method.
3. Below is the logic in intercept method,

byte[] eventBody = event.getBody();

    try

    {
    Schema SCHEMA = new Schema.Parser().parse(CustomInterceptor.class.getClassLoader().getResourceAsStream("djr.avsc"));
    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(SCHEMA);
    BinaryDecoder binaryDecoder = new DecoderFactory().binaryDecoder(eventBody, 8, eventBody.length - 8, null);
    GenericRecord jsonRecord = reader.read(null, binaryDecoder);

    //////////JSON conversion to AVRO/////////
    File file = new File("/home/gorrepat/record.avro");
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(SCHEMA);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
    dataFileWriter.create(SCHEMA, file);
    dataFileWriter.append(jsonRecord);
    ///////////////JSON conversion to AVRO///////////
    event.setBody(str.getBytes());
}

4. Flume configuration,

agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = hdfs-sink

agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.topic = cdpTopic
agent.sources.kafka-source.groupId = flume-id
agent.sources.kafka-source.channels = memory-channel
agent.sources.kafka-source.interceptors = etpr-parameters-interceptor custominterceptor

#getting serialized data from CustomInterceptor class
agent.sources.kafka-source.interceptors.custominterceptor.type= com.djr.interceptor.CustomInterceptor$Builder

agent.sinks.hdfs-sink.hdfs.path = hdfs://mas:8020/var/lib/hadoop-hdfs/mas_test_data/scripts/json_test/CustomInterceptorJson/date=%y-%m-%d
//////For CDP topic data is directly written to HDFS location. This would write to a partitioned directory with date=. %y%m and %d values are passed by flume.///////////

/////Below sink is for CRM plugin which uses kite data set.
sinks:
crm-plugin-kite-sink-hive-1:
channels:
- crm-plugin-mem-channel-hive
  config:
    type: org.apache.flume.sink.kite.DatasetSink
      # Kite dataset url for hive.
         kite.repo.uri: 'repo:hive'
     # Kite dataset name for hive.
         kite.dataset.name: inv_all
     # Batch commit size.
         kite.batchSize: 100000
     # Batch timeout in seconds.
         kite.rollInterval: 30

Note: 


  • Need to copy the custom inceptor JAR file under flume lib dir(/usr/lib/flume-ng/lib) and specify the class name of the custom interceptor under agent.sources.kafka-source.interceptors.custominterceptor.type



  • Kite data set plug-in is used in sink to write data to Hive tables. To use Kite data set need to copy kite-data-hive.jar to flume lib dir(/usr/lib/flume-ng/lib) and specify     type: org.apache.flume.sink.kite.DatasetSink.

         Kite dataset can be used for writing data to HBase as well using kite-data-hbase.jar. There are many options available in Kite data set

Tuesday, February 6, 2018

Spark

RDD

RDD is an immutable object that gets computed in-memory and in Parallel.

Fault Tolerance in Spark: Self-recovery property in RDD
This logical execution plan is also popular as lineage graph. In the process, we may lose any RDD as if any fault arises in a machine. By applying same computation on that node, we can recover our same dataset again. Resilient means that the failure part of RDD would be re-executed from lineage.

In case of a data node failure - In this process, data gets replicated on one of the other nodes. So that if any failure occurs we can retrieve the data for further use.

Immutable distributed data objects those gets computed in parallel in different data nodes in the cluster.

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
The object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable.

Default size of table while doing Broadcast Join is 10 MB. 

It can be increased by setting the parameter spark.sql.autoBroadcastJoinThreshold. Till now I have used upto 300 MB.

Difference between Sort merge JOIN and Broadcast JOIN

Accumulators

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance counter) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table.

Accumulators in the Spark UI
Tracking accumulators in the UI can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).

A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

The code below shows an accumulator being used to add up the elements of an array:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

Another real time example where I have used accumulators:

Case: I have written an API that does an upsert to RDBMS which uses DriverManager class where the records are parsed and inserted like insert into ...
During this process if an exception has occured due to improper data in one of the record then we need to capture this exception and mark the function output to represent its failure to upsert. Increment the counter value in the exception block as,
retVal += 1 
exceptionReturnedFrmExecutor +=sqlExp.getMessage()
return it in the 
finally{
(retVal, exceptionReturnedFrmExecutor)
}

Here, both retVal and exceptionReturnedFrmExecutor are accumulators.

Spark SQL vs Spark Session


Prior to Spark 2.0 there is a need to create a SparkConf and SparkContext to interact with Spark, and then SQLContext.

Whereas in Spark 2.0 the same effects can be achieved through SparkSession, without expliciting creating SparkConf, SparkContext or SQLContext, as they’re encapsulated within the SparkSession. Using a builder design pattern, it instantiates a SparkSession object if one does not already exist, along with its associated underlying contexts.

Eg:        val sparkSession = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

Difference between spark.jars and spark.driver.extraClassPath

Use --jars if you want to make these jars available to both driver and executor class-paths. If the required jar is only to be used by driver code, use option --driver-class-path

Launching spark-shell with external jars.

spark-shell --jars /usr/lib/hive/lib/json-udf-1.3.7-jar-with-dependencies.jar,/usr/lib/hive/lib/json-serde-1.3.7.3.jar

In Post, http://leelaprasadhadoop.blogspot.in/2017/07/hive-functions.html

a) A JAR has been added to hive
b) Created function to use as UDF.

hive> add jar hdfs:////user/HiveHbase/hive-contrib-1.1.0.jar;

hive> CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

Same can be acheived in Spark via below commands.

Launch Spark-shell along with the external JAR

spark-shell --jars /home/gorrepat/hive-contrib-1.1.0.jar

scala> import org.apache.spark.sql.hive.HiveContext
scala> val hq = new HiveContext(sc);
scala> hq.sql("""CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'""")
scala> hq.setConf("set hive.mapred.mode","nonstrict")    //same as hive> set hive.mapred.mode=nonstrict;

To SEE THE LOADED Classpath JARS,

In Scala ,
val cl = ClassLoader.getSystemClassLoader
cl.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(println)

Inserting data in Hive table from Spark SQL

val data = hq.sql("select \"leela\" as name)
data.write.mode("append").saveAsTable("emp")

OR

hq.sql("insert into table emp select name from student_hive")


Writing data to HDFS without using saveAsTextFile. This helps in case where this API can't be used.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.spark.input.PortableDataStream
import java.io._
  def writeAsString(hdfsPath: String, content: String) {
   val fs = {
      val conf = new Configuration()
      FileSystem.get(conf)
      }
    val path: Path = new Path(hdfsPath)
    if (fs.exists(path)) {
      fs.delete(path, true)
    }
    val dataOutputStream: FSDataOutputStream = fs.create(path)
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
    bw.write(content)
    bw.close

  }

rdd conversion to DataFrame

There are few options available to convert rdd to DF.

1. Create a case class and map rdd values to case class fields and use toDF(). 
Note: Spark 1.6 and below had a limitation of not accepting more than 23 fields in a case class.

2. StructType & StructField is a goo way, however need to have the data in the form of Rows and not in Array.

3. Some good information and another approach is mentioned under http://markhneedham.com/blog/2015/08/06/spark-convert-rdd-to-dataframe/


Spark Client mode and Cluster mode


Client mode: The driver resides in the Client machine from which the application is launched vis spark-submit.

Cluster mode: Spark Driver resides in Application Master. Even though we shut down the machine from which spark-submit is triggered, still the application runs because the driver is in Application Master.

Source: https://www.youtube.com/watch?v=vJ0eUZxF80s

Executors,Stage, partitions and Tasks.


Take a case where Driver has submitted the Job for 2 Executors to execute. Data is in 5 partitions.

In case of 5 partitions a maximum of 5 tasks can be executed, but in this case there are only 2 Executors, so in-parallel only 2 tasks will get execute and the other 3 will wail for one of them to complete and get processed.

Stage: It can be called as a set of transformations execution. When ever a wide transformation is encountered a new stage would be created(eg. Reducebykey or join) in which data shuffles across the partitions. Stages will comprised of Tasks.

Sources:
https://www.youtube.com/watch?v=qctfDbrvQ8s
https://www.youtube.com/watch?v=fyTiJLKEzME

Parallelism in Spark using Futures

follow: http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/#concurrency-in-spark

Sample code for simple reading 2 DF's, Join and Writing the result to a New DB


object  stgIngestion {
  def main(args: Array[String]): Unit = {

    println("Say hello")

    val spark = SparkSession.builder
      .appName("stgingIngestion")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition","true")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .config("spark.sql.parquet.writeLegacyFormat","true")
      .getOrCreate()

    spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    spark.sparkContext.hadoopConfiguration.set("speculation", "false")
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


    val filename = args(0)
    val srcPathTblsPath = args(1)             
    val targetHiveDB = args(2)           
    val targethivePath = args(3)         


   val tableList = Source.fromFile(filename).getLines.toList

    val tableIterator = tableList.toIterator.map(table => Future{
      val df = spark.read.format("orc").load(s"${srcPathTblsPath}/${table}/")
      val df_id = spark.read.format("orc").load(s"${srcPathTblsPath}/${table}_id/")
      df.withColumn("row_num", row_number() over(partitionBy(col("id")).orderBy(desc("updatetime")))).
        filter(col("row_num") === "1").
        join((df_id),Seq("id"),"inner").
        write.format("parquet").
        options(Map("path" -> s"${targethivePath}/${table}")).
        mode(SaveMode.Overwrite).partitionBy("ingestiondt").
        saveAsTable(targetHiveDB + "." + table)
    })
    val timeout: Duration =Inf
//    Await.result(Future.sequence(tableIterator),timeout)        //Without Sliding window

    def awaitSliding[T](it: Iterator[Future[T]], batchSize: Int = 2, timeout: Duration = Inf): Iterator[T] = {
        val slidingIterator = it.sliding(batchSize - 1).withPartial(true)
        val (initIterator, tailIterator) = slidingIterator.span(_ => slidingIterator.hasNext)
        initIterator.map(futureBatch => Await.result(futureBatch.head, timeout)) ++
        tailIterator.flatMap(lastBatch => Await.result(Future.sequence(lastBatch), timeout))
}

   awaitSliding(tableIterator).foreach(x => println("Done"))
  }
}

Sample spark-submit command,

spark-submit --class com.practice.stgIngestion  --master yarn --deploy-mode client --conf spark.shuffle.spill=true --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=1024m --conf spark.sql.planner.externalSort=true --conf spark.shuffle.manager=sort  --conf spark.ui.port=8088 --conf spark.executor.memoryOverhead=12096  --conf spark.driver.memoryOverhead=12096 --conf spark.rpc.message.maxSize=1024 --conf spark.file.transferTo=false --conf spark.driver.maxResultSize=10g --conf spark.rdd.compress=true --conf spark.executor.cores=5 --conf spark.executor.memory=10g --conf spark.driver.memory=20g  --conf spark.executor.instances=2 --jars /home/hadoop/data/sqljdbc42.jar /home/hadoop/data/demo_2.11-0.1.jar

More the Number of executors on Big tables might lead to JAVA Heap space error. So, specified spark.executor.instances=2

Note: The TPS for this Job is about 300k per second. Pulled about 1 billion records from all the 10 parallel threads.

pseudo code for executing functionality with future execution context, handling exceptions, logging status in Hive


function call from main(),

val viewIterator = readWriteToHDFS(spark)

awaitSliding(viewIterator, threadsCount).foreach(x => println("Done"))

Functions Definition,

import scala.concurrent.{ Await, Future }
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

def readWriteToHDFS(spark: SparkSession): Iterator[Future[Unit]] = {
spark.read ...

val viewlist = spark.read.format("csv").option("header", true).option("delimiter", ",").load(view_path).collect()
viewIterator = viewlist.toIterator.map { a =>
Future {
try {
....
....
writeAuditEntry(spark, viewName, 0, audithive_tableName, s"Success", "Successfully inserted the records")
}
catch {
case ex: FileNotFoundException => {
println("File not Found")
}
case e: Exception => {
println("Unknown exception occured for ${viewname} : $e")
val stackTraceStr = e.getStackTraceString
logger.error(s"Exception occured for ${viewname} and stackTraceStr is:" + stackTraceStr)
writeAuditEntry(spark, viewName, 0, audithive_tableName, s"Failed", e.getMessage)
}
}
}
}
(viewIterator)
}

def writeAuditEntry(spark: SparkSession, tableName: String, recordCount: Long, hiveTable: String, auditStatus: String, auditMessage: String): Unit = {
val newauditMsg = removeEscapeCharacters(auditMessage)
val insertCmd = s"insert into ${audithive_tableName} values('${tableName}', ${recordCount},'${auditStatus}','${newauditMsg}')"
println("insertCmd = " + insertCmd)
spark.sql(insertCmd)
}

Idiom of dataframe PartitionBy and Coalesce

If there is a usecase to write the dataframe to a location Partitioned by few columns and in each column we would like to have a single parquet file.

This is possible with below line of code, however this takes lot of time and on large set of data the job fails as well.
df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status").mode.(SaveMode.Append).parquet(s"${location}")

partitionBy - will create partitions and save data accordingly.
coalesce - problem with using coalesce(1) is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10) you get more parallelism, but end up with 10 files per partition.

To get one file per partition without using coalesce(), use repartition() with the same columns you want the output to be partitioned by. So in your case, do this

df.repartition("entity", "year", "month", "day", "status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")


Coalesce() vs Repartition()

The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.

Coalsece: 
Avoids full shuffle.
Results in unequal partitions

Repartition:
Full shuffle and altogether creates new partitions.
Almost equal sized partitions.


colorDf = peopleDf.repartition(5)

OR

colorDf = peopleDf.repartition($"color")        //This is called Repartition by column and this will ensure that 
                                                                         //all the same columns values are present in the same partition.
Eg: Blue, 1
      Blue, 2
      Red, 1
      Blue,1

This creates 2 Partitions where all the Blue colors will be in a partition and Red in another.

colorDf = peopleDf.repartition($"color",$"code")  //In this case we are passing multiple columns to                                                                                   //repartition and this will ensure that the 
                                                                            //combination of these 2 column values exist in same partition.


Total 3 Partitions are created
Blue,1 - Count =2 and both these values will be in 1 Partition.
Red, 1 - Count =1 present in 1 Partition
Blue, 2 - Count =1 present in 1 Partition


When partitioning by a column, Spark will create a minimum of 200 partitions by default.



Source:



Select dataframe columns from an Array

val cols = "id,name,address"
val colsArr = cols.split(",")
df.select(colsArr.head, colsArr.tail: _*)

Overwrite specific partitions in spark dataframe write method

Directly writing to HDFS or S3 Location

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

Source: https://stackoverflow.com/questions/38487667/overwrite-specific-partitions-in-spark-dataframe-write-method


Point 1: When Spark writes parquet file to HDFS/S3 Location, Hive is unable to read the data written by Spark. 

Root Cause:

This issue is caused because of different parquet conventions used in Hive and Spark. In Hive, the decimal datatype is represented as fixed bytes (INT 32). In Spark 1.4 or later the default convention is to use the Standard Parquet representation for decimal data type. As per the Standard Parquet representation based on the precision of the column datatype, the underlying representation changes.
eg: DECIMAL can be used to annotate the following types: int32: for 1 <= precision <= 9 int64: for 1 <= precision <= 18; precision < 10 will produce a warning

Hence this issue happens only with the usage of datatypes which have different representations in the different Parquet conventions. If the datatype is DECIMAL (10,3), both the conventions represent it as INT32, hence we won't face an issue. If you are not aware of the internal representation of the datatypes it is safe to use the same convention used for writing while reading. With Hive, you do not have the flexibility to choose the Parquet convention. But with Spark, you do.

Solution: The convention used by Spark to write Parquet data is configurable. This is determined by the property spark.sql.parquet.writeLegacyFormat The default value is false. If set to "true", Spark will use the same convention as Hive for writing the Parquet data. This will help to solve the issue.

use the below configuration while submitting spark job or in other words have this as a parameter in spark-submit job.

--conf "spark.sql.parquet.writeLegacyFormat=true" \

Point 2: While reading data from RDBMS in spark via val df1 = spark.sqlContext.read.format("jdbc").
partition column can be specified for parallelism. The partition column has to be integer only and not string or timestamp. Preferably it has to be Primary Key.

Point 3: If the column names in a Dataframe has space or . in between(eg: 'Account Number') then it has to be replaced with _. Below is the code:

   def modifyschema(schema: StructType) =
    {
      val fields = schema.fields
      val fieldnames = fields.map(f => f.name)
      fieldnames.map(x => {
        if(x.contains(" ")) x.replace(" ","_")
        else if(x.contains(".")) x.replace(".","")
        else x
      }
      )
    }

    val colNames = modifyschema(dfraw2.schema)
    val dfraw2_renamed = dfraw2.toDF(colNames: _*)


Executors, Cores and Executor Memory


--num-executors - Executors are the JVM Executors on each Node and Multiple Evecutors can run at same time on a single Node. https://community.hortonworks.com/questions/56240/spark-num-executors-setting.html

--executor-cores - Number of cores = Concurrent tasks an executor can run. Optimum value is 5.

--executor-memory - Memory for each Executor = Allocation of Ram for each executor


Creating UDF's in Spark

UDFs transform values from a single row within a table to produce a single corresponding output value per row.  For example, most SQL environments provide an UPPER function returning an uppercase version of the string provided as input.

val df = Seq(("ID1",1),("ID2",2),("ID3",3)).toDF("Name","value")
val udfFunc = (value: Int) => {value *value}

spark.udf.register("squareUDF",udfFunc)
df.select(col("Name"),callUDF("squareUDF",(col("value")))).show

Note: This UDF is available only in this session.

Source: https://blog.cloudera.com/working-with-udfs-in-apache-spark/

Parallelism in Spark while reading from RDBMS.


As like in sqoop, specify the column name, lowerbound, upperbound and numPartitions values.

if columnname = id,
lowerbound = 1
Upperbound = 1000000
numPartitions = 100

Then, the complete read operation would have parallel connections by splitting the specified columnname, lowerbound and upperbound as 100 parallel connections.

Need to have 100 tasks specified in spark. 
Eg: 
spark.executor.instances=10
spark.executor.cores=10

However, need to keep resource allocation in mind where specifying 10 executors with 10 cores each will eat up Lion's share of the cluster.

So make it 

spark.executor.instances=5
spark.executor.cores=5

In this case 25 parallel reads will happen which is not bad and a resonable utilization of cluster resources.

Similarly, while writing number of write executors can be specified using repartition().

Sourcehttps://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism

Using Log4j


1. Create a log4j.properties file with below contents,

# Root logger option
log4j.rootLogger=INFO, file, stdout

# Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/u/<LINUXPATH>/logging.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

log4j.appender.file.Append=false
log4j.appender.file.ImmediateFlush=true
log4j.appender.file.Threshold=INFO

2. @transient lazy val logger = Logger.getLogger("AppName")
PropertyConfigurator.configure("log4j.properties")

now, we can use

logger.warn("Warning msg")
logger.error("Error msg")
logger.info("Info msg")

3. In spark submit command

spark-submit --class xyz --files /<PATH>/log4j.properties "-Dlog4j.configuration=file:/<PATH>/log4j.properties"

Spark Submit command to launch job in cluster mode


spark-submit --class namespace.XYZClass --files /<PATH>/log4j.properties#log4j.properties,/<CONFFilePath>/unit_AppConffile.conf#unit_AppConffile.conf /<PATH>/App.jar unit_AppConffile.conf


How Spark Shuffle works:


Example of word count:

rdd.flatMap(_.split(' ')).map((_, 1)).reduceByKey((x, y) => x + y).filter(_._1 > 100)

Well, after the initial map stages complete, depending on your shuflle manager, each row is either hashed by the key or sorted and put into a file on disk, on the machine that it was sourced from. Then that executors lets something called the ShuffleManager know that it currently has a block of data corresponding to the given key. The ShuffleManager keeps track of all keys/locations and once all of the map side work is done. The next stage starts, and the executors each reach out to the shuffle manager to figure out where the blocks for each of their keys live. Once they know where those blocks live, each executor will reach out to the corresponding executor to fetch the data and pull it down to be processed locally. To enable this, all the executors run a Netty server which can serve blocks that are requested from that specific executor.

So to recap, it proceeds as follows:

Map operations pipelined and executed
Map side shuffle operations performed (map side reduce, etc)
Map side blocks written to disk and tracked within the ShuffleManager/BlockManager
Reduce stage begins to fetch keys and blocks from the ShuffleManager/BlockManager
Reduce side aggregate takes place.
Next set of map/shuffle stages takes place, repeating from step #1.

Source:
http://hydronitrogen.com/apache-spark-shuffles-explained-in-depth.html

Map() vs flatMap()

Both map() and flatmap() takes single element to process.
When map() is applied on an input RDD, then the output of this function will contain same number of rows as input RDD.
Output of flatMap() can have 0,1 or multiple elements as output and it need not be same as input RDD. So, this is called flattened map.

Spark Architecture or Spark Execution Model:




Partition size in spark
By default HDFS Block size is the Spark partition size

https://drive.google.com/open?id=1zr2YHqYCf7t2_3F2BMwrhpn-KqKbM0tG

DataFrame Join function and Selecting few Columns


val srcDf = spark.read.option("header",true).csv("file1")
val sinkDf = spark.read.option("header",true).csv("file2")

val df_asSrc = srcDf.as("dfSrc")
val df_asSink = sinkDf.as("dfSink")

val joined_df = df_asSrc.join(df_asSink , col("df_Src.id") === col("dfSink.id") && col("df_Src.subject") === col("dfSink.subject"), "inner")

joined_df .show
joined_df.select(col("df_Src.id"), col("df_Src.subject"), col("df_Sink.subject")).show
joined_df .select(col("df_Src.*")).show

Sourcehttps://stackoverflow.com/questions/40343625/joining-spark-dataframes-on-the-key

UDF to convert column values in String to TimeStamp

def convertTimeStamp(ts: String): Timestamp ={
val timestamp = new Timestamp(new SimpleDateFormat("yyyyMMddHH:mm:ss").parse(ts).getTime());
timestamp
}

def applyTimeStampFormat(srcDf: DataFrame, columnName: String): DataFrame = {
val convertTimeStamp = udf[Timestamp, String](this.convertTimeStamp)
val srcDF = srcDf.withColumn(s"${columnName}_Tmp", convertTimeStamp(col(columnName)))
                    .drop(columnName).withColumnRenamed(s"${columnName}_Tmp", columnName)
srcDF
}

calling function:
val df = applyTimeStampFormat(srcDf, "TS_COLNAME")

Concatenating 2 Columns:

If we want to concat data between multiple columns, separated by a pipe and create new column, then use below code,
srcdf has columns id and name
import org,apache,spark.sql.types._
val selection = srcdf.columns.map(col)
val finaldf = srcdf.withColumn("row_key", concat_ws("|", selection: _*).cast(StringType))

This will create a new column row_key with values as 1|Leela.

RepartitionByRange


Consider a case where the data is Skewed which means if we have a record set of 10,000 records and is stored in 3 partitions and partition 1 has 7,000 records and the other 2 partitions has the remaining 3000 records. Or typically the data is not evenly distributed among the partitions.

To, distribute data evenly among all the partitions, use repartitionByRange API. This function takes 2 input parameters numberOf Partitions and ColumnName(s) on which the data has to be divided.

Eg: repartitionByRange ( 5, col("AccountId"))


Same can be achieved via bucketBy(), but repartitionByRange gives better results


Difference between RepartitionByRange vs Repartition

RepartitionByRange would repartition such that the data in each of the partition would be in a sequence. Data in each partition would be as below:

("10,11,12,13,14", "15,16,17,18", "19,20,21,22")

In case of repartition it could be,

("10,14,17,18,21,22", "13,19,20", "11,12,15,16")


Bucketing:

Before we discuss on Bucketing, lets first see how a Join happens in Spark. Below are the 3 Stages happen for the records for table 1 when it joins with table. Same activity happens in table 2 as well.

1. Shuffle
2. Sort
3. Merge

Out the above Shuffle is costly operation as the records need to move between the nodes.
Bucketing helps in minimizing Shuffle. 
Bucketing column needs to be the column which is not suitable for Partitioning where it can result in numerous partitions and lot of small files and are used in Join. Eg customerId

numBuckets = 3
salesData.write.bucketBy(numBuckets, "customerId").sortBy("transactionId").saveAsTable("Orders")

Above statement creates 3 buckets and would distribute records on hash(customerId)%numBuckets.

Note: Bucketed data can only be saved as a table and not files in HDFS or S3. So, only .saveAsTable or in ab existing Hive table. Reason, the table maintains information that these are the 3 files generated as part of Bucketing operation on this column. While reading the data, the bucketed info is fetched from the table.

Consider that in table 2 the column to be joined is also bucketed. Here is the key, when the Join operation happens, the 2 bucketed files those hold same keys are processed by the same executor. So here, if there are 3 executors, each executor will have the bucketed file of both the tables which has same joining Ids. This would eliminate the shuffle operation and the join happens within the executor.





Another case: If the 2nd table is not bucketed, in this case the Executor will have the Bucketed file as in it's node and matching keys of the 2nd table would be shuffled. Atleast, in this case full shuffle could be avoided.


withColumn, when, otherwise usage (column comparison)

We can use the function when to validate a condition between 2 column followed by otherwise. Please see the below code:

Basic Code:


Input Set:  
                                      

Output: