Thursday, May 21, 2020
Thursday, May 7, 2020
Mongo Spark Connector
This Article explains the way to Write, Read and Update data to MongoDB.
One of my Friend's Thomas has written a nice article in which he explained the same in an awesome manner. Please follow the link https://medium.com/@thomaspt748/how-to-load-millions-of-data-into-mongo-db-using-apache-spark-3-0-8bcf089bd6ed
I would like to add 3 Points apart from the one's explained by my friend.
1. Dealing with Nested JSON.
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites")).groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
Reference: https://stackoverflow.com/questions/53200529/spark-dataframe-to-nested-json
Later the Dataframe can be written to Mongo Collection using Save API.
Eg:
MongoSpark.save(foodGrpDf .write.option("collection", "foodMongoCollection").mode("Append"), writeConfig)
Using this above function the Dataframe(foodGrpDf ) can be directly written to MOngo Collection(foodMongoCollection).
2. Creating spark session when Kerberos LDAP Authentication is enabled for mongodb.
authSource=$external&authMechanism=PLAIN has to be included in the URI.
Eg:
val mongoURL = s"mongodb://${mongouser}:${mongopwd}@${mongoHost}:${mongoPort}/${mongoDBName}.foodMongoCollection/"
val spark = Spark.Session.builder.appName("MongoSample").config("spark.mongodb.output.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").config("spark.mongodb.input.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").getOrCreate
3. Update existing Record in Mongo Collection
save() will act as Update as well. See the below code snippet to Read -> Update a value in DataFrame -> Save
val df4 = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "foodDB").option("collection", "foodMongoCollection").load()
val df5 = df4.filter(col("foodName") === "food3")
val df5Updated = df5.drop("isFavFood").withColumn("isFavFood", lit("American_Updated"))
df5Updated .show
MongoSpark.save(df5Updated.write.option("collection","foodMongoCollection").mode("Append"), writeConfig)
One of my Friend's Thomas has written a nice article in which he explained the same in an awesome manner. Please follow the link https://medium.com/@thomaspt748/how-to-load-millions-of-data-into-mongo-db-using-apache-spark-3-0-8bcf089bd6ed
I would like to add 3 Points apart from the one's explained by my friend.
1. Dealing with Nested JSON.
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val foodGrpDf = foodDf.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites")).groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
groupBy and collect_list() are to be used together to form a Nested structure.
Reference: https://stackoverflow.com/questions/53200529/spark-dataframe-to-nested-json
Later the Dataframe can be written to Mongo Collection using Save API.
Eg:
MongoSpark.save(foodGrpDf .write.option("collection", "foodMongoCollection").mode("Append"), writeConfig)
Using this above function the Dataframe(foodGrpDf ) can be directly written to MOngo Collection(foodMongoCollection).
2. Creating spark session when Kerberos LDAP Authentication is enabled for mongodb.
authSource=$external&authMechanism=PLAIN has to be included in the URI.
Eg:
val mongoURL = s"mongodb://${mongouser}:${mongopwd}@${mongoHost}:${mongoPort}/${mongoDBName}.foodMongoCollection/"
val spark = Spark.Session.builder.appName("MongoSample").config("spark.mongodb.output.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").config("spark.mongodb.input.uri", mongoURL + "?authSource=$external&authMechanism=PLAIN").getOrCreate
3. Update existing Record in Mongo Collection
save() will act as Update as well. See the below code snippet to Read -> Update a value in DataFrame -> Save
val df4 = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "foodDB").option("collection", "foodMongoCollection").load()
val df5 = df4.filter(col("foodName") === "food3")
val df5Updated = df5.drop("isFavFood").withColumn("isFavFood", lit("American_Updated"))
df5Updated .show
MongoSpark.save(df5Updated.write.option("collection","foodMongoCollection").mode("Append"), writeConfig)
Subscribe to:
Posts (Atom)