Monday, February 6, 2017

Common Issues and topics of Interest

1. Where PIG is used?


2. Generally used HIVE and PIG operations?
3. What exactly data cleansing mean?
4. In which circumstances UDFs are needed. Are they often used?
5. HIVE and HBase Integration

Use HBaseStorage handler and mention SERDEPROPERTIES 
Follow http://leelaprasadhadoop.blogspot.in/2017/02/hive-hbase-integration.html

6. HIVE and MongoDB integration
7. HCatalog
8. UDF's are of 2 types: 

UDAF and UDTF https://www.linkedin.com/pulse/hive-functions-udfudaf-udtf-examples-gaurav-singh


9. Runtime memory Issues faced.



GC(Garbage Collector) Overhead Issue - http://stackoverflow.com/questions/33341515/hadoop-streaming-gc-overhead-limit-exceeded

21625 GB for HIVE 

3 ways to overcome this.

1. Increase container size. 

If TEZ,
set hive.execution.engine = TEZ
set tez.container.memory.MB = 
If not,
set hive.container.memory.MB =

2. Vectorization 
This picks 1024 rows at a time
set hive.execution.vectorization.enabled=true;

3.Convert join, map JOIN.
This is performance tuning technique.


Vertex Issue - sorted by increasing container size.

10. What is TEZ?

Tez – Hindi for “speed” – (currently under incubation vote within Apache) provides a general-purpose, highly customizable framework that creates simplifies data-processing tasks across both small scale (low-latency) and large-scale (high throughput) workloads in Hadoop. It generalizes the MapReduce paradigm to a more powerful framework by providing the ability to execute a complex DAG (directed acyclic graph) of tasks for a single job so that projects in the Apache Hadoop ecosystem such as Apache Hive, Apache Pig and Cascading can meet requirements for human-interactive response times and extreme throughput.
With Tez, we introduce a more expressive DAG of tasks, within a single application or job, that is better aligned with the required processing task – thus, for e.g., any given SQL query can be expressed as a single job using Tez.

11. HIVE File formats
RCFile (Record Columnar File), ORC (Optimized Row Columnar) format.  Hive file formats are significant to its performance and big data processing. AVRO, Parquet

12. How to Run JAR file?
        1. Input and output to be in HDFS.
        2. use command, hadoop jar <JARFile_PATH> <MAIN_JOB.Class> <HDFS_INPUT> <HDFS_OUTPUT>

JAVA Questions:

1. Is if Immutable?

2. Scenarios for writing custom UDFs?

3. What do you understand by mutable and immutable objects?
Mutable objects can have their fields changed after construction. Immutable objects cannot. Immutable objects are simply objects whose state (the object's data) cannot change after construction. Examples of immutable objects from the JDK include String and Integer

4. What is difference between Path and Classpath?
Path and Classpath are operating system level environment variales. Path is used define where the system can find the executables(.exe) files and classpath is used to specify the location .class files.

5. What is the advantage of HASH code?
hashCode() is used for bucketing in Hash implementations like HashMap, HashTable, HashSet, etc.

The value received from hashCode() is used as the bucket number for storing elements of the set/map. This bucket number is the address of the element inside the set/map.


When you do contains() it will take the hash code of the element, then look for the bucket where hash code points to. If more than 1 element is found in the same bucket (multiple objects can have the same hash code), then it uses the equals() method to evaluate if the objects are equal, and then decide if contains() is true or false, or decide if element could be added in the set or not.

HADOOP Questions:

1. What is Inputsplit?


2. Can Job Tracker Exist with out name node?
Answer is No. Job tracker can exist but cannot execute any task.

NameNode And DataNode

Technical Sense: NameNode stores MetaData(No of Blocks, On Which Rack which DataNode the data is stored and other details) about the data being stored in DataNodes whereas the DataNode stores the actual Data.

Physical Sense: In a multinode cluster NameNode and DataNodes are usually on different machines. There is only one NameNode in a cluster and many DataNodes; Thats why we call NameNode as a single point of failure. Although There is a Secondary NameNode (SNN) that can exist on different machine which doesn't actually act as a NameNode but stores the image of primary NameNode at certain checkpoint and is used as backup to restore NameNode.

In a single node cluster (which is referred to as a cluster in pseudo-distributed mode), the NameNode and DataNode can be in a single machine as well.

JobTracker And TaskTracker

Technical Sense: JobTracker is a master which creates and runs the job. JobTracker which can run on the NameNode allocates the job to TaskTrackers which run on DataNodes; TaskTrackers run the tasks and report the status of task to JobTracker.

Physical Sense: The JobTracker runs on MasterNode aka NameNode whereas TaskTrackers run on DataNodes.


3. Hadoop architecture?

4. Difference between Hadoop 1.x and 2.x

http://stackoverflow.com/questions/33494697/secondary-namenode-usage-and-high-availability-in-hadoop-2-x

5. Indexing in HADOOP?

6. What is Namenode?

7. Replication factor?

8. Heart beat?

9. Rack awareness?

Each DataNode sends a Heartbeat message to the NameNode periodically. A network partition can cause a subset of DataNodes to lose connectivity with the NameNode. The NameNode detects this condition by the absence of a Heartbeat message. The NameNode marks DataNodes without recent Heartbeats as dead and does not forward any new IO requests to them. Any data that was registered to a dead DataNode is not available to HDFS any more. DataNode death may cause the replication factor of some blocks to fall below their specified value. The NameNode constantly tracks which blocks need to be replicated and initiates replication whenever necessary. The necessity for re-replication may arise due to many reasons: a DataNode may become unavailable, a replica may become corrupted, a hard disk on a DataNode may fail, or the replication factor of a file may be increased.

10. In Hadoop 1.x

The FsImage and the EditLog are central data structures of HDFS. A corruption of these files can cause the HDFS instance to be non-functional. For this reason, the NameNode can be configured to support maintaining multiple copies of the FsImage and EditLog. Any update to either the FsImage or EditLog causes each of the FsImages and EditLogs to get updated synchronously. This synchronous updating of multiple copies of the FsImage and EditLog may degrade the rate of namespace transactions per second that a NameNode can support. However, this degradation is acceptable because even though HDFS applications are very data intensive in nature, they are not metadata intensive. When a NameNode restarts, it selects the latest consistent FsImage and EditLog to use.
The NameNode machine is a single point of failure for an HDFS cluster. If the NameNode machine fails, manual intervention is necessary. Currently, automatic restart and failover of the NameNode software to another machine is not supported.
11. Secondary Namenode?
The NameNode stores modifications to the file system as a log appended to a native file system file, edits. When a NameNode starts up, it reads HDFS state from an image file, fsimage, and then applies edits from the edits log file. It then writes new HDFS state to the fsimage and starts normal operation with an empty edits file. Since NameNode merges fsimage and edits files only during start up, the edits log file could get very large over time on a busy cluster. Another side effect of a larger edits file is that next restart of NameNode takes longer.
The secondary NameNode merges the fsimage and the edits log files periodically and keeps edits log size within a limit. It is usually run on a different machine than the primary NameNode since its memory requirements are on the same order as the primary NameNode.
The start of the checkpoint process on the secondary NameNode is controlled by two configuration parameters.
  • dfs.namenode.checkpoint.period, set to 1 hour by default, specifies the maximum delay between two consecutive checkpoints, and
  • dfs.namenode.checkpoint.txns, set to 1 million by default, defines the number of uncheckpointed transactions on the NameNode which will force an urgent checkpoint, even if the checkpoint period has not been reached.
Secondary Namenode concept is only for Hadoop 1.x version.

12. StandBy Namenode?

Prior to Hadoop 2.0.0, the NameNode was a single point of failure (SPOF) in an HDFS cluster. Each cluster had a single NameNode, and if that machine or process became unavailable, the cluster as a whole would be unavailable until the NameNode was either restarted or brought up on a separate machine.
This impacted the total availability of the HDFS cluster in two major ways:
In the case of an unplanned event such as a machine crash, the cluster would be unavailable until an operator restarted the NameNode. Planned maintenance events such as software or hardware upgrades on the NameNode machine would result in windows of cluster downtime. The HDFS High Availability feature addresses the above problems by providing the option of running two redundant NameNodes in the same cluster in an Active/Passive configuration with a hot standby. This allows a fast failover to a new NameNode in the case that a machine crashes, or a graceful administrator-initiated failover for the purpose of planned maintenance.

Two separate machines are configured as NameNodes. At any point in time, exactly one of the NameNodes is in an Active state, and the other is in a Standby state. The Active NameNode is responsible for all client operations in the cluster, while the Standby is simply acting as a slave, maintaining enough state to provide a fast failover if necessary.
In order for the Standby node to keep its state synchronized with the Active node, both nodes communicate with a group of separate daemons called “JournalNodes” (JNs).
When any namespace modification is performed by the Active node, it durably logs a record of the modification to a majority of these JNs. The Standby node is reads these edits from the JNs and apply to its own name space.
In the event of a failover, the Standby will ensure that it has read all of the edits from the JounalNodes before promoting itself to the Active state. This ensures that the namespace state is fully synchronized before a failover occurs.
It is vital for an HA cluster that only one of the NameNodes be Active at a time.

https://www.dezyre.com/article/what-is-hadoop-2-0-high-availability/87
13. Zookeeper Role in Hadoop cluster?

14. YARN 2.0?

Major Components are - 
Resource Manager
  • This daemon process runs on master node (may run on the same machine as name node for smaller clusters)
  • It is responsible for getting job submitted from client and schedule it on cluster, monitoring running jobs on cluster and allocating proper resources on the slave node
  • It communicates with Node Manager daemon process on the slave node to track the resource utilization
  • It uses two other processes named Application Manager and Scheduler for MapReduce task and resource management
  • Resource Manager should be on a high end machine. There is standby RM as well.
  • RM allocates Application master on any one of the datanodes where the job related data is stored.
  • The jar files for job execution would be stored in temp location(local filesystem) in the Application master.
Node Manager
  • This daemon process runs on slave nodes (normally on HDFS Data node machines)
  • It is responsible for coordinating with Resource Manager for task scheduling and tracking the resource utilization on the slave node
  • It also reports the resource utilization back to the Resource Manager
  • It uses other daemon process like Application Master and Container for MapReduce task scheduling and execution on the slave node
Container
It is considered to be a small unit of resources (like cpu, memory, disk) belong to the SlaveNode

http://backtobazics.com/big-data/yarn-architecture-and-components/


  1. A client program submits the application, including the necessary specifications to launch the application-specific ApplicationMaster itself.
  2. The ResourceManager assumes the responsibility to negotiate a specified container in which to start the ApplicationMaster and then launches the ApplicationMaster.
  3. The ApplicationMaster, on boot-up, registers with the ResourceManager – the registration allows the client program to query the ResourceManager for details, which allow it to  directly communicate with its own ApplicationMaster.
  4. During normal operation the ApplicationMaster negotiates appropriate resource containers via the resource-request protocol.
  5. On successful container allocations, the ApplicationMaster launches the container by providing the container launch specification to the NodeManager. The launch specification, typically, includes the necessary information to allow the container to communicate with the ApplicationMaster itself.
  6. The application code executing within the container then provides necessary information (progress, status etc.) to its ApplicationMaster via an application-specific protocol.
  7. During the application execution, the client that submitted the program communicates directly with the ApplicationMaster to get status, progress updates etc. via an application-specific protocol.
  8. Once the application is complete, and all necessary work has been finished, the ApplicationMaster deregisters with the ResourceManager and shuts down, allowing its own container to be repurposed.
What is Distributed Cache?

How to configure queues using YARN capacity-scheduler.xml

What is Memcache? 

Difference between HBASE and cassandra?

How to skip MapReduce execution for a Job?

How to submit a spark JOB?

How to improve performance?

            Mapside Joins
            Increase Mappers ( based on need and if feasible)
            Vectorization
            TEZ Engine
            Impala
            SparkSQL
            Partitioning and Bucketing
            Query optimization
            Hive Query tuning: https://www.sisense.com/blog/8-ways-fine-tune-sql-queries-production-databases/

1.      Create Joins with INNER JOIN Rather than WHERE

SELECT Customers.CustomerID, Customers.Name, Sales.LastSaleDate
FROM Customers, Sales
WHERE Customers.CustomerID = Sales.CustomerID
        //Inefficient Way
This type of join creates a Cartesian Join, also called a Cartesian Product or CROSS JOIN. In a Cartesian Join, all possible combinations of the variables are created. In this example, if we had 1,000 customers with 1,000 total sales, the query would first generate 1,000,000 results, then filter for the 1,000 records where CustomerID is correctly joined.
To prevent creating a Cartesian Join, INNER JOIN should be used instead:
SELECT Customers.CustomerID, Customers.Name, Sales.LastSaleDate
FROM Customers
   INNER JOIN Sales
   ON Customers.CustomerID = Sales.CustomerID
      //Efficient Way
The database would generate the 1,000 desired records where CustomerID is equal.

2.      Use WHERE instead of HAVING to Define Filters.


3.      LIMIT JOIN operations wherever needed. Remove flaws like duplication of Joins etc


How file information wil be written to Active and Standby Namenodes?

How to copy file from Windows machine to Linux?
Winscp

What is Edge Node?
One of the data node in the cluster.

How to access Edge Node of cluster?
Putty

Which version of Oracle you have used?
Oracle 11g

Delete and update a row from Hive table.

This is possible form Hive 0.14 onwards
--------------------------------------------------------------
ACID functionality in HIVE
--------------------------------------------------------------
Add the below properties in "hive-site.xml" file & restart hive server

<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>

<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>2</value>
</property>




CREATE HIVE TABLE WITH CLUSTERED BY, ORC, TBLPROPERTIES
--------------------------------------------------------------
CREATE TABLE IF NOT EXISTS student_acid
( name string, id int, course string, year int )
CLUSTERED BY (name) INTO 4 BUCKETS
STORED AS ORC
LOCATION '/hive/kalyan/student_acid'
TBLPROPERTIES ('transactional' = 'true')
;


INSERT INTO TABLE student_acid VALUES
('arun', 1, 'mca', 1),
('anil', 2, 'mca', 1),
('sudheer', 3, 'mca', 2),
('santosh', 4, 'mca', 2)
;

UPDATE student_acid
SET year = 3, course = 'mech'
WHERE id = 4 ;

DELETE FROM student_acid WHERE name = 'anil';

Note: In cloudera this is restricted and gives the below error.

FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

Documentation in Cloudera states, Hive ACID is not supported


Hive ACID is an experimental feature and Cloudera does not currently support it.

How to select number of buckets


1. Every bucket will create a file.
2. See to it that the file size is not less that Block size or very large
While reading each file processing will assign a reducer, if there are multiple small files then it is not worth of performance.
Too huge bucket is also not recommended as it can fail to fit bucket in memory. Optimum is approx the size of each block.


Explain 4 datatypes in Pig.

Id, product_name
-----------------------
10, iphone
20, samsung
30, Nokia
Field: A field is a piece of data. In the above data set product_name is a field.
Tuple: A tuple is a set of fields. Here Id and product_name form a tuple. Tuples are represented by braces. Example: (10, iphone).
Bag: A bag is collection of tuples. Bag is represented by flower braces. Example: {(10,iphone),(20, samsung),(30,Nokia)}.
Relation: Relation represents the complete database. A relation is a bag. To be precise relation is an outer bag. We can call a relation as a bag of tuples.
To compare with RDBMS, a relation is a table, where as the tuples in the bag corresponds to the rows in the table. 

7. How to assign parameters in pig
         pig -f <filename> -param input=__
pig -param date=20130326 -param date2=20130426 -f myfile.pig        //Passing 2 Parameters

8. InputSplit

  • The answer by @user1668782 is a great explanation for the question and I'll try to give a graphical depiction of it.
  • Assume we have a file of 400MB with consists of 4 records(e.g : csv file of 400MB and it has 4 rows, 100MB each)
enter image description here
  • If the HDFS Block Size is configured as 128MB, then the 4 records will not be distributed among the blocks evenly. It will look like this.
enter image description here
  • Block 1 contains the entire first record and a 28MB chunk of the second record.
  • If a mapper is to be run on Block 1, the mapper cannot process since it won't have the entire second record.
  • This is the exact problem that input splits solve. Input splits respects logical record boundaries.
  • Lets Assume the input split size is 200MB
enter image description here
  • Therefore the input split 1 should have both the record 1 and record 2. And input split 2 will not start with the record 2 since record 2 has been assigned to input split 1. Input split 2 will start with record 3.
  • This is why an input split is only a logical chunk of data. It points to start and end locations with in blocks.

Mapreduce Reads data via inputsplit. Default is Textinputformat.


Split is logical split of your data, basically used during data processing using Map/Reduce program or other processing techniques. Split size is user defined and you can choose your split size based on your data(How much data you are processing).
Split is basically used to control number of Mapper in Map/Reduce program. If you have not defined any input split size in Map/Reduce program then default HDFS block split will be considered as input split.
Example:
Suppose you have a file of 100MB and HDFS default block configuration is 64MB then it will chopped in 2 split and occupy 2 blocks. Now you have a Map/Reduce program to process this data but you have not specified any input split then based on the number of blocks(2 block) input split will be considered for the Map/Reduce processing and 2 mapper will get assigned for this job.
But suppose,you have specified the split size(say 100MB) in your Map/Reduce program then both blocks(2 block) will be considered as a single split for the Map/Reduce processing and 1 Mapper will get assigned for this job.
Suppose,you have specified the split size(say 25MB) in your Map/Reduce program then there will be 4 input split for the Map/Reduce program and 4 Mapper will get assigned for the job.
Conclusion:
Split is a logical division of the input data while block is a physical division of data.
HDFS default block size is default split size if input split is not specified.
Split is user defined and user can control split size in his Map/Reduce program.
One split can be mapping to multiple blocks and there can be multiple split of one block.
The number of map tasks (Mapper) are equal to the number of splits.
If your resource is limited and you want to limit the number of maps you can increase the split size. For example: If we have 640 MB of 10 blocks i.e. each block of 64 MB and resource is limited then you can mention Split size as 128 MB then then logical grouping of 128 MB is formed and only 5 maps will be executed with a size of 128 MB.
If we specify split size is false then whole file will form one input split and processed by one map which it takes more time to process when file is big.

By default,
splitSize = Block Size

Block concept is while writing data to HDFS.
Split concept is while reading data from HDFS

number of splits = number of mappers
number of splits = number of blocks [default, not always]
number of splits = number of mappers [default, not always]


During execution 2 mappers will start execution in parallel and 3rd mapper will wait for completion of any one of the mapper.


How to increase Mappers in Hive execution?

Reduce the input split size from the default value. The mappers will get increased.

SET mapreduce.input.fileinputformat.split.maxsize;

10. How to load 10 lines at a time in mapper instead of record by record

You may have to define your own RecordReaderInputSplit, and InputFormat. Depending on exactly what you are trying to do, you will be able to reuse some of the already existing ones of the three above. You will likely have to write your own RecordReader to define the key/value pair and you will likely have to write your own InputSplit to help define the boundary.

Extend nLineInputFormat.

11. What is Record Reader?

Record reader is an interface between inputsplit and mapper

Mapper will work only with Key-value pairs. Converts the input split data into key-value pairs and sends to Mapper.

By default Record reader tramsforms 1 line to mapper. THis can be overrided by extending nlineinputformat class and reimplement the method.

Identity Mapper is the default Mapper class provided by hadoop and this will be picked automatically when no mapper is specified in Mapreduce driver class. Identity Mapper class implements the identity function, which directly writes all its input key/value pairs into output. It is a generic mapper class and it can be used with any key/value data types. Doesn't perform any processing of data


Identity Reducer It is the default reducer class provided by Hadoop and this class will be picked up by mapreduce job automatically when no other reducer class is specified in the driver class. Similar to Identity Mapper, this class also doesn’t perform any processing on the data and it simply writes all its input data into output.

12. Rack Awareness

Usually hadoop clusters are of more that 30-40 nodes and are configured in multiple racks. Communication between 2 data nodes on the same rack is efficient when compared to the ones in different racks.

Namenode acheives this rack information by maintaining rack ids of each data node. This concept of choosing closer data nodes based on racks information is called Rack awareness in hadoop.

On multiple rack cluster, block replicas are maintained with a policy that no more than one replica is placed on one data node and no more that 2 replicas are placed in same rack with a constraint that number of racks used for block replication should be always less than total no of block replicas.

1. When a block is created, the first replica is placed on the local node, the second one is placed at a different rack, the third one is on a different node at the local rack.

2. For reading, the namenode first checks id the client's computer is located in the cluster. If yes, block locations are returned from the close data nodes to the client.

13. HBase commands

HBASE Architecture

Hierarchy of each componen in HBase.


https://www.edureka.co/blog/hbase-architecture/

https://mapr.com/blog/in-depth-look-hbase-architecture/

CREATE TABLE IN HBASE


create 'tbl_name', 'column_family1', 'column_family2', ...
eg: create 'sample', 'cf1', 'cf2'

INSERT / UPDATE DATA IN HBASE

put 'tbl_name', 'row_key', 'column_family1:qualifier', 'value'
eg: put 'test', 'row1', 'cf:a', 'value1'

READ DATA FROM HBASE

scan 'tbl_name'
scan 'tbl_name', {COLUMNS => 'column_family:qualifier'}
get 'tbl_name', 'row_key', {COLUMNS => 'column_family:qualifier'}

eg: scan 'test'
scan 'test',{COLUMNS => ['cf:a','cf:b']}
get 'test', 'row1', {COLUMNS => 'cf:a'}

DELETE DATA FROM HBASE

delete 'tbl_name', 'row_key', 'column_family1:qualifier'

eg:
delete 'test', 'row4', 'cf:d'

Show Hbase tables

List all tables in hbase. Optional regular expression parameter could
be used to filter the output. Examples:

  hbase> list
  hbase> list 'abc.*'
  hbase> list 'ns:abc.*'
  hbase> list 'ns:.*'

Delete HBase table

Disable tbl1
Drop tbl2

Bulk upload to HBASE

use the command in normal terminal,

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.1.2.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,cf:c1,cf:c2 mytable5 hdfs://localhost:8020/hbase_inputs/import/sample1.tsv


20. Incremental load option in SQOOP (--incremental append and --incremental lastmodified)

https://dataandstats.wordpress.com/2014/12/04/incremental-load-in-sqoop/

Delta data importsIdeal process in real-time scenario is synchronizing the delta data (modified or updated data) from RDBMS to Hadoop. Sqoop has incremental load command to facilitate the delta data.
Append in import command for tables where rows only get inserted.
Last-Modified in import command for the rows get inserted as well as updated.


sqoop-increamental-append
Importing incremental data with Last-modified mode option

sqoop-increamental-last-modifiedWorkaround for delta data import
Sqoop is importing and saving as RDBMS table name as a file in HDFS. The last modified mode is importing the delta data and trying to save the same name which already present in HDFS side and it throw error since HDFS does not allow the same name file.

Here is workaround to get complete updated data in HDFS side
1. Move existing HDFS data to temporary folder
2. Run last modified mode fresh import
3. Merge with this fresh import with old data which saved in temporary folder.

21. Import/Export Sqoop from JAVA code

Follow, http://www.programcreek.com/java-api-examples/index.php?api=com.cloudera.sqoop.tool.SqoopTool

use SqoopOptions object to set options for the operation.

Create Sqoop object and call runSqoop() method.

21. Distributed By
Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer.
It ensures each of N reducers gets non-overlapping ranges of column, but doesn’t sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges.
Example ( taken directly from Hive wiki ):-
We are Distributing By x on the following 5 rows to 2 reducer:
x1
x2
x4
x3
x1
Reducer 1 got
x1
x2
x1
Reducer 2 got
x4
x3

Difference between Cluster By Distributed By.

Cluster By is a short-cut for both Distribute By and Sort By.

How do you do for Unit testing?

We test some random samples from the development samples and make sure that all the combination of datasets are covered as part of unit test cases.

Difference between oozie job where frequency is set for short duration and spark streaming

OOZIE is a job scheduler and has to trigger a new job everytime and needs to go through the over head of Application Master, Resource Manager etc.


This overhead will not be there for spark streaming job.

Checkpoint Node:

Checkpoint node in hadoop is a new implementation of the Secondary NameNode to solve the drawbacks of Secondary NameNode.


Main function of the Checkpoint Node in hadoop is to create periodic checkpoints of file system metadata by merging edits file with fsimage file. Usually the new fsimage from merge operation is called as a checkpoint.

Checkpoint Node periodically downloads the fsimage andedits log files from primary NameNode and merges them locally and stores in a directory structure which is similar to the directory structure of a primary NameNode so that primary NameNode can easily access the latest checkpoint if necessary in case of any NameNode failures.
 It usually runs on a different machine than the primary NameNode since its memory requirements are same as the primary NameNode.

SPARK Questions

1. How to save an RDD?
persist

2. split RDDS in runtime?
Repartition can be used to increase or decrease the partitions. coalese is best to decrease the number of partitions.

Difference between Repartition and Coalese

The Repartition algorithm does a full shuffle and creates new partitions with data that's distributed evenly. This method makes new partitions and evenly distributes the data in the new partitions 

Coalese avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.
So, it would go something like this:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
Then coalesce down to 2 partitions:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Notice that Node 1 and Node 3 did not require its original data to move.

3. calculate Average?
use GroupByKey

4. Consider data is converged for every hour in the form of different categories and each category has some values? Get the max value of each hour. Implement in spark.

5. Functions and Closure

6. DAG and RDD Lineage

DAG is the engine and Lineage is the graph.

Lineage Graph: This is a graph of all parent RDDs of an RDD. It is built as a result of applying transformations to the RDD and creates a logical execution plan.
+

rdd lineage.png

This graph is at RDD transformations level.

DAG - This term is not specifc to spark, this is a generalized software engineering term which represents the execution flow. This flow should not be cyclic to avoid infinite loop.


Computations on RDDs are represented as a lineage graph, a DAG representing the computations done on the RDD. 

7. Difference between map and flatmap
FlatMap, it does a similar job. Transforming one collection to another. Or in spark terms, one RDD to another RDD.
the output of flatMap is flattened . Though the function in flatMap returns a list of element(s) for each element but the output of FlatMap will be an RDD which has all the elements flattened to a single list.
Let’s see this with an example.
Say you have a text file as follows
  1. Hello World
  2. Who are you
Now, if you run a flatMap on the textFile rdd,
  1. words = linesRDD.flatMap(x -> List(x.split(“ “)))
And, the value in the words RDD would be, [“Hello”, World”, Who”, are”, you”]

8. Spark Execution Model

9. What are the 2 types of Deploy modes in Spark?

There are two deploy modes that can be used to launch Spark applications on YARN. In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

 A common deployment strategy is to submit your application from a gateway machine that is physically co-located with your worker machines (e.g. Master node in a standalone EC2 cluster). In this setup, client mode is appropriate. In client mode, the driver is launched directly within the spark-submit process which acts as a client to the cluster. The input and output of the application is attached to the console. Thus, this mode is especially suitable for applications that involve the REPL (e.g. Spark shell).

Alternatively, if your application is submitted from a machine far from the worker machines (e.g. locally on your laptop), it is common to use cluster mode to minimize network latency between the drivers and the executors.

The key difference between Hadoop MapReduce and Spark

To make the comparison fair, here we will contrast Spark with Hadoop MapReduce, as both are responsible for data processing. In fact, the key difference between them lies in the approach to processing: Spark can do it in-memory, while Hadoop MapReduce has to read from and write to a disk. As a result, the speed of processing differs significantly – Spark may be up to 100 times faster. However, the volume of data processed also differs: Hadoop MapReduce is able to work with far larger data sets than Spark.
Now, let’s take a closer look at the tasks each framework is good for.

Tasks Hadoop MapReduce is good for:

  • Linear processing of huge data sets. Hadoop MapReduce allows parallel processing of huge amounts of data. It breaks a large chunk into smaller ones to be processed separately on different data nodes and automatically gathers the results across the multiple nodes to return a single result. In case the resulting dataset is larger than available RAM, Hadoop MapReduce may outperform Spark.
  • Economical solution, if no immediate results are expected. Our Hadoop teamconsiders MapReduce a good solution if the speed of processing is not critical. For instance, if data processing can be done during night hours, it makes sense to consider using Hadoop MapReduce.

Tasks Spark is good for:

  • Fast data processing. In-memory processing makes Spark faster than Hadoop MapReduce – up to 100 times for data in RAM and up to 10 times for data in storage.
  • Iterative processing. If the task is to process data again and again – Spark defeats Hadoop MapReduce. Spark’s Resilient Distributed Datasets (RDDs) enable multiple map operations in memory, while Hadoop MapReduce has to write interim results to a disk.
  • Near real-time processing. If a business needs immediate insights, then they should opt for Spark and its in-memory processing.
  • Graph processing. Spark’s computational model is good for iterative computations that are typical in graph processing. And Apache Spark has GraphX – an API for graph computation.
  • Machine learning. Spark has MLlib – a built-in machine learning library, while Hadoop needs a third-party to provide it. MLlib has out-of-the-box algorithms that also run in memory. Besides, there is a possibility of tuning and adjusting them.
  • Joining datasets. Due to its speed, Spark can create all combinations faster, though Hadoop may be better if joining of very large data sets that requires a lot of shuffling and sorting is needed.

Examples of practical applications

We analyzed several examples of practical applications and made a conclusion that Spark is likely to outperform MapReduce in all applications below, thanks to fast or even near real-time processing. Let’s look at the examples.
  • Customer segmentation. Analyzing customer behavior and identifying segments of customers that demonstrate similar behavior patterns will help businesses to understand customer preferences and create a unique customer experience.
  • Risk management. Forecasting different possible scenarios can help managers to make right decisions by choosing non-risky options.
  • Real-time fraud detection. After the system is trained on historical data with the help of machine-learning algorithms, it can use these findings to identify or predict an anomaly in real time that may signal of a possible fraud.
  • Industrial big data analysis. It’s also about detecting and predicting anomalies, but in this case, these anomalies are related to machinery breakdowns. A properly configured system collects the data from sensors to detect pre-failure conditions.

Which framework to choose?

It’s your particular business needs that should determine the choice of a framework. Linear processing of huge datasets is the advantage of Hadoop MapReduce, while Spark delivers fast performanceiterative processing, real-time analytics, graph processing, machine learning and more. In many cases Spark may outperform Hadoop MapReduce. The great news is the Spark is fully compatible with the Hadoop eco-system and works smoothly with Hadoop Distributed File System, Apache Hive, etc.

Compress in snappy and save in Parquet format


sqlContext.setConf("spark.sql.parquet.compression.codec.", "snappy")
dataDF.write.parquet("/user/cloudera/problem1/result4a-snappy")


Compress in gzip and save in Parquet format


sqlContext.setConf("spark.sql.parquet.compression.codec.", "gzip")
dataDF.write.parquet("/user/cloudera/problem1/result4a-snappy")


1. How do u identify duplicate records read from kafka --Aggregratebykey

Consider below records in 2 partitions
partition 1
1 Hyderabad
4 Chennai
2 Bangalore
1 Hyderabad

partition 2
3 Mumbai
4 Chennai
Solution: Use AggregrateByKey and find the duplicate records in each partition and in the 2nd function of aggregratebykey identify the duplicate records across partitions.

2. A function to which configuration to be passed without modifying the source code.
Solution: Below are some options:
1. Pass a configuration flag using an external config file like JSON/XML.
2. Broadcast Variables.

3. Initially addition is performed by reading the + operation from external source. Now with out modifying the Kafka-sparkstreaming job now - operation to be made. + is already cached. How
Solution: Send a message in kafka to change the operation from + to -. Sparkstreaming reads the message and applies to - from then.

4. What is the use of versioning in HBASE AND WHY THE data type is byte array
Sol:
1. Versioning to keep track of historical data. To have track of recent updates to that column.
2. We implemented for a use case where there could be multiple values for 'status' column in Hive table(Not more than 5 Statuses). Its an array data type in Hive. In order to transform this same table data to HBase, we have taken the advantage of versioning. If versioning is not there then we would have created another table with a uniqueue keay and all the 5 status values corresponding to it.

Why the datatype is Bytearray in HBase:

It allows us to store any kind of data without much fuss. For example, imagine you have to store a product related data into your hbase table, say ID, make, country, price etc. To store each of these parameters you would have to take care of the individual datatypes of each of these parameters in advance which will definitely add some overhead. And unlike RDBMSs, hbase doesn't ask for all this at the time of table creation. So, even if datatypes of these parameters change tomorrow or you decide to add some parameters(with some new datatype), all you have to do is wrap the value in Bytes.ToBytes() and push it into your table. No change is required at database end. All this makes insertions faster.

Also, sometimes storing a value in a serialized byte[] form saves a few bytes as compared to storing the same values in their native format. And this minor saving becomes quite significant when you deal with BigData.


Long story short, Hbase does this to make things faster and to make storage more efficient, keeping the overhead of internal data structures to a minimum..

5 Vs of Bigdata

Volume
Velocity - Speed
Variety - Different forms of sources
Veracity refers to the messiness or trustworthiness of the data. With many forms of big data, quality and accuracy are less controllable (just think of Twitter posts with hash tags, abbreviations, typos and colloquial speech as well as the reliability and accuracy of content) but big data and analytics technology now allows us to work with these type of data.
Value- having access to big data but unless we can turn it into value it is useless.


Some Interview Questions:

Hadoop

                - What are part-m-* and part-r-* file in hadoop
part-m-* for mapper generated output and part-r-* for reducer generated output
If there are no Reducers in your job - then one part-m-* file for one Mapper. There is one Mapper for one InputSplit (usually - unless you use custom InputFormat implementation, there is one InputSplit for one HDFS block of your input data).

                - While coping 130MB file to HDFS, copy fails at 120 MB , say 128 MB block size .. How much data I can read? X
No data would be copied in the destination(Assumption). Copying happens writing to a temporary directory and later it would be renamed.
If not completely succeeded then the temporary file will be removed resulting in not copying even a byte of data.

Hive

                - File size as less as 10 MB - Hive Query still spawning 10/more mappers - > X
Input split size is less than 1 MB, This Input split size is responsible for the number of mappers. 
Each inputsplit is assigned to a mapper, default is block size, if reduced the number of Mappers will increase.
SET mapreduce.input.fileinputformat.split.maxsize;

                - Hive bucketing - bucketing on 4 records - still wants 8 buckets - how data is distributed ? X

                - Spark partition vs Hive partition
Spark Partition - RDD or DataFrame partitioned in the cluster
Hive partition 

                - Zookeeper and nimbus - what is it ? X
Apache Storm has two type of nodes, Nimbus (master node) and Supervisor (worker node). Nimbus is the central component of Apache Storm.
Nimbus is stateless, so it depends on ZooKeeper to monitor the working node status.
                - Reading first 5 rows from hive, do it loads all data from file ?

               No

Spark

                - Having data size as 10 GB, how to decide number of executors and memory given to executors.
Having data size as 10 GB, how to decide number of executors and memory given to executors.

Lets consider that there are 10 files with 1 GB each.

Case 1: Just read 10 GB of data in these 10 files and write to a new location.

can give 2 Executors , 5 cores and executor memory = 15 GB. This gives each core 3 GB to read and write. - Best case

If there is memory constraint then specify executor memory as 5 GB, in this case each core will adjust it's allocated 1GB between Read and write operations by doing spill.


Case 2: Read 10 GB of data in 10 files, Join with another table of same size and write to a new location.

Better to specify 1 Executor, 10 cores and executor memory = 40 GB. The reason for specifying 1 executor is to prevent data shuffle between executors as we are applying Join. Shuffle within partitions is faster than executors. As we are joining so the executor memory has been increased to 40 GB.


                - Having spark job running in 10 MB of data, I see lots of partitions (partition files) in my output , what is wrong or and why so ?
df.write partitionedby column has data in such a way that it created too many partitions.
Kafka

                - How do we use offset in kafka ?
Messages in Kafka can also be read by specifying the offset using seek()

                - What is max message size in Kafka ? X
Default is 1 MB, but this can be increased by setting max.message.size and few other parameters
                - Spout and Streams in Kafka
Spout is realted to Storm I gusss, and not completely sure.
               

Spark streaming  -

                - What is ssc - spark streaming context ?

                - What is ssc.awaitTermination(), why do we use it ?
awaitTermination - It internally uses some condition variable which keeps a check on whether stop() was invoked explicitly in code or the application terminated (Ctrl+C).

                - what is sliding window ?

Scala

                - Currying - example - fold() or foldbykey()

                - Option-Some-None
Bucketing in Spark
Same as Hive and the function used is bucketBy(4, "id")

Hbase vs Cassandra

                - Difference between HBase and Cassandra

                - What bucketing strategy Cassandra uses

                - Hbase minor and major compaction - how frequently it is done?

                - Hbase give me frequently access data from memory, what if that memory gets full?

                - What is flushing in Hbase?
memstore to HFiles

                - Will HBASE take longer time when accessed via non-key column value.
Yes, it will take longer time when accessed via non-key column value , because it scans the entire table.
If you use the index in your query, you read only required records, else you scan entire table.
If you are not using a filter against rowkey column in your query, your rowkey design may be wrong. 
The row key should be designed to contain the information you need to find specific subsets of data.


Easy

MR Programming Paradigm

What is MR Paradigm? Why it is better than existing/legacy processing frameworks?

Data Locality, Partitioner, Combiner (When not to use), Identity mapper and Identity reducer

Distributed Cache, Hadoop modes (Standalone, Pseudo-Distributed(single-node), Distributed(multi-node))

HDFS

Namenode and datanodes - significance,  Editlog and FSImage
Secondary Namenode reads HDFS state from an image file, fsimage, and then applies edits from the edits log file

splits vs blocks, SAFEMODE, Replication stratergy,

SAFEMODE
 During start up, NameNode loads the filesystem state from fsimage and edits log file. It then waits for data nodes to report their blocks so that it does not prematurely start replicating the blocks though, enough replicas already exist in the cluster.

During this time, NameNode stays in safe mode. A safe mode for NameNode is essentially a read-only mode for the HDFS cluster, it does not allow any modifications to file system or blocks.

YARN

Why YARN ? Can we run other applications over YARN other than MapReduce?

YARN Components - (NodeManager,  ResourceManager, ApplicationManager/AppMaster, Container) and their significance

Hive

Different file formats supported by Hive(text, ORC, Parquet etc), their performance advantages wrt each other

Hive Metastore- why it is used

Hive Partitioning, Bucketing - when to use them , RDBMS vs Hive

NoSQL - HBase,Cassandra

What is NoSQL ?   CAP (Consistency Availability and Partition Tolerance) Theorem

Hbase ( Expected answer : Key-Value based column-oriented database, millions of columns * billions of rows)

When should we use HBase?, According to CAP theorem, what HBase /Cassandra obeys ?
Consistency, Availability and Partition Tolerance

Hbase vs Cassandra

Sqoop

Sqoop features -import, export, merge, incremental imports, file formats

Controlling  paralellism in sqoop import,
Increase mappers


Advance

MR Programming Paradigm

MR Job submission flow, diferent phases/stages of job  flow

Speculative job execution - advantages, disadvantages, failure handling

The slowness of the nodes could be due to hardware/software failure, network failure,executing non-local task or that node could be busy. In such cases Hadoop platform schedule redundant copies of slower tasks across other nodes in the cluster.This process is known as Speculative Execution in Hadoop
Disadvantage: After longtime the system understands the slowness of a node and shifts the processing to another node. Time lag is the disadvantage.

Factors to consider while writing Custom InputFormat, MR Job CHaining


HDFS

Why Secondary namenode -its significance, what exactly it does , ask to exaplin in detail

Reading  and writing to HDFS file with detailed steps, Checkpointing in HDFS and how it is done

 Checkpointing is a process of reconciling fsimage with edits to produce a new version of fsimage.
 fs.checkpoint.period controls how often this reconciliation will be triggered.  3600 means that every hour fsimage will be updated and edit log truncated. 

YARN

Schedulars in YARN (FIFO, Capacity, Fair)

Fair scheduling is a method of assigning resources to jobs such that all jobs get, on average, an equal share of resources over time. 
When there is a single job running, that job uses the entire cluster. When other jobs are submitted, tasks slots that free up are assigned 
to the new jobs, so that each job gets roughly the same amount of CPU time.

Capacity Scheduler, Allocates resources to pools, with FIFO scheduling within each pool

Hive

Factors should be considered while Partitionign and Buckecting hive tables

Managed (Internal) Tables and Extrenal Tables

Dynamic-Partitioning

NoSQL - Hbase,Cassandra

According to CAP theorem, what HBase /Cassandra obeys ? Why (Detailed explaination) ?
HBase - CP Compromises on Availability) , Cassandra - AP (Compromises on Consistency)

HFile, Commit Log (WAL) and Memstore - Significance, Major and Minor compactions

Minor compaction runs periodically and merges small files into larger files. 
Major compaction is a administrator activity that merges sfiles to a larger file, Overtime, when records are deleted, versions expired you need to perform all that cleanup. Major compaction helps cleanup these records.
write the data back out so that it is local to the region server hosting the region.

Catalog Tables (Meta tables), Co-Processors

Factors to consider while Designing HBase Row Key

Sqoop

how to handle database UPDATE/DELETE using sqoop

Deciding split column in sqoop import, sqoop to hive data types handling ) e.g. handling date/time data type

Mixin in Scala
A Mixin is a trait which are used to compose a class.

Eg:

abstract class A {
  val message: String
}

trait C extends A {
  def loudMessage = message.toUpperCase()
}
Here, C is a Mixin.
Serialization in spark


You don't need to make classes implement Serialization which are only accessed on driver .
need to serialize only the classes which are expected  by executors


- Dataframe Vs Dataset ?
provides the functionality of type-safe, object-oriented programming interface of the RDD API. Also, performance benefits of the Catalyst query optimizer.
Read & Write to RDBMS from spark
Read:
spark.read.format("jdbc")
Write:
df.write.jdbc

- Data Locality
Moving computation closer to the data rather than moving data to the computation.

- Block Caching in HDFS
Normally a datanode reads blocks from disk, but for frequently accessed files the blocks may be explicitly cached in the datanode’s memory, in an off-heap block cache.

- Failover and fencing in HDFS
The transition from the active namenode to the standby is managed by a new entity in the system called the failover controller.

The HA implementation goes to great lengths to ensure that the previously active namenode is prevented from doing any damage and causing corruption — a method known as fencing.

- Parallel Copying in HDFS
Hadoop comes with a useful program called distcp for copying data to and from Hadoop filesystems in parallel. distcp is implemented as a MapReduce job.. maps that run in parallel across the cluster. There are no reducers.

- Accumulators
shared variable which each task can access at same time

- Broad cast variable
it copies the the variable to all executors which is immutable

- How to stop a running Spark-streaming application

Pass a message in Kafka which indicates the streaming to stop. See how Ctrl + C and be triggered to awaitTermination.

- Union Dataframes
df.reduce(_+_)

- pattern matching in scala

- How to increase Hive Query performance

- tail recursion

- Use existing  sparksession to launch a new sparkcontext

- Structured Streaming

Spark can read streaming data and can have the return value as Dataframe. No Streaming context is needed for this.

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "persons").load()
df.printSchema

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  
val value_df = selectExpr("CAST(value AS STRING)")

- Spark-streaming Sliding Window 

As window slides over a source DStream, the source RDDs that fall within the window are combined. It also operated upon which produces spark RDDs of the windowed DStream. 

Basically, any Spark window operation requires specifying two parameters.

Window length – It defines the duration of the window (3 in the figure).
Sliding interval – It defines the interval at which the window operation is performed

Common Spark Window Operations
Window 
CountByWindow 
ReduceByWindow 
ReduceByKeyAndWindow

- When to use HBASE


https://techvidvan.com/tutorials/spark-streaming-window-operations/


Pattern matching with case class.

----- Hadoop -----
- What is defference between hadoop v1 and hadoop v2?
----- HBase -----
- https://mapr.com/blog/in-depth-look-hbase-architecture/
- What is flushing in Hbase
- What is a difference between major and minor compactions?
- Why do hbase need major compaction as we already have minor ?
- What is the role of Zookeeper in HBase?
----- HDFS  - MR -----
- What is HDFS block and and what split?
- Can we tune split size ?
- In HDFS architecture, you must be aware it sored file metadata in.
 
----- Hive ----
- Consider a scenario, I have 4 rows in my hive table. I want to bucket it into 8 backets on one fo the colums.
- how the data in buckets would be distributed?
- I have approx. 10 MB of data in size in my hive table. 
- If the data is 10 MB, it fits in one HDFS block, it should have spawned only one mapper ? 
- When I fire a query, I see 4-5 mappers being spawned. Why ?
- What is HiveServer ?
- What is dynamic-partitioning in Hive ?
- Consider I have 1 millions of records in a file and I have hive table over it. I want to read first 10 records using Hive. Does hive brings entire file/all records in-memory to get first file records?
####

---- Scala -----
- Val vs Var in Scala
- What is an Object in scala ?
- What Case classes are and where it should be used?
- What is currying? Example ?
- Are you aware of Mixin ? 
If yes how it can be used in Scala ? example?
- What are mplicit conversions in Scala ?
- Are you aware of option-some-none caluse in Scala ? How it should be used?
----- Spark -----
- Disadvantages of spark ?
- What spark and scala version you have used ?
- Dataframe Vs Dataset ?
provides the functionality of type-safe, object-oriented programming interface of the RDD API. Also, performance benefits of the Catalyst query optimizer.
- Why RDDs are said to be fault tolerent?
- When should you use broadcast variable ? Any usecase/scenario you can tell us ?
- Can we read Accumulators value in executor code ? if no then why?
- Do you know what is Closure in Spark ?
- What is Paired RDD ? Can you tell use few transaformations and Actions which we can only be performed on paired RDD?
- How can you produce pairedRDD? any example/api ?
- What is a diference between reducedBy (reduce), foldByKey, combineByKey operaations on RDD?
- What is a Job, Task and Stage in spark runtime ?
# Performamnce Tuning Spark Application
- How can I control level of parallelism running Spark job
- You have data with size 10 GB, how would you estimate how many numbers of executors and memory per executor to specify? 
----- Spark Streaming -----
- What is DStream?
- How can we make sure data/streams of data shoudl not be lost in case of spark streaming application failure dudu to some reason ?
- What is a sliding interval in spark streaming, and window length?
- With spark streaming can we have a window based on number of input messeges/records instead of time interval?
- What operations/transformations we can do on spark streaming window?

9 comments:

  1. The most effective method to Solve MongoDB Map Reduce Memory Issue with Cognegic's MongoDB Technical Support
    Confronting MongoDB outline memory issue? Or then again some other specialized issue in regards to MongoDB? Try not to freeze, simply unwind and take a profound inhale in light of the fact that we at Cognegic give gives MongoDB Online Support or MongoDB Customer Support USA. Here we created powerful database administration applications to utilize the MongoDB. We have professionally experienced and devoted specialized group who constantly prepared to tackle your concern.
    For More Info: https://cognegicsystems.com/
    Contact Number: 1-800-450-8670
    Email Address- info@cognegicsystems.com
    Company’s Address- 507 Copper Square Drive Bethel Connecticut (USA) 06801

    ReplyDelete
  2. As the growth of AWS big data consultant , it is essential to spread knowledge in people. This meetup will work as a burst of awareness.

    ReplyDelete
  3. I get a lot of great information from this blog. Thank you for your sharing this informative blog. Just now I have completed Hadoop certification course at iClass Gyansetu .
    Bigdata Hadoop training

    ReplyDelete