Thursday, May 7, 2020

Mongo Spark Connector

This Article explains the way to Write, Read and Update data to MongoDB.

One of my Friend's Thomas has written a nice article in which he explained the same in an awesome manner. Please follow the link https://medium.com/@thomaspt748/how-to-load-millions-of-data-into-mongo-db-using-apache-spark-3-0-8bcf089bd6ed

I would like to add 3 Points apart from the one's explained by my friend.

1. Dealing with Nested JSON.


val foodDf = Seq((123,"food2",false,"Italian",2),
             (123,"food3",true,"American",3),
             (123,"food1",true,"Mediterranean",1))
        .toDF("userId","foodName","isFavFood","cuisine","score")
val foodGrpDf = foodDf
          .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites")).groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

groupBy and collect_list() are to be used together to form a Nested structure.
    
Reference: https://stackoverflow.com/questions/53200529/spark-dataframe-to-nested-json

Later the Dataframe can be written to Mongo Collection using Save API.

Eg:

MongoSpark.save(foodGrpDf .write.option("collection", "foodMongoCollection").mode("Append"), writeConfig)

Using this above function the Dataframe(foodGrpDf ) can be directly written to MOngo Collection(foodMongoCollection).

2. Creating spark session when Kerberos LDAP Authentication is enabled for mongodb.

authSource=$external&authMechanism=PLAIN has to be included in the URI.

Eg:

val mongoURL = s"mongodb://${mongouser}:${mongopwd}@${mongoHost}:${mongoPort}/${mongoDBName}.foodMongoCollection/"

val spark = Spark.Session.builder.appName("MongoSample").config("spark.mongodb.output.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").config("spark.mongodb.input.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").getOrCreate

3. Update existing Record in Mongo Collection

save() will act as Update as well. See the below code snippet to Read -> Update a value in DataFrame -> Save

val df4 = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "foodDB").option("collection", "foodMongoCollection").load()

val df5 = df4.filter(col("foodName") === "food3")
val df5Updated = df5.drop("isFavFood").withColumn("isFavFood", lit("American_Updated"))

df5Updated .show

MongoSpark.save(df5Updated.write.option("collection","foodMongoCollection").mode("Append"), writeConfig)