Thursday, March 4, 2021

Spark Scala vs pySpark

Performance: Many articles say that "Spark Scala is 10 times faster than pySpark", but in reality and from Spark 2.x onwards this statement is no longer true. pySpark used to be buggy and poorly supported, but was updated well in recent times. However, for batch jobs where data magniture is more Spark Scala gives better performance.


Library Stack:

Pandas in Pyspark is an advantage.

Python's Visualization libraries complement pySpark. Where these are not available in Scala.

Python comes with some libraries that are well known for data analysis. Several Libraries are available like Machine learning and Natural Language Processing.


Learning python is believed to be easier than Scala.


Scala Supports powerful concurrency trough primitives like Akka's actors. Also has Future Execution context


Tuesday, March 2, 2021

Reconciliation in Spark

Input configuration as CSV and get primary keys for the respective tables and Updated_date

Source Unload - Select primarykeys, Updated_date from srcTbl1 where Updated_date between X and Y

Sink Unload - Select primarykeys, Updated_date from sinkTbl1 where Updated_date between X and Y


Recon Comparison Process:

Get Max Updated from srcTbl - val srcMaxUpdatedDate = srcDf.agg(max("Updated_date")).rdd(map(x => x.mkString).collect.toString


From Sink Table get only the columns whose Updated_date is less than Max Updated_date of Source.


val sinkDf = spark.sql("select * from sinkTbl where Updated_date <= ${srcMaxUpdatedDate}")


//This below function creates the SQL for comparision between source and sink tables and identifies if any of the records those were failed to get inserted to Sink table

keyCompare(spark,srcDf,sinkDf, primarykeys.toString.split(","))


def keyCompare(spark: SparkSession, srcDf: DataFrame, sinkDf: DataFrame, key: Array[String]): DataFrame = {

srcDf.createOrReplaceTempView("srcTbl")

sinkDf.createOrReplaceTempView("sinkTbl")

val keycomp = key.map(x => "trim(src." + x + ") = " + "trim(sink." + x + ")").mkString(" AND ")

val keyfilter = key.map(x => "sink." + x + "is NULL").mkString(" AND ")

val compareSQL = s" Select src.* from srcTbl src Left JOIN sinkTbl sink on $keycomp where $keyfilter"

println(compareSQL)

val keyDiffDf = spark.sql(compareSQL)

(keyDiffDf)

}

Sample Insert Compare would look like,

select src.* from srcTbl src left join sinkTbl sink on trim(src.PrimaryKEY1) = trim(sink.PrimaryKEY1) AND trim(src.PrimaryKEY2) = trim(PrimaryKEY2) where sink.PrimaryKEY1 is null and sink.PrimaryKEY2 is null


In the similar lines we can identify the records those were not updated during the delta process

select src.* from srcTbl src inner join sinkTbl sink on trim(src.PrimaryKEY1) = trim(sink.PrimaryKEY1) AND trim(src.PrimaryKEY2) = trim(PrimaryKEY2) where src.Updated_date != sink.Updated_date


def dateCompare(spark: SparkSession, srcDf: DataFrame, sinkDf: DataFrame, key: Array[String], deltaCol: String): DataFrame = {

srcDf.createOrReplaceTempView("srcTbl")

sinkDf.createOrReplaceTempView("sinkTbl")

val keycomp = key.map(x => "trim(src." + x + ") = " + "trim(sink." + x + ")").mkString(" AND ")

val compareSQL = s" Select src.* from srcTbl src Left JOIN sinkTbl sink on $keycomp where src.$deltaCol != sink.deltaCol"

println(compareSQL)

val keyDiffDf = spark.sql(compareSQL)

(keyDiffDf)

}

Thursday, May 7, 2020

Mongo Spark Connector

This Article explains the way to Write, Read and Update data to MongoDB.

One of my Friend's Thomas has written a nice article in which he explained the same in an awesome manner. Please follow the link https://medium.com/@thomaspt748/how-to-load-millions-of-data-into-mongo-db-using-apache-spark-3-0-8bcf089bd6ed

I would like to add 3 Points apart from the one's explained by my friend.

1. Dealing with Nested JSON.


val foodDf = Seq((123,"food2",false,"Italian",2),
             (123,"food3",true,"American",3),
             (123,"food1",true,"Mediterranean",1))
        .toDF("userId","foodName","isFavFood","cuisine","score")
val foodGrpDf = foodDf
          .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites")).groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

groupBy and collect_list() are to be used together to form a Nested structure.
    
Reference: https://stackoverflow.com/questions/53200529/spark-dataframe-to-nested-json

Later the Dataframe can be written to Mongo Collection using Save API.

Eg:

MongoSpark.save(foodGrpDf .write.option("collection", "foodMongoCollection").mode("Append"), writeConfig)

Using this above function the Dataframe(foodGrpDf ) can be directly written to MOngo Collection(foodMongoCollection).

2. Creating spark session when Kerberos LDAP Authentication is enabled for mongodb.

authSource=$external&authMechanism=PLAIN has to be included in the URI.

Eg:

val mongoURL = s"mongodb://${mongouser}:${mongopwd}@${mongoHost}:${mongoPort}/${mongoDBName}.foodMongoCollection/"

val spark = Spark.Session.builder.appName("MongoSample").config("spark.mongodb.output.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").config("spark.mongodb.input.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").getOrCreate

3. Update existing Record in Mongo Collection

save() will act as Update as well. See the below code snippet to Read -> Update a value in DataFrame -> Save

val df4 = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "foodDB").option("collection", "foodMongoCollection").load()

val df5 = df4.filter(col("foodName") === "food3")
val df5Updated = df5.drop("isFavFood").withColumn("isFavFood", lit("American_Updated"))

df5Updated .show

MongoSpark.save(df5Updated.write.option("collection","foodMongoCollection").mode("Append"), writeConfig)

Tuesday, November 26, 2019

Updating data in a Hive table


This can be achieved with out ORC file format and transaction=false, can be achieved only when the table is a partitioned table. This is a 2 step process:

1. Create data set with Updated entries using Union of non-updated records and New record in the partition.

select tbl2.street,tbl2.city,tbl2.zip,tbl2.state,tbl2.beds,tbl2.baths,tbl2.sq__ft,tbl2.sale_date,tbl2.price,tbl2.latitude,tbl2.longitude,tbl2.type from (select * from samp_tbl_part where type = "Multi-Family") tbl1 JOIN (select * from samp_tbl where type = "Multi-Family") tbl2 ON tbl1.zip=tbl2.zip         ///New Record
UNION ALL 
select tbl1.* from (select * from samp_tbl_part where type = "Multi-Family") tbl1 LEFT JOIN (select * from samp_tbl where type = "Multi-Family") tbl2 on tbl1.zip=tbl2.zip where tbl2.zip is NULL;       ////Non-updated records

2. Insert overwrite the partition.

Eg:
CREATE EXTERNAL TABLE `samp_tbl_part`(
  `street` string, 
  `city` string, 
  `zip` string, 
  `state` string, 
  `beds` string, 
  `baths` string, 
  `sq__ft` string, 
  `sale_date` string, 
  `price` string, 
  `latitude` string, 
  `longitude` string)
PARTITIONED BY ( 
  `type` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://quickstart.cloudera:8020/user/hive/sampledata/realestate_part';
  
220 OLD AIRPORT RD AUBURN 95603 CA 2 2 960 Mon May 19 00:00:00 EDT 2008 285000 38.939802 -121.054575 Multi-Family
398 LINDLEY DR SACRAMENTO 95815 CA 4 2 1744 Mon May 19 00:00:00 EDT 2008 416767 38.622359 -121.457582 Multi-Family
8198 STEVENSON AVE SACRAMENTO 95828 CA 6 4 2475 Fri May 16 00:00:00 EDT 2008 159900 38.465271 -121.40426 Multi-Family
1139 CLINTON RD SACRAMENTO 95825 CA 4 2 1776 Fri May 16 00:00:00 EDT 2008 221250 38.585291 -121.406824 Multi-Family
7351 GIGI PL SACRAMENTO 95828 CA 4 2 1859 Thu May 15 00:00:00 EDT 2008 170000 38.490606 -121.410173 Multi-Family

CREATE EXTERNAL TABLE `samp_tbl`(
  `street` string, 
  `city` string, 
  `zip` string, 
  `state` string, 
  `beds` string, 
  `baths` string, 
  `sq__ft` string, 
  `type` string, 
  `sale_date` string, 
  `price` string, 
  `latitude` string, 
  `longitude` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
WITH SERDEPROPERTIES ( 
  'field.delim'=',', 
  'line.delim'='\n', 
  'serialization.format'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://quickstart.cloudera:8020/user/hive/sampledata/realestate'

1139 CLINTON RD SACRAMENTO 95825 FL 4 2 1776 Multi-Family Fri May 16 00:00:00 EDT 2008 221250 38.585291 -121.406824
  
Complete Insert statement:
INSERT OVERWRITE TABLE samp_tbl_part partition (type) select tbl2.street,tbl2.city,tbl2.zip,tbl2.state,tbl2.beds,tbl2.baths,tbl2.sq__ft,tbl2.sale_date,tbl2.price,tbl2.latitude,tbl2.longitude,tbl2.type from (select * from samp_tbl_part where type = "Multi-Family") tbl1 JOIN (select * from samp_tbl where type = "Multi-Family") tbl2 ON tbl1.zip=tbl2.zip 
UNION ALL 
select tbl1.* from (select * from samp_tbl_part where type = "Multi-Family") tbl1 LEFT JOIN (select * from samp_tbl where type = "Multi-Family") tbl2 on tbl1.zip=tbl2.zip where tbl2.zip is NULL;

Monday, February 25, 2019

Configuring a Spark-submit Job


Configuring Spark-submit parameters

Before going further let's discuss on the below parameters which I have given for a Job.
spark.executor.cores=5 
spark.executor.instances=3
spark.executor.memory=20g
spark.driver.memory=5g 
spark.dynamicAllocation.enabled=true 
spark.dynamicAllocation.maxExecutors=10 

spark.executor.cores - specifies the number of cores for an executor. 5 is the optimum level of parallelism that can be obtained, More the number of executors can lead to bad HDFS I/O throughput.

spark.executor.instances - Specifies number of executors to run, so 3 Executors x 5 cores = 15 parallel tasks.

spark.executor.memory - The amount of memory each executor can get 20g(as per above configuration). Cores would be sharing this 20GB and as there are 5 cores each core/Task will get 20/5 = 4 GB.
Typically allocate 300 MB for processing 100,000 records.

spark.driver.memory - Usually this can be less as the driver manages the job and doesn't process the data. Driver also stores Local variables, needs larger space incase any Map/Array collection is allocated with large amounts of data. Usuall does Job allocation to executor nodes, DAG creation, writing Log, displaying in console etc

spark.dynamicAllocation.enabled - By default this is enabled and when there is a scope of using more resources than specified(when the cluster is free). The job will ramp up with the resources. Dominant resource calculator needs to be enabled to get the full power of Capacity. This will allocate executors and cores as per the ones we specified, if not enables default it will allocate 1 core per executor. Default is Memory based resource calculator.

spark.dynamicAllocation.maxExecutors - Can specify the maxExecutors in Dynamic allocation

When Dynamic allocation is enabled, the conf parameters look as below:
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=3 --conf spark.executor.memory=20g --conf spark.driver.memory=5g

spark.yarn.executor.memoryOverhead - The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

Off-heap refers to objects (serialised to byte array) that are managed by the operating system but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector)

Coming to the actual story of assigning the parameters. 

Consider a case where data needs to be read from a partitioned table with each partition containing multiple small/medium files. In this case have Good executor memory, more executors and as usual 5 cores.

Similar cases as above but, not having multiple small/medium files at source. In this case executors can be less and can have good memory for each executor.

In case of incremental load where data pull is less, however needs to pull from multiple tables in parallel(Futures). In this case executors can be more, little less executor memory and as usual 5 cores.

Needs to consider amount of data being processed, the way joins are applied, stages in job, broadcast or not. Basically this goes as trail and error after analyzing the above factors and the best one can be chalked out in UAT environment.


spark.driver.maxResultSize - Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.

Get Unravel Report and can tune accordingly.
There is a good monitoring tool called sparklens which can monitor a job and provide it's analysis in the first run. This is an open source.

Note: Incase, you want all you spark job load a particular dependency jars to drivers and executers then you can specify in those property. The --jars is if you want to add dependency jar to a spark job then we can explicitly pass to the spark-submit command as  --conf spark.driver.extraClassPath=<PATH>/phoenix5-spark-shaded.jar --conf spark.executor.extraClassPath=<PATH>/phoenix5-spark-shaded.jar 



sparklens : https://docs.qubole.com/en/latest/user-guide/spark/sparklens.html

Sunday, January 13, 2019

Creating Spark Scala SBT Project in Intellij


Below are the steps for creation Spark Scala SBT Project in Intellij:


1. Open Intellij via Run as Administrator and create a New project of type scala and sbt.
If this option is not available, open Intellij and go to settings -> pluging and type the plugin Scala and install it.
Also Install sbt plugin from the plugins window.

2. Select scala version which is compatible with spark, eg if spark version is 2.3 then select scala version as 2.11 and not 2.12 as spark 2.3 is compatible with scala 2.11. So, selected 2.11.8

Sample is available under https://drive.google.com/open?id=19YpCwLzuFZSqBReaceVOFS-BwlArOEpf

Debugging Spark Application

Remote Debugging

http://www.bigendiandata.com/2016-08-26-How-to-debug-remote-spark-jobs-with-IntelliJ/

1. Generate JAR file and copy it to a location in cluster.

2. Execute the command,
export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4000

3. In Intellij Run -> Edit Configurations -> Select Remote and Create a configuration with port number 4000 and the Host name of the machine in which JAR is copied.

4. submit the Spark application Eg: spark-submit --class com.practice.SayHello demo_2.11-0.1.jar

5. Click on Debug in Intellij for the configuration create in step3 and this would connect to the Spark Application.


To write data to Hive tables from Spark Dataframe below are the 2 steps:

1. In spark-submit add the entry of hive site file as --files /etc/spark/conf/hive-site.xml
2. Enable Hive support in spark session enableHiveSupport(). eg:
val spark = SparkSession.builder.appName("Demo App").enableHiveSupport().getOrCreate()

Sample Code:

   val date_add = udf((x: String) => {
      val sdf = new SimpleDateFormat("yyyy-MM-dd")
      val result = new Date(sdf.parse(x).getTime())
      sdf.format(result)
    } )

    val dfraw2 = dfraw.withColumn("ingestiondt",date_add($"current_date"))

dfraw2.write.format("parquet").mode(SaveMode.Append).partitionBy("ingestiondt").option("path", "s3://ed-raw/cdr/table1").saveAsTable("db1.table1")

Sunday, January 6, 2019

XML Parsing

XML Parsing:


Source: https://medium.com/@tennysusanto/use-databricks-spark-xml-to-parse-nested-xml-d7d7cf797c28


Description:

This is a cool example and can be taken as a reference for most of the business scenarios.

In the below code, rowTag is mentioned as 'Transaction'. So in the contents between <<Transaction></<Transaction> would be read and formed as a structure with sub elements under it.

val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")

df.printSchema  =>

root
 |-- BusinessDayDate: string (nullable = true)
 |-- ControlTransaction: struct (nullable = true)
 |    |-- OperatorSignOff: struct (nullable = true)
 |    |    |-- CloseBusinessDayDate: string (nullable = true)
 |    |    |-- CloseTransactionSequenceNumber: long (nullable = true)
 |    |    |-- EndDateTimestamp: string (nullable = true)
 |    |    |-- OpenBusinessDayDate: string (nullable = true)
 |    |    |-- OpenTransactionSequenceNumber: long (nullable = true)
 |    |    |-- StartDateTimestamp: string (nullable = true)
 |    |-- ReasonCode: string (nullable = true)
 |    |-- _Version: double (nullable = true)
 |-- CurrencyCode: string (nullable = true)
 |-- EndDateTime: string (nullable = true)
 |-- OperatorID: struct (nullable = true)
 |    |-- _OperatorName: string (nullable = true)
 |    |-- _VALUE: long (nullable = true)
 |-- RetailStoreID: long (nullable = true)
 |-- RetailTransaction: struct (nullable = true)
 |    |-- ItemCount: long (nullable = true)
 |    |-- LineItem: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Sale: struct (nullable = true)
 |    |    |    |    |-- Description: string (nullable = true)
 |    |    |    |    |-- DiscountAmount: double (nullable = true)


 looking at the formed schema we can say that the element structure is formed as per XML format. For Sub elements like 'LineItem' the datatype is array of struct and it has elements like Sale(struct),Tax(struct),SequenceNumber(Long).

 Now, Flattening the contents in the LineItem. Explode is the function that can be used. withColumn will add a new column to the existing dataframe 'df'.
 val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))

 With this 'flattened' dataframe, the needed values can be extracted as like an SQL query. See $"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount" in the below function as the way to extract the values to form a Table.

 val selectedData = flattened.select($"RetailStoreID",$"WorkstationID",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount")

Explode Function: explode function creates a new row for each element in the given array or map column (in a DataFrame). In simple terms, Explode function is used to explode data in a structure.

After Explode, data in XML can be accessed via Tagname or _Tagname
<LineItem EntryMethod="Auto">
            <SequenceNumber>1</SequenceNumber>
            <Tax TaxID="1" TaxDescription="TaxID1">
               <TaxableAmount>5.04</TaxableAmount>
               <Amount>0.30</Amount>


Point to be noted is that for contents within tags the data can be accessed directly with the tagname.
Eg: for <TaxableAmount>5.04</TaxableAmount>
xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax.TaxableAmount")).show(5)

For, Tax TaxID="1" need to use _

xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax._TaxID")).show(5)


Extract XML from the RDBMS column where the data is compressed in GZIP Format


val gzipBinaryToString = udf((payload: Array[Byte]) => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(payload))
  scala.io.Source.fromInputStream(inputStream).mkString
})

val data = xmldf.withColumn("xmlcolumn", gzipBinaryToString(unbase64($"coldata")))

Here, coldata is the column which contains XML in GZIP Format , xmldf is the dataframe, xmlcolumn is the New column in which we would like to extract the XML.

To read XML as a row value, 

from above data as a DF.

val xmlmodified = data.map(x => x.toString)

val reader = new XmlReader()

val xml_parsed = reader.withRowTag("Object").xmlrdd(spark.SqlContext,xmlmodified).select($"object")

Saturday, January 5, 2019

StreamSets

Streamsets is a datapipeline tool and has multiple built in ready to use processors through which pipelines can be build. Few examples are below:

1. Incrementally ingest records from RDBMS to S3 location with lookups applied.
Select Origin as 'JDBC Query Consumer' -> Configuration 'JDBC' provide the JDBC URL.
jdbc:sqlserver://Databasehostname:1433;database=retaildb

 SQL Query:
SELECT A.*
,B1.TYPECODE as pctl_TypeCode,B1.NAME as pctl_Name,B1.DESCRIPTION as pctl_Description
FROM retail_contact A
LEFT OUTER JOIN pctl_typelist B1 ON (A.Subtype=B1.ID)
where A.UpdateTime > '${OFFSET}' ORDER BY A.UpdateTime

InitialOffset: 1970-01-01
Offset Column: UpdateTime

Destination: Amazon S3

Bucket: poc-bucket
Note: Don't provide s3:\\poc-bucket as this throws error. Just provide poc-bucket.

Common prefix: retaildb          //A directory name that can have the table data in this root directory.

Partition Prefix: Table1/ingestiondt=${YYYY()}-${MM()}-${DD()}     //This would ingest data as per date partition

Data Format: Delimited
Default Format: CSV
Header Line: With Header Line.

2. Incrementally ingest data from Multiple tables to independent directories as per table names Without Lookups.

Select Origin as 'JDBC Multitable Consumer' and provide 'JDBC connection String'
Schema: %
Table Pattern: retailtables_%
Offset Columns: UpdateTime


Select Destination as 'Amazon S3'
Bucket: poc-bucket
Common prefix: retaildb
Partition Prefix: ${record:attribute('jdbc.tables')}/ingestiondt=${YYYY()}-${MM()}-${DD()}   //This would create directories with table names and insert as per date Partition. 'jdbc.tables' is a variable through which table name is set.

If there is a need to pull data from a certain date(non-historical) then set,

Initial Offset: UpdateTime : ${time:dateTimeToMilliseconds(time:extractDateFromString('2018-10-01 00:00:00.000','yyyy-MM-dd HH:mm:ss.SSS'))}

Monday, December 31, 2018

CICD Process

CICD Process:

Continuous Integration and Continuous Deployment

Git is the source repository. Below are some of the points.

1. For enhancements a feature branch would be created out of Master. The changes for this feature would be made in the
feature branch by checking out the source code and pushing the changes.

For pushing the changes to the remote GIT repository, it first needs to be committed and then pushed.
commit - adds changes to local repository.
push - transfer the last commits to GIT Remote server.

What is difference between commit and push in git?
Since git is a distributed version control system, the difference is that commit will commit changes to your local repository, whereas push will push
changes up to a remote repo. git commit record your changes to the local repository. git push update the remote repository with your local changes.

2. Jenkins is a build tool in which source code would be compiled, JAR would be generated and copies the JAR to the specified location in the EMR cluster.

3. This would be used for UAT Testing in L3 environment. Dev testing would be done by using the JAR compiled in developers machine in L1 environment.

4. Once UAT is passed, deployment would be done in L4 with the same JAR and post 15 days of DEV warranty this code from feature branch would pushed to the
Master branch by DEVOPS team.



Basic Structure and usage:
Initially Master would be created and that would have 1.0 version of code. 2 more branches will be created out of it namely staging and Dev.
Development process:
1.       Each developer would create each of their feature branch namely “feature/JIRATicketNumber” branches from Dev. This branch would be created only in local machine and not in Remote server at this point of time. Below are the steps for this process:
è Clone code from all the 3 branches Master, staging and Dev- command: Git Clone.
è Checkout Dev for creating a branch out of this. command: Git checkout Dev
è Create a feature branch in the local machine. Command: Git checkout –b feature/JIRATicketNumber.

2.       Each developer would make changes in their Local feature branch and the changes can be saved in the local machine as in the form of branch by using commit. Command: Git commit. To compare the changes made the command used is, command: Git Status.

3.       Pushing the changes from the feature branch to the Remote Repository.
Currently the code changes made by each developer are in the local Repository. Here Local refers as Laptop and Remote server as Git Server. At this point of time, below are the branches in Remote Server and Local.
Remote Server - 3 Branches. Master, Staging and Dev.
Local Repository – 4 Branches. Master, Staging and Dev + feature branch.
The changes in the local branch are to be pushed to Remote Server, in this case first the feature branch has to be create in Remote server and subsequently each time the code has to be pushed.
Commands: Git push –set–upstream origin <name>
                      Git push (for subsequent pushes)
4.       Once the changes are done at the developer end along with Unit testing and is ready to push the changes from feature branch to Dev branch. Below are the steps to be followed:

Pull the latest changes from Dev in the Remote server to local Dev Repository. So, on doing this the latest code changes from peer developers would be pulled in and as an example consider 2 new source files have been added to the Dev branch by a peer developer during this phase and these are not available in the feature branch. Also these are to be updated in the local Dev branch, so a pull has to be made and changes re the be merged.
Commands: Git pull   (Remote Dev branch to Local Dev)
                      Git push (from Local feature branch to Remote feature branch)
                      Git pull Request (This includes code review process)
       Git merge (Once Code review is completed the code would be merged to the Dev branch)

Anatomy of a Pull Request
When you file a pull request, all you’re doing is requesting that another developer (e.g., the project maintainer) pulls a branch from your repository into their repository.

Sources: https://www.atlassian.com/git/tutorials/making-a-pull-request


GIT Commands in Git Bash:

cd dirname
git clone https://github.ent.srccode.com/leel0002/DataUpgrade
cd DataUpgrade/
git branch          //Gives the current branch we are pointing to
git branch -a       // Lists all the branches in the repository
git checkout unit    //Switching to the branch
git pull origin unit unit           // Updates code in all the branches in the repository
Make changes in the code
git status          // gives the status of the updated files
git add .
git commit -a -m "adding new updates"   // commit the code changes in the local branch in laptop
git checkout master  //Need to comeout of the branch and only then we can check-in
git push origin unit           // pushes the code which is committed in the local branch in the repository to the remote server

To create a feature branch from development branch.

1. In the UI, select develop branch and type the new branch name as feature/JIRANUMBER, as below. Observe the from 'develop'.




Wednesday, November 7, 2018

OOZIE


OOZIE Schedule


frequency=0 09,15,22 * * *

This frequency indicates Job to get triggered every day at 09:00 am, 3:00p.m and 10:00 p.m

0 indicates Minutes
09,15,22 indicates hour
1st * indicates days
2nd * indicates Months
3rd * is default or Year

If like to run for every 24 hours, specify
frequency=1440

here, 1st parameter is minutes.

Source: http://blog.cloudera.com/blog/2014/04/how-to-use-cron-like-scheduling-in-apache-oozie/


OOZIE Commands:

1] To submit job - Goto to directory containing job.properties and run following command.
oozie job --oozie http://HOSTNAME:11000/oozie --config job.properties -submit

2] To kill a job
oozie job --oozie http://HOSTNAME:11000/oozie --kill [coord_jobid]

3]To suspend a job
oozie job --oozie http://HOSTNAME:11000/oozie --suspend [coord_jobid]

4]To resume suspended job(coord_jobid is the one used which is suspended)
oozie job --oozie http://HOSTNAME:11000/oozie --resume [coord_jobid]

5] To restart a failed workflow.
oozie job -rerun [parent_workflow_jobid] -Doozie.wf.rerun.failnodes=true


Configuring YARN Capacity Scheduler Queues in AWS EMR

Followhttps://mitylytics.com/2017/11/configuring-multiple-queues-aws-emr-yarn/


Saturday, August 4, 2018

AWS


Command to copy from local linux file system to S3.

aws s3 sync ${v_input_path}/ s3://${output_path}/ --include "*.gz"



Typical Prod cluster is,

1 - r4.16xlarge

12- r4.4xlarge

r4.16xlarge - 64 vCore, 488 GiB memory
r4.4xlarge - 16 vCore, 122 GiB memory

Dev
m4.4xlarge - 32 vCore, 64 GiB memory

1- m4.4xlarge Master

10 - m4.4xlarge Datanodes


IAM Roles

Using a command line tool avenue, we create IAM Roles

eg command: avenue policy create-p <ENV> --policy-name <NEWIAM POLICYNAME>

After creating the IAM role we need to attach policies which enables accesses to multiple AWS Services,

eg command: avenue policy attach ..

Tuesday, July 17, 2018

SQL Server commands


Get the table name and the row count in a DB:


SELECT
    t.NAME AS TableName,
    SUM(p.rows) AS [RowCount]
FROM
    sys.tables t
INNER JOIN   
    sys.indexes i ON t.OBJECT_ID = i.object_id
INNER JOIN
    sys.partitions p ON i.object_id = p.OBJECT_ID AND i.index_id = p.index_id
WHERE 
    i.index_id <= 1
GROUP BY
    t.NAME, i.object_id, i.index_id, i.name
ORDER BY
    SUM(p.rows) DESC

Source: https://stackoverflow.com/questions/3980622/sql-server-2008-i-have-1000-tables-i-need-to-know-which-tables-have-data

Get System time

select SYSDATE()