Thursday, March 16, 2017

Spark Streaming

val conf = new SparkConf().SetMaster("local[2]").SetAppName("NetworkWordcount");

///"local" -> default 1 thread, if only 1 thread is engaged starving problem will be created. Means as 1 ///thread is engaged into capturing events from source, to buffer batch another thread is required. So ///declared as 2


val ssc = new StreamingContext(conf, scconds(5))


val lines = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word,1))

val wordscounts = pairs.reduceByKey(_+_)

ssc.start()        ///Job will be initiated here.
ssc.awaitTermination()

///In the above case the source is socket, so used socketTextStream.
//If the source is text file use,
              val lines = ssc.textFileStream("..//Path of file");

To run a spark streaming job from JAR file

spark-submit --class "Sparkstreaming.renderkafka" --master local[2] --deploy-mode client /home/hadoop/Leela/WC/Sparkprojs-1.0.jar 5

To run a spark Job from terminal, instead of JAR file,

export SPARK_MAJOR_VERSION=2
spark-submit --class com.hp.Code1 --master yarn --num-executors 12 --executor-cores 12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "com.databricks:spark-avro_2.11:3.2.0,com.databricks:spark-csv_2.11:1.5.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2" /home/sathwik26782/streaming_jar_file.jar

--packages have to be specified in the format of , groupId:artifactId:version

Note: By the above specification the Jars would be downloaded, however few times we come across 
"You probably access the destination server through a proxy server that is not well configured." and see UNRESOLVED DEPENDENCIES  . Try again for second time as this could be a late response from host.

org.apache.spark:spark-streaming_2.11:2.1.0

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

2 comments: