Running Spark Job in OOZIE 1. Running a spark Job that executes Hive Queries. If came across error "The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------"
Watch OOZIE Spark Job submission video under
https://www.youtube.com/watch?v=w5Be9ubK_Po
Watch OOZIE Hive Job submission video under
https://www.youtube.com/watch?v=i1QW7NoAiwM Youtube, OOZIE Documentation in OOZIE Page
CREATE EXTERNAL TABLE `test`(
sno bigint,
id string,
dte timestamp,
status string,
val int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"quoteChar"="\"",
'separatorChar'=",") LOCATION
's3a://l1-emr-ed-raw/Test';
HCatalog
HCatalog opens up the hive metadata to other mapreduce tools. Every mapreduce tools has its own notion about HDFS data (example Pig sees the HDFS data as set of files, Hive sees it as tables). With having table based abstraction, HCatalog supported mapreduce tools do not need to care about where the data is stored, in which format and storage location (HBase or HDFS).
We do get the facility of WebHcat to submit jobs in an RESTful way if you configure webhcat along Hcatalog.
very basic example of how ho use HCATALOG.
LOADING HIVE table to PIG
I have a table in hive ,TABLE NAME is STUDENT which is stored in one of the HDFS location:
Now suppose I want to load this table to pig for further transformation of data, In this scenario I can use HCATALOG:
When using table information from the Hive metastore with Pig, add the -useHCatalog option when invoking pig:
pig -useHCatalog
(you may want to export HCAT_HOME 'HCAT_HOME=/usr/lib/hive-hcatalog/')
Now loading this table to pig: A = LOAD 'student' USING org.apache.hcatalog.pig.HCatLoader();
Now you have loaded the table to pig.To check the schema , just do a DESCRIBE on the relation.
DESCRIBE A
LOADING PIG Data to HIVE Table
There are two approaches explained below with 'Employee' table example to store pig output into hive table. (Prerequisite is that hive table should be already created)
A = LOAD 'EMPLOYEE.txt' USING PigStorage(',') AS(EMP_NUM:int,EMP_NAME:chararray,EMP_PHONE:int); Approach 1: Using Hcatalog
// dump pig result to Hive using Hcatalog store A into 'Empdb.employee' using org.apache.hive.hcatalog.pig.HCatStorer(); (or)
Approach 2: Using HDFS physical location
// dump pig result to external hive warehouse location STORE A INTO 'hdfs://<<nmhost>>:<<port>>/user/hive/warehouse/Empdb/employee/' USING PigStorage(',')Write Custom UDF Function s in Hive
UDTF: User defined tabular function works on one row as input and returns multiple rows as output. So here the relation in one to many. e.g Hive built in EXPLODE() function. Now lets take an array column USER_IDS as ARRAY10,12,5,45> then SELECT EXPLODE(USER_IDS) as ID FROM T_USER. will give 10,12,5,45 as four different rows in output. UDTF can be used to split a column into multiple column as well which we will look in below example. Here alias "AS" clause is mandatory .
UDAF: User defined aggregate functions works on more than one row and gives single row as output. e.g Hive built in MAX() or COUNT() functions. here the relation is many to one. Lets say you have a table with students name, id and total marks, so here if I have 10 rows in the table and if I have to find student who got maximum number then our query need to check each 10 row to find the maximum but ultimately we get only one output which is the maximum.
UDF: Simple API
The Simple API
Building a UDF with the simpler UDF API involves little more than writing a class with one function (evaluate).
1. Need to extend UDF class.
2. Override evaluate()
3. In Hive ADD JAR(like Register in PIG)
4. Create TEMPORARY FUNCTION
5. Use it.
Here is an example:
Creating Views
As like in SQL,
Generate a query to retrieve the employee details who earn a salary of more than Rs 30000. We store the result in a view named emp_30000.
hive> CREATE VIEW emp_30000 AS
SELECT * FROM employee
WHERE salary>30000;
hive> DROP VIEW emp_30000;
Creating an Index
An Index is nothing but a pointer on a particular column of a table. Creating an index means creating a pointer on a particular column of a table
hive> CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';
2 Types of Indexes in Hive
Compact Indexing - Compact indexing stores the pair of indexed column’s value and its blockid.
Bitmap Indexing - Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.
Dropping an Index
The following query drops an index named index_salary:
hive> DROP INDEX index_salary ON employee;
Can we have different indexes for the same table?
Yes! We can have any number of indexes for a particular table and any type of indexes as well. Note: With different types (compact,bitmap) of indexes on the same columns, for the same table, the index which is created first is taken as the index for that table on the specified columns. Indexing would create another table containing all the details of the table which you are indexed. So when you try to execute any query on an indexed table it will first query on the index_table based on the data in the index it will directly query on the original table. It is just like the index of any text book.
How to identify deleted records in SQL Server?
Take a case where data is being ingested from RDBMS to Hive in an incremental fashion. After insertion, some of the records were deleted in RDBMS. Now how to identify the deleted records in Hive which are deleted at source.
Steps:
1. Create a view or temp table in hive that gets snapshot of the records at source(RDBMS)
2. Use below query, here src is the snapshot of source and dest is the Hive table that has the entires of the deleted records.
select did from (select D.id AS did, S.id as sid from dest D LEFT JOIN src S ON D.id = S.id) sq where sid is NULL;
Updating Records in Hive.
CREATE TABLE base_table (id int,field1 STRING,modified_date timestamp); insert into base_table values(1,"abcd", "2014-02-01 09:22:55"); insert into base_table values(2,"abcde", "2014-02-01 09:22:55"); insert into base_table values(3,"zxvh", "2014-02-01 09:22:55"); CREATE TABLE incremental_table (id int,field1 STRING,modified_date timestamp); insert into base_table values(1,"abcdddddddd", "2017-02-01 09:22:55"); insert into base_table values(2,"abcdedddddd", "2017-02-01 09:22:55"); select t1.* from (select * from base_table UNION select * from incremental_table) t1; 1 abcd 2014-02-01 09:22:55 1 abcdddddddd 2017-02-01 09:22:55 2 abcde 2014-02-01 09:22:55 2 abcdedddddd 2017-02-01 09:22:55 3 zxvh 2014-02-01 09:22:55 select az.* from ( select * from base_table UNION select * from incremental_table) az JOIN (select id,MAX(t1.modified_date) md_date from (select * from base_table UNION select * from incremental_table) t1 group by t1.id) cz on az.id=cz.id and az.modified_date=cz.md_date; O/P: 1 abcdddddddd 2017-02-01 09:22:55 3 zxvh 2014-02-01 09:22:55 2 abcdedddddd 2017-02-01 09:22:55
Replicated Join- Same as Distributed cache, where smaller table is loaded to RAM and performs Map side Join.
Skewed Join - Used when data is unbalanced. The cases wherein one redcer gets more number of records and other reducer gets less records.
Eg: Records for USA might be more when compared to India. Here the records are unbalanced and recucer for processing USA records takes more time.
Skewed Join in Pig handles this scenario by presampling and identifies keys which have skewed data. The skewed data will be automatically split across multiple reducers
Merge Join - If the data is pre-sorted, then this kind of Join can be used which can skip sort phase. Sort operation is an expensive process and this can be skipped.
1. Number of partitions, replication has to be specified during the topic creation.
2. Follows Publish and subscribe mechanism
3. Consumer Group will read messages in the topic.
4. To achieve parallelism multiple consumers in consumer group will read the messages.
5. Consumers can join a group by using the samegroup.id.
The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.
Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
Kafka guarantees that a message is only ever read by a single consumer in the group.
Consumers can see the message in the order they were stored in the log.
Consumers and Consumer Groups
Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic, and start receiving messages, validating them and writing the results. This may work well for a while, but what if the rate at which producers write messages to the topic exceeds the rate at which your application can validate them? If you are limited to a single consumer reading and processing the data, your application may fall farther and farther behind, unable to keep up with the rate of incoming messages. Obviously there is a need to scale consumption from topics. Just like multiple producers can write to the same topic, we need to allow multiple consumers to read from the same topic, splitting the data between them.
Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.
Let’s take topic T1 with four partitions. Now suppose we created a new consumer, C1, which is the only consumer in group G1, and use it to subscribe to topic T1. Consumer C1 will get all messages from all four t1 partitions. See Figure 4-1.
If we add another consumer, C2, to group G1, each consumer will only get messages from two partitions. Perhaps messages from partition 0 and 2 go to C1 and messages from partitions 1 and 3 go to consumer C2. See Figure 4-2.
If G1 has four consumers, then each will read messages from a single partition. See Figure 4-3.
If we add more consumers to a single group with a single topic than we have partitions, some of the consumers will be idle and get no messages at all. See Figure 4-4.
Note: In order to have multiple consumers in a single single consumer group. We need to create multiple spark-streaming jobs with same group id, topic and different consumer name. There is no way to add multiple consumers in the same streaming job.
Eg: code is under https://github.com/eBay/Spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
7. By Default it will start consuming from latest offset in each partition, If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.
Advantages of Direct approach over receiver approach:
Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.
Only 1 Disadvantage - This approach does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself
8. While submitting Spark Streaming Job Zookeeper Quorm port, Consumer groupID, Kafka Topic have to be mentioned additionally.
Kafka Message Structure:
This is a key value structure and the actual message is in value. CreateDirectStream returns DStream. Sample code is below:
val Array(topics) = Array("kafka_topic_name")
val sparkConf = new SparkConf().setMaster("yarn").setAppName("Hpstreaming")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent, Subscribe[String, String](topicsSet, kafkaParams))
val spark = SparkSession.builder().getOrCreate()
Note: Seek functionality is available to read data in a Partition base on Offset. However this is not implemented while using createDirectStream(). Eg:
Option 2: When the streaming job reads messages from Kafka topic we can send message as a signal to indicate the streaming job to exit. On reading this message an if condition can be specified in the streaming job which can then call ssc.stop(stopSparkContext = true, stopGracefully = true) OR Place a marker file in a certain location and streaming job checks this file to get the flag for calling ssc.stop().
In production Kafka will be installed in the form of Parcels.
For more info follow https://kafka.apache.org/quickstart#quickstart_startserver
1. Extract Kafka zip file
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if
you don't already have one. You can use the convenience script packaged
with kafka to get a quick-and-dirty single-node ZooKeeper instance.
Installing or Upgrading Kafka from a Parcel
Required Role
In Cloudera Manager, download the Kafka parcel, distribute the parcel to the hosts in your cluster, and then activate the parcel. See Managing Parcels. After you activate the Kafka parcel, Cloudera Manager prompts you to restart the cluster. You do not need to restart the cluster after installing Kafka. Click Close to ignore this prompt.
Add the Kafka service to your cluster. See Adding a Service.
-> Highly Scalable.
-> no Single point of failure - Not Master Slave Architecture
-> Fast writes
-> Data Replication across data centres. - Commolly used to create a database that is spread across nodes in more than one data centres, for high availability.
The key components of Cassandra are as follows −
Node − It is the place where data is stored.
Data center − It is a collection of related nodes.
Cluster − A cluster is a component that contains one or more data centers.
Commit log − The commit log is a crash-recovery mechanism in Cassandra. Every write operation is written to the commit log.
Mem-table − A mem-table is a memory-resident data structure. After commit log, the data will be written to the mem-table. Sometimes, for a single-column family, there will be multiple mem-tables.
SSTable − It is a disk file to which the data is flushed from the mem-table when its contents reach a threshold value.
Bloom filter − These are nothing but quick, nondeterministic, algorithms for testing whether an element is a member of a set. It is a special kind of cache. Bloom filters are accessed after every query.
In Cassandra, terminology comparison with RDBMS
Database = Keyspace.
tables = tables
Write Operations
Every write activity of nodes is captured by the commit logs written in the nodes. Later the data will be captured and stored in the mem-table. Whenever the mem-table is full, data will be written into the SStable data file. All writes are automatically partitioned and replicated throughout the cluster. Cassandra periodically consolidates the SSTables, discarding unnecessary data.
Read Operations
During read operations, Cassandra must combine results from the active memtable and potentially multiple SSTables.
Cassandra gets values from the mem-table and checks the bloom filter to find the appropriate SSTable that holds the required data.
The syntax of creating a Keyspace is as follows −
CREATE KEYSPACE Keyspace name
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};
CQL - Cassandra Query Language is the language used to access/write data to cassandra.
Shell prompt used is Cqlsh
SELECT,INSERT,UPDATE,DELETE,WHERE,ORDERBY are same as normal SQL.
Few other additions are:
Copy
This command copies data to and from Cassandra to a file. Given below is an example to copy the table named emp to the file myfile.
cqlsh:tutorialspoint> COPY emp (emp_id, emp_city, emp_name, emp_phone,emp_sal) TO ‘myfile’;
Datastax provided JARS those can be imported which does enhnace Spark Context such that Read(CassandraTable()) and Write(SaveToCassandra()) are available.
There is no direct option of HIVE - Cassandra Integration.
We need to use 3rd party Jars for connecting Spark with Cassandra. Follow
1. Enable Cassandra-specific functions on the SparkContext, SparkSession, RDD, and DataFrame. 2. saveToCassandra() is Cassandra specific RDD function and comes from the imported connector JAR
Saving data from RDD to Cassandra
Writing Example by Adding two more rows to the table: