Wednesday, February 28, 2018

JDBC connection from Spark


JDBC connection from Spark Steps:


Both Read and Write.

1. Create JDBC URL,
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}

////Sets username and password////
  import java.util.Properties
  val connectionProperties = new Properties()

  connectionProperties.put("user", s"${jdbcUsername}")
  connectionProperties.put("password", s"${jdbcPassword}")

2. Read data from JDBC

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

3. Write data to JDBC

New table creation,
spark.table("diamonds").withColumnRenamed("table", "table_number")
     .write
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Insertion to existing table,
import org.apache.spark.sql.SaveMode

spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
     .write
     .mode(SaveMode.Append) // <--- Append to the existing table
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Source: https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Second Approach:

val jdbcSqlConnStr = "jdbc:sqlserver://sn1:1433;database=test;user=user1;password=pwd123#;"

val query = """(select c.Number, c.Num_Ext, s.ID, cast(snapshotdate as date) as snapshotdate
     FROM Tbl1 C
     join Tbl2 P on P.ID = C.ID
     join Tbl3 s on c.ID = s.LocID
     WHERE p.Location = 'NY') aliasName"""

val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

val dfJDBC = spark.sqlContext.read.format("jdbc").options(Map("url"->jdbcSqlConnStr, "Driver"->driver,"dbTable"->query)).load()

source: https://mapr.com/support/s/article/How-do-I-connect-from-JDBC-to-MySQL-using-Spark-SQL-datasource?language=en_US

Parallelism while pulling data from RDBMS


By default when the below function is executed only 1 Connection to RDBMS is established to an executor node for pulling data.

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

Multiple connections can be established by increasing numPartitions. This gives parallel connections for faster data pull. Multiple connections to RDBMS will get established to pull data in faster manner.
Eg:

val employees_table = spark.read.jdbc(jdbcUrl, "employees", "Employee_ID", 1, 200000, 10, connectionProperties);

Here, we have specified 10 connections for data pull.

10 Executors will run in this case.

See the below screenshot for 10 Tasks parallel run.




While writing or reading from RDBMS. 


Reading:
Eg:
val numPartitions = 5
val min = 1
val max = 45000000
val fetch = 10000
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", someprimaryKey).
option("dbtable", config("table")).
option("user", user).
option("fetchsize",fetch).
option("password", password).load()

Here, we have given numPartitions = 5 , partitionColumn = someprimaryKey, min = 1, max = 45000000
So, While pulling the data 5 connections would be established to the db by 5 executors which divides the data pull of 45000000/5 = 9,000,000 / Executor per connection.

val fetch = 10000 specifies the JDBC fetch size, which determines how many rows to fetch per round trip.

options avialable for JDBC connection are under, https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Writing:
Same is the case while writing to RDBMS, numPartitions = Number of connections established to the db by same number of executors.
Note:
1. No need to specify lowerBound and upperBound values while writing to RDBMS.
2. Instead of fetch size, it is batchsize.

option("batchsize",batchcount).

Eg:
df.write
  .mode(SaveMode.Append)
  .option("batchsize",batchcount)
  .option("numPartitions", 8)
  .option(JDBCOptions.JDBC_DRIVER_CLASS, "org.postgresql.Driver")
  .jdbc(url, dbTable, connectionProperties)

7 comments:

  1. We at COEPD provides finest Data Science and R-Language courses in Hyderabad. Your search to learn Data Science ends here at COEPD. Here, we are an established training institute who have trained more than 10,000 participants in all streams. We will help you to convert your passion to learn into an enriched learning process. We will accelerate your career in data science by mastering concepts of Data Management, Statistics, Machine Learning and Big Data.


    http://www.coepd.com/AnalyticsTraining.html

    ReplyDelete
  2. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop training in electronic city

    ReplyDelete
  3. Very nice article,Thank you for sharing this awesome Blog.

    Keep updating.....

    Big Data and Hadoop Online Training

    Big Data Online Training

    ReplyDelete