Tuesday, March 2, 2021

Reconciliation in Spark

Input configuration as CSV and get primary keys for the respective tables and Updated_date

Source Unload - Select primarykeys, Updated_date from srcTbl1 where Updated_date between X and Y

Sink Unload - Select primarykeys, Updated_date from sinkTbl1 where Updated_date between X and Y


Recon Comparison Process:

Get Max Updated from srcTbl - val srcMaxUpdatedDate = srcDf.agg(max("Updated_date")).rdd(map(x => x.mkString).collect.toString


From Sink Table get only the columns whose Updated_date is less than Max Updated_date of Source.


val sinkDf = spark.sql("select * from sinkTbl where Updated_date <= ${srcMaxUpdatedDate}")


//This below function creates the SQL for comparision between source and sink tables and identifies if any of the records those were failed to get inserted to Sink table

keyCompare(spark,srcDf,sinkDf, primarykeys.toString.split(","))


def keyCompare(spark: SparkSession, srcDf: DataFrame, sinkDf: DataFrame, key: Array[String]): DataFrame = {

srcDf.createOrReplaceTempView("srcTbl")

sinkDf.createOrReplaceTempView("sinkTbl")

val keycomp = key.map(x => "trim(src." + x + ") = " + "trim(sink." + x + ")").mkString(" AND ")

val keyfilter = key.map(x => "sink." + x + "is NULL").mkString(" AND ")

val compareSQL = s" Select src.* from srcTbl src Left JOIN sinkTbl sink on $keycomp where $keyfilter"

println(compareSQL)

val keyDiffDf = spark.sql(compareSQL)

(keyDiffDf)

}

Sample Insert Compare would look like,

select src.* from srcTbl src left join sinkTbl sink on trim(src.PrimaryKEY1) = trim(sink.PrimaryKEY1) AND trim(src.PrimaryKEY2) = trim(PrimaryKEY2) where sink.PrimaryKEY1 is null and sink.PrimaryKEY2 is null


In the similar lines we can identify the records those were not updated during the delta process

select src.* from srcTbl src inner join sinkTbl sink on trim(src.PrimaryKEY1) = trim(sink.PrimaryKEY1) AND trim(src.PrimaryKEY2) = trim(PrimaryKEY2) where src.Updated_date != sink.Updated_date


def dateCompare(spark: SparkSession, srcDf: DataFrame, sinkDf: DataFrame, key: Array[String], deltaCol: String): DataFrame = {

srcDf.createOrReplaceTempView("srcTbl")

sinkDf.createOrReplaceTempView("sinkTbl")

val keycomp = key.map(x => "trim(src." + x + ") = " + "trim(sink." + x + ")").mkString(" AND ")

val compareSQL = s" Select src.* from srcTbl src Left JOIN sinkTbl sink on $keycomp where src.$deltaCol != sink.deltaCol"

println(compareSQL)

val keyDiffDf = spark.sql(compareSQL)

(keyDiffDf)

}

1 comment: