Wednesday, July 26, 2017

Hive JDBC Connection

package org.tek.Java.Javaprojects;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class hiveJdbcClient {

private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) throws SQLException {
// TODO Auto-generated method stub
// TODO Auto-generated method stub
try {
Class.forName(driverName);

} catch (Exception e) {
// TODO: handle exception
     e.printStackTrace();
     System.exit(1);
}

String hiveConn_Str = args[0].toString();
String ssh_User = args[1].toString();
String ssh_Pwd = args[2].toString();
// Connection con = DriverManager.getConnection("jdbc:hive2://10.188.193.152:10000/leela", "hadoop", "W3b!6xfMd");
Connection con = DriverManager.getConnection(hiveConn_Str, ssh_User, ssh_Pwd);
System.out.println("!!!!!!!!!!!!!!!!Connected!!!!!!!!!!!!");
Statement stmt = con.createStatement();

String sql = "show databases";
   System.out.println("Running: " + sql);
   ResultSet res = stmt.executeQuery(sql);
   while (res.next()) {
     System.out.println(res.getString(1));
   }
}

}

Monday, July 24, 2017

pyspark

Few points:

1. pyspark will take input only from HDFS and not from local file system.

PySpark Notes

To install Packages

Python Installation
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

Packages Installation
pip3 install <PACKAGENAME>
pip3 install requests --user
pip3 install pandas


Requests Example:
requests package is used while working with rest API's.

import requests
payload= f"grant_type=client_credentials&client_id={client_id_value}&client_secret={client_secret_value}&undefined="
headers={
'content-Type': ""
'Accept':"application/json;v=1"
}

response= requests.request("POST",url,data=payload, header,verify=false)

Pandas Examples:

import pandas as pd
pdf = pd.Dataframe(df_list)
pdf.to_csv(PATH, sep='\u0001', index=False, header = False)

SnowFlake connection:

import snowflake.connector

sfconnec = snowflake.connector.connect(user,
pwd,
account,
warehouse,
dbname,
schema,
role)
sfJdbcConnection = sfconnec.cursor

sqldata = sfJdbcConnection.execute("SELECT * FROM TBL1").fetchall
tableschema = ['Name', 'Address', 'Id']
panda_df = DataFrame(sqldata, columns=tableschema)   #Converts to Pandas DF


Transpose/Pivot  using Pandas:

The advantage of using Pandas is that it has numerous libraries and is powerful. one such example is Transpose operation where columns are to be converted as Rows. 
For this we do not need to write any complex logic. In pandas 'melt' is the function that does this job for used
Eg:
import pandas as pd
tablecols = ['Name', 'Address', 'Id']
pd.melt(csv_df, tablecols)

Later rename the columns

source: https://www.journaldev.com/33398/pandas-melt-unmelt-pivot-function


Steps for AWS Lambda creation:

1. Create Function
2. Add Tags.
3. Add VPS Settings
4. Create Layer
5. Start writing function

To create a layer

pip3 install -U pandas -t ./

Now, Pandas library will be saved in the current directory, zip it and upload as layer to the Lambdas function.

Python Iteration through list

numbers = (1,2,3,4)

def addition_op(n):
return n+ n

out_num = map(addition_op, numbers)

out_nums2 = map(lambda x: x + x, numbers)

Python alternative of map(x => x) like in Scala

df.select("text").show()
df.select("text").rdd.map(lambda x: str(x).upper()).collect()

+-----+ | text| +-----+ |hello| |world| +-----+
["ROW(TEXT='HELLO')", "ROW(TEXT='WORLD')"]

Example2:
df.show()

+-----+----------+ | text|upper_text| +-----+----------+ |hello| HELLO| |world| WORLD| +-----+----------+

dd = df.rdd.map(lambda x: (x.text, x.upper_text))

dd.collect()
[('hello', 'HELLO'), ('world', 'WORLD')]


Parallelism in Python( Alternate to Futures in Scala)

use ThreadPoolExecutor

Eg:
from concurrent.futures ThreadPoolExecutor, ProcessPoolExecutor


with ThreadPoolExecutor(max_workers = 4) as exe:
    exec.submit(get_Tbl1_data)
    exec.submit(get_Tbl2_data)
    exec.submit(get_Tbl3_data)
    exec.submit(get_Tbl4_data)
    exec.submit(get_Tbl5_data)

def get_Tbl1_data:
    run_snow(conn, Tbl1_sql)

Snowflake Connection:
Used sqlalchemy for connecting to Snowflake from Python.

eg:
from sqlalchemy import create_engine

create_engine(URL(account="hostname", user='user', password='password',warehouse='warehousename',database='databasename',schema='schemaname'))

Install multiple packages in Python

pip3 install --upgrade -r /path/requirements.txt --user

requirements.txt
pandas==1.1.2
snowflake-sqlalchemy==1.2.3

pyspark snowflake connectivity

Need to install the following:

  • Snowflake Connector for Spark
  • Snowflake JDBC driver

# Installing Snowflake Connector for Spark
pip install snowflake-connector-python
pip install snowflake-spark-connector


from pyspark.sql import SparkSession

# Initialize your Spark session
spark = SparkSession.builder \
    .appName("PySpark with Snowflake") \
    .config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.10.0,net.snowflake:snowflake-jdbc:3.13.1") \
    .getOrCreate()

# Snowflake options
sfOptions = {
    "sfURL": "<your_snowflake_account_url>",
    "sfUser": "<your_username>",
    "sfPassword": "<your_password>",
    "sfDatabase": "<your_database>",
    "sfSchema": "<your_schema>",
    "sfWarehouse": "<your_warehouse>",
    "sfRole": "<your_role>"
}

# Read from Snowflake
df = spark.read \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "<your_table_name>") \
    .load()

# Show the DataFrame
df.show()


Pass --jars to pyspark

Jars can be passed to the Pyspark code using --jars and use it in the .py file using import, the same way as like scala.


To pass the --jars option when running a PySpark job, you can use it to include external libraries (such as .jar files) that your Spark application needs at runtime. This is commonly used to include JDBC drivers, connectors (e.g., Snowflake, Kafka, etc.), or custom jars.

pyspark --jars /path/to/spark-snowflake_2.12-2.10.0.jar,/path/to/snowflake-jdbc-3.13.1.jar

If you're running a PySpark job using spark-submit, you can include the --jars option the same way:

spark-submit \
  --master <your-cluster-master-url> \
  --deploy-mode <deploy-mode> \
  --jars /path/to/spark-snowflake_2.12-2.10.0.jar,/path/to/snowflake-jdbc-3.13.1.jar \
  your_pyspark_script.py

Common Use Cases:

  1. Including JDBC Drivers: When you need to connect Spark to databases like Snowflake, MySQL, or Postgres.
  2. Custom UDF Libraries: For adding custom transformations or functions.
  3. Spark Connectors: Like Kafka, ElasticSearch, or Snowflake connectors.

Note: In PySpark, the --files option allows you to pass external files to be distributed across the cluster and accessible to the tasks running on worker nodes. This can include configuration files, data files, or even compressed archives like .zip files.

pyspark --files /path/to/myfiles.zip my_pyspark_script.py

import zipfile
import os
from pyspark import SparkFiles

# Get the path to the .zip file
zip_file_path = SparkFiles.get("myfiles.zip")

# Define the extraction directory (you can define any directory name)
extraction_dir = "/tmp/extracted_files"

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extraction_dir)

# Now you can access files in the extraction_dir
for root, dirs, files in os.walk(extraction_dir):
    for file in files:
        print(f"File: {file}")

# Continue your PySpark operations using the extracted files


zipWithIndex


Zips this RDD with its element indices.

rdd1 = sc.parallelize(['A','B','C','D']).zipWithIndex()
rdd1.collect()

[('A', 0), ('B', 1), ('C', 2), ('D', 3)]

Friday, July 21, 2017

Hadoop cluster creation in Microsoft Azure


Step 1:
In market place -> Data + Analytics -> HDInsight. In HDInsight default option is Quick Create, change it to Custom(size,settings,apps)




Step 2: Providing details and selecting configuration

Cluster name: Provide a unique name. eg: hadoophortonWks-2

cluster type: Hadoop

cluster login username: this is ambari username eg: admin
cluster password: W3b!6xfMdhaad

we can create one SSH user, Eg: hadoop_1 in this page and later after the cluster creation users can be added via Ambari.

click Next.

Step 3: 
Create a storage account if not exists or choose any one if exists.

Step 4: Choosing data nodes and Head node. In "Number of Worker nodes" we can specify the data nodes count.



Click Next -> Next for Advance settings and complete the cluster creation. This would take about 10 minutes.


Wednesday, July 19, 2017

HIVE Functions

show functions;

DESCRIBE FUNCTION <function_name>;
describe function month         or            describe function extended month;

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inOperators

Eg:
SELECT round(cgp) from student_record where student_id=15;


Date and time functions:


set hive.cli.print.header=true;

select  pay_date,date_format(to_date(pay_date), 'yyyy-MM') from department_avg_sal;

pay_date date_format(to_date(pay_date), yyyy-MM) 2017-03-31 2017-03 2017-02-28 2017-02 2017-03-31 2017-03 2017-02-28 2017-02

select *, from_unixtime(CAST(tmp_time.time/1000 as BIGINT), 'yyyy-MM-dd') as created_timestamp from tmp_time;
OK
tmp_time.time tmp_time.name created_timestamp


1406185200000 Leela 2014-07-24

select unix_timestamp('2018-06-04 11:06:38', 'yyyy-MM-dd HH:mm:ss') as s from tmp_time LIMIT 1;

1528110398



Custom Functions:

UDF - applied on a single row eg: day()
UDAF - User defined Aggregrate Functions eg: Min() - Applied to set of rows

UDTF - User defined Transactional functions - transform a single input row to multiple output rows - Eg: json_tuple()


JSON file parsing


{"country":"US","page":227,"data":{"ad":{"impressions":{"s":10,"o":10}}}}

CREATE TABLE hive_parsing_json_table ( json string );

LOAD DATA LOCAL INPATH  '/tmp/hive-parsing-json.json' INTO TABLE hive_parsing_json_table;

LATERAL VIEW - forms a virtual table having the supplied table alias

select v1.Country, v1.Page, v4.impressions_s, v4.impressions_o 
from hive_parsing_json_table hpjp
     LATERAL VIEW json_tuple(hpjp.json, 'country', 'page', 'data') v1
     as Country, Page, data
     LATERAL VIEW json_tuple(v1.data, 'ad') v2
     as Ad
     LATERAL VIEW json_tuple(v2.Ad, 'impressions') v3
     as Impressions
     LATERAL VIEW json_tuple(v3.Impressions, 's' , 'o') v4
     as impressions_s,impressions_o; 

To flatten the multiline json to a single line JSON


CREATE EXTERNAL TABLE StudentsRaw (textcol string) STORED AS TEXTFILE LOCATION '/user/hadoop/Leela/Json';


CREATE TABLE StudentsOneLine(json_body string);

INSERT OVERWRITE TABLE StudentsOneLine
    SELECT CONCAT_WS(' ',COLLECT_LIST(textcol)) AS singlelineJSON
           FROM (SELECT INPUT__FILE__NAME,BLOCK__OFFSET__INSIDE__FILE, textcol FROM StudentsRaw DISTRIBUTE BY INPUT__FILE__NAME SORT BY BLOCK__OFFSET__INSIDE__FILE) x
           GROUP BY INPUT__FILE__NAME;     //This statement does the flattening job

Generate the create statement for an existing hive table


SHOW CREATE TABLE shows the CREATE TABLEstatement that creates a given table.

Eg: SHOW CREATE TABLE myTable;

COALESCE function:

Of the fields above (field1, field2, … , fieldn), for each record returned it returns the value of the first field of the ones listed in the COALESCE function that is not NULL. If all of the fields are NULL, it returns NULL.


For example, let’s say one had 3 date fields, datefield1, datefield2, and datefield3from the table tblDates.
TBLDATES
PRIMARY_KEYDATEFIELD1DATEFIELD2DATEFIELD3
1NULLNULL1993-06-04
The code:
SELECT COALESCE(datefield1, datefield2, datefield3) as first_date_found
FROM
tblDates
WHERE
primary_key = 1

will return ‘1993-06-04’


Difference between COALESCE and NVL functions:

NVL

Syntax - NVL(arg1, arg2)

This will replace arg1 with arg2 if arg1 value is NULL

Example -

NVL(value, default value)  
Returns default value if value is null else returns value 

COALESCE

Syntax - coalesce(value1, value 2, …value n)

This will return the first value that is not NULL, or NULL if all values's are NULL

Example

coalesce(email,phonenumber,address)  
 
If customer primary contact medium is email, if email is null then use phonenumber, and if phonenumber is also null then use address 
So now to understand the context NVL takes only two arguments while coalesce gives more flexibility to use different fields in an order of preference when one is NULL.

Create Sequence numbering for rows for a Hive table / Primary key / unique key creation in Hive.


add jar hdfs:////user/HiveHbase/hive-contrib-1.1.0.jar;

CREATE TEMPORARY FUNCTION row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

CREATE TABLE IF NOT EXISTS tbl1
(
ID int,
name  String
) row format delimited fields terminated by ' ' stored as textfile;

CREATE TABLE IF NOT EXISTS tbl2
(
RowID BigInt,
ID int,
name  String
) row format delimited fields terminated by ' ' stored as textfile;

INSERT INTO TABLE tbl1 values(56,"Name1");
INSERT INTO TABLE tbl1 values(57,"Name2");
INSERT INTO TABLE tbl1 values(58,"Name3");
INSERT INTO TABLE tbl1 values(59,"Name4");

INSERT OVERWRITE TABLE tbl2 SELECT row_sequence(), * FROM tbl1;

INSERT INTO TABLE tbl1 values(60,"Name5");
INSERT INTO TABLE tbl1 values(61,"Name6");

INSERT INTO TABLE tbl2 SELECT m.max+row_sequence(), * FROM tbl1 where ID > 59 ;

insert into table tbl2
select m.max+row_sequence() as inc , t.*
from (select * from tbl1 where ID > 59) t
join
(select max(RowID) as max from tbl2) m;


Source: http://kiran-javaworld.blogspot.in/2016/11/hive-auto-increment-column-incrementing.html

Alternate ways are: 
1. Use the reflect UDF to generate UUIDs.
eg: reflect("java.util.UUID", "randomUUID")

2. USING row_number(). This is Hive's inbuild function. This can be used as like row_sequence(), however max() is not available for this. so use 
select row_number() over() as colvals from tbl2 order by colvals desc LIMIT 1;


insert into table tbl2
select m.colvals +row_sequence() as inc , t.*
from (select * from tbl1 where ID > 65) t
join
(select row_number() over() as colvals from tbl2 order by colvals desc LIMIT 1) m;

Few queries of this.
Insert INTO TABLE tbl2 select row_number() over(), * from tbl1 WHERE ID > 63;

select row_number() over() as colvals from tbl1 order by colvals desc LIMIT 1;

Insert INTO TABLE tbl2 select 9+row_number() over(), * from tbl1 WHERE ID > 64;

Row_Number() and Rank() functions

ROW_NUMBER: TThis function will provide a unique number to each row in resultset based on the ORDER BY clause within the PARTITION. For example, if we want to assign row_number to each fname, which is also partitioned by IP address in the sales dataset, the query would be:

hive> select fname,ip,ROW_NUMBER() OVER (ORDER BY ip ) as rownum from sales;



RANK: It is similar to ROW_NUMBER, but the equal rows are ranked with the same number. For example, if we use RANK in the previous query instead of ROW_NUM

hive> select fname,ip,RANK() OVER (ORDER BY ip) as ranknum, RANK() OVER (PARTITION BY ip order by fname ) from sales ;





Source: https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781782161080/6/ch06lvl1sec75/analytics-functions-in-hive

Accessing data from complex data types in Hive:

Complex data types like Array, Map, Struct, Union are also known as collections in Hive.

{"requestId": null, "device": {"id": 112, "deviceId": "356211002789588, ""subscriberId": "5472789588, ""networkId": "501823551207399, ""make": "Default", "model": "DefaultClient"}, "job": {"id": 573055, "type": "SINGLE", "name": "Job--741417212", "primitive": "UpdateUnknownDeviceMakeAndModel"}, "startedOn": null, "queuedOn": 1518793352301, "endedOn": 1518793352320, "retries": 0, "status": "SUCCESS", "subStatus": "SUCCESS", "faultString": null, "response": [{"name": "[status]", "value": "SUCCESS"},{"name": "[status2]", "value": "SUCCESS2"},{"name": "[status3]", "value": "SUCCESS3"}]}

Fetching individual values from an array of strings in key value pairs:

Complex column types are:

`device`struct<id:bigint,deviceid:string,make:string,model:string> 
and 
`response` array<struct<name:string,value:string>>)

select abc_exttbl.device.id from abc_exttbl;
select abc_exttbl.response from abc_exttbl;

select abc_exttbl.response[0].name from abc_exttbl;

select abc_exttbl.response[0].value from abc_exttbl;

select abc_exttbl.response[1].value from abc_exttbl;

Get the lenght of elements in an array.
select size(abc_exttbl.response) from abc_exttbl;


Analytical functions in Hive

Analytics functions
  • RANK
  • ROW_NUMBER
  • DENSE_RANK
  • CUME_DIST
  • PERCENT_RANK
  • NTILE
Eg:
select pat_id, 
dept_id, 
ins_amt, 
row_number() over (order by ins_amt) as rn, 
rank() over (order by ins_amt ) as rk, 
dense_rank() over (order by ins_amt ) as dense_rk 
from patient;
OVER with standard aggregates:
  • COUNT
  • SUM
  • MIN
  • MAX
  • AVG
select pat_id, 
dept_id, 
count(*) over (partition by dept_id order by dept_id asc) as pat_cnt 
from patient;
Windowing functions
    LEAD
    LAG
    FIRST_VALUE
    LAST_VALUE
    Eg:
    select pat_id, 
    dept_id, 
    ins_amt, 
    lead(ins_amt,1,0) over (partition by dept_id order by dept_id asc ) as lead_ins_amt, 
    lag(ins_amt,1,0) over (partition by dept_id order by dept_id asc ) as lag_ins_amt 
    from patient;

    partition by works as creating a window for the specified column. In the above case a window set will be created for the same values of dept_id.

    Reference:https://dwgeek.com/hadoop-hive-analytic-functions-examples.html/

    Conditional Functions
    
    
    http://dwgeek.com/hadoop-hive-conditional-functions-if-case-coalesce-nvl-decode.html/
    
    
    
    Few scenarios on null:
    
    
    1. select c1 + c2 from tbl1;  //This adds NUMBERS in c1 and c2 columns
    
    
    
    if c2 = NULL then result is NULL
    
    
    
    2. CASE(NULL == NULL)
    return 'yes'
    else 'No'
    
    
    
    returns No as NULL == NULL  will not be equal to 1
    
    

    Tuesday, July 18, 2017

    JAVA and Scala Errors

    1. For maven build errors:

    In POM.xml add
     <build>
        <sourceDirectory>src</sourceDirectory>
        <resources>
          <resource>
            <directory>src</directory>
            <excludes>
              <exclude>**/*.java</exclude>
            </excludes>
          </resource>
        </resources>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
        </plugins>
      </build>

    2. For the error:

    Exception in thread "main" java.lang.UnsupportedClassVersionError: Sparkjobs/Wordcount : Unsupported major.minor version 52.0


    In Ecllipse,
    Window -> Preferences -> Scala ->Compiler -> target Lower to jvm-1.7

    3. In case of class not found exception for scala JAR,

    In Ecllipse,
    Window -> Preferences -> JAVA->Compiler -> compiler compliance level to jvm-1.7

    4. In case of,

    Unable to locate the Javac Compiler in:   C:\Program Files\Java\jre1.8.0_101\..\lib\tools.jar Please ensure you are using JDK 1.4 or above and not a JRE (the com.sun.tools.javac.Main class is required)

    See if JDK 1.8.0 is installed if not install it.

    Try updating the JDK Eclipse is using, as follows:
    Add and set the JRE in Window->Preferences...->Java->Installed JREs:
    JRE type: Standard VM JRE 
    Name: jdk1.6.0_18
    JRE home directory: C:\Program Files (x86)\Java\jdk1.6.0_18
    https://stackoverflow.com/questions/2222560/maven-build-failed-unable-to-locate-the-javac-compiler-in-jre-or-jdk-issue

    5. In spark streaming job, if encountered the below error:

    ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
    org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

    Then modify metadata.broker.list to "bootstrap.servers"

    in case of similar errors like "value.deserializer" or "key.deserializer" 

        val kafkaParams = Map("bootstrap.servers" -> brokers,
                            "key.deserializer" -> classOf[StringDeserializer],
                            "value.deserializer" -> classOf[StringDeserializer])

    6. To run a JAR:
    2 Options:
    1. Add main class in Manifest file
    2. Include Manifest in POM.xml
    https://stackoverflow.com/questions/9689793/cant-execute-jar-file-no-main-manifest-attribute

    Run using command:
    java -cp consumerTest-0.0.1-SNAPSHOT.jar consumerTest.KafkaConsumer

    7. To configure settings for compiling maven via command prompt:
    configure java for windows https://stackoverflow.com/questions/1672281/environment-variables-for-java-installation
    MVN clean install

    8. To include all the dependencies in the Jar file, follow 
    In case of error,
     Exception in thread "PollableSourceRunner-KafkaSource-kafka-source" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;

    This occurred because of missing dependencies on the machine JAR is being executed.
    Package all the Jars into the deployed jar.
    https://stackoverflow.com/questions/1729054/including-dependencies-in-a-jar-with-maven. This would PACKAGE all the dependant jars inside single JAR


    9. In case of any function call giving error because of mismatched input arguments then look for the appropriate imports.

    10. Java Decompiler can be installed in Ecllipse follow: http://jd.benow.ca/