Thursday, March 4, 2021

Spark Scala vs pySpark

Performance: Many articles say that "Spark Scala is 10 times faster than pySpark", but in reality and from Spark 2.x onwards this statement is no longer true. pySpark used to be buggy and poorly supported, but was updated well in recent times. However, for batch jobs where data magniture is more Spark Scala gives better performance.


Library Stack:

Pandas in Pyspark is an advantage.

Python's Visualization libraries complement pySpark. Where these are not available in Scala.

Python comes with some libraries that are well known for data analysis. Several Libraries are available like Machine learning and Natural Language Processing.


Learning python is believed to be easier than Scala.


Scala Supports powerful concurrency trough primitives like Akka's actors. Also has Future Execution context


Tuesday, March 2, 2021

Reconciliation in Spark

Input configuration as CSV and get primary keys for the respective tables and Updated_date

Source Unload - Select primarykeys, Updated_date from srcTbl1 where Updated_date between X and Y

Sink Unload - Select primarykeys, Updated_date from sinkTbl1 where Updated_date between X and Y


Recon Comparison Process:

Get Max Updated from srcTbl - val srcMaxUpdatedDate = srcDf.agg(max("Updated_date")).rdd(map(x => x.mkString).collect.toString


From Sink Table get only the columns whose Updated_date is less than Max Updated_date of Source.


val sinkDf = spark.sql("select * from sinkTbl where Updated_date <= ${srcMaxUpdatedDate}")


//This below function creates the SQL for comparision between source and sink tables and identifies if any of the records those were failed to get inserted to Sink table

keyCompare(spark,srcDf,sinkDf, primarykeys.toString.split(","))


def keyCompare(spark: SparkSession, srcDf: DataFrame, sinkDf: DataFrame, key: Array[String]): DataFrame = {

srcDf.createOrReplaceTempView("srcTbl")

sinkDf.createOrReplaceTempView("sinkTbl")

val keycomp = key.map(x => "trim(src." + x + ") = " + "trim(sink." + x + ")").mkString(" AND ")

val keyfilter = key.map(x => "sink." + x + "is NULL").mkString(" AND ")

val compareSQL = s" Select src.* from srcTbl src Left JOIN sinkTbl sink on $keycomp where $keyfilter"

println(compareSQL)

val keyDiffDf = spark.sql(compareSQL)

(keyDiffDf)

}

Sample Insert Compare would look like,

select src.* from srcTbl src left join sinkTbl sink on trim(src.PrimaryKEY1) = trim(sink.PrimaryKEY1) AND trim(src.PrimaryKEY2) = trim(PrimaryKEY2) where sink.PrimaryKEY1 is null and sink.PrimaryKEY2 is null


In the similar lines we can identify the records those were not updated during the delta process

select src.* from srcTbl src inner join sinkTbl sink on trim(src.PrimaryKEY1) = trim(sink.PrimaryKEY1) AND trim(src.PrimaryKEY2) = trim(PrimaryKEY2) where src.Updated_date != sink.Updated_date


def dateCompare(spark: SparkSession, srcDf: DataFrame, sinkDf: DataFrame, key: Array[String], deltaCol: String): DataFrame = {

srcDf.createOrReplaceTempView("srcTbl")

sinkDf.createOrReplaceTempView("sinkTbl")

val keycomp = key.map(x => "trim(src." + x + ") = " + "trim(sink." + x + ")").mkString(" AND ")

val compareSQL = s" Select src.* from srcTbl src Left JOIN sinkTbl sink on $keycomp where src.$deltaCol != sink.deltaCol"

println(compareSQL)

val keyDiffDf = spark.sql(compareSQL)

(keyDiffDf)

}