Problem scenarios from Arun teaches u
http://arun-teaches-u-tech.blogspot.in/p/cca-175-prep-problem-scenario-1.html
Problem Scenario 1:
- Using sqoop, import orders table into hdfs to folders /user/cloudera/problem1/orders. File should be loaded as Avro File and use snappy compression
- Using sqoop, import order_items table into hdfs to folders /user/cloudera/problem1/order-items. Files should be loaded as avro file and use snappy compression
- Using Spark Scala load data at /user/cloudera/problem1/orders and /user/cloudera/problem1/orders-items items as dataframes.
import com.databricks.spark.avro._
val orders_df = sqlContext.read.avro("problem1/orders")
val order-items_df = sqlContext.read.avro("problem1/orders-items")
4. Expected Intermediate Result: Order_Date , Order_status, total_orders, total_amount. In plain english, please find total orders and total amount per status per day. The result should be sorted by order date in descending, order status in ascending and total amount in descending and total orders in ascending. Aggregation should be done using below methods. However, sorting can be done using a dataframe or RDD. Perform aggregation in each of the following ways
- a). Just by using Data Frames API - here order_date should be YYYY-MM-DD format
- b). Using Spark SQL - here order_date should be YYYY-MM-DD format
- c). By using combineByKey function on RDDS -- No need of formatting order_date or total_amount
Summary of this task :
a.Join the 2 dataframes and Group by operation on Order_Date , Order_status.
b. Aggregrate sum on total_orders and total_amount.
c. Display Order Date in YYYY-MM-DD format
d. Sort the result by order date in descending, order status in ascending and total amount in descending and total orders in ascending.- Note: Perform b-d taks using 1. Dataframes 2. SparlSQL 3. RDDS- No need of formatting order_date
Solution:
//Joining 2 DFs
val joinedorders_df = orders_df.join(order_items_df,orders_df("order_id") === order_items_df("order_item_id"))
//Group By and Sum of column values
val grouped_df = joinedorders_df.groupBy(from_unixtime(joinedorders_df("order_date").divide(1000), "yyyy-MM-dd"),joinedorders_df("order_status")).agg(sum(joinedorders_df("order_item_quantity")),sum(joinedorders_df("order_item_subtotal")))
grouped_df .show
// Giving new column names
val grouped_df = joinedorders_df.groupBy(from_unixtime(col("order_date").divide(1000), "yyyy-MM-dd").alias("Order Date"),col("order_status").alias("Order Status")).agg(countDistinct("order_id").alias("Total Orders"),sum(col("order_item_subtotal")).alias("Total Amount"))
//Sorting as order date in descending, order status in ascending and total amount in descending and total orders in ascending
joinedorders_df.groupBy(from_unixtime(col("order_date").divide(1000), "yyyy-MM-dd").alias("Order Date"),col("order_status").alias("Order Status")).agg(countDistinct("order_id").alias("Total Orders"),sum(col("order_item_subtotal")).alias("Total Amount")).orderBy(col("Order Date").desc,col("Order Status"),col("Total Amount"),col("Total Orders")).show
//Few miscellaneous functions:
orders_df.join(orderitems_df,orders_df("order_id") === orderitems_df("order_item_id")).filter(orders_df("order_date") === "1379660400000").show
orders_df.join(orderitems_df,orders_df("order_id") === orderitems_df("order_item_id")).filter(orders_df("order_date") === "1379660400000" && orders_df("order_status") == "").show
orders_df.join(orderitems_df,orders_df("order_id") === orderitems_df("order_item_id")).filter(orders_df("order_date") === "1379660400000").show
orders_df.join(orderitems_df,orders_df("order_id") === orderitems_df("order_item_id")).filter(orders_df("order_date") === "1379660400000" && orders_df("order_status") == "").show
Using HQL:
import com.databricks.spark.avro._val hq = new HiveContext(sc)
val orders_df = hq.read.avro("problem1/orders")
val order_items_df = hq.read.avro("problem1/orders-items")
orders_df.registerTempTable("Orders")
order_items_df.registerTempTable("OrderItems")
val res_df = hq.sql("select from_unixtime(CAST(Orders.Order_Date/1000 as BIGINT), 'yyyy-MM-dd') AS OrderDate, Orders.Order_status AS OrderStatus, count(order_id) AS TotalOrders,sum(order_item_subtotal) AS TotalAmount from Orders JOIN OrderItems ON Orders.order_id = OrderItems.order_item_id GROUP BY Orders.Order_Date,Orders.Order_status ORDER BY OrderDate Desc, OrderStatus, TotalAmount Desc, TotalOrders")
res_df: org.apache.spark.sql.DataFrame = [OrderDate: string, OrderStatus: string, TotalOrders: bigint, TotalAmount: double]
val res_df = hq.sql("select to_date(from_unixtime(CAST(Orders.Order_Date/1000 as BIGINT), 'yyyy-MM-dd')) AS OrderDate, Orders.Order_status AS OrderStatus, count(order_id) AS TotalOrders,sum(order_item_subtotal) AS TotalAmount from Orders JOIN OrderItems ON Orders.order_id = OrderItems.order_item_id GROUP BY Orders.Order_Date,Orders.Order_status ORDER BY OrderDate Desc, OrderStatus, TotalAmount Desc, TotalOrders")
res_df: org.apache.spark.sql.DataFrame = [OrderDate: date, OrderStatus: string, TotalOrders: bigint, TotalAmount: double]
Note: to_date() converts to date datatype
Using CombineByKey:
val newrdd = joinedorders_df.map(x => ((x(1).toString,x(3).toString), (x(8).toString.toFloat,x(0).toString.toInt)))type itemprice = (Float,Int)
val createtotalcombiner = (collec1: itemprice ) => {
(collec1._1,1)
}
val mergetotalcombiner = (collec1: itemprice, collec2: itemprice) => {
val (price1,item1) = collec1
val (price2,item2) = collec2
(price1+price2, item1 + 1)
}
val mergevals = (collec1: itemprice, collec2: itemprice) => {
val (price1,item1) = collec1
val (price2,item2) = collec2
(price1+price2, item1 + item2)
}
val combinedres = newrdd.combineByKey(createtotalcombiner,mergetotalcombiner,mergevals)
val res = combinedres.map(x => (x._1._1,x._1._2,x._2._1,x._2._2)).toDF.orderBy(desc("_1"),asc("_2"),desc("_3"),asc("_4"))
Problem Scenario 2:
- Using sqoop copy data available in mysql products table to folder /user/cloudera/products on hdfs as text file. columns should be delimited by pipe '|'
Sol: sqoop import --connect jdbc:mysql://localhost/retail_db --username root --password cloudera --table products --fields-terminated-by "|" --target-dir "/user/cloudera/products"
2. move all the files from /user/cloudera/products folder to /user/cloudera/problem2/products folder
Sol: hdfs dfs -mkdir /user/cloudera/problem2
hdfs dfs -mv /user/cloudera/products /user/cloudera/problem2/products
3. Change permissions of all the files under /user/cloudera/problem2/products such that owner has read,write and execute permissions, group has read and write permissions whereas others have just read and execute permissions
Sol: hdfs dfs -getfacl /user/cloudera/problem2/products
hdfs dfs -chmod -R 765 /user/cloudera/problem2/products
OR
hadoop fs -chmod 765 /user/cloudera/problem2/products/*
-R represents recursively apply for all the files in the directoty.
4. read data in /user/cloudera/problem2/products and do the following operations using a) dataframes api b) spark sql c) RDDs aggregateByKey method. Your solution should have three sets of steps. Sort the resultant dataset by category id
Sol:
val products_rdd = sc.textFile("/user/cloudera/problem2/products")
case class products_class(var product_id: Int, var product_category_id: Int, var product_name: String,var product_description: String,var product_price: Float,var product_image: String)
def products_class_fun(x: String) = {
val x_split = x.split("\\|").map(_.trim.toString) val pro = products_class(x_split(0).toInt,x_split(1).toInt,x_split(2),x_split(3),x_split(4).toFloat,x_split(5)) (pro)
}
OR
def products_class_fun(x: String): products_class = {
val x_split = x.split('|').map(_.trim.toString)
val pro = products_class(x_split(0).toInt,x_split(1).toInt,x_split(2),x_split(3),x_split(4).toFloat,x_split(5))
(pro)
}
val products_df_tmp = products_rdd.map( x => products_class_fun(x))
val products_df = products_df_tmp.toDF
Sql approach:
products_df.registerTempTable("products")
sqlContext.sql("select product_category_id, cast(MAX(product_price) as decimal(10,2)) as maximum_price, CAST(AVG(product_price) as decimal(10,2)) as Average_Price, cast(MIN(product_price) as decimal(10,2)) as minimum_price, count(*) as total_products from products where product_price < 100 GROUP BY product_category_id order by product_category_id desc").show
Dataframe approach:
products_df.filter(col("product_price") < 100).groupBy((col("product_category_id"))).agg(max("product_price").alias("Maximum price"), avg("product_price").alias("Average Price") , min("product_price").alias("Minumum Price"), count("product_price").alias("Total Products")).orderBy(col("product_category_id")).show
Problem 1:
Import retail_db.categories table to hdfs
1. Without specifying Directory name.
2. Specity directory name as "categories_target".
3. Copy to warehouse directory "categories_warehouse"
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories -m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --target-dir=categories_target -m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --warehouse-dir=categories_warehouse -m 1
Note: use --fields-terminated-by to change the default field delimiter from ',' to the desired one
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --target-dir=categories_target2 --fields-terminated-by='|' -m 1
Problem 2:
HDFS
hdfs dfs -help
hdfs dfs -mkdir -help
mkdir Employee
nano quicktechie.txt
merge 2 files in local
cat quicktechie.txt hadoopexam.txt > MergedEmployee.txt
echo "" >> file.txt //Adds a newline character at the end
Force replace by overriding existing Directory.
hdfs dfs -put -f /home/cloudera/Employee/newdir1 /user/cloudera
Merge Files in HDFS and save the resultant in Local filesystem
hdfs dfs -getmerge /user/cloudera/newdir1 /home/cloudera/Employee/newdir1/MergedEmployee.txt
Change Permissions.
hdfs dfs -chmod 664 /user/cloudera/newdir1/MergedEmployee.txt
Problem 5: Data Format file separated by "|".
Name|Location1,Location2...Location5|Sex,Age|FatherName:No_of_child
Example Record
Anupam|Delhi,Mumbai,Chennai|Male,45|Daulat:4
Write table creation script.
Solution:
create table FamilyHead(Name String, Location ARRAY<String>, sex_age STRUCT<sex: String, age: int>, fatherchild MAP<String, int>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
collection items TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
Problem 3:
Problem Scenario 3: You have been given MySQL DB with following details. — 1. Video URL.
http://cca175cloudera.training4exam.corni
user=retail_dba
password=cloudera Please provode feedback to
hadoopexam@gmail.com database=retail_db
table=retail_db.categories -_,
user=retail_dba jdbc URL = jdbc:mysql://quickstart:3306Iretail_db password=doudera
database=retail_db Please accomplish following activities. table=retail_db.categories
jdbc URL = jdbc:mysql://quickstart3306/retail_c
1. Import data from catagories table, where catagory=22 (Data should be stored in categories_subset) —
2. Import data from catagories table, where catagory>22 (Data should be stored in categories_subset_2) Use below link to get tips or
3. Import data from catagories table, where catagory between 1 and 22 (Data should be stored in categories_subset_3) updates for Real Exam
4. While importing catagories data change the delimiter to T (Data should be stored in categories_subset_6) httpliwww.hadoopexam.com/Cloudera_Certific
5. Importing data from catagories table and restrict the import to category_name,category_id columns only with delimiter as T
6. Add null values in the table using below SQL statement If you want to get deeper understanding
ALTER TABLE categories modify category_department_id int(11); of Apache Spark and Hadoop, Please consider
INSERT INTO categories values (60,NULL,IESTING'); Trainings from below links
7. Importing data from catagories table (In categories_subset_17 directory) using T delimiter and category_id between 1 and 61
and encode null values for both string and non string columns.
8. Import entire schema retail_db in a directory categories_subset_all_tables 2
Solution:
Problem 4:
Import the table categories to have managed table where category_id between 1 and 22
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id >= 1 AND category_id <= 22" --target-dir /user/cloudera/categories3 --fields-terminated-by "," --hive-import --create-hive-table --hive-table default.categories1 -m 1;
Problem 5:
1. List all the tables using sqoop command from retail_db
2. Write simple sqoop eval command to check whether you have permission to read database tables or not.
3. Import all the tables as avro files in luserlhivelwarehouselretail_cca174.db
Note: eval Evaluate a SQL statement and display the results
Problem 6:
Create Hive database named "Family" with below details>
Take care if this database already exists.
Comment: "This database will be used for collecting"
Data File location: '/hdfs/family'
Stored other Properties: 'Database creator'='Vedic', 'Database_Created_On'='2016-01-01'
Command to check if the database has been created with new properties.
Solution:
CREATE DATABASE IF NOT EXISTS Family COMMENT "This database will be used for collecting"
LOCATION '/hdfs/family'
WITH DBPROPERTIES ('Database creator'='Vedic', 'Database_Created_On'='2016-01-01');
DESCRIBE DATABASE EXTENDED FAMILY;
Problem 7:
Import departments table from retail_db using custom boundary query which imports departments between 1 and 7.
2 mappers.
Import only 2 columns department_id and department_name.
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --boundary-query "SELECT 1,7 FROM departments" --table departments --columns "department_id,department_name" --m 2
Problem 8:
From retail_db import joined result of orders and order_items table join on orders.order_id = order_items.order_item_order_id.
2 mappers
use order_id column for boundary conditions
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --split-by orders.order_id --query 'select orders.*,order_items.* from orders JOIN order_items on orders.order_id = order_items.order_item_order_id WHERE $CONDITIONS' --target-dir orders_joined3 --m 2
Problem 8:
A table categories already exists in default schema. Now, create another table named Employee100k in Family schema which has department ids less than 4.
Solution:
create table Family.Employee100k AS SELECT * FROM default.categories where category_department_id < 4;
Note: Family schema has to be created prior to creation, else it would lead to error.
Problem 9:
Import departments table with fields-terminated-by '|' --lines-terminated-by '\n'. Also Incremental append to insert new records.
Solution:
sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --fields-terminated-by '|' --lines-terminated-by '\n' --table departments --m 1;
sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --fields-terminated-by '|' --lines-terminated-by '\n' --incremental append --check-column department_id --last-value 8 --m 1;
Problem 10:
create database hadoopexam and create a table named departments.
Import data in existing hive table from retail_db.departments into hadoopexam.departments
Import data to non-existing table, which means while importing create hive table named hadoopexam.departments_new.
Solution:
create database hadoopexam;
create table departments(department_id int, department_name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/hadoopexam.db/departments';
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --table departments --username root --password cloudera --hive-table hadoopexam.departments --hive-import --delete-target-dir --hive-overwrite --split-by department_id;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --table departments --username root --password cloudera --incremental append --check-column department_id --fields-terminated-by '\t' --target-dir hdfs://quickstart.cloudera:8020/user/hive/warehouse/hadoopexam.db/departments --m 1;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --table departments --username root --password cloudera --hive-table hadoopexam.departments_new --hive-import --split-by department_id;
Problem 10:
Import entire database retail_db with all 6 tables from Mysql to Hive.
file format -Parquet and compression as snappy.
Write impala query which can produce 5 Most popular categories and save results in HadoopExam/best_categories.csv
Write impala query which can produce top 10 revenue generating products and save results in HadoopExam/best_products.csv
Solution:
sqoop import-all-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --as-parquetfile --compression-codec snappy
--hive-import --hive-overwrite --hive-database retail_db -m 1
Note:
option --hive-database retail_db specifies the database to which these tables to be imported. Else these tables would be imported to default database.
The below error can be encountered when any of the table is already imported previously as a different file format
Got exception running Sqoop: org.kitesdk.data.UnknownFormatException: Unknown format for serde:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
No error is thrown in case when same file format of the imported statement is executed.
5 Most popular categories:
select C.category_name, count(C.category_name) from categories C
INNER JOIN products P ON C.category_id=P.product_category_id
INNER JOIN order_items O ON P.product_id=O.order_item_product_id
GROUP BY C.category_name ORDER BY count(C.category_name) LIMIT 5;
Execute this in HUE and save the result in HDFS via HUE UI.
top 10 revenue generating products:
select product_name, total from products P
INNER JOIN (select order_item_product_id,SUM(order_item_subtotal) as total from order_items OT
INNER JOIN orders O ON OT.order_item_order_id=O.order_id
WHERE O.order_status NOT IN ('CLOSED','CANCELED','SUSPECTED_FRAUD') GROUP BY ORDER_ITEM_PRODUCT_ID) SQ ON P.product_id=SQ.order_item_product_id
ORDER BY total DESC LIMIT 10;
Execute this in HUE and save the result in HDFS via HUE UI.
Creating another table from existing table (Source - Parquet & Destination - TEXTFILE)
create table retail_db.best_categories
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/HadoopExam/best_categories.csv'
AS select category_name,count(*) from categories group by category_name order by count(*) DESC LIMIT 5;
Importing Impala query output to Local file system.
impala-shell -B -q 'select category_name,count(*) from categories group by category_name order by count(*) DESC LIMIT 5' -o best_categories.csv '--output_delimiter=,'
Problem 12:
Import entire retail_db such that all the hive tables must be created in default schema.
each table must contain 3 files e.g: part-00000,part-00002,part-00003.
Store all the java files in a directory called Java_output to evaluate further.
Solution:
sqoop import-all-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --hive-import --hive-overwrite --compress --compression-codec snappy --outdir Java_output -m 3;
Note: Java_output will be created locally and not in HDFS.
Problem 12:
1. In MySQL, create table departments_new(department_id int(11), department_name varchar(45), created_date TIMESTAMP DEFAULT NOW());
2. Insert records from departments to departments_new.
3. import data from departments_new table to hdfs.
4. Insert some records to departments_new table.
5. Do Incremental import based on created-date column
Solution:
create table departments_new(department_id int(11), department_name varchar(45), created_date TIMESTAMP DEFAULT NOW());
insert into departments_new(department_id,department_name) select * from departments;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments_new --split-by department_id;
Insert into departments_new values(110,"Civil",null);
Insert into departments_new values(111,"Mechanical",null);
Insert into departments_new values(112,"Automobile",null);
Insert into departments_new values(113,"Pharma",null);
Insert into departments_new values(114,"Social Engineering",null);
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments_new --target-dir hdfs://quickstart.cloudera:8020/user/cloudera/departments_new --incremental append --check-column created_date --last-value '2017-12-21 01:22:09' --split-by department_id;
Problem 13:
1. Create a table departments_export in retail_db in mysql.
2. Now import the data from following directory into departments_export table.
Solution:
CREATE table departments_export(department_id int(11), department_name varchar(45), created_date TIMESTAMP DEFAULT NOW());
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --export-dir departments_new --input-fields-terminated-by "," --table departments_export;
Problem 13:
Import departments table using boundary query, which import departments between 1 to 6.
2 files are created part-00000, part-00002
only take department_id and department_name columns.
Solution:
sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --hive-import --hive-table departments --hive-overwrite --boundary-query="select 1, 7 from departments" --columns "department_id, department_name" -m 2;
Note: --boundary-query is the boundary that is set between 1,7 where the data is split across 2 mappers those were specified.
On top of this we can even use --where and can specify condition.
eg:sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --where "department_id between 1 AND 6" --hive-import --hive-table departments --hive-overwrite --boundary-query="select 1, 7 from departments" --columns "department_id, department_name" -m 2;
--columns specifies the columns those to be imported.
Problem 14:
1. create a CSV updated_departments.csv and upload to HDFS
2,fitness
3,footwear
12,fathematics
13,fcience
14,engineering
1000,management
2. Export this data from HDFS to mysql retail_db.departments table. During upload update existing records with new vallues and all the new values.
3. Now update the CSV File in HDFS
2,Fitness
3,Footwear
12,Fathematics
13,Science
14,Engineering
1000,Management
2000,Quality Check
4. Export this data from HDFS to mysql retail_db.departments table. During upload update existing records with new vallues.
Solution:
hdfs dfs -put /home/cloudera/updated_departments.csv /user/cloudera/leela/updated_departments
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --update-key department_id --export-dir /user/cloudera/leela/updated_departments --input-fields-terminated-by "," --update-mode allowinsert;
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --update-key department_id --export-dir /user/cloudera/leela/updated_departments --input-fields-terminated-by "," --update-mode updateonly;
Problem 16:
1. Create hive table department_hive(department_id int, department_name string);
2. Import data from mysql departments table to the created hive table. make sure data is visible using "select * from departments_hive"
Solution:
create table department_hive(department_id int, department_name string);
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --hive-import --hive-table department_hive;
Sol: hdfs dfs -getfacl /user/cloudera/problem2/products
hdfs dfs -chmod -R 765 /user/cloudera/problem2/products
OR
hadoop fs -chmod 765 /user/cloudera/problem2/products/*
-R represents recursively apply for all the files in the directoty.
4. read data in /user/cloudera/problem2/products and do the following operations using a) dataframes api b) spark sql c) RDDs aggregateByKey method. Your solution should have three sets of steps. Sort the resultant dataset by category id
- filter such that your RDD\DF has products whose price is lesser than 100 USD
- on the filtered data set find out the higest value in the product_price column under each category
- on the filtered data set also find out total products under each category
- on the filtered data set also find out the average price of the product under each category
- on the filtered data set also find out the minimum price of the product under each category
Sol:
val products_rdd = sc.textFile("/user/cloudera/problem2/products")
case class products_class(var product_id: Int, var product_category_id: Int, var product_name: String,var product_description: String,var product_price: Float,var product_image: String)
def products_class_fun(x: String) = {
val x_split = x.split("\\|").map(_.trim.toString) val pro = products_class(x_split(0).toInt,x_split(1).toInt,x_split(2),x_split(3),x_split(4).toFloat,x_split(5)) (pro)
}
OR
def products_class_fun(x: String): products_class = {
val x_split = x.split('|').map(_.trim.toString)
val pro = products_class(x_split(0).toInt,x_split(1).toInt,x_split(2),x_split(3),x_split(4).toFloat,x_split(5))
(pro)
}
val products_df_tmp = products_rdd.map( x => products_class_fun(x))
val products_df = products_df_tmp.toDF
Sql approach:
products_df.registerTempTable("products")
sqlContext.sql("select product_category_id, cast(MAX(product_price) as decimal(10,2)) as maximum_price, CAST(AVG(product_price) as decimal(10,2)) as Average_Price, cast(MIN(product_price) as decimal(10,2)) as minimum_price, count(*) as total_products from products where product_price < 100 GROUP BY product_category_id order by product_category_id desc").show
Dataframe approach:
products_df.filter(col("product_price") < 100).groupBy((col("product_category_id"))).agg(max("product_price").alias("Maximum price"), avg("product_price").alias("Average Price") , min("product_price").alias("Minumum Price"), count("product_price").alias("Total Products")).orderBy(col("product_category_id")).show
Compress in snappy and save in Parquet format
sqlContext.setConf("spark.sql.parquet.compression.codec.", "snappy")
dataDF.write.parquet("/user/cloudera/problem1/result4a-snappy")
Compress in gzip and save in Parquet format
sqlContext.setConf("spark.sql.parquet.compression.codec.", "gzip")
dataDF.write.parquet("/user/cloudera/problem1/result4a-snappy")
Problem 3:
Import all database tables;
sqoop import-all-tables --connect jdbc:mysql://localhost/retail_db --username root --password cloudera --as-avrodatafile --compress --compression-codec snappy --warehouse-dir retail.db
create hive external table on AVRO data with out using avsc file
create external table orders_3(order_id int, order_date bigint, order_customer_id bigint,order_status String)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
'hdfs://quickstart.cloudera:8020/user/cloudera/retail_stage.db/orders';
Note: order_date is datetime datatype in mysql, however sqoop while importing will convert to long. So the datatype is bigint.
Problem scenarios from Hadoop Exam
Import retail_db.categories table to hdfs
1. Without specifying Directory name.
2. Specity directory name as "categories_target".
3. Copy to warehouse directory "categories_warehouse"
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories -m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --target-dir=categories_target -m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --warehouse-dir=categories_warehouse -m 1
Note: use --fields-terminated-by to change the default field delimiter from ',' to the desired one
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --target-dir=categories_target2 --fields-terminated-by='|' -m 1
Problem 2:
HDFS
hdfs dfs -help
hdfs dfs -mkdir -help
mkdir Employee
nano quicktechie.txt
merge 2 files in local
cat quicktechie.txt hadoopexam.txt > MergedEmployee.txt
echo "" >> file.txt //Adds a newline character at the end
Force replace by overriding existing Directory.
hdfs dfs -put -f /home/cloudera/Employee/newdir1 /user/cloudera
Merge Files in HDFS and save the resultant in Local filesystem
hdfs dfs -getmerge /user/cloudera/newdir1 /home/cloudera/Employee/newdir1/MergedEmployee.txt
Change Permissions.
hdfs dfs -chmod 664 /user/cloudera/newdir1/MergedEmployee.txt
Problem 5: Data Format file separated by "|".
Name|Location1,Location2...Location5|Sex,Age|FatherName:No_of_child
Example Record
Anupam|Delhi,Mumbai,Chennai|Male,45|Daulat:4
Write table creation script.
Solution:
create table FamilyHead(Name String, Location ARRAY<String>, sex_age STRUCT<sex: String, age: int>, fatherchild MAP<String, int>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
collection items TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
Problem 3:
Problem Scenario 3: You have been given MySQL DB with following details. — 1. Video URL.
http://cca175cloudera.training4exam.corni
user=retail_dba
password=cloudera Please provode feedback to
hadoopexam@gmail.com database=retail_db
table=retail_db.categories -_,
user=retail_dba jdbc URL = jdbc:mysql://quickstart:3306Iretail_db password=doudera
database=retail_db Please accomplish following activities. table=retail_db.categories
jdbc URL = jdbc:mysql://quickstart3306/retail_c
1. Import data from catagories table, where catagory=22 (Data should be stored in categories_subset) —
2. Import data from catagories table, where catagory>22 (Data should be stored in categories_subset_2) Use below link to get tips or
3. Import data from catagories table, where catagory between 1 and 22 (Data should be stored in categories_subset_3) updates for Real Exam
4. While importing catagories data change the delimiter to T (Data should be stored in categories_subset_6) httpliwww.hadoopexam.com/Cloudera_Certific
5. Importing data from catagories table and restrict the import to category_name,category_id columns only with delimiter as T
6. Add null values in the table using below SQL statement If you want to get deeper understanding
ALTER TABLE categories modify category_department_id int(11); of Apache Spark and Hadoop, Please consider
INSERT INTO categories values (60,NULL,IESTING'); Trainings from below links
7. Importing data from catagories table (In categories_subset_17 directory) using T delimiter and category_id between 1 and 61
and encode null values for both string and non string columns.
8. Import entire schema retail_db in a directory categories_subset_all_tables 2
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id = 22" --target-dir categories_subset -m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id > 22" --warehouse-dir categories_subset_2 --m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id between 1 and 22" --warehouse-dir categories_subset_3 --m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id between 1 and 22" --warehouse-dir categories_subset_6 --fields-terminated-by '|' --columns "category_name,category_id" --m 1
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id between 1 and 80" --warehouse-dir categories_subset_17 --fields-terminated-by '|' --null-string 'N' --null-non-string '\\N' --m 1
sqoop import-all-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --target-dir categories_subset_all_tables
Note:
1. Encoding or replacing NULL character in the column values with a specific value while importing to HDFS
--null-string <null-string> The string to be written for a null value for string columns
--null-non-string <null-string> The string to be written for a null value for non-string columns
2. import-all-tables can import data only to warehouse directory and --target-dir leads to error.
Import the table categories to have managed table where category_id between 1 and 22
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table categories --where "category_id >= 1 AND category_id <= 22" --target-dir /user/cloudera/categories3 --fields-terminated-by "," --hive-import --create-hive-table --hive-table default.categories1 -m 1;
Problem 5:
1. List all the tables using sqoop command from retail_db
2. Write simple sqoop eval command to check whether you have permission to read database tables or not.
3. Import all the tables as avro files in luserlhivelwarehouselretail_cca174.db
Solution:
sqoop list-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera
sqoop eval --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --query "select * from categories LIMIT 5"
sqoop import-all-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --as-avrodatafile --warehouse-dir /user/hive/warehouse/retail_cca174.db --m 1
Note: eval Evaluate a SQL statement and display the results
Create Hive database named "Family" with below details>
Take care if this database already exists.
Comment: "This database will be used for collecting"
Data File location: '/hdfs/family'
Stored other Properties: 'Database creator'='Vedic', 'Database_Created_On'='2016-01-01'
Command to check if the database has been created with new properties.
Solution:
CREATE DATABASE IF NOT EXISTS Family COMMENT "This database will be used for collecting"
LOCATION '/hdfs/family'
WITH DBPROPERTIES ('Database creator'='Vedic', 'Database_Created_On'='2016-01-01');
DESCRIBE DATABASE EXTENDED FAMILY;
Problem 7:
Import departments table from retail_db using custom boundary query which imports departments between 1 and 7.
2 mappers.
Import only 2 columns department_id and department_name.
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --boundary-query "SELECT 1,7 FROM departments" --table departments --columns "department_id,department_name" --m 2
Problem 8:
From retail_db import joined result of orders and order_items table join on orders.order_id = order_items.order_item_order_id.
2 mappers
use order_id column for boundary conditions
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --split-by orders.order_id --query 'select orders.*,order_items.* from orders JOIN order_items on orders.order_id = order_items.order_item_order_id WHERE $CONDITIONS' --target-dir orders_joined3 --m 2
Problem 8:
A table categories already exists in default schema. Now, create another table named Employee100k in Family schema which has department ids less than 4.
Solution:
create table Family.Employee100k AS SELECT * FROM default.categories where category_department_id < 4;
Note: Family schema has to be created prior to creation, else it would lead to error.
Problem 9:
Import departments table with fields-terminated-by '|' --lines-terminated-by '\n'. Also Incremental append to insert new records.
Solution:
sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --fields-terminated-by '|' --lines-terminated-by '\n' --table departments --m 1;
sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --fields-terminated-by '|' --lines-terminated-by '\n' --incremental append --check-column department_id --last-value 8 --m 1;
Problem 10:
create database hadoopexam and create a table named departments.
Import data in existing hive table from retail_db.departments into hadoopexam.departments
Import data to non-existing table, which means while importing create hive table named hadoopexam.departments_new.
Solution:
create database hadoopexam;
create table departments(department_id int, department_name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/hadoopexam.db/departments';
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --table departments --username root --password cloudera --hive-table hadoopexam.departments --hive-import --delete-target-dir --hive-overwrite --split-by department_id;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --table departments --username root --password cloudera --incremental append --check-column department_id --fields-terminated-by '\t' --target-dir hdfs://quickstart.cloudera:8020/user/hive/warehouse/hadoopexam.db/departments --m 1;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --table departments --username root --password cloudera --hive-table hadoopexam.departments_new --hive-import --split-by department_id;
Problem 10:
Import entire database retail_db with all 6 tables from Mysql to Hive.
file format -Parquet and compression as snappy.
Write impala query which can produce 5 Most popular categories and save results in HadoopExam/best_categories.csv
Write impala query which can produce top 10 revenue generating products and save results in HadoopExam/best_products.csv
Solution:
sqoop import-all-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --as-parquetfile --compression-codec snappy
--hive-import --hive-overwrite --hive-database retail_db -m 1
Note:
option --hive-database retail_db specifies the database to which these tables to be imported. Else these tables would be imported to default database.
Got exception running Sqoop: org.kitesdk.data.UnknownFormatException: Unknown format for serde:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
No error is thrown in case when same file format of the imported statement is executed.
5 Most popular categories:
select C.category_name, count(C.category_name) from categories C
INNER JOIN products P ON C.category_id=P.product_category_id
INNER JOIN order_items O ON P.product_id=O.order_item_product_id
GROUP BY C.category_name ORDER BY count(C.category_name) LIMIT 5;
Execute this in HUE and save the result in HDFS via HUE UI.
top 10 revenue generating products:
select product_name, total from products P
INNER JOIN (select order_item_product_id,SUM(order_item_subtotal) as total from order_items OT
INNER JOIN orders O ON OT.order_item_order_id=O.order_id
WHERE O.order_status NOT IN ('CLOSED','CANCELED','SUSPECTED_FRAUD') GROUP BY ORDER_ITEM_PRODUCT_ID) SQ ON P.product_id=SQ.order_item_product_id
ORDER BY total DESC LIMIT 10;
Execute this in HUE and save the result in HDFS via HUE UI.
Creating another table from existing table (Source - Parquet & Destination - TEXTFILE)
create table retail_db.best_categories
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/HadoopExam/best_categories.csv'
AS select category_name,count(*) from categories group by category_name order by count(*) DESC LIMIT 5;
Importing Impala query output to Local file system.
impala-shell -B -q 'select category_name,count(*) from categories group by category_name order by count(*) DESC LIMIT 5' -o best_categories.csv '--output_delimiter=,'
Problem 12:
Import entire retail_db such that all the hive tables must be created in default schema.
each table must contain 3 files e.g: part-00000,part-00002,part-00003.
Store all the java files in a directory called Java_output to evaluate further.
Solution:
sqoop import-all-tables --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --hive-import --hive-overwrite --compress --compression-codec snappy --outdir Java_output -m 3;
Note: Java_output will be created locally and not in HDFS.
Problem 12:
1. In MySQL, create table departments_new(department_id int(11), department_name varchar(45), created_date TIMESTAMP DEFAULT NOW());
2. Insert records from departments to departments_new.
3. import data from departments_new table to hdfs.
4. Insert some records to departments_new table.
5. Do Incremental import based on created-date column
Solution:
create table departments_new(department_id int(11), department_name varchar(45), created_date TIMESTAMP DEFAULT NOW());
insert into departments_new(department_id,department_name) select * from departments;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments_new --split-by department_id;
Insert into departments_new values(110,"Civil",null);
Insert into departments_new values(111,"Mechanical",null);
Insert into departments_new values(112,"Automobile",null);
Insert into departments_new values(113,"Pharma",null);
Insert into departments_new values(114,"Social Engineering",null);
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments_new --target-dir hdfs://quickstart.cloudera:8020/user/cloudera/departments_new --incremental append --check-column created_date --last-value '2017-12-21 01:22:09' --split-by department_id;
Problem 13:
1. Create a table departments_export in retail_db in mysql.
2. Now import the data from following directory into departments_export table.
Solution:
CREATE table departments_export(department_id int(11), department_name varchar(45), created_date TIMESTAMP DEFAULT NOW());
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --export-dir departments_new --input-fields-terminated-by "," --table departments_export;
Import departments table using boundary query, which import departments between 1 to 6.
2 files are created part-00000, part-00002
only take department_id and department_name columns.
Solution:
sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --hive-import --hive-table departments --hive-overwrite --boundary-query="select 1, 7 from departments" --columns "department_id, department_name" -m 2;
Note: --boundary-query is the boundary that is set between 1,7 where the data is split across 2 mappers those were specified.
On top of this we can even use --where and can specify condition.
eg:sqoop import --connect=jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --where "department_id between 1 AND 6" --hive-import --hive-table departments --hive-overwrite --boundary-query="select 1, 7 from departments" --columns "department_id, department_name" -m 2;
--columns specifies the columns those to be imported.
Problem 14:
1. create a CSV updated_departments.csv and upload to HDFS
2,fitness
3,footwear
12,fathematics
13,fcience
14,engineering
1000,management
2. Export this data from HDFS to mysql retail_db.departments table. During upload update existing records with new vallues and all the new values.
3. Now update the CSV File in HDFS
2,Fitness
3,Footwear
12,Fathematics
13,Science
14,Engineering
1000,Management
2000,Quality Check
4. Export this data from HDFS to mysql retail_db.departments table. During upload update existing records with new vallues.
Solution:
hdfs dfs -put /home/cloudera/updated_departments.csv /user/cloudera/leela/updated_departments
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --update-key department_id --export-dir /user/cloudera/leela/updated_departments --input-fields-terminated-by "," --update-mode allowinsert;
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --update-key department_id --export-dir /user/cloudera/leela/updated_departments --input-fields-terminated-by "," --update-mode updateonly;
Problem 16:
1. Create hive table department_hive(department_id int, department_name string);
2. Import data from mysql departments table to the created hive table. make sure data is visible using "select * from departments_hive"
Solution:
create table department_hive(department_id int, department_name string);
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --hive-import --hive-table department_hive;
same, If you like to import to a table apart from default data base then,
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --hive-import --hive-table retail_db.department_hive --target-dir hdfs://quickstart.cloudera:8020/user/cloudera/departments_hive;
Problem 17:
In hive
create table departments_hive01(department_id int, department_name string, avg_salary int);
In Mysql
create table IF NOT EXISTS departments_hive01(id int, department_name varchar(45), avg_salary int);
insert into departments_hive01 select a.*,null from departments a;
insert into departments_hive01 values(777,"Not known",1000);
insert into departments_hive01 values(8888,null,1000);
insert into departments_hive01 values(666,null,1100);
Import data from Mysql table 'departments_hive01' while importing if null value is found for department_name replace it with ''''(empty string) and NULL value for id with -999
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments_hive01 --hive-import --hive-table retail_db.departments_hive01 --null-string '''' --null-non-string -999 --m 1;
Note: --null-string '''' replaces NULL of string types to empty string and --null-non-string will replace all NULLS of non-string types to -999
Problem 18: Issue
Solution:
create table if not exists departments_hive02(id int, department_name varchar(45), avg_salary int);
show create table departments_hive01;
sqoop export --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments_hive02 --export-dir /user/hive/warehouse/retail_db.db/departments_hive01 --input-fields-terminated-by "\t" --input-lines-terminated-by '\n' --input-null-string '''' --input-null-non-string -999 --m 1;
CREATE TABLE departments_hive01_tmp(department_id int, department_name string,avg_salary int);
Problem 19:
Import departmets table in following formats:
text
sequence file
avro file
parquet
Solution:
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --target-dir /user/cloudera/departments_txt --m 1;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --target-dir /user/cloudera/departments_seq --as-sequencefile --m 1;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --target-dir /user/cloudera/departments_avro --as-avrodatafile --m 1;
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --table departments --target-dir /user/cloudera/departments_parq --as-parquetfile --m 1;
Great Blog
ReplyDeleteWe are making the Best Software training in bangalore.
Software Training Institute in Bangalore
Selenium Training in Bangalore
Hadoop Training in Bangalore
Devops Training in Bangalore
Python Training in Bangalore
RPA Training in Bangalore
AWS Training in Bangalore
TABLEAU Training in Bangalore
Spark Training in Bangalore
Big data has been a buzzword for the past couple of years and it has become a major business trend. With the help ofbig data infrastructure organizations can keep a track of every minute detail about their customers and provide them with better service to stay ahead of their competitors.
ReplyDeleteYour post is very great.I read this post. It’s very helpful. I will definitely go ahead and take advantage of this. You absolutely have wonderful stories. Cheers for sharing with us your blog. For more learning about data science visit at Data science course in Bangalore
ReplyDelete