Friday, April 14, 2017

MapReduce

Few Points:

1. Serialization and De-serialization is needed to transfer data stream in the form of objects via network for reading and writing data.

2. Default input format - TextInputFormat
Default Mapper - Identity Mapper


Mapper Phase --- Sort & Shuffle   --- Reducer Phase

3. Deployment of Mapreduce Program.

a. Prepare Jar File.
b. Input and Output paths to be in HDFS
c. hadoop jar <JarfilePath> <main-Job Class> <HDFS: InPath> <HDFSOutPath>

4. In the mapreduce project import only mapreduce and not mapred.

5. Mapper is Mandatory and Reducer is Optional.

6. Reducer is for aggregrate operation.

7. Sort & Shuffle will not be called if reducer is not there.

8. Combiner is called mini-reducer.

9. By default number of mappers is 2 and number of reducers is 1.

10. To increase the reducer count use setNumReduceTasks(integer_numer);

11. Partitioner will divide the output in to different files.

12. If reducer is not there, then Sort & Shuffle  and Partitioner will not get called.

13. If reducer is present then both these will be called.

By default,

Mapper - Identity Mapper
Reducer - Identity Reducer
Combiner - No default
Partitioner - Hash Partitioner

14. Combiner is optional, so no default combiner.

15. Distributed Cache in mapreduce can be added by,
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
                                   job);

16. Dynamically How to increase the number of Reducers?
Search for -config option

17. Joins

2 Types: Mapside Join and Reduce Side Join

Few Points before the implementation of Reduce side Join.

If one table is big and the other is small use Distributed Cache and cache the smaller one - Mapside Join

Reduce side join in helpful in case when both the tables are big. Consider the data set in 2 Tables

a. Mapside Join 

Steps:
1. setNumReduceTasks(0); and no need to write Reducer class.
2. Add the small file to Distributed Cache by calling, DistributedCache.addCacheFile(new URI(args[1]), mapjoinjob.getConfiguration());
3. This cached file has to be read in Mapper and add in Hashmap object as key, value pair.
This has to happen only once for this whole class and not on every map call. So, override setup() in mapper class.
Sample code is below,

    HashMap<String, String> cust_data;
    protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
   
        Path[] cachedFiles = DistributedCache.getFileClassPaths(context.getConfiguration());
        for(Path Filename: cachedFiles){
            if(Filename.getName().toString().equals("cust_info"))
            {
                BufferedReader brReader = new BufferedReader(new FileReader(Filename.toString()));
                String line = brReader.readLine();
                while(line != null){
                    String[] values = line.split("\t");
                    cust_data.put(values[0], values[1]);
                    line = brReader.readLine();
                }              
            }
        }
    }
4.  Now in the map() use the hashmap object(cust_data in above code snippet) and create joined Key-Value pair as output.

https://www.youtube.com/watch?v=t32UQxLMkQ0

b. Reduce Side Join:

Cust_data
1001 leela
1002 Annapurna
1003 Karthik

Trans_data
57868 1001 5.9 Hyderabad
89899 1002 8.6 Vijayawada
89875 1002 2.6 Vizag

The output should be

leela 5.9 1
Annapurna 11.2 2

Steps:

1. Write 2 Mapper classes as CustMapper and TransMapper both Extends Mapper
2. CustMapper output should be 1002 as key and "cust Annapurna" as value. Append cust to identify as the Key value pair from Cust mapper in reducer phase.
3. TransMapper output should be 1002 as key and "trans 8.6" and "trans 2.6" as values. The key is single and value is Iterable(List).
4. After Sort & Shuffle, Reducer's input would be Key and Iterable List 1002 "cust Annapurna"
"trans 8.6"
"trans 2.6"
Note: Teh framework will guarantee that one List and value(s) set will got to a single reducer.
5. Reducer Logic will read each value and extract Name from 1st value and transaction count and amount spent from the remainig 2 values.

In the driver code:
Instead of setInputFormatClass(TextInputFormat.class); use MultipleInputs.

eg: MultipleInputs.addInputPath(job, new Path(args(0), TextInputFormat.class, CustMapper.class);  //Specifies Path of file, Format Type and MapperClass name
MultipleInputs.addInputPath(job, new Path(args(1), TextInputFormat.class, TransMapper.class);

Follow: https://www.youtube.com/watch?v=fR-Z2r8gx7I


No comments:

Post a Comment