Sunday, January 13, 2019

Creating Spark Scala SBT Project in Intellij


Below are the steps for creation Spark Scala SBT Project in Intellij:


1. Open Intellij via Run as Administrator and create a New project of type scala and sbt.
If this option is not available, open Intellij and go to settings -> pluging and type the plugin Scala and install it.
Also Install sbt plugin from the plugins window.

2. Select scala version which is compatible with spark, eg if spark version is 2.3 then select scala version as 2.11 and not 2.12 as spark 2.3 is compatible with scala 2.11. So, selected 2.11.8

Sample is available under https://drive.google.com/open?id=19YpCwLzuFZSqBReaceVOFS-BwlArOEpf

Debugging Spark Application

Remote Debugging

http://www.bigendiandata.com/2016-08-26-How-to-debug-remote-spark-jobs-with-IntelliJ/

1. Generate JAR file and copy it to a location in cluster.

2. Execute the command,
export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4000

3. In Intellij Run -> Edit Configurations -> Select Remote and Create a configuration with port number 4000 and the Host name of the machine in which JAR is copied.

4. submit the Spark application Eg: spark-submit --class com.practice.SayHello demo_2.11-0.1.jar

5. Click on Debug in Intellij for the configuration create in step3 and this would connect to the Spark Application.


To write data to Hive tables from Spark Dataframe below are the 2 steps:

1. In spark-submit add the entry of hive site file as --files /etc/spark/conf/hive-site.xml
2. Enable Hive support in spark session enableHiveSupport(). eg:
val spark = SparkSession.builder.appName("Demo App").enableHiveSupport().getOrCreate()

Sample Code:

   val date_add = udf((x: String) => {
      val sdf = new SimpleDateFormat("yyyy-MM-dd")
      val result = new Date(sdf.parse(x).getTime())
      sdf.format(result)
    } )

    val dfraw2 = dfraw.withColumn("ingestiondt",date_add($"current_date"))

dfraw2.write.format("parquet").mode(SaveMode.Append).partitionBy("ingestiondt").option("path", "s3://ed-raw/cdr/table1").saveAsTable("db1.table1")

Sunday, January 6, 2019

XML Parsing

XML Parsing:


Source: https://medium.com/@tennysusanto/use-databricks-spark-xml-to-parse-nested-xml-d7d7cf797c28


Description:

This is a cool example and can be taken as a reference for most of the business scenarios.

In the below code, rowTag is mentioned as 'Transaction'. So in the contents between <<Transaction></<Transaction> would be read and formed as a structure with sub elements under it.

val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")

df.printSchema  =>

root
 |-- BusinessDayDate: string (nullable = true)
 |-- ControlTransaction: struct (nullable = true)
 |    |-- OperatorSignOff: struct (nullable = true)
 |    |    |-- CloseBusinessDayDate: string (nullable = true)
 |    |    |-- CloseTransactionSequenceNumber: long (nullable = true)
 |    |    |-- EndDateTimestamp: string (nullable = true)
 |    |    |-- OpenBusinessDayDate: string (nullable = true)
 |    |    |-- OpenTransactionSequenceNumber: long (nullable = true)
 |    |    |-- StartDateTimestamp: string (nullable = true)
 |    |-- ReasonCode: string (nullable = true)
 |    |-- _Version: double (nullable = true)
 |-- CurrencyCode: string (nullable = true)
 |-- EndDateTime: string (nullable = true)
 |-- OperatorID: struct (nullable = true)
 |    |-- _OperatorName: string (nullable = true)
 |    |-- _VALUE: long (nullable = true)
 |-- RetailStoreID: long (nullable = true)
 |-- RetailTransaction: struct (nullable = true)
 |    |-- ItemCount: long (nullable = true)
 |    |-- LineItem: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Sale: struct (nullable = true)
 |    |    |    |    |-- Description: string (nullable = true)
 |    |    |    |    |-- DiscountAmount: double (nullable = true)


 looking at the formed schema we can say that the element structure is formed as per XML format. For Sub elements like 'LineItem' the datatype is array of struct and it has elements like Sale(struct),Tax(struct),SequenceNumber(Long).

 Now, Flattening the contents in the LineItem. Explode is the function that can be used. withColumn will add a new column to the existing dataframe 'df'.
 val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))

 With this 'flattened' dataframe, the needed values can be extracted as like an SQL query. See $"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount" in the below function as the way to extract the values to form a Table.

 val selectedData = flattened.select($"RetailStoreID",$"WorkstationID",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount")

Explode Function: explode function creates a new row for each element in the given array or map column (in a DataFrame). In simple terms, Explode function is used to explode data in a structure.

After Explode, data in XML can be accessed via Tagname or _Tagname
<LineItem EntryMethod="Auto">
            <SequenceNumber>1</SequenceNumber>
            <Tax TaxID="1" TaxDescription="TaxID1">
               <TaxableAmount>5.04</TaxableAmount>
               <Amount>0.30</Amount>


Point to be noted is that for contents within tags the data can be accessed directly with the tagname.
Eg: for <TaxableAmount>5.04</TaxableAmount>
xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax.TaxableAmount")).show(5)

For, Tax TaxID="1" need to use _

xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax._TaxID")).show(5)


Extract XML from the RDBMS column where the data is compressed in GZIP Format


val gzipBinaryToString = udf((payload: Array[Byte]) => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(payload))
  scala.io.Source.fromInputStream(inputStream).mkString
})

val data = xmldf.withColumn("xmlcolumn", gzipBinaryToString(unbase64($"coldata")))

Here, coldata is the column which contains XML in GZIP Format , xmldf is the dataframe, xmlcolumn is the New column in which we would like to extract the XML.

To read XML as a row value, 

from above data as a DF.

val xmlmodified = data.map(x => x.toString)

val reader = new XmlReader()

val xml_parsed = reader.withRowTag("Object").xmlrdd(spark.SqlContext,xmlmodified).select($"object")

Saturday, January 5, 2019

StreamSets

Streamsets is a datapipeline tool and has multiple built in ready to use processors through which pipelines can be build. Few examples are below:

1. Incrementally ingest records from RDBMS to S3 location with lookups applied.
Select Origin as 'JDBC Query Consumer' -> Configuration 'JDBC' provide the JDBC URL.
jdbc:sqlserver://Databasehostname:1433;database=retaildb

 SQL Query:
SELECT A.*
,B1.TYPECODE as pctl_TypeCode,B1.NAME as pctl_Name,B1.DESCRIPTION as pctl_Description
FROM retail_contact A
LEFT OUTER JOIN pctl_typelist B1 ON (A.Subtype=B1.ID)
where A.UpdateTime > '${OFFSET}' ORDER BY A.UpdateTime

InitialOffset: 1970-01-01
Offset Column: UpdateTime

Destination: Amazon S3

Bucket: poc-bucket
Note: Don't provide s3:\\poc-bucket as this throws error. Just provide poc-bucket.

Common prefix: retaildb          //A directory name that can have the table data in this root directory.

Partition Prefix: Table1/ingestiondt=${YYYY()}-${MM()}-${DD()}     //This would ingest data as per date partition

Data Format: Delimited
Default Format: CSV
Header Line: With Header Line.

2. Incrementally ingest data from Multiple tables to independent directories as per table names Without Lookups.

Select Origin as 'JDBC Multitable Consumer' and provide 'JDBC connection String'
Schema: %
Table Pattern: retailtables_%
Offset Columns: UpdateTime


Select Destination as 'Amazon S3'
Bucket: poc-bucket
Common prefix: retaildb
Partition Prefix: ${record:attribute('jdbc.tables')}/ingestiondt=${YYYY()}-${MM()}-${DD()}   //This would create directories with table names and insert as per date Partition. 'jdbc.tables' is a variable through which table name is set.

If there is a need to pull data from a certain date(non-historical) then set,

Initial Offset: UpdateTime : ${time:dateTimeToMilliseconds(time:extractDateFromString('2018-10-01 00:00:00.000','yyyy-MM-dd HH:mm:ss.SSS'))}