Tuesday, June 6, 2017

Cassandra and Cassandra Spark connector

Cassandra:


Features:

-> Highly Scalable.
-> no Single point of failure - Not Master Slave Architecture
-> Fast writes
-> Data Replication across data centres.  - Commolly used to create a database that is spread across nodes in more than one data centres, for high availability.

The key components of Cassandra are as follows −


Node − It is the place where data is stored.

Data center − It is a collection of related nodes.

Cluster − A cluster is a component that contains one or more data centers.

Commit log − The commit log is a crash-recovery mechanism in Cassandra. Every write operation is written to the commit log.

Mem-table − A mem-table is a memory-resident data structure. After commit log, the data will be written to the mem-table. Sometimes, for a single-column family, there will be multiple mem-tables.

SSTable − It is a disk file to which the data is flushed from the mem-table when its contents reach a threshold value.

Bloom filter − These are nothing but quick, nondeterministic, algorithms for testing whether an element is a member of a set. It is a special kind of cache. Bloom filters are accessed after every query.


In Cassandra, terminology comparison with RDBMS

Database = Keyspace.
tables = tables

Write Operations

Every write activity of nodes is captured by the commit logs written in the nodes. Later the data will be captured and stored in the mem-table. Whenever the mem-table is full, data will be written into the SStable data file. All writes are automatically partitioned and replicated throughout the cluster. Cassandra periodically consolidates the SSTables, discarding unnecessary data.

Read Operations

During read operations, Cassandra must combine results from the active memtable and potentially multiple SSTables.
Cassandra gets values from the mem-table and checks the bloom filter to find the appropriate SSTable that holds the required data.

The syntax of creating a Keyspace is as follows −

CREATE KEYSPACE Keyspace name
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};


CQL - Cassandra Query Language is the language used to access/write data to cassandra.
Shell prompt used is Cqlsh

SELECT,INSERT,UPDATE,DELETE,WHERE,ORDERBY are same as normal SQL.

Few other additions are:

Copy
This command copies data to and from Cassandra to a file. Given below is an example to copy the table named emp to the file myfile.

cqlsh:tutorialspoint> COPY emp (emp_id, emp_city, emp_name, emp_phone,emp_sal) TO ‘myfile’;

Cassanndra via JAVA:

//Creating Cluster object
      Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
   
      //Creating Session object
      Session session = cluster.connect("tp");
 
      //Executing the query
      session.execute(query);

SPARK- Cassandra Connector

 Datastax provided JARS those can be imported which does enhnace Spark Context such that Read(CassandraTable()) and Write(SaveToCassandra()) are available.

There is no direct option of HIVE - Cassandra Integration.

We need to use 3rd party Jars for connecting Spark with Cassandra. Follow

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md

Key points are:

1. Enable Cassandra-specific functions on the SparkContextSparkSessionRDD, and DataFrame.

2.  saveToCassandra() is Cassandra specific RDD function and comes from the imported connector JAR

Saving data from RDD to Cassandra

Writing Example by Adding two more rows to the table:
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))       
Here, test is keyspace
kv is table name
columns names to which data is being written are "key" and "value".

3. cassandraTable() is Cassandra specific SparkContext and comes from the imported connector JAR

Read OR Loading and analyzing data from Cassandra

Use the sc.cassandraTable method to view this table as a Spark RDD:
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)     

No comments:

Post a Comment