Monday, June 19, 2017

OOZIE Job configuration from HUE


Check OOZIE Status:

oozie admin -oozie http://quickstart.cloudera:11000/oozie -status

For getting Name node URL:
cd /etc/hadoop/conf

ls -ltr

view core-site.xml

Property name is,
fs.defaultFS

For JobTracker URL:

view yarn-site.xml

Property name is
yarn.resourcemanager.address

Eg: quickstart.cloudera:8032  OR localhost:8032

Config file path of job.properties to be LOCAL PATH

To create Work flow Job from HUE:

In HUE go to
Workflows -> Editors -> Workflows and create Work flows

Few points:

-> Parameters are the ones for passing job specific parametets like Executor memory etc.

-> Files - are to be used to give input and output files

-> While creation/Edit of workflow a folder icon beside settings button will show the path of workflow.xml and job.properties in HDFS.

-> But job.properties should be in local folder - ?
As this is a single node setup ${namenode} and ${jobtracker} were taken by default.

Note:
Usually in production the jobs will be run via command line where we pass job.prpoerties from local file system using --config flag.

Job Progress and logs can be seen in HUE under the Job.
Also, Job progress can be seen in Job Tracker.

References:

Running Hive script in OOZIE:

Steps:
Follow work flow steps in OOZIE Work flow job creation.
Make sure to mention
oozie.use.system.libpath=True in job.properties.

Running Spark Job in OOZIE 

1. Running a spark Job that executes Hive Queries.

If came across error "The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx------"




Watch OOZIE Spark Job submission video under
https://www.youtube.com/watch?v=w5Be9ubK_Po

Watch OOZIE Hive Job submission video under
https://www.youtube.com/watch?v=i1QW7NoAiwM
Youtube, OOZIE Documentation in OOZIE Page

Friday, June 9, 2017

Hive

To skip header while loading a file in HIVE.

Data:

RecordId,FirstName,LastName
1,"John","Doe"
2,"Jane","Doe"

Command:

[cloudera@quickstart ~]$ hadoop fs -mkdir /user/hive/warehouse/names
[cloudera@quickstart ~]$ hadoop fs -put file:///home/cloudera/Desktop/Spark/Hive/test.csv /user/hive/warehouse/names
Make sure that the file is in a folder and need to specify the folder name as external table location.

Hive>create external table names(RecordId string, FirstName string, LastName string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/names' tblproperties("skip.header.line.count" = "1");

Quotes in Data file: Consider a case where there is a data file that has data within ""

Data: 
1,"pc:61254","2017-03-10 17:41:05.091","1",,"1200105"
2,"pc:61255","2017-03-10 18:41:05.091","1",,"1200106"
3,"pc:61256","2017-03-10 19:41:05.091","1",,"1200107"
4,"pc:61257","2017-03-10 20:41:05.091","1",,"1200108"


CREATE EXTERNAL TABLE `test`(
  sno bigint,
  id string,
  dte timestamp,
  status string,
  val int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "quoteChar"="\"",
   'separatorChar'=",") LOCATION
  's3a://l1-emr-ed-raw/Test';

HCatalog

HCatalog opens up the hive metadata to other mapreduce tools. Every mapreduce tools has its own notion about HDFS data (example Pig sees the HDFS data as set of files, Hive sees it as tables). With having table based abstraction, HCatalog supported mapreduce tools do not need to care about where the data is stored, in which format and storage location (HBase or HDFS).

We do get the facility of WebHcat to submit jobs in an RESTful way if you configure webhcat along Hcatalog.

very basic example of how ho use HCATALOG.

LOADING HIVE table to PIG

I have a table in hive ,TABLE NAME is STUDENT which is stored in one of the HDFS location:
neethu 90 malini 90 sunitha 98 mrinal 56 ravi 90 joshua 8
Now suppose I want to load this table to pig for further transformation of data, In this scenario I can use HCATALOG:
When using table information from the Hive metastore with Pig, add the -useHCatalog option when invoking pig:
pig -useHCatalog
(you may want to export HCAT_HOME 'HCAT_HOME=/usr/lib/hive-hcatalog/')
Now loading this table to pig: A = LOAD 'student' USING org.apache.hcatalog.pig.HCatLoader();
Now you have loaded the table to pig.To check the schema , just do a DESCRIBE on the relation.
 DESCRIBE A

 LOADING PIG Data to HIVE Table

There are two approaches explained below with 'Employee' table example to store pig output into hive table. (Prerequisite is that hive table should be already created)

A =  LOAD 'EMPLOYEE.txt' USING PigStorage(',') AS(EMP_NUM:int,EMP_NAME:chararray,EMP_PHONE:int);
Approach 1: Using Hcatalog

// dump pig result to Hive using Hcatalog
store A into 'Empdb.employee' using org.apache.hive.hcatalog.pig.HCatStorer();
(or)

Approach 2: Using HDFS physical location

// dump pig result to external hive warehouse location
STORE A INTO 'hdfs://<<nmhost>>:<<port>>/user/hive/warehouse/Empdb/employee/' USING PigStorage(',')Write Custom UDF Function s in Hive

3 Types of UDF can be written

Source: https://www.linkedin.com/pulse/hive-functions-udfudaf-udtf-examples-gaurav-singh

https://dzone.com/articles/writing-custom-hive-udf-andudaf

UDF - 2 Types Simple and Complex

UDTF: User defined tabular function works on one row as input and returns multiple rows as output. So here the relation in one to many. e.g Hive built in EXPLODE() function. Now lets take an array column USER_IDS as ARRAY10,12,5,45> then SELECT EXPLODE(USER_IDS) as ID FROM T_USER. will give 10,12,5,45 as four different rows in output. UDTF can be used to split a column into multiple column as well which we will look in below example. Here alias "AS" clause is mandatory .

UDAF: User defined aggregate functions works on more than one row and gives single row as output. e.g Hive built in MAX() or COUNT() functions. here the relation is many to one. Lets say you have a table with students name, id and total marks, so here if I have 10 rows in the table and if I have to find student who got maximum number then our query need to check each 10 row to find the maximum but ultimately we get only one output which is the maximum. 

UDF: Simple API

The Simple API

Building a UDF with the simpler UDF API involves little more than writing a class with one function (evaluate).

1. Need to extend UDF class.
2. Override evaluate()
3.  In Hive ADD JAR(like Register in PIG)
4. Create TEMPORARY FUNCTION
5. Use it.


Here is an example:












class SimpleUDFExample extends UDF {
  
  public Text evaluate(Text input) {
    return new Text("Hello " + input.toString());
  }
}
 
hive> ADD JAR target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
hive> CREATE TEMPORARY FUNCTION helloworld as 'com.matthewrathbone.example.SimpleUDFExample';
hive> select helloworld(name) from people limit 1000;

In hive you can overload the method same as JAVA. But in UDF you have to use Hadoop Datatypes likes IntWritable, FloatWritable...
Please find below the code.
public class ToUpper extends UDF{

    public String evaluate(Text word) {
        String upperCase=word.toString();
        return upperCase;

    }

    public String evaluate(IntWritable word) {
        String upperCase="Error : Input type is Integer. Cannot convert to UpperCase";
        return upperCase;

    }

    public String evaluate(FloatWritable word) {
        String upperCase="Error : Input type is Float. Cannot convert to UpperCase";
        return upperCase;

    }

    public String evaluate(LongWritable word) {
        String upperCase="Error : Input type is Long. Cannot convert to UpperCase";
        return upperCase;

    }



}   

A use case of UDF:

For calculating insights from multiple column values by applying a formulae

Function signature:

public float evaluate(FloatWritable param_1,IntWritable param_2,...param_3) Apply calculation inside this function

Calling UDF:

select calculateRecommendation(param_1,param_4,param_6,param_7) from tr69data;

TO delete the Added JAR

DELETE JAR /home/cloudera/workspace/customudf1/target/customudf1-0.0.1-SNAPSHOT.jar;

Creating Parquet File in Hive

First create an External Table(temp)

hive> create External table studentdatatmp(name string, deptnum int, branch string, year int)
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY '\t'
    > LINES TERMINATED BY '\n'
    > LOCATION 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Student';
OK
Time taken: 0.244 seconds
hive> select * from studentdatatmp;

Create a table whose file encryption format is Parquet

hive> create table studentdata(name string, deptnum int)
    > PARTITIONED BY(branch string, year int)
    > STORED AS PARQUET
    > LOCATION 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Student_parquet';
INSERT OVERWRITE TABLE studentdata PARTITION(branch='cse', year=1) SELECT name, deptnum FROM studentdatatmp WHERE branch='cse' AND year=1;
 

 To append data to an existing table use,

INSERT OVERWRITE TABLE studentdata PARTITION(branch='cse', year=2) SELECT name, deptnum FROM studentdatatmp WHERE branch='cse' AND year=2;

Here OVERWRITE is the keyword that makes difference for new INsersion and appending.

INSERT INTO TABLE country_times_part PARTITION(time) SELECT country,continent,time FROM country_times;

Note: In case of Pure dynamic partition , the dynamic partition columns must be specified last among the columns in the SELECT statement and in the same order in which they appear in the PARTITION() clause. Follow https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions for more info


-> In older Version of HIVE even for 1 kb data select * will run MapReduce. In New version MapReduce will not get Executed even for 1 GB data.

 

What is Distributed BY clause?

Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer.

It ensures each of N reducers gets non-overlapping ranges of column, but doesn’t sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges

http://saurzcode.in/2015/01/hive-sort-vs-order-vs-distribute-vs-cluster/


Creating a new table as like an existing one

create table tabl_new as select * from tabl_old;
               
                   OR

create table tabl_new as select ip, name from tabl_old;


Inserting Data to Hive table:

We can insert new data into table by two ways.

  1. Load the data of a file into table using load command.
    LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename.
    
  2. You can insert new data into table by using select query.
    INSERT INTO table tablename1 select columnlist FROM secondtable;

Creating Views

As like in SQL,
Generate a query to retrieve the employee details who earn a salary of more than Rs 30000. We store the result in a view named emp_30000.


hive> CREATE VIEW emp_30000 AS
SELECT * FROM employee
WHERE salary>30000;

hive> DROP VIEW emp_30000;

Creating an Index

An Index is nothing but a pointer on a particular column of a table. Creating an index means creating a pointer on a particular column of a table
hive> CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';

2 Types of Indexes in Hive

  • Compact Indexing - Compact indexing stores the pair of indexed column’s value and its blockid.
  • Bitmap Indexing - Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Dropping an Index

The following query drops an index named index_salary:
hive> DROP INDEX index_salary ON employee;

Can we have different indexes for the same table?

 Yes! We can have any number of indexes for a particular table and any type of indexes as well.
Note: With different types (compact,bitmap) of indexes on the same columns, for the same table, the index which is created first is taken as the index for that table on the specified columns.
Indexing would create another table containing all the details of the table which you are indexed. So when you try to execute any query on an indexed table it will first query on the index_table based on the data in the index it will directly query on the original table. It is just like the index of any text book.

Creating Views

As like in SQL,
Generate a query to retrieve the employee details who earn a salary of more than Rs 30000. We store the result in a view named emp_30000.


hive> CREATE VIEW emp_30000 AS
SELECT * FROM employee
WHERE salary>30000;

hive> DROP VIEW emp_30000;

Creating an Index

An Index is nothing but a pointer on a particular column of a table. Creating an index means creating a pointer on a particular column of a table
hive> CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';

2 Types of Indexes in Hive

  • Compact Indexing - Compact indexing stores the pair of indexed column’s value and its blockid.
  • Bitmap Indexing - Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Dropping an Index

The following query drops an index named index_salary:
hive> DROP INDEX index_salary ON employee;

Can we have different indexes for the same table?

Yes! We can have any number of indexes for a particular table and any type of indexes as well.

Note: With different types (compact,bitmap) of indexes on the same columns, for the same table, the index which is created first is taken as the index for that table on the specified columns.

Indexing would create another table containing all the details of the table which you are indexed. So when you try to execute any query on an indexed table it will first query on the index_table based on the data in the index it will directly query on the original table. It is just like the index of any text book.

How to identify deleted records in SQL Server?

Take a case where data is being ingested from RDBMS to Hive in an incremental fashion. After insertion, some of the records were deleted in RDBMS. Now how to identify the deleted records in Hive which are deleted at source.

Steps:
1. Create a view or temp table in hive that gets snapshot of the records at source(RDBMS)
2. Use below query, here src is the snapshot of source and dest is the Hive table that has the entires of the deleted records.
select did from (select D.id AS did, S.id as sid from dest D LEFT JOIN src S ON D.id = S.id) sq where sid is NULL;

Updating Records in Hive.

CREATE TABLE base_table (id int,field1 STRING,modified_date timestamp);
insert into base_table values(1,"abcd", "2014-02-01 09:22:55");
insert into base_table values(2,"abcde", "2014-02-01 09:22:55");
insert into base_table values(3,"zxvh", "2014-02-01 09:22:55");

CREATE TABLE incremental_table (id int,field1 STRING,modified_date timestamp);
insert into base_table values(1,"abcdddddddd", "2017-02-01 09:22:55");
insert into base_table values(2,"abcdedddddd", "2017-02-01 09:22:55");

select t1.* from (select * from base_table UNION select * from incremental_table) t1;

1       abcd    2014-02-01 09:22:55
1       abcdddddddd     2017-02-01 09:22:55
2       abcde   2014-02-01 09:22:55
2       abcdedddddd     2017-02-01 09:22:55
3       zxvh    2014-02-01 09:22:55


select az.* from (
select * from base_table 
UNION 
select * from incremental_table) az
JOIN
(select id,MAX(t1.modified_date) md_date from (select * from base_table UNION select * from incremental_table) t1 group by t1.id) cz
on az.id=cz.id and az.modified_date=cz.md_date;

O/P:
1       abcdddddddd     2017-02-01 09:22:55
3       zxvh    2014-02-01 09:22:55

2       abcdedddddd     2017-02-01 09:22:55

Tuesday, June 6, 2017

Replicated, Skewed and Merge Joins in Pig


Replicated Join- Same as Distributed cache, where smaller table is loaded to RAM and performs Map side Join.

Skewed Join - Used when data is unbalanced. The cases wherein one redcer gets more number of records and other reducer gets less records.

Eg: Records for USA might be more when compared to India. Here the records are unbalanced and recucer for processing USA records takes more time.

Skewed Join in Pig handles this scenario by presampling and identifies keys which have skewed data. The skewed data will be automatically split across multiple reducers

Merge Join - If the data is pre-sorted, then this kind of Join can be used which can skip sort phase. Sort operation is an expensive process and this can be skipped.

Kafka and integration with Spark Streaming


1. Number of partitions, replication has to be specified during the topic creation.
2. Follows Publish and subscribe mechanism
3. Consumer Group will read messages in the topic.
4. To achieve parallelism multiple consumers in consumer group will read the messages.
5. Consumers can join a group by using the samegroup.id.

The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.

Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.

Kafka guarantees that a message is only ever read by a single consumer in the group.

Consumers can see the message in the order they were stored in the log.

Consumers and Consumer Groups

Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic, and start receiving messages, validating them and writing the results. This may work well for a while, but what if the rate at which producers write messages to the topic exceeds the rate at which your application can validate them? If you are limited to a single consumer reading and processing the data, your application may fall farther and farther behind, unable to keep up with the rate of incoming messages. Obviously there is a need to scale consumption from topics. Just like multiple producers can write to the same topic, we need to allow multiple consumers to read from the same topic, splitting the data between them.
Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.
Let’s take topic T1 with four partitions. Now suppose we created a new consumer, C1, which is the only consumer in group G1, and use it to subscribe to topic T1. Consumer C1 will get all messages from all four t1 partitions. See Figure 4-1.



ktdg 04in01
Figure 4-1. One Consumer group with four partitions

If we add another consumer, C2, to group G1, each consumer will only get messages from two partitions. Perhaps messages from partition 0 and 2 go to C1 and messages from partitions 1 and 3 go to consumer C2. See Figure 4-2.



ktdg 04in02
Figure 4-2. Four partitions split to two consumer groups

If G1 has four consumers, then each will read messages from a single partition. See Figure 4-3.



ktdg 04in03
Figure 4-3. Four consumer groups to one partition each

If we add more consumers to a single group with a single topic than we have partitions, some of the consumers will be idle and get no messages at all. See Figure 4-4.



ktdg 04in04
Note: In order to have multiple consumers in a single single consumer group. We need to create multiple spark-streaming jobs with same group id, topic and different consumer name. There is no way to add multiple consumers in the same streaming job.

Source: https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html

There are 2 approaches for reading data from Kafka topics

1. Receiver approach (Old)  - KafkaUtils.CreateStream()
2. Direct Approach   - KafkaUtils.CreateDirectStream()

6. In Spark streaming KafkaUtils.CreateDirectStream API is used to read messages in Kafka

this API takes Streaming Context, Map of Kafka Parameters, set of topics to Read.

Each consumer in the group will call CreateDirectStream API to read stream from partition of the Topic.

https://spark.apache.org/docs/1.6.1/streaming-kafka-integration.html

Note: For creating Maven for Spark Steamin and kafka. Need to include, else error will be thrown at import org.apache.spark.streaming.kafka._

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0</version>
</dependency>

Eg: code is under https://github.com/eBay/Spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

7. By Default it will start consuming from latest offset in each partition, If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.

Advantages of Direct approach over receiver approach:


  • Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.



  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. 

Only 1 Disadvantage - This approach does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself

8. While submitting Spark Streaming Job Zookeeper Quorm port, Consumer groupID, Kafka Topic have to be mentioned additionally.

Kafka Message Structure:

This is a key value structure and the actual message is in value. CreateDirectStream returns DStream. Sample code is below:

package Sparkstreaming
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010
import org.apache.spark._
import org.apache.spark
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.sql.SparkSession

object streaming {

  def main(args: Array[String]) {

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "10.0.0.14:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "partition.assignment.strategy" -> "range",
      "group.id" -> "stream")

    val Array(topics) = Array("kafka_topic_name")
    val sparkConf = new SparkConf().setMaster("yarn").setAppName("Hpstreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topicsSet = topics.split(",").toSet
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent, Subscribe[String, String](topicsSet, kafkaParams))
    val spark = SparkSession.builder().getOrCreate()

    kafkaStream.foreachRDD(kafka_msg_rdd => {
      kafka_msg_rdd.cache()
      println(kafka_msg_rdd.partitions.size)
      val kafka_msg_df = spark.read.json(kafka_msg_rdd.map(record => (record.value)))
      if (kafka_msg_df.columns.size != 0) {
        kafka_msg_df.foreach(x => println(x))
      }
      kafka_msg_rdd.unpersist(true)
    })
    ssc.start()
    ssc.awaitTerminationOrTimeout(60 * 1000)
    ssc.stop(false, true)
  }
}

Note: Seek functionality is available to read data in a Partition base on Offset. However this is not implemented while using createDirectStream().
Eg:
consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);

Sample project in scala implementing Direct Approach is under,


https://drive.google.com/open?id=0BzG0wQkWbKpLOTNHMjNUaVFsd2c

Query:

Dynamic increase of partitions on exceeding partition capacity.


How to gracefully stop Kafka-spark streaming job?

Option 1: ssc.awaitTerminationOrTimeout(noofMilliseconds)

Option 2: When the streaming job reads messages from Kafka topic we can send message as a signal to indicate the streaming job to exit. On reading this message an if condition can be specified in the streaming job which can then call ssc.stop(stopSparkContext = true, stopGracefully = true) OR Place a marker file in a certain location and streaming job checks this file to get the flag for calling ssc.stop().


Practicals:

Kafka Installation steps:

Below is the Basic Approach - Only for POC's.

In production Kafka will be installed in the form of Parcels.

For more info follow https://kafka.apache.org/quickstart#quickstart_startserver

1. Extract Kafka zip file
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance. 

bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start Broker

[cloudera@quickstart Kafka]$ cd kafka_2.11-0.9.0.0
[cloudera@quickstart kafka_2.11-0.9.0.0]$ bin/kafka-server-start.sh config/server.properties

In the server.properties give the host name of zookeeper in the field, zookeeper.connect

eg:
zookeeper.connect=zk0-nadeem.gcow501a4m5ehgp0vqbg3nk5vb.rx.internal.cloudapp.net:2181

3. Create Topic
IN New terminal

If  2181 port is occupied then change the port to 2182 and update it accordingli in ll the places.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic2

To get the list of availabe topics use --list

Eg: bin/kafka-topics.sh --list --zookeeper zk0-nadeem.gcow501a4m5ehgp0vqbg3nk5vb.rx.internal.cloudapp.net:2181

4. Start producer to send messages

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic2

5. Start Consumer - IN new terminal

bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic topic2 --from-beginning

For Installing via Parcels:

http://community.cloudera.com/t5/Data-Ingestion-Integration/Kafka-service-Failed-to-start-in-CDH-5-4-1-Showing-error/m-p/28438?&_ga=1.204331284.545008642.1477911892#M886

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_installing.html

Follow

Installing or Upgrading Kafka from a Parcel
Required Role

In Cloudera Manager, download the Kafka parcel, distribute the parcel to the hosts in your cluster, and then activate the parcel. See Managing Parcels. After you activate the Kafka parcel, Cloudera Manager prompts you to restart the cluster. You do not need to restart the cluster after installing Kafka. Click Close to ignore this prompt.
Add the Kafka service to your cluster. See Adding a Service.

To submit a Spark streaming Job,

spark-submit --class "Sparkstreamingjobs.renderkafka" --master yarn --num-executors 12 --executor-cores 12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0" /home/app/Leela/Sparkprojs-1.0.jar 


Spark-Python command:

spark-submit --master yarn --num-executors 12 --executor-cores 12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0" one_column4.py

pyspark --packages "com.databricks:spark-csv_2.11:1.5.0" --master local[*] csv.py


Note: Python Job can be sybmitted via spark-submit or pyspark


starting spark-shell with exported packages:


eg:spark-shell --packages "org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0"

Cassandra and Cassandra Spark connector

Cassandra:


Features:

-> Highly Scalable.
-> no Single point of failure - Not Master Slave Architecture
-> Fast writes
-> Data Replication across data centres.  - Commolly used to create a database that is spread across nodes in more than one data centres, for high availability.

The key components of Cassandra are as follows −


Node − It is the place where data is stored.

Data center − It is a collection of related nodes.

Cluster − A cluster is a component that contains one or more data centers.

Commit log − The commit log is a crash-recovery mechanism in Cassandra. Every write operation is written to the commit log.

Mem-table − A mem-table is a memory-resident data structure. After commit log, the data will be written to the mem-table. Sometimes, for a single-column family, there will be multiple mem-tables.

SSTable − It is a disk file to which the data is flushed from the mem-table when its contents reach a threshold value.

Bloom filter − These are nothing but quick, nondeterministic, algorithms for testing whether an element is a member of a set. It is a special kind of cache. Bloom filters are accessed after every query.


In Cassandra, terminology comparison with RDBMS

Database = Keyspace.
tables = tables

Write Operations

Every write activity of nodes is captured by the commit logs written in the nodes. Later the data will be captured and stored in the mem-table. Whenever the mem-table is full, data will be written into the SStable data file. All writes are automatically partitioned and replicated throughout the cluster. Cassandra periodically consolidates the SSTables, discarding unnecessary data.

Read Operations

During read operations, Cassandra must combine results from the active memtable and potentially multiple SSTables.
Cassandra gets values from the mem-table and checks the bloom filter to find the appropriate SSTable that holds the required data.

The syntax of creating a Keyspace is as follows −

CREATE KEYSPACE Keyspace name
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};


CQL - Cassandra Query Language is the language used to access/write data to cassandra.
Shell prompt used is Cqlsh

SELECT,INSERT,UPDATE,DELETE,WHERE,ORDERBY are same as normal SQL.

Few other additions are:

Copy
This command copies data to and from Cassandra to a file. Given below is an example to copy the table named emp to the file myfile.

cqlsh:tutorialspoint> COPY emp (emp_id, emp_city, emp_name, emp_phone,emp_sal) TO ‘myfile’;

Cassanndra via JAVA:

//Creating Cluster object
      Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
   
      //Creating Session object
      Session session = cluster.connect("tp");
 
      //Executing the query
      session.execute(query);

SPARK- Cassandra Connector

 Datastax provided JARS those can be imported which does enhnace Spark Context such that Read(CassandraTable()) and Write(SaveToCassandra()) are available.

There is no direct option of HIVE - Cassandra Integration.

We need to use 3rd party Jars for connecting Spark with Cassandra. Follow

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md

Key points are:

1. Enable Cassandra-specific functions on the SparkContextSparkSessionRDD, and DataFrame.

2.  saveToCassandra() is Cassandra specific RDD function and comes from the imported connector JAR

Saving data from RDD to Cassandra

Writing Example by Adding two more rows to the table:
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))       
Here, test is keyspace
kv is table name
columns names to which data is being written are "key" and "value".

3. cassandraTable() is Cassandra specific SparkContext and comes from the imported connector JAR

Read OR Loading and analyzing data from Cassandra

Use the sc.cassandraTable method to view this table as a Spark RDD:
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)