XML Parsing:
Source: https://medium.com/@tennysusanto/use-databricks-spark-xml-to-parse-nested-xml-d7d7cf797c28
Description:
This is a cool example and can be taken as a reference for most of the business scenarios.In the below code, rowTag is mentioned as 'Transaction'. So in the contents between <<Transaction></<Transaction> would be read and formed as a structure with sub elements under it.
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("/user/tsusanto/POSLog-201409300635-21.xml")
df.printSchema =>
root
|-- BusinessDayDate: string (nullable = true)
|-- ControlTransaction: struct (nullable = true)
| |-- OperatorSignOff: struct (nullable = true)
| | |-- CloseBusinessDayDate: string (nullable = true)
| | |-- CloseTransactionSequenceNumber: long (nullable = true)
| | |-- EndDateTimestamp: string (nullable = true)
| | |-- OpenBusinessDayDate: string (nullable = true)
| | |-- OpenTransactionSequenceNumber: long (nullable = true)
| | |-- StartDateTimestamp: string (nullable = true)
| |-- ReasonCode: string (nullable = true)
| |-- _Version: double (nullable = true)
|-- CurrencyCode: string (nullable = true)
|-- EndDateTime: string (nullable = true)
|-- OperatorID: struct (nullable = true)
| |-- _OperatorName: string (nullable = true)
| |-- _VALUE: long (nullable = true)
|-- RetailStoreID: long (nullable = true)
|-- RetailTransaction: struct (nullable = true)
| |-- ItemCount: long (nullable = true)
| |-- LineItem: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Sale: struct (nullable = true)
| | | | |-- Description: string (nullable = true)
| | | | |-- DiscountAmount: double (nullable = true)
looking at the formed schema we can say that the element structure is formed as per XML format. For Sub elements like 'LineItem' the datatype is array of struct and it has elements like Sale(struct),Tax(struct),SequenceNumber(Long).
Now, Flattening the contents in the LineItem. Explode is the function that can be used. withColumn will add a new column to the existing dataframe 'df'.
val flattened = df.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
With this 'flattened' dataframe, the needed values can be extracted as like an SQL query. See $"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount" in the below function as the way to extract the values to form a Table.
val selectedData = flattened.select($"RetailStoreID",$"WorkstationID",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber",$"LineItem.Tax.TaxableAmount")
Explode Function: explode function creates a new row for each element in the given array or map column (in a DataFrame). In simple terms, Explode function is used to explode data in a structure.
After Explode, data in XML can be accessed via Tagname or _Tagname
<LineItem EntryMethod="Auto">
<SequenceNumber>1</SequenceNumber>
<Tax TaxID="1" TaxDescription="TaxID1">
<TaxableAmount>5.04</TaxableAmount>
<Amount>0.30</Amount>
Point to be noted is that for contents within tags the data can be accessed directly with the tagname.
Eg: for <TaxableAmount>5.04</TaxableAmount>
xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax.TaxableAmount")).show(5)
For, Tax TaxID="1" need to use _
xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax._TaxID")).show(5)
Extract XML from the RDBMS column where the data is compressed in GZIP Format
val gzipBinaryToString = udf((payload: Array[Byte]) => {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(payload))
scala.io.Source.fromInputStream(inputStream).mkString
})
val data = xmldf.withColumn("xmlcolumn", gzipBinaryToString(unbase64($"coldata")))
Here, coldata is the column which contains XML in GZIP Format , xmldf is the dataframe, xmlcolumn is the New column in which we would like to extract the XML.
val xmlmodified = data.map(x => x.toString)
val reader = new XmlReader()
val xml_parsed = reader.withRowTag("Object").xmlrdd(spark.SqlContext,xmlmodified).select($"object")
To read XML as a row value,
from above data as a DF.val xmlmodified = data.map(x => x.toString)
val reader = new XmlReader()
val xml_parsed = reader.withRowTag("Object").xmlrdd(spark.SqlContext,xmlmodified).select($"object")
Good Blog
ReplyDeleteWe are making the Best Software training in bangalore.
Software Training Institute in Bangalore
Hadoop Training in Bangalore
Very nice write-up. I absolutely appreciate this website. Thanks!
ReplyDeleteUI Development Training in Bangalore
Reactjs Training in Bangalore
PHP Training in Bangalore
Hi leela,
ReplyDeleteWhat about having null values in your array of struct and you have to do it in spark 2.1.
It's value will be read as null.
ReplyDeleteEg: If,
xmlflattened.select(col("LineItem.SequenceNumber"),col("LineItem.Tax.TaxableAmount")).show(5)
TaxableAmount will have value as NULL.
I believe this is going to work this way, however I haven't tried. Appreciate if you try and post the result as this would be helpful for others as well.
GOOD BLOG
ReplyDeleteBig Data and Hadoop Online Training
Awesome. You have clearly explained …Its very useful for me to know about new things. Keep on blogging.
ReplyDeleteDocker Kubernetes training in Gurgaon
AWS cloud training in Gurgaon
FullStack Development Training in Gurgaon
How to handle a xml having multiple row tags?
ReplyDelete