Tuesday, June 6, 2017

Kafka and integration with Spark Streaming


1. Number of partitions, replication has to be specified during the topic creation.
2. Follows Publish and subscribe mechanism
3. Consumer Group will read messages in the topic.
4. To achieve parallelism multiple consumers in consumer group will read the messages.
5. Consumers can join a group by using the samegroup.id.

The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.

Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.

Kafka guarantees that a message is only ever read by a single consumer in the group.

Consumers can see the message in the order they were stored in the log.

Consumers and Consumer Groups

Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another data store. In this case your application will create a consumer object, subscribe to the appropriate topic, and start receiving messages, validating them and writing the results. This may work well for a while, but what if the rate at which producers write messages to the topic exceeds the rate at which your application can validate them? If you are limited to a single consumer reading and processing the data, your application may fall farther and farther behind, unable to keep up with the rate of incoming messages. Obviously there is a need to scale consumption from topics. Just like multiple producers can write to the same topic, we need to allow multiple consumers to read from the same topic, splitting the data between them.
Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.
Let’s take topic T1 with four partitions. Now suppose we created a new consumer, C1, which is the only consumer in group G1, and use it to subscribe to topic T1. Consumer C1 will get all messages from all four t1 partitions. See Figure 4-1.



ktdg 04in01
Figure 4-1. One Consumer group with four partitions

If we add another consumer, C2, to group G1, each consumer will only get messages from two partitions. Perhaps messages from partition 0 and 2 go to C1 and messages from partitions 1 and 3 go to consumer C2. See Figure 4-2.



ktdg 04in02
Figure 4-2. Four partitions split to two consumer groups

If G1 has four consumers, then each will read messages from a single partition. See Figure 4-3.



ktdg 04in03
Figure 4-3. Four consumer groups to one partition each

If we add more consumers to a single group with a single topic than we have partitions, some of the consumers will be idle and get no messages at all. See Figure 4-4.



ktdg 04in04
Note: In order to have multiple consumers in a single single consumer group. We need to create multiple spark-streaming jobs with same group id, topic and different consumer name. There is no way to add multiple consumers in the same streaming job.

Source: https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html

There are 2 approaches for reading data from Kafka topics

1. Receiver approach (Old)  - KafkaUtils.CreateStream()
2. Direct Approach   - KafkaUtils.CreateDirectStream()

6. In Spark streaming KafkaUtils.CreateDirectStream API is used to read messages in Kafka

this API takes Streaming Context, Map of Kafka Parameters, set of topics to Read.

Each consumer in the group will call CreateDirectStream API to read stream from partition of the Topic.

https://spark.apache.org/docs/1.6.1/streaming-kafka-integration.html

Note: For creating Maven for Spark Steamin and kafka. Need to include, else error will be thrown at import org.apache.spark.streaming.kafka._

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0</version>
</dependency>

Eg: code is under https://github.com/eBay/Spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

7. By Default it will start consuming from latest offset in each partition, If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.

Advantages of Direct approach over receiver approach:


  • Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.



  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. 

Only 1 Disadvantage - This approach does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself

8. While submitting Spark Streaming Job Zookeeper Quorm port, Consumer groupID, Kafka Topic have to be mentioned additionally.

Kafka Message Structure:

This is a key value structure and the actual message is in value. CreateDirectStream returns DStream. Sample code is below:

package Sparkstreaming
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010
import org.apache.spark._
import org.apache.spark
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.sql.SparkSession

object streaming {

  def main(args: Array[String]) {

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "10.0.0.14:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "partition.assignment.strategy" -> "range",
      "group.id" -> "stream")

    val Array(topics) = Array("kafka_topic_name")
    val sparkConf = new SparkConf().setMaster("yarn").setAppName("Hpstreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topicsSet = topics.split(",").toSet
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent, Subscribe[String, String](topicsSet, kafkaParams))
    val spark = SparkSession.builder().getOrCreate()

    kafkaStream.foreachRDD(kafka_msg_rdd => {
      kafka_msg_rdd.cache()
      println(kafka_msg_rdd.partitions.size)
      val kafka_msg_df = spark.read.json(kafka_msg_rdd.map(record => (record.value)))
      if (kafka_msg_df.columns.size != 0) {
        kafka_msg_df.foreach(x => println(x))
      }
      kafka_msg_rdd.unpersist(true)
    })
    ssc.start()
    ssc.awaitTerminationOrTimeout(60 * 1000)
    ssc.stop(false, true)
  }
}

Note: Seek functionality is available to read data in a Partition base on Offset. However this is not implemented while using createDirectStream().
Eg:
consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);

Sample project in scala implementing Direct Approach is under,


https://drive.google.com/open?id=0BzG0wQkWbKpLOTNHMjNUaVFsd2c

Query:

Dynamic increase of partitions on exceeding partition capacity.


How to gracefully stop Kafka-spark streaming job?

Option 1: ssc.awaitTerminationOrTimeout(noofMilliseconds)

Option 2: When the streaming job reads messages from Kafka topic we can send message as a signal to indicate the streaming job to exit. On reading this message an if condition can be specified in the streaming job which can then call ssc.stop(stopSparkContext = true, stopGracefully = true) OR Place a marker file in a certain location and streaming job checks this file to get the flag for calling ssc.stop().


Practicals:

Kafka Installation steps:

Below is the Basic Approach - Only for POC's.

In production Kafka will be installed in the form of Parcels.

For more info follow https://kafka.apache.org/quickstart#quickstart_startserver

1. Extract Kafka zip file
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance. 

bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start Broker

[cloudera@quickstart Kafka]$ cd kafka_2.11-0.9.0.0
[cloudera@quickstart kafka_2.11-0.9.0.0]$ bin/kafka-server-start.sh config/server.properties

In the server.properties give the host name of zookeeper in the field, zookeeper.connect

eg:
zookeeper.connect=zk0-nadeem.gcow501a4m5ehgp0vqbg3nk5vb.rx.internal.cloudapp.net:2181

3. Create Topic
IN New terminal

If  2181 port is occupied then change the port to 2182 and update it accordingli in ll the places.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic2

To get the list of availabe topics use --list

Eg: bin/kafka-topics.sh --list --zookeeper zk0-nadeem.gcow501a4m5ehgp0vqbg3nk5vb.rx.internal.cloudapp.net:2181

4. Start producer to send messages

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic2

5. Start Consumer - IN new terminal

bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic topic2 --from-beginning

For Installing via Parcels:

http://community.cloudera.com/t5/Data-Ingestion-Integration/Kafka-service-Failed-to-start-in-CDH-5-4-1-Showing-error/m-p/28438?&_ga=1.204331284.545008642.1477911892#M886

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_installing.html

Follow

Installing or Upgrading Kafka from a Parcel
Required Role

In Cloudera Manager, download the Kafka parcel, distribute the parcel to the hosts in your cluster, and then activate the parcel. See Managing Parcels. After you activate the Kafka parcel, Cloudera Manager prompts you to restart the cluster. You do not need to restart the cluster after installing Kafka. Click Close to ignore this prompt.
Add the Kafka service to your cluster. See Adding a Service.

To submit a Spark streaming Job,

spark-submit --class "Sparkstreamingjobs.renderkafka" --master yarn --num-executors 12 --executor-cores 12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0" /home/app/Leela/Sparkprojs-1.0.jar 


Spark-Python command:

spark-submit --master yarn --num-executors 12 --executor-cores 12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0" one_column4.py

pyspark --packages "com.databricks:spark-csv_2.11:1.5.0" --master local[*] csv.py


Note: Python Job can be sybmitted via spark-submit or pyspark


starting spark-shell with exported packages:


eg:spark-shell --packages "org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0"

2 comments:

  1. https://learnwithdevayan.blogspot.com/2019/04/apache-spark-streaming-listen-to-local.html#comment-form

    ReplyDelete
  2. Actually I read it yesterday but I had some thoughts about it and today I wanted to read it again because it is very well written. Film streaming

    ReplyDelete