Sunday, January 6, 2019

XML Parsing

XML Parsing:


Source: https://medium.com/@tennysusanto/use-databricks-spark-xml-to-parse-nested-xml-d7d7cf797c28


Description:

This is a cool example and can be taken as a reference for most of the business scenarios.

In the below code, rowTag is mentioned as 'Transaction'. So in the contents between <<Transaction></<Transaction> would be read and formed as a structure with sub elements under it.

val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")

df.printSchema  =>

root
 |-- BusinessDayDate: string (nullable = true)
 |-- ControlTransaction: struct (nullable = true)
 |    |-- OperatorSignOff: struct (nullable = true)
 |    |    |-- CloseBusinessDayDate: string (nullable = true)
 |    |    |-- CloseTransactionSequenceNumber: long (nullable = true)
 |    |    |-- EndDateTimestamp: string (nullable = true)
 |    |    |-- OpenBusinessDayDate: string (nullable = true)
 |    |    |-- OpenTransactionSequenceNumber: long (nullable = true)
 |    |    |-- StartDateTimestamp: string (nullable = true)
 |    |-- ReasonCode: string (nullable = true)
 |    |-- _Version: double (nullable = true)
 |-- CurrencyCode: string (nullable = true)
 |-- EndDateTime: string (nullable = true)
 |-- OperatorID: struct (nullable = true)
 |    |-- _OperatorName: string (nullable = true)
 |    |-- _VALUE: long (nullable = true)
 |-- RetailStoreID: long (nullable = true)
 |-- RetailTransaction: struct (nullable = true)
 |    |-- ItemCount: long (nullable = true)
 |    |-- LineItem: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Sale: struct (nullable = true)
 |    |    |    |    |-- Description: string (nullable = true)
 |    |    |    |    |-- DiscountAmount: double (nullable = true)


 looking at the formed schema we can say that the element structure is formed as per XML format. For Sub elements like 'LineItem' the datatype is array of struct and it has elements like Sale(struct),Tax(struct),SequenceNumber(Long).

 Now, Flattening the contents in the LineItem. Explode is the function that can be used. withColumn will add a new column to the existing dataframe 'df'.
 val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))

 With this 'flattened' dataframe, the needed values can be extracted as like an SQL query. See $"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount" in the below function as the way to extract the values to form a Table.

 val selectedData = flattened.select($"RetailStoreID",$"WorkstationID",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount")

Explode Function: explode function creates a new row for each element in the given array or map column (in a DataFrame). In simple terms, Explode function is used to explode data in a structure.

After Explode, data in XML can be accessed via Tagname or _Tagname
<LineItem EntryMethod="Auto">
            <SequenceNumber>1</SequenceNumber>
            <Tax TaxID="1" TaxDescription="TaxID1">
               <TaxableAmount>5.04</TaxableAmount>
               <Amount>0.30</Amount>


Point to be noted is that for contents within tags the data can be accessed directly with the tagname.
Eg: for <TaxableAmount>5.04</TaxableAmount>
xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax.TaxableAmount")).show(5)

For, Tax TaxID="1" need to use _

xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax._TaxID")).show(5)


Extract XML from the RDBMS column where the data is compressed in GZIP Format


val gzipBinaryToString = udf((payload: Array[Byte]) => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(payload))
  scala.io.Source.fromInputStream(inputStream).mkString
})

val data = xmldf.withColumn("xmlcolumn", gzipBinaryToString(unbase64($"coldata")))

Here, coldata is the column which contains XML in GZIP Format , xmldf is the dataframe, xmlcolumn is the New column in which we would like to extract the XML.

To read XML as a row value, 

from above data as a DF.

val xmlmodified = data.map(x => x.toString)

val reader = new XmlReader()

val xml_parsed = reader.withRowTag("Object").xmlrdd(spark.SqlContext,xmlmodified).select($"object")

7 comments:

  1. Hi leela,

    What about having null values in your array of struct and you have to do it in spark 2.1.

    ReplyDelete
  2. It's value will be read as null.
    Eg: If,



    xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax.TaxableAmount")).show(5)

    TaxableAmount will have value as NULL.

    I believe this is going to work this way, however I haven't tried. Appreciate if you try and post the result as this would be helpful for others as well.

    ReplyDelete
  3. How to handle a xml having multiple row tags?

    ReplyDelete