Wednesday, July 26, 2017

Hive JDBC Connection

package org.tek.Java.Javaprojects;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class hiveJdbcClient {

private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) throws SQLException {
// TODO Auto-generated method stub
// TODO Auto-generated method stub
try {
Class.forName(driverName);

} catch (Exception e) {
// TODO: handle exception
     e.printStackTrace();
     System.exit(1);
}

String hiveConn_Str = args[0].toString();
String ssh_User = args[1].toString();
String ssh_Pwd = args[2].toString();
// Connection con = DriverManager.getConnection("jdbc:hive2://10.188.193.152:10000/leela", "hadoop", "W3b!6xfMd");
Connection con = DriverManager.getConnection(hiveConn_Str, ssh_User, ssh_Pwd);
System.out.println("!!!!!!!!!!!!!!!!Connected!!!!!!!!!!!!");
Statement stmt = con.createStatement();

String sql = "show databases";
   System.out.println("Running: " + sql);
   ResultSet res = stmt.executeQuery(sql);
   while (res.next()) {
     System.out.println(res.getString(1));
   }
}

}

Monday, July 24, 2017

pyspark

Few points:

1. pyspark will take input only from HDFS and not from local file system.

PySpark Notes

To install Packages

Python Installation
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

Packages Installation
pip3 install <PACKAGENAME>
pip3 install requests --user
pip3 install pandas


Requests Example:
requests package is used while working with rest API's.

import requests
payload= f"grant_type=client_credentials&client_id={client_id_value}&client_secret={client_secret_value}&undefined="
headers={
'content-Type': ""
'Accept':"application/json;v=1"
}

response= requests.request("POST",url,data=payload, header,verify=false)

Pandas Examples:

import pandas as pd
pdf = pd.Dataframe(df_list)
pdf.to_csv(PATH, sep='\u0001', index=False, header = False)

SnowFlake connection:

import snowflake.connector

sfconnec = snowflake.connector.connect(user,
pwd,
account,
warehouse,
dbname,
schema,
role)
sfJdbcConnection = sfconnec.cursor

sqldata = sfJdbcConnection.execute("SELECT * FROM TBL1").fetchall
tableschema = ['Name', 'Address', 'Id']
panda_df = DataFrame(sqldata, columns=tableschema)   #Converts to Pandas DF


Transpose/Pivot  using Pandas:

The advantage of using Pandas is that it has numerous libraries and is powerful. one such example is Transpose operation where columns are to be converted as Rows. 
For this we do not need to write any complex logic. In pandas 'melt' is the function that does this job for used
Eg:
import pandas as pd
tablecols = ['Name', 'Address', 'Id']
pd.melt(csv_df, tablecols)

Later rename the columns

source: https://www.journaldev.com/33398/pandas-melt-unmelt-pivot-function


Steps for AWS Lambda creation:

1. Create Function
2. Add Tags.
3. Add VPS Settings
4. Create Layer
5. Start writing function

To create a layer

pip3 install -U pandas -t ./

Now, Pandas library will be saved in the current directory, zip it and upload as layer to the Lambdas function.

Python Iteration through list

numbers = (1,2,3,4)

def addition_op(n):
return n+ n

out_num = map(addition_op, numbers)

out_nums2 = map(lambda x: x + x, numbers)


Parallelism in Python( Alternate to Futures in Scala)

use ThreadPoolExecutor

Eg:
from concurrent.futures ThreadPoolExecutor, ProcessPoolExecutor


with ThreadPoolExecutor(max_workers = 4) as exe:
    exec.submit(get_Tbl1_data)
    exec.submit(get_Tbl2_data)
    exec.submit(get_Tbl3_data)
    exec.submit(get_Tbl4_data)
    exec.submit(get_Tbl5_data)

def get_Tbl1_data:
    run_snow(conn, Tbl1_sql)

Snowflake Connection:
Used sqlalchemy for connecting to Snowflake from Python.

eg:
from sqlalchemy import create_engine

create_engine(URL(account="hostname", user='user', password='password',warehouse='warehousename',database='databasename',schema='schemaname'))

Install multiple packages in Python

pip3 install --upgrade -r /path/requirements.txt --user

requirements.txt
pandas==1.1.2
snowflake-sqlalchemy==1.2.3

Friday, July 21, 2017

Hadoop cluster creation in Microsoft Azure


Step 1:
In market place -> Data + Analytics -> HDInsight. In HDInsight default option is Quick Create, change it to Custom(size,settings,apps)




Step 2: Providing details and selecting configuration

Cluster name: Provide a unique name. eg: hadoophortonWks-2

cluster type: Hadoop

cluster login username: this is ambari username eg: admin
cluster password: W3b!6xfMdhaad

we can create one SSH user, Eg: hadoop_1 in this page and later after the cluster creation users can be added via Ambari.

click Next.

Step 3: 
Create a storage account if not exists or choose any one if exists.

Step 4: Choosing data nodes and Head node. In "Number of Worker nodes" we can specify the data nodes count.



Click Next -> Next for Advance settings and complete the cluster creation. This would take about 10 minutes.


Wednesday, July 19, 2017

HIVE Functions

show functions;

DESCRIBE FUNCTION <function_name>;
describe function month         or            describe function extended month;

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inOperators

Eg:
SELECT round(cgp) from student_record where student_id=15;


Date and time functions:


set hive.cli.print.header=true;

select *, from_unixtime(CAST(tmp_time.time/1000 as BIGINT), 'yyyy-MM-dd') as created_timestamp from tmp_time;
OK
tmp_time.time tmp_time.name created_timestamp


1406185200000 Leela 2014-07-24

select unix_timestamp('2018-06-04 11:06:38', 'yyyy-MM-dd HH:mm:ss') as s from tmp_time LIMIT 1;

1528110398

Note: This conversion can be done only up-till seconds and milliseconds conversion is not possible as unix functions only support seconds. Refer https://stackoverflow.com/questions/31701847/hives-unix-timestamp-and-from-unixtime-functions 

describe country_times;


select month(country_times.time) from country_times;
select day(country_times.time) as Day from country_times;

country_times.time      country_times.country   country_times.continent
12/14/2016 0:00 India   Asia
7/26/2016 0:00  Pak     Asia
10/18/2016 0:00 Jamica  South America
7/15/2016 0:00  USA     North America
8/23/2016 0:00  France  Europe
2009-07-30 12:58:59     Sri Lanka       Asia

hive> select * from time_tmp;;
OK
1 2014-08-01 09:45:22
Time taken: 0.101 seconds, Fetched: 1 row(s)
hive> select *,from_unixtime(unix_timestamp(time_tmp.tme ,'yyyy-mm-dd'), 'dd-mm-yyyy hh:mm:ss') from time_tmp;
OK
1 2014-08-01 09:45:22 01-08-2014 12:08:00
Time taken: 0.085 seconds, Fetched: 1 row(s)


select from_unixtime(unix_timestamp(country_times.time ,'MM/dd/yyyy'), 'yyyy-MM-dd') from country_times;

select from_unixtime(unix_timestamp(country_times.time ,'MM/dd/yyyy'), 'yyyy-MMM-dd') from country_times;

select from_unixtime(unix_timestamp(country_times.time ,'MM/dd/yyyy'), 'dd-MMM-yyyy') from country_times;

select from_unixtime(unix_timestamp(country_times.time ,'MM/dd/yyyy'), 'DD-MMM-yyyy') from country_times;

select from_unixtime(unix_timestamp(country_times.time ,'MM/dd/yyyy'), 'EEE, MMM d, ''yy') from country_times;



Custom Functions:

UDF - applied on a single row eg: day()
UDAF - User defined Aggregrate Functions eg: Min() - Applied to set of rows

UDTF - User defined Transactional functions - transform a single input row to multiple output rows - Eg: json_tuple()


JSON file parsing


{"country":"US","page":227,"data":{"ad":{"impressions":{"s":10,"o":10}}}}

CREATE TABLE hive_parsing_json_table ( json string );

LOAD DATA LOCAL INPATH  '/tmp/hive-parsing-json.json' INTO TABLE hive_parsing_json_table;

LATERAL VIEW - forms a virtual table having the supplied table alias

select v1.Country, v1.Page, v4.impressions_s, v4.impressions_o 
from hive_parsing_json_table hpjp
     LATERAL VIEW json_tuple(hpjp.json, 'country', 'page', 'data') v1
     as Country, Page, data
     LATERAL VIEW json_tuple(v1.data, 'ad') v2
     as Ad
     LATERAL VIEW json_tuple(v2.Ad, 'impressions') v3
     as Impressions
     LATERAL VIEW json_tuple(v3.Impressions, 's' , 'o') v4
     as impressions_s,impressions_o; 

To flatten the multiline json to a single line JSON


CREATE EXTERNAL TABLE StudentsRaw (textcol string) STORED AS TEXTFILE LOCATION '/user/hadoop/Leela/Json';


CREATE TABLE StudentsOneLine(json_body string);

INSERT OVERWRITE TABLE StudentsOneLine
    SELECT CONCAT_WS(' ',COLLECT_LIST(textcol)) AS singlelineJSON
           FROM (SELECT INPUT__FILE__NAME,BLOCK__OFFSET__INSIDE__FILE, textcol FROM StudentsRaw DISTRIBUTE BY INPUT__FILE__NAME SORT BY BLOCK__OFFSET__INSIDE__FILE) x
           GROUP BY INPUT__FILE__NAME;     //This statement does the flattening job

Generate the create statement for an existing hive table


SHOW CREATE TABLE shows the CREATE TABLEstatement that creates a given table.

Eg: SHOW CREATE TABLE myTable;

COALESCE function:

Of the fields above (field1, field2, … , fieldn), for each record returned it returns the value of the first field of the ones listed in the COALESCE function that is not NULL. If all of the fields are NULL, it returns NULL.


For example, let’s say one had 3 date fields, datefield1, datefield2, and datefield3from the table tblDates.
TBLDATES
PRIMARY_KEYDATEFIELD1DATEFIELD2DATEFIELD3
1NULLNULL1993-06-04
The code:
SELECT COALESCE(datefield1, datefield2, datefield3) as first_date_found
FROM
tblDates
WHERE
primary_key = 1

will return ‘1993-06-04’


Difference between COALESCE and NVL functions:

NVL

Syntax - NVL(arg1, arg2)

This will replace arg1 with arg2 if arg1 value is NULL

Example -

NVL(value, default value)  
Returns default value if value is null else returns value 

COALESCE

Syntax - coalesce(value1, value 2, …value n)

This will return the first value that is not NULL, or NULL if all values's are NULL

Example

coalesce(email,phonenumber,address)  
 
If customer primary contact medium is email, if email is null then use phonenumber, and if phonenumber is also null then use address 
So now to understand the context NVL takes only two arguments while coalesce gives more flexibility to use different fields in an order of preference when one is NULL.

Create Sequence numbering for rows for a Hive table / Primary key / unique key creation in Hive.


add jar hdfs:////user/HiveHbase/hive-contrib-1.1.0.jar;

CREATE TEMPORARY FUNCTION row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

CREATE TABLE IF NOT EXISTS tbl1
(
ID int,
name  String
) row format delimited fields terminated by ' ' stored as textfile;

CREATE TABLE IF NOT EXISTS tbl2
(
RowID BigInt,
ID int,
name  String
) row format delimited fields terminated by ' ' stored as textfile;

INSERT INTO TABLE tbl1 values(56,"Name1");
INSERT INTO TABLE tbl1 values(57,"Name2");
INSERT INTO TABLE tbl1 values(58,"Name3");
INSERT INTO TABLE tbl1 values(59,"Name4");

INSERT OVERWRITE TABLE tbl2 SELECT row_sequence(), * FROM tbl1;

INSERT INTO TABLE tbl1 values(60,"Name5");
INSERT INTO TABLE tbl1 values(61,"Name6");

INSERT INTO TABLE tbl2 SELECT m.max+row_sequence(), * FROM tbl1 where ID > 59 ;

insert into table tbl2
select m.max+row_sequence() as inc , t.*
from (select * from tbl1 where ID > 59) t
join
(select max(RowID) as max from tbl2) m;


Source: http://kiran-javaworld.blogspot.in/2016/11/hive-auto-increment-column-incrementing.html

Alternate ways are: 
1. Use the reflect UDF to generate UUIDs.
eg: reflect("java.util.UUID", "randomUUID")

2. USING row_number(). This is Hive's inbuild function. This can be used as like row_sequence(), however max() is not available for this. so use 
select row_number() over() as colvals from tbl2 order by colvals desc LIMIT 1;


insert into table tbl2
select m.colvals +row_sequence() as inc , t.*
from (select * from tbl1 where ID > 65) t
join
(select row_number() over() as colvals from tbl2 order by colvals desc LIMIT 1) m;

Few queries of this.
Insert INTO TABLE tbl2 select row_number() over(), * from tbl1 WHERE ID > 63;

select row_number() over() as colvals from tbl1 order by colvals desc LIMIT 1;

Insert INTO TABLE tbl2 select 9+row_number() over(), * from tbl1 WHERE ID > 64;

Row_Number() and Rank() functions

ROW_NUMBER: TThis function will provide a unique number to each row in resultset based on the ORDER BY clause within the PARTITION. For example, if we want to assign row_number to each fname, which is also partitioned by IP address in the sales dataset, the query would be:

hive> select fname,ip,ROW_NUMBER() OVER (ORDER BY ip ) as rownum from sales;



RANK: It is similar to ROW_NUMBER, but the equal rows are ranked with the same number. For example, if we use RANK in the previous query instead of ROW_NUM

hive> select fname,ip,RANK() OVER (ORDER BY ip) as ranknum, RANK() OVER (PARTITION BY ip order by fname ) from sales ;





Source: https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781782161080/6/ch06lvl1sec75/analytics-functions-in-hive

Accessing data from complex data types in Hive:

Complex data types like Array, Map, Struct, Union are also known as collections in Hive.

{"requestId": null, "device": {"id": 112, "deviceId": "356211002789588, ""subscriberId": "5472789588, ""networkId": "501823551207399, ""make": "Default", "model": "DefaultClient"}, "job": {"id": 573055, "type": "SINGLE", "name": "Job--741417212", "primitive": "UpdateUnknownDeviceMakeAndModel"}, "startedOn": null, "queuedOn": 1518793352301, "endedOn": 1518793352320, "retries": 0, "status": "SUCCESS", "subStatus": "SUCCESS", "faultString": null, "response": [{"name": "[status]", "value": "SUCCESS"},{"name": "[status2]", "value": "SUCCESS2"},{"name": "[status3]", "value": "SUCCESS3"}]}

Fetching individual values from an array of strings in key value pairs:

Complex column types are:

`device`struct<id:bigint,deviceid:string,make:string,model:string> 
and 
`response` array<struct<name:string,value:string>>)

select abc_exttbl.device.id from abc_exttbl;
select abc_exttbl.response from abc_exttbl;

select abc_exttbl.response[0].name from abc_exttbl;

select abc_exttbl.response[0].value from abc_exttbl;

select abc_exttbl.response[1].value from abc_exttbl;

Get the lenght of elements in an array.
select size(abc_exttbl.response) from abc_exttbl;


Analytical functions in Hive

Analytics functions
  • RANK
  • ROW_NUMBER
  • DENSE_RANK
  • CUME_DIST
  • PERCENT_RANK
  • NTILE
Eg:
select pat_id, 
dept_id, 
ins_amt, 
row_number() over (order by ins_amt) as rn, 
rank() over (order by ins_amt ) as rk, 
dense_rank() over (order by ins_amt ) as dense_rk 
from patient;
OVER with standard aggregates:
  • COUNT
  • SUM
  • MIN
  • MAX
  • AVG
select pat_id, 
dept_id, 
count(*) over (partition by dept_id order by dept_id asc) as pat_cnt 
from patient;
Windowing functions
    LEAD
    LAG
    FIRST_VALUE
    LAST_VALUE
    Eg:
    select pat_id, 
    dept_id, 
    ins_amt, 
    lead(ins_amt,1,0) over (partition by dept_id order by dept_id asc ) as lead_ins_amt, 
    lag(ins_amt,1,0) over (partition by dept_id order by dept_id asc ) as lag_ins_amt 
    from patient;

    partition by works as creating a window for the specified column. In the above case a window set will be created for the same values of dept_id.

    Reference:https://dwgeek.com/hadoop-hive-analytic-functions-examples.html/

    Conditional Functions
    
    
    http://dwgeek.com/hadoop-hive-conditional-functions-if-case-coalesce-nvl-decode.html/
    
    
    
    Few scenarios on null:
    
    
    1. select c1 + c2 from tbl1;  //This adds NUMBERS in c1 and c2 columns
    
    
    
    if c2 = NULL then result is NULL
    
    
    
    2. CASE(NULL == NULL)
    return 'yes'
    else 'No'
    
    
    
    returns No as NULL == NULL  will not be equal to 1
    
    

    Tuesday, July 18, 2017

    JAVA and Scala Errors

    1. For maven build errors:

    In POM.xml add
     <build>
        <sourceDirectory>src</sourceDirectory>
        <resources>
          <resource>
            <directory>src</directory>
            <excludes>
              <exclude>**/*.java</exclude>
            </excludes>
          </resource>
        </resources>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
        </plugins>
      </build>

    2. For the error:

    Exception in thread "main" java.lang.UnsupportedClassVersionError: Sparkjobs/Wordcount : Unsupported major.minor version 52.0


    In Ecllipse,
    Window -> Preferences -> Scala ->Compiler -> target Lower to jvm-1.7

    3. In case of class not found exception for scala JAR,

    In Ecllipse,
    Window -> Preferences -> JAVA->Compiler -> compiler compliance level to jvm-1.7

    4. In case of,

    Unable to locate the Javac Compiler in:   C:\Program Files\Java\jre1.8.0_101\..\lib\tools.jar Please ensure you are using JDK 1.4 or above and not a JRE (the com.sun.tools.javac.Main class is required)

    See if JDK 1.8.0 is installed if not install it.

    Try updating the JDK Eclipse is using, as follows:
    Add and set the JRE in Window->Preferences...->Java->Installed JREs:
    JRE type: Standard VM JRE 
    Name: jdk1.6.0_18
    JRE home directory: C:\Program Files (x86)\Java\jdk1.6.0_18
    https://stackoverflow.com/questions/2222560/maven-build-failed-unable-to-locate-the-javac-compiler-in-jre-or-jdk-issue

    5. In spark streaming job, if encountered the below error:

    ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
    org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

    Then modify metadata.broker.list to "bootstrap.servers"

    in case of similar errors like "value.deserializer" or "key.deserializer" 

        val kafkaParams = Map("bootstrap.servers" -> brokers,
                            "key.deserializer" -> classOf[StringDeserializer],
                            "value.deserializer" -> classOf[StringDeserializer])

    6. To run a JAR:
    2 Options:
    1. Add main class in Manifest file
    2. Include Manifest in POM.xml
    https://stackoverflow.com/questions/9689793/cant-execute-jar-file-no-main-manifest-attribute

    Run using command:
    java -cp consumerTest-0.0.1-SNAPSHOT.jar consumerTest.KafkaConsumer

    7. To configure settings for compiling maven via command prompt:
    configure java for windows https://stackoverflow.com/questions/1672281/environment-variables-for-java-installation
    MVN clean install

    8. To include all the dependencies in the Jar file, follow 
    In case of error,
     Exception in thread "PollableSourceRunner-KafkaSource-kafka-source" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;

    This occurred because of missing dependencies on the machine JAR is being executed.
    Package all the Jars into the deployed jar.
    https://stackoverflow.com/questions/1729054/including-dependencies-in-a-jar-with-maven. This would PACKAGE all the dependant jars inside single JAR


    9. In case of any function call giving error because of mismatched input arguments then look for the appropriate imports.

    10. Java Decompiler can be installed in Ecllipse follow: http://jd.benow.ca/

    Monday, July 17, 2017

    Data Modelling

    ER Diagram is the first step to build datawarehouse model

    Entity Relation model

    Data Modelling: An archirects job. 


    Tables that makeup the warehouse are mentioned, the relationship between the tables is mentioned. the primary and foriegn keys are also mentioned.

    3 Types of data models:


    1. Conceptual Data model: 

    First step, High level overview of datawarehouse.
    Primarly for business consumers, very less technical details and not useful for developers.
    Just gives information about the related tables. No primary and foriegn key specification.
    2. Logical Data model; an extension to conception data model.
    Specifies the keys/attributes in the tables relation. 
    primary and foriegn keys are specified.
    columns involved in each table.

    3. Physical data model: Datatypes for each column, Not NULLS
    Views procedures and Indexes will also be specified.



    Types of datawarehouse designs:

    Agile Datawarehouse design

    Dimentional modelling

    Sunday, July 16, 2017

    Database Denormalization

    Denormalization:


    Constraints:Constraints in DBMSConstraints enforce limits to the data or type of data that can be inserted/updated/deleted from a table. The whole purpose of constraints is to maintain the data integrity during an update/delete/insert into a table

    Denormalization techniques:

    1. Materialized views

    2. Star schema:

    Fact and Dimension tables: https://www.youtube.com/watch?v=6k3nwXXpnMY

    Fact tables:
    Defined by related dimensions
    Resolves many to many relations
    Does not contain granular information.
    Provides Foreign keys to relate to Dimension tables to pull granular information.

    Dimension tables:
    Table containing actual business elements
    Filelds contain element descriptions
    Referenced by multiple fact tables


    Example: 

    3. Snowflake schema: Extension to Star schema - https://en.wikipedia.org/wiki/Snowflake_schema
    The snowflake schema is similar to the star schema. However, in the snowflake schema, dimensions are normalized into multiple related tables, whereas the star schema's dimensions are denormalized with each dimension represented by a single table. A complex snowflake shape emerges when the dimensions of a snowflake schema are elaborate, having multiple levels of relationships, and the child tables have multiple parent tables ("forks in the road").

    Data Normalization

    or simply normalization, is the process of organizing the columns (attributes) and tables (relations) of a relational database to reducedata redundancy and improve data integrity.

    Questions:

    1. Differences between views and materialized views?

    Materialized views are disk based and are updated periodically based upon the query definition.

    Views are virtual only and run the query definition each time they are accessed.

    A materialized view takes a different approach: the query result is cached as a concrete ("materialized") table (rather than a view as such) that may be updated from the original base tables from time to time. This enables much more efficient access, at the cost of extra storage and of some data being potentially out-of-date. Materialized views find use especially in data warehousing scenarios, where frequent queries of the actual base tables can be expensive.