Configuring Spark-submit parameters
Before going further let's discuss on the below parameters which I have given for a Job.
spark.executor.cores=5
spark.executor.instances=3
spark.executor.memory=20g
spark.driver.memory=5g
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.maxExecutors=10
spark.executor.cores - specifies the number of cores for an executor. 5 is the optimum level of parallelism that can be obtained, More the number of executors can lead to bad HDFS I/O throughput.
More the cores, more the parallel tasks
spark.executor.instances - Specifies number of executors to run, so 3 Executors x 5 cores = 15 parallel tasks.
spark.executor.memory - The amount of memory each executor can get 20g(as per above configuration). Cores would be sharing this 20GB and as there are 5 cores each core/Task will get 20/5 = 4 GB.
Typically allocate 300 MB for processing 100,000 records.
spark.driver.memory - Usually this can be less as the driver manages the job and doesn't process the data. Driver also stores Local variables, needs larger space incase any Map/Array collection is allocated with large amounts of data. Usuall does Job allocation to executor nodes, DAG creation, writing Log, displaying in console etc
spark.dynamicAllocation.enabled - By default this is enabled and when there is a scope of using more resources than specified(when the cluster is free). The job will ramp up with the resources. Dominant resource calculator needs to be enabled to get the full power of Capacity. This will allocate executors and cores as per the ones we specified, if not enables default it will allocate 1 core per executor. Default is Memory based resource calculator.
spark.dynamicAllocation.maxExecutors - Can specify the maxExecutors in Dynamic allocation
spark.driver.memory - Usually this can be less as the driver manages the job and doesn't process the data. Driver also stores Local variables, needs larger space incase any Map/Array collection is allocated with large amounts of data. Usuall does Job allocation to executor nodes, DAG creation, writing Log, displaying in console etc
spark.dynamicAllocation.enabled - By default this is enabled and when there is a scope of using more resources than specified(when the cluster is free). The job will ramp up with the resources. Dominant resource calculator needs to be enabled to get the full power of Capacity. This will allocate executors and cores as per the ones we specified, if not enables default it will allocate 1 core per executor. Default is Memory based resource calculator.
spark.dynamicAllocation.maxExecutors - Can specify the maxExecutors in Dynamic allocation
When Dynamic allocation is enabled, the conf parameters look as below:
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=3 --conf spark.executor.memory=20g --conf spark.driver.memory=5g
spark.yarn.executor.memoryOverhead - The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
Off-heap refers to objects (serialised to byte array) that are managed by the operating system but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector)
Coming to the actual story of assigning the parameters.
Consider a case where data needs to be read from a partitioned table with each partition containing multiple small/medium files. In this case have Good executor memory, more executors and as usual 5 cores.
Similar cases as above but, not having multiple small/medium files at source. In this case executors can be less and can have good memory for each executor.
In case of incremental load where data pull is less, however needs to pull from multiple tables in parallel(Futures). In this case executors can be more, little less executor memory and as usual 5 cores.
Needs to consider amount of data being processed, the way joins are applied, stages in job, broadcast or not. Basically this goes as trail and error after analyzing the above factors and the best one can be chalked out in UAT environment.
Consider a 5 Node cluster with 12 cores in each node and 48 Gb RAM in each Node.A case when the complete cluster is allocated. Leave out 1 core and 1 GB for operational purposes.
spark.yarn.executor.memoryOverhead - The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
Off-heap refers to objects (serialised to byte array) that are managed by the operating system but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector)
Coming to the actual story of assigning the parameters.
Consider a case where data needs to be read from a partitioned table with each partition containing multiple small/medium files. In this case have Good executor memory, more executors and as usual 5 cores.
Similar cases as above but, not having multiple small/medium files at source. In this case executors can be less and can have good memory for each executor.
In case of incremental load where data pull is less, however needs to pull from multiple tables in parallel(Futures). In this case executors can be more, little less executor memory and as usual 5 cores.
Needs to consider amount of data being processed, the way joins are applied, stages in job, broadcast or not. Basically this goes as trail and error after analyzing the above factors and the best one can be chalked out in UAT environment.
Consider a 5 Node cluster with 12 cores in each node and 48 Gb RAM in each Node.A case when the complete cluster is allocated. Leave out 1 core and 1 GB for operational purposes.
FAT Executors:
--num-executors 5
--num-executor-cores 11
--executor-memory 47g
Advantages:
Advantages:
-> Improved application performance in the cases when each task needs significant amount of data to be processed.
-> With fewer or Large executors, the chances of data being processed on the node where it is stored enhances data locality, there by reducing less network traffic.
Disadvantages: High possibility of resource under utilization.
There is a good monitoring tool called sparklens which can monitor a job and provide it's analysis in the first run. This is an open source.
Thin Executors:
--num-executors 55
--num-executor-cores 1
--executor-memory 4g ##Here per node 47Gb/11cores = 4GB.
Advantages:
-> Increases parallelism as there are more executors handling smaller tasks
-> Beneficial when the tasks are light weight.
-> Cases of Incremental load executed via Future execution context, streaming jobs etc
Disadvantages: Increased Network traffic, reduced data locality
Optimum Executors:
--num-executor-cores 5 ##HDFS throughput deteriorate and leads to GC
--num-executors 11
--num-executor-memory 20g
spark.driver.maxResultSize - Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.
Get Unravel Report and can tune accordingly.There is a good monitoring tool called sparklens which can monitor a job and provide it's analysis in the first run. This is an open source.
Note: Incase, you want all you spark job load a particular dependency jars to drivers and executers then you can specify in those property. The --jars is if you want to add dependency jar to a spark job then we can explicitly pass to the spark-submit command as --conf spark.driver.extraClassPath=<PATH>/phoenix5-spark-shaded.jar --conf spark.executor.extraClassPath=<PATH>/phoenix5-spark-shaded.jar
Reference: https://youtu.be/mA96gUESVZc?si=EZSEl25hp9eH8PJ9
sparklens : https://docs.qubole.com/en/latest/user-guide/spark/sparklens.html
sparklens : https://docs.qubole.com/en/latest/user-guide/spark/sparklens.html
Thanks for sharing such a nice information about Hadoop Language. Keep Sharing.
ReplyDeletethanks for the information which you have provided.it helps me a lot i have seen so many youtube videos there is no satisfaction onces again thank you sir
ReplyDeletedata science training in hyderabad
Thank you for your post. This is superb information. It is amazing and great.
ReplyDeleteHadoop Big Data Classes in Pune
Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.
ReplyDeleteBig Data Solutions
Data Lake Companies
Advanced Analytics Solutions
Full Stack Development Company
bookmarked!!, I like your site!
ReplyDeleteUI Development Training in Bangalore
Reactjs Training in Bangalore
PHP Training in Bangalore
This comment has been removed by the author.
ReplyDeleteWonderful article with great piece of information. Thanks for sharing this with us. I'll take reference from your blog. Do share more such informative articles.
ReplyDeleteAngularJS Training in Pune
As we know there are many companies which are converting into AWS big data consultant. with the right direction we can definitely predict the future.
ReplyDeletenice information thank you
ReplyDeleteSpark and Scala Online Training
Thanks for sharing good information
ReplyDeleteBig Data and Hadoop Online Training
This comment has been removed by the author.
ReplyDeletevery nice information...
ReplyDeleteThank you...
online courses for big data and hadoop
Its a great post with very informative content. Thanks for sharing.
ReplyDeleteui development online courses
Web Designing Online Training
Best UI Designing Course in Bangalore
web development courses online