Friday, June 9, 2017

Hive

To skip header while loading a file in HIVE.

Data:

RecordId,FirstName,LastName
1,"John","Doe"
2,"Jane","Doe"

Command:

[cloudera@quickstart ~]$ hadoop fs -mkdir /user/hive/warehouse/names
[cloudera@quickstart ~]$ hadoop fs -put file:///home/cloudera/Desktop/Spark/Hive/test.csv /user/hive/warehouse/names
Make sure that the file is in a folder and need to specify the folder name as external table location.

Hive>create external table names(RecordId string, FirstName string, LastName string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/names' tblproperties("skip.header.line.count" = "1");

Quotes in Data file: Consider a case where there is a data file that has data within ""

Data: 
1,"pc:61254","2017-03-10 17:41:05.091","1",,"1200105"
2,"pc:61255","2017-03-10 18:41:05.091","1",,"1200106"
3,"pc:61256","2017-03-10 19:41:05.091","1",,"1200107"
4,"pc:61257","2017-03-10 20:41:05.091","1",,"1200108"


CREATE EXTERNAL TABLE `test`(
  sno bigint,
  id string,
  dte timestamp,
  status string,
  val int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "quoteChar"="\"",
   'separatorChar'=",") LOCATION
  's3a://l1-emr-ed-raw/Test';

HCatalog

HCatalog opens up the hive metadata to other mapreduce tools. Every mapreduce tools has its own notion about HDFS data (example Pig sees the HDFS data as set of files, Hive sees it as tables). With having table based abstraction, HCatalog supported mapreduce tools do not need to care about where the data is stored, in which format and storage location (HBase or HDFS).

We do get the facility of WebHcat to submit jobs in an RESTful way if you configure webhcat along Hcatalog.

very basic example of how ho use HCATALOG.

LOADING HIVE table to PIG

I have a table in hive ,TABLE NAME is STUDENT which is stored in one of the HDFS location:
neethu 90 malini 90 sunitha 98 mrinal 56 ravi 90 joshua 8
Now suppose I want to load this table to pig for further transformation of data, In this scenario I can use HCATALOG:
When using table information from the Hive metastore with Pig, add the -useHCatalog option when invoking pig:
pig -useHCatalog
(you may want to export HCAT_HOME 'HCAT_HOME=/usr/lib/hive-hcatalog/')
Now loading this table to pig: A = LOAD 'student' USING org.apache.hcatalog.pig.HCatLoader();
Now you have loaded the table to pig.To check the schema , just do a DESCRIBE on the relation.
 DESCRIBE A

 LOADING PIG Data to HIVE Table

There are two approaches explained below with 'Employee' table example to store pig output into hive table. (Prerequisite is that hive table should be already created)

A =  LOAD 'EMPLOYEE.txt' USING PigStorage(',') AS(EMP_NUM:int,EMP_NAME:chararray,EMP_PHONE:int);
Approach 1: Using Hcatalog

// dump pig result to Hive using Hcatalog
store A into 'Empdb.employee' using org.apache.hive.hcatalog.pig.HCatStorer();
(or)

Approach 2: Using HDFS physical location

// dump pig result to external hive warehouse location
STORE A INTO 'hdfs://<<nmhost>>:<<port>>/user/hive/warehouse/Empdb/employee/' USING PigStorage(',')Write Custom UDF Function s in Hive

3 Types of UDF can be written

Source: https://www.linkedin.com/pulse/hive-functions-udfudaf-udtf-examples-gaurav-singh

https://dzone.com/articles/writing-custom-hive-udf-andudaf

UDF - 2 Types Simple and Complex

UDTF: User defined tabular function works on one row as input and returns multiple rows as output. So here the relation in one to many. e.g Hive built in EXPLODE() function. Now lets take an array column USER_IDS as ARRAY10,12,5,45> then SELECT EXPLODE(USER_IDS) as ID FROM T_USER. will give 10,12,5,45 as four different rows in output. UDTF can be used to split a column into multiple column as well which we will look in below example. Here alias "AS" clause is mandatory .

UDAF: User defined aggregate functions works on more than one row and gives single row as output. e.g Hive built in MAX() or COUNT() functions. here the relation is many to one. Lets say you have a table with students name, id and total marks, so here if I have 10 rows in the table and if I have to find student who got maximum number then our query need to check each 10 row to find the maximum but ultimately we get only one output which is the maximum. 

UDF: Simple API

The Simple API

Building a UDF with the simpler UDF API involves little more than writing a class with one function (evaluate).

1. Need to extend UDF class.
2. Override evaluate()
3.  In Hive ADD JAR(like Register in PIG)
4. Create TEMPORARY FUNCTION
5. Use it.


Here is an example:












class SimpleUDFExample extends UDF {
  
  public Text evaluate(Text input) {
    return new Text("Hello " + input.toString());
  }
}
 
hive> ADD JAR target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
hive> CREATE TEMPORARY FUNCTION helloworld as 'com.matthewrathbone.example.SimpleUDFExample';
hive> select helloworld(name) from people limit 1000;

In hive you can overload the method same as JAVA. But in UDF you have to use Hadoop Datatypes likes IntWritable, FloatWritable...
Please find below the code.
public class ToUpper extends UDF{

    public String evaluate(Text word) {
        String upperCase=word.toString();
        return upperCase;

    }

    public String evaluate(IntWritable word) {
        String upperCase="Error : Input type is Integer. Cannot convert to UpperCase";
        return upperCase;

    }

    public String evaluate(FloatWritable word) {
        String upperCase="Error : Input type is Float. Cannot convert to UpperCase";
        return upperCase;

    }

    public String evaluate(LongWritable word) {
        String upperCase="Error : Input type is Long. Cannot convert to UpperCase";
        return upperCase;

    }



}   

A use case of UDF:

For calculating insights from multiple column values by applying a formulae

Function signature:

public float evaluate(FloatWritable param_1,IntWritable param_2,...param_3) Apply calculation inside this function

Calling UDF:

select calculateRecommendation(param_1,param_4,param_6,param_7) from tr69data;

TO delete the Added JAR

DELETE JAR /home/cloudera/workspace/customudf1/target/customudf1-0.0.1-SNAPSHOT.jar;

Creating Parquet File in Hive

First create an External Table(temp)

hive> create External table studentdatatmp(name string, deptnum int, branch string, year int)
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY '\t'
    > LINES TERMINATED BY '\n'
    > LOCATION 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Student';
OK
Time taken: 0.244 seconds
hive> select * from studentdatatmp;

Create a table whose file encryption format is Parquet

hive> create table studentdata(name string, deptnum int)
    > PARTITIONED BY(branch string, year int)
    > STORED AS PARQUET
    > LOCATION 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Student_parquet';
INSERT OVERWRITE TABLE studentdata PARTITION(branch='cse', year=1) SELECT name, deptnum FROM studentdatatmp WHERE branch='cse' AND year=1;
 

 To append data to an existing table use,

INSERT OVERWRITE TABLE studentdata PARTITION(branch='cse', year=2) SELECT name, deptnum FROM studentdatatmp WHERE branch='cse' AND year=2;

Here OVERWRITE is the keyword that makes difference for new INsersion and appending.

INSERT INTO TABLE country_times_part PARTITION(time) SELECT country,continent,time FROM country_times;

Note: In case of Pure dynamic partition , the dynamic partition columns must be specified last among the columns in the SELECT statement and in the same order in which they appear in the PARTITION() clause. Follow https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions for more info


-> In older Version of HIVE even for 1 kb data select * will run MapReduce. In New version MapReduce will not get Executed even for 1 GB data.

 

What is Distributed BY clause?

Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer.

It ensures each of N reducers gets non-overlapping ranges of column, but doesn’t sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges

http://saurzcode.in/2015/01/hive-sort-vs-order-vs-distribute-vs-cluster/


Creating a new table as like an existing one

create table tabl_new as select * from tabl_old;
               
                   OR

create table tabl_new as select ip, name from tabl_old;


Inserting Data to Hive table:

We can insert new data into table by two ways.

  1. Load the data of a file into table using load command.
    LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename.
    
  2. You can insert new data into table by using select query.
    INSERT INTO table tablename1 select columnlist FROM secondtable;

Creating Views

As like in SQL,
Generate a query to retrieve the employee details who earn a salary of more than Rs 30000. We store the result in a view named emp_30000.


hive> CREATE VIEW emp_30000 AS
SELECT * FROM employee
WHERE salary>30000;

hive> DROP VIEW emp_30000;

Creating an Index

An Index is nothing but a pointer on a particular column of a table. Creating an index means creating a pointer on a particular column of a table
hive> CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';

2 Types of Indexes in Hive

  • Compact Indexing - Compact indexing stores the pair of indexed column’s value and its blockid.
  • Bitmap Indexing - Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Dropping an Index

The following query drops an index named index_salary:
hive> DROP INDEX index_salary ON employee;

Can we have different indexes for the same table?

 Yes! We can have any number of indexes for a particular table and any type of indexes as well.
Note: With different types (compact,bitmap) of indexes on the same columns, for the same table, the index which is created first is taken as the index for that table on the specified columns.
Indexing would create another table containing all the details of the table which you are indexed. So when you try to execute any query on an indexed table it will first query on the index_table based on the data in the index it will directly query on the original table. It is just like the index of any text book.

Creating Views

As like in SQL,
Generate a query to retrieve the employee details who earn a salary of more than Rs 30000. We store the result in a view named emp_30000.


hive> CREATE VIEW emp_30000 AS
SELECT * FROM employee
WHERE salary>30000;

hive> DROP VIEW emp_30000;

Creating an Index

An Index is nothing but a pointer on a particular column of a table. Creating an index means creating a pointer on a particular column of a table
hive> CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';

2 Types of Indexes in Hive

  • Compact Indexing - Compact indexing stores the pair of indexed column’s value and its blockid.
  • Bitmap Indexing - Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Dropping an Index

The following query drops an index named index_salary:
hive> DROP INDEX index_salary ON employee;

Can we have different indexes for the same table?

Yes! We can have any number of indexes for a particular table and any type of indexes as well.

Note: With different types (compact,bitmap) of indexes on the same columns, for the same table, the index which is created first is taken as the index for that table on the specified columns.

Indexing would create another table containing all the details of the table which you are indexed. So when you try to execute any query on an indexed table it will first query on the index_table based on the data in the index it will directly query on the original table. It is just like the index of any text book.

How to identify deleted records in SQL Server?

Take a case where data is being ingested from RDBMS to Hive in an incremental fashion. After insertion, some of the records were deleted in RDBMS. Now how to identify the deleted records in Hive which are deleted at source.

Steps:
1. Create a view or temp table in hive that gets snapshot of the records at source(RDBMS)
2. Use below query, here src is the snapshot of source and dest is the Hive table that has the entires of the deleted records.
select did from (select D.id AS did, S.id as sid from dest D LEFT JOIN src S ON D.id = S.id) sq where sid is NULL;

Updating Records in Hive.

CREATE TABLE base_table (id int,field1 STRING,modified_date timestamp);
insert into base_table values(1,"abcd", "2014-02-01 09:22:55");
insert into base_table values(2,"abcde", "2014-02-01 09:22:55");
insert into base_table values(3,"zxvh", "2014-02-01 09:22:55");

CREATE TABLE incremental_table (id int,field1 STRING,modified_date timestamp);
insert into base_table values(1,"abcdddddddd", "2017-02-01 09:22:55");
insert into base_table values(2,"abcdedddddd", "2017-02-01 09:22:55");

select t1.* from (select * from base_table UNION select * from incremental_table) t1;

1       abcd    2014-02-01 09:22:55
1       abcdddddddd     2017-02-01 09:22:55
2       abcde   2014-02-01 09:22:55
2       abcdedddddd     2017-02-01 09:22:55
3       zxvh    2014-02-01 09:22:55


select az.* from (
select * from base_table 
UNION 
select * from incremental_table) az
JOIN
(select id,MAX(t1.modified_date) md_date from (select * from base_table UNION select * from incremental_table) t1 group by t1.id) cz
on az.id=cz.id and az.modified_date=cz.md_date;

O/P:
1       abcdddddddd     2017-02-01 09:22:55
3       zxvh    2014-02-01 09:22:55

2       abcdedddddd     2017-02-01 09:22:55

2 comments:


  1. very informative blog and useful article thank you for sharing with us , keep postingBig data hadoop online Training Hyderabad

    ReplyDelete
  2. The article is so appealing. You should read this article before choosing the AWS big data consultant you want to learn.

    ReplyDelete