Sunday, July 2, 2017

Practical Issues and Solutions

Issue 1: 

GC(Garbage Collector) Overhead Issue - http://stackoverflow.com/questions/33341515/hadoop-streaming-gc-overhead-limit-exceeded

21625 GB for HIVE 

3 ways to overcome this.

1. Increase container size. 

If TEZ,
set hive.execution.engine = TEZ
set tez.container.memory.MB = 
If not,
set hive.container.memory.MB =

2. Vectorization 
This picks 1024 rows at a time
set hive.execution.vectorization.enabled=true;

3.Convert join, map JOIN.
This is performance tuning technique.


Issue 2: 

Vertex Issue - sorted by increasing container size.

Issue 3: 

In 0.8 version of Kafka had 4MB Message as Upper limit and gives MessageSizeTooLargeException -
https://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message

Work around - 1. Compress the message while writing.
              2. Split the message as 2 messages

In latest version 0.10 we can set MaxBytes per message
Set the following properties:
Broker Configs($KAFKA_HOME/config/server.properties)
1. replica.fetch.max.bytes
2. message.max.bytes

Consumer Configs($KAFKA_HOME/config/consumer.properties)
This step didn't work for me. I add it to the consumer app and it was working fine

1. fetch.message.max.bytes

Issue 4: 

If any Hive takes longer time to process then need to optmize that particular Query.

1. Enable Vectorization
2. If possible Map side Joins
3. Functional optimization(not specific to Hadoop), in case the Joined dataset is already exists then skip Joining the same set and Join only newer data set.

Issue 5:

While migration from RDBMS, Identified some non-interrelated Jobs, but their execution was scheduled sequentially.
Discussed on the business logic if these can be sequenced in parallel and implemented these Hive jobs to run in parallel 
via Oozie Fork jobs.

Issue 6:

In Spark execution process, in the initial stage long execution sequence was set and the execution process starts only
on encountering action command. Because of this the DAG has become long and has thrown exception. The exception name was thread pool exception.
To overcome this the DF's were persisted at frequent stages to avoid big DAG. This helped to save DF/RDD form as well to 
avoid execution from beginning in case of failure. This happened in the initial stage of Spark.

Issue 7:

In have initial versions Delete option was not there, So created a new table and inserted the data using where condition.
Now, Inserted the data with overwrite mode(Insert overwrite) to the previous table. This will remove the records which were 
not part of the where condition. Came across Issue in case of Partitioned tables, so used MSCK repair and refresh option prior 
to Insertion

Issue 8: 

Out of Memory problem in Reducer - To overcome this problem increase the heap size mapreduce.child.java.opts

Issue 9: 

In spark if the machine has 32 GB RAM then the Maximum data to be computed should be between 12-16 GB where 6-8 GB for output and remaining to be Free for computation.

Issue 10: 

ORC with gZip gave better performance when compared with Snappy.


Issue 11:

org.apache.hadoop.mapred.invalidinputexception input path does not exist s3. THis locates to the partitions those are not available in the specified s3 directory.

change the execution engine from tez to mr

set hive.execution.engine = mr

Issue 12:
When data is dirty, Data is imported using SQOOP with default 4 mappers, in somefiles a column value is bigint and others if has ** or 546* etc. So, athena has created tables out of each file as it could not identify a common schema across each file.

Sol: Imported with a single Mapper.

Issue 13: Tokens expiry during Hive execution.
Error occured: This token is expired. current time is 1454617494914 found 1454598336617 Note: System times on machines may be out of sync. Check system time and time zones.
Basically, in YARN default token interval is 10 minutes. If in any datanode if its time difference is beyond 10 minutes then the query that gets processed in that machine will throw the above error. 
To resolve this the systems in all the datanodes should be same if possible or else the difference should not exceed 10 minutes(default token interval). If is prone to happen that sometimes the system times in data nodes might change. So as a monthly maintenance activity it is better to check and adjust system times to make them aligned.

Memory Issues:


1. can be resolved by increasing memory and CPU capacity in Hive Properties

Eg:

SET mapred.compress.map.output=true;
SET mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapreduce.map.memory.mb=4096;     
SET mapreduce.reduce.memory.mb=4096;
SET mapreduce.map.java.opts=-Xmx3072m              #JAVA Heap size configuraiotn
SET mapreduce.reduce.java.opts=-Xmx6144m #JAVA Heap size configuraiotn
SET hive.auto.convert.join = false;
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

Also when you set java.opts, you need to note two important points, If not Issues like GC Issue and out of memory(OR) java HEAP SIZE will be encountered.

1. It has dependency on memory.mb, so always try to set java.opts upto 80% of memory.mb

2. Follow the "-Xmx4g" format for opt but numerical value for memory.mb

SET hive.auto.convert.join = false;
This will disable automatic conversion to mapside join based on situation.

For more info on all the Hive properties, access https://svn.apache.org/repos/asf/hive/tags/release-0.8.1/conf/hive-default.xml.template

2. In log, If write bytes is 0 then Issue in insert statement.

3. Observe Job and Task counters for debugging

4. In general, for execution of hadoop jobs a Queues would be provided by Administrators for the each team. while execution need to specify proper resources like Memory, CPU cores for the successful job execution to overcome memory issues.

Issues and Steps to overcome Job failure Issues:


Few Scenarios and resolutions:

Scenario – A particular task is using a lot of memory which is causing the slowness or failure, I will look for ways to reduce the memory usage.


  • Make sure the joins are made in an optimal way with memory usage in mind. In joins, the LEFT hand side tables are sent to the reducer first and held in memory and the RIGHT most table in streamed to the reducer. So make sure the RIGHT most table is largest of the datasets in the join.
  • We can also increase the memory requirements needed by the map and reduce tasks by setting – mapreduce.map.memory.mb and mapreduce.reduce.memory.mb 

Scenario – Understanding the data helps a lot in optimizing the way we use the datasets in PIG and HIVE scripts.


  • If you have smaller tables in join, they can be sent to distributed cache and loaded in memory on the Map side and the entire join can be done on the Map side thereby avoiding the shuffle and reduce phase altogether. 
  • If the data is already sorted you can use USING MERGE which will do a Map Only join
  • If the data is bucketted in hive, you may use hive.optimize.bucketmapjoin or hive.optimize.bucketmapjoin.sortedmerge depending on the characteristics of the data


http://hadoopinrealworld.com/how-do-you-debug-a-performance-issue-or-a-long-running-job/

In case where job takes longer time to complete then it could be because of lack of resources(memory,CPU etc). So, increasing container size could resolve the issue.


There is no case where a job is stuck forever, it might take longer time and attempts for 3 times to complete the job. If still, the task fails on all the 3 attempts then the Job will fail.

Logs for each job can be seen in YARN UI. Consider a failure job that has 3 actions in sequential order. 1st Job succeeded and 2nd Job failed.
In the Logs in Yarn UI we can clearly see start of OOZIE job,successful completion of 1st Job and failure of 2nd Job and return to OOZIE job because of this failure.

Partial data Issue:

In case of failed Sqoop job, partial data issue occurs because data from RDBMS because of parallel import. Solution to this problem is to specify a staging table via --staging-table option.
https://stackoverflow.com/questions/24568256/sqoop-import-job-failure-between-data-import

Hive job failures will not leave partial data if the Job failed at mapper stage. Because mappers will not write data and reducers will write data to HDFS.

RDD execution, will not lead to partial data issue because the task execution starts only when the data is loaded in the RAM of each data nodes. In case of insufficient memory where data to be computed could not fit in memory, then the execution will not proceed in the first instance.
This is why the execution is called as Resilient Distributed datasets. This has good fault tolerance.

This is the difference between Impala and Spark where fault tolerance in Impala is not as good as like in Spark

During Spark execution checkpoints can be enabled in the intermediate stages so that in case of job failure DAG engine will start from the Checkpoint stage.

To troubleshoot failures in Production environment, following are the steps:

In Hue see the execution logs and get the Yarn UI link and go to that job.

This link will give the Job details like failed mappers, failed reducers, Application Master node etc.

If suppose any mapper has failed, then click on the failed mapper link as below. This will show the Node in which the mapper has got executed and beside it a link for 'logs'. This log will give detailed information of the failure.

Job failure can be debugged by going through failed job logs in Yarn UI.
In case of multiple actions in a failed oozie job, those actions could be seen in HUE and can identify the failed action.

Logging mechanism in spark follow https://mapr.com/blog/how-log-apache-spark/

Any Automation?



1. IN AE Console, column names would be specified in XML parameters and these column names are used applied in template HQL files, to get actual HQL file for operation. Along with this the job scheduling parameters will also be taken from UI to schedule OOZIE jobs. Business Analysts can use this for scheduling oozie jobs without any assistance from development team.

2. A usecase where past few days data have to be deleted in the hive table and for about 30 days empty partitions are to be created. This was made configurable to the Business Analysts by exposing option in XML file while creating Job.

When encountered issue "FAILED: RuntimeException MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found)".

This means that the JSON serde is not available in HIVE. The Hive table's data has to be Deserialized via JsonSerDe and needs to be part of Hive libs.

Solution: Download json-serde-1.3-SNAPSHOT-jar-with-dependencies.jar and paste under Hive's lib directory(/usr/lib/hive/lib)

1 comment: