Monday, August 28, 2017

NIFI and OOZIE Flow

Nifi Installation https://nifi.apache.org/docs/nifi-docs/html/getting-started.html

This document describes the data flow for spark job in which NIFI acts as source point for data ingestion from Rest API.

High level data flow:
1.    Fetching data from REST: Need to pass URL to the REST API to fetch data, this call includes start and dates.

2.    Copying to HDFS: Store the fetched file in HDFS for the spark application to use it as its input file.

3.    Invoking Spark job from OOZIE.

4.    Backup the fetched file in HDFS.

Note: The above specified activity happens for a single flow. In a given day, if this flow is executed twice then this leads to second file generation from REST and it ultimately copies this file in Spark and Backup locations in HDFS.

Description of each activity:
1.    Fetching data from REST: This involves NIFI to read data from Source(REST) and destination is HDFS
Below are the sub-activities:
·       Getting start and end dates: These parameters are being used in URL to fetch data from,


NIFI generates this information in the form of properties which is implemented in the processor “UpdateAttribute”. A limitation in NIFI which cannot have this processor as the source point and this needs an Upstream Processor, so ‘ManualGetFileTesting’ processor has been created as start of the job and acts as upstream processor for ‘UpdateAttribute’. Every time this processor passes a dummy file from the location \\file1\corporate_open_share\IT\test to the NIFI flow.

·       Reading from REST:
‘InvokeHTTP’ processor does read file from REST (https://XYZ?startDate=${StartDate}&endDate=${EndDate}&language=en). This generates the source file which is in the form of CSV.

‘GenerateFlowFile’ processor is an automated form of ‘ManualGetFiletesting’ and is scheduled every day at 4:00 p.m UTC time.

2.    Passing the CSV file to Spark JOB:
Spark Job will fetch the file from a HDFS location “/user/Leela_1/data/nifi_file” as input and removes the file on successful completion of the job.
‘PutHDFS’ processor copies the CSV file to HDFS. The file name would be ‘oil-data.csv’
Triggering Spark Job:
Spark Job is triggered via OOZIE every day at 4:30 p.m UTC
timestamp.
3.    Backing up the data to HDFS:
The same CSV file is backed in a HDFS location “hdfs://cluster-namenode.com:8020/user/Leela_1//${date.dir}” based on date attribute that was set as property in step 1.




4.    Triggering OOZIE Job:

Oozie is a Job scheduler to run jobs in Hadoop. There are 3 Kinds of jobs:

Workflow – Scheduling on demand
Coordinator– Scheduling as per time and frequency
Bundle – defines and execute a bunch of coordinator applications

Coordinator jobs have been implemented for Spark job execution.

Files to be in local file system:
1.      Job.properties
Files to be in HDFS:
1.      Job.properties
2.      Workflow.xml
3.      Spark-Coordinate.xml
4.      Shell script file

Job.properties
================================================================
nameNode=hdfs://cluster-namenode.com:8020
jobTracker=dev-jp1.cluster.com:8050
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
ProcessName=vi1
wfroot=${nameNode}/user/Leela_1/jobs/oozie/test
jobStart=2017-08-27T00:00Z
jobEnd=2027-08-27T00:00Z
user.name=Leela_1
myscript=myscript.sh
myscriptPath=${nameNode}/user/Leela_1/jobs/oozie/test/myscript.sh
oozie.coord.application.path=${wfroot}/spark-coord.xml

This job.properties defines properties like nameNode,jobtracker etc.
Variables will be set here which are used in job execution and they are:
wfroot is the variable that specifies all the above mentioned files in HDFS for job execution.
myscript Shell script file that executes Spark Job.
oozie.coord.application.path Path of spark-coord.xml (this would be replaced with oozie.wf.application.path in case of workflow jobs)

Spark-coord.xml
<?xml version="1.0" encoding="UTF-8"?>
<coordinator-app name="spark-coord" frequency="${coord:minutes(10)}" start="${jobStart}" end="${jobEnd}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
  <action>
    <workflow>
      <app-path>${wfroot}/workflow.xml</app-path>
    </workflow>
  </action>
</coordinator-app>

Specifies frequency,start,end and path of the Work-flow.xml file.
Values for ${jobStart} and ${jobEnd} have been specified in job.properties.

Workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.4" name="shell-wf">
    <start to="shell-node"/>
    <action name="shell-node">
        <shell xmlns="uri:oozie:shell-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <exec>${myscript}</exec>
            <file>${myscriptPath}</file>
            <capture-output/>
        </shell>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <kill name="fail-output">
        <message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

This uses variables set in job.properties and Spark-coord.xml and executes the Shell script in HDFS under ${nameNode}/user/Leela_1/jobs/oozie/test/myscript.sh
The action executed is <shell > type which copy jar file and executes spark Job.

myscript.sh
#!/bin/bash
#command to call spark
#!/bin/bash
if [ -e /tmp/xyz-0.0.1-SNAPSHOT.jar ]
then
   rm /tmp/xyx-0.0.1-SNAPSHOT.jar
fi
hadoop fs -get hdfs://dev-nn1.cluster-namenode.com:8020/user/Leela_1/hydraulic-0.0.1-SNAPSHOT.jar /tmp/
chmod 777 /tmp/xyz-0.0.1-SNAPSHOT.jar
export SPARK_MAJOR_VERSION=2
spark-submit --class com.xyz.Code --master yarn --num-executors 12 --executor-cores 12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "com.databricks:spark-avro_2.11:3.2.0,com.databricks:spark-csv_2.11:1.5.0" /tmp/xyz-0.0.1-SNAPSHOT.jar hdfs://hddev-nn1.xyzinc.com:8020/user/Leela_1/data.csv abc_data_test


Scheduling Job command: The above job can be scheduled via the command

submit and running:
oozie job -oozie http://hddev-namenodeinc.com:11000/oozie -config  job.properties -run


Few other oozie commands:

validating Xml file
oozie  validate -oozie  http://hddev-namenodeinc.com:11000/oozie  hdfs://hddev-nn1.namenodeinc.com:8020/user/narendra26954/jobs/oozie/test/workflow.xml

Job info
oozie job -oozie http://hddev-namenodeinc.com:11000/oozie -info 0000107-170815214258044-oozie-oozi-W

kill job
oozie job -oozie  http://hddev-sn1.namenodeinc.com:11000/oozie --kill 0000042-170824180706071-oozie-oozi-C


Validating the Job Status:
All the scheduled jobs and their status can be viewed in Oozie UI. http://hddev-sn1.namenodeinc.com:11000/oozie/

Also OOZIE Job status can be seen in HUE

oozie commands


1] To submit job - Goto to directory containing job.properties and run following command.
oozie job --oozie http://<HOSTNAME>:11000/oozie --config job.properties -submit

2] To kill a job
oozie job --oozie http://<HOSTNAME>:11000/oozie --kill [coord_jobid]

3]To suspend a job
oozie job --oozie http://<HOSTNAME>:11000/oozie --suspend [coord_jobid]

4]To resume suspended job(coord_jobid is the one used which is suspended)
oozie job --oozie http://<HOSTNAME>:11000/oozie --resume [coord_jobid]

5] To restart a failed workflow.
oozie job -rerun [parent_workflow_jobid] -Doozie.wf.rerun.failnodes=true


Resultant of the Job execution:

The CSV file is taken as Spark job input and a Hive table “abc_temp” would be created post processing.

Hive table schema:

CREATE EXTERNAL TABLE abc_temp
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 'hdfs://hddev-nn1.namenode.com:8020/user/Leela_1/abc_data_test'
TBLPROPERTIES ('avro.schema.url'='hdfs://hddev-nn1.namenode.com/user/Leela_1/abc_v_5.avsc',"hive.input.dir.recursive" = "TRUE", "hive.mapred.supports.subdirectories" = "TRUE",    "hive.supports.subdirectories" = "TRUE",     "mapred.input.dir.recursive" = "TRUE");


Note: Using Kite abc_v_5.avsc was generated using sample CSV file sent initially.

Eg: 
Kite Installation:

curl http://central.maven.org/maven2/org/kitesdk/kite-tools/1.1.0/kite-tools-1.1.0-binary.jar -o kite-dataset

A csv file with Header like,

id,Name,Location,Desg
1,Leela,Hyd,SW Engg


Command to extract schema:
kite-dataset csv-schema 1.csv --class Sample -o sample.avsc

Use this avsc in TBLProperties in Hive table creation.


NIFI Installation in cluster mode








Installation & Configuration of
NiFi on AWS-EMR cluster









Author
Date
Version
Devender Chauhan
14/06/2018
1




Introduction
This documentation describes the process required to install and configure NIFI application which helps the user to set up the environment correctly.
Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.
Apache NiFi is based on technology previously called “Niagara Files” that was in development and used at scale within the NSA for the last 8 years and was made available to the Apache Software Foundation through the NSA Technology Transfer Program. Some of the use cases include, but are not limited to:
·         Big Data Ingest– Offers a simple, reliable and secure way to collect data streams.
·         IoAT Optimization– Allows organizations to overcome real world constraints such as limited or expensive bandwidth while ensuring data quality and reliability.
·         Compliance– Enables organizations to understand everything that happens to data in motion from its creation to its final resting place, which is particularly important for regulated industries that must retain and report on chain of custody.
·         Digital Security– Helps organizations collect large volumes of data from many sources and prioritize which data is brought back for analysis first, a critical capability given the time sensitivity of identifying security breache



























1.  Pre-requisites :

=> Create EMR cluster with 1 master and 2 datanodes :


=> Here is the current configuration of our EMR cluster :


1 MasterNode : IP : 172.30.0.240,   Hostname : ip-172-30-0-240.ec2.internal
2 DataNodes :   IP : 172.30.0.67,      Hostname : ip-172-30-0-67.ec2.internal
IP : 172.30.0.10,      Hostname : ip-172-30-0-10.ec2.internal

·        Operating system : 
   Centos/RHEL 6/7 or Amazon Linux

·        Java : 1.7 or higher version.

Run below command on all nodes to install java (By Default it’s installed on EMR Machines).
Command :

#yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

Here we are installing NIFI on 3 machines namely :-

172.30.0.240 ----- ip-172-30-0-240.ec2.internal    ----- Master Node
172.30.0.67  ------ ip-172-30-0-67.ec2.internal     ----- Slave Node1
172.30.0.10  ------ ip-172-30-0-10.ec2.internal      ----- Slave Node2

·        Passwordless SSH on all nodes of NIFI cluster using root user.

Generate Public key and Private key using below command on both nodes on NIFI cluster.
#ssh-keygen -t dsa (Press Enter)
[root@ip-172-30-0-240 .ssh]# ssh-keygen -t dsa
Generating public/private dsa key pair.
Enter file in which to save the key (/root/.ssh/id_dsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_dsa.
Your public key has been saved in /root/.ssh/id_dsa.pub.
The key fingerprint is:
SHA256:mqpkzsLOUxJomwoqcGOzLAMwW9ccYUB69mScgaeIkdk root@ip-172-30-0-240.ec2.internal
The key's randomart image is:
+---[DSA 1024]----+
| + .oo+.         |
|+ E..ooo         |
|.o..o==.         |
|=oooo+o          |
|o++.  . S        |
|=+=.   o         |
|B+=+  o          |
|OBo  .           |
|o*=..            |
+----[SHA256]-----+

Now private and public key is generated for root user in /root/.ssh directory.

Upload  public key of 172.30.0.240 into authorized_keys of 172.30.0.240 (on same node) using below command.

#cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

Use SSH from node 172.30.0.240 and upload new generated public key (id_rsa.pub) on slave nodes
(172.30.0.67  & 172.30.0.10)   under root's .ssh directory as a file name authorized_keys

#cat .ssh/id_rsa.pub | ssh 172.30.0.67 'cat >> .ssh/authorized_keys'
#cat .ssh/id_rsa.pub | ssh 172.30.0.10 'cat >> .ssh/authorized_keys'

·        Disable Firewall :
           
Do the below changes on all nodes :
#service iptables stop
           
·        Set Hostname :

#hostname ip-172-30-0-240.ec2.internal (Set on masternode1)
#vim /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=ip-172-30-0-240.ec2.internal
NOZEROCONF=yes

##hostname ip-172-30-0-67.ec2.internal (Set on Slavenode1)
#vim /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=ip-172-30-0-67.ec2.internal
NOZEROCONF=yes

#hostname ip-172-30-0-10.ec2.internal (Set on Slavenode2)
#vim /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=ip-172-30-0-10.ec2.internal
NOZEROCONF=yes

·        Set Swappiness

Change default Swappiness on all nodes :
#sysctl vm.swappiness=10
#echo "vm.swappiness = 10" >> /etc/sysctl.conf

·        Set /etc/hosts file :

[root@ip-172-30-0-240 .ssh]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost6 localhost6.localdomain6

172.30.0.240 ip-172-30-0-240.ec2.internal ip-172-30-0-240
172.30.0.67  ip-172-30-0-67.ec2.internal  ip-172-30-0-67
172.30.0.10  ip-172-30-0-10.ec2.internal  ip-172-30-0-10

=> scp /etc/hosts ip-172-30-0-67.ec2.internal:/etc/
=> scp /etc/hosts ip-172-30-0-10.ec2.internal:/etc/
2.  Installation Of NIFI
On all nodes :

To download tar file we need to install wget command if it's not available on system.
Command:

#yum install -y wget
#cd /opt
#wget http://public-repo-1.hortonworks.com/HDF/2.0.1.0/HDF-2.0.1.0-12.tar.gz

It will download HDF-2.0.1.0-12.tar.gz file, now we need to extract this file.

#tar -xzf  HDF-2.0.1.0-12.tar.gz
#ls
HDF-2.0.1.0

#cd /opt/HDF-2.0.1.0/nifi/

we need to do below configuration for nifi cluster in nifi.properties file.
#vim conf/nifi.properties (Do below changes in nifi.properties file)

Below changes need to do in #web properties# section
# web properties#
nifi.web.http.host=ip-172-30-0-240.ec2.internal
nifi.web.http.port=7070

Below changes need to do in #Cluster node properties# section
# Cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=ip-172-30-0-240.ec2.internal
nifi.cluster.node.protocol.port=12000

Also add below two properties in same section

nifi.cluster.node.unicast.manager.address=ip-172-30-0-240.ec2.internal
nifi.cluster.node.unicast.manager.protocol.port=12001


Add #Cluster Manager properties# section after this.

# Cluster Manager properties#
nifi.cluster.is.manager=true
nifi.cluster.manager.address=ip-172-30-0-240.ec2.internal
nifi.cluster.manager.protocol.port=12001
nifi.cluster.manager.protocol.threads=10
nifi.cluster.manager.node.event.history.size=25
nifi.cluster.manager.node.api.connection.timeout=5 sec
nifi.cluster.manager.node.api.read.timeout=5 sec
nifi.cluster.manager.node.firewall.file=
nifi.cluster.flow.election.max.wait.time=5 mins
nifi.cluster.flow.election.max.candidates=

Below changes need to do in #zookeeper properties# section

#zookeeper properties, used for cluster management#
nifi.zookeeper.connect.string=ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181


we need to do below configuration for nifi cluster in state-management.xml file
#vim conf/state-management.xml (add the highlighted text in state-management.xml file)
<cluster-provider>
        <id>zk-provider</id>
        <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
        <property name="Connect String">ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181</property>
        <property name="Root Node">/nifi</property>
        <property name="Session Timeout">10 seconds</property>
        <property name="Access Control">Open</property>
    </cluster-provider>





4.      Configuration of NIFI on All Slavenodes (ip-172-30-0-67.ec2.internal & ip-172-30-0-10.ec2.internal)

Run below command from MasterNode to Slavenode1 : (ip-172-30-0-67.ec2.internal)

Copy Nifi directory from ip-172-30-0-240.ec2.internal to ip-172-30-0-67.ec2.internal
#scp -r /opt/HDF-2.0.1.0/nifi/* ip-172-30-0-67.ec2.internal:/opt/HDF-2.0.1.0/nifi/

Now login on Slavenmode1 using root user and do the following nifi configuration :

we need to do below configuration for nifi cluster in nifi.properties file.
#cd  /opt/HDF-2.0.1.0/nifi/
#vim conf/nifi.properties (Do below changes in nifi.properties file)

Below changes need to do in #web properties# section
# web properties#
nifi.web.http.host=ip-172-30-0-67.ec2.internal
nifi.web.http.port=7070

Below changes need to do in #Cluster node properties# section
# Cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=ip-172-30-0-67.ec2.internal
nifi.cluster.node.protocol.port=12000
nifi.cluster.node.unicast.manager.address=ip-172-30-0-67.ec2.internal
nifi.cluster.node.unicast.manager.protocol.port=12001

Delete or Comment  #Cluster Manager properties# section in slave node.

# Cluster Manager properties#
#nifi.cluster.is.manager=true
#nifi.cluster.manager.address=ip-172-30-0-67.ec2.internal
#nifi.cluster.manager.protocol.port=12001
#nifi.cluster.manager.protocol.threads=10
#nifi.cluster.manager.node.event.history.size=25
#nifi.cluster.manager.node.api.connection.timeout=5 sec
#nifi.cluster.manager.node.api.read.timeout=5 sec
#nifi.cluster.manager.node.firewall.file=
#nifi.cluster.flow.election.max.wait.time=5 mins
#nifi.cluster.flow.election.max.candidates=

Below changes need to do in #zookeeper properties# section

#zookeeper properties, used for cluster management#
nifi.zookeeper.connect.string=ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181

===========

Run below command from MasterNode to Slavenode2 : ip-172-30-0-10.ec2.internal)

Copy Nifi directory from ip-172-30-0-240.ec2.internal to ip-172-30-0-10.ec2.internal
#scp -r /opt/HDF-2.0.1.0/nifi/* ip-172-30-0-10.ec2.internal:/opt/HDF-2.0.1.0/nifi

Now login on Slavenmode1 using root user and do the following nifi configuration :

we need to do below configuration for nifi cluster in nifi.properties file.
#cd  /opt/HDF-2.0.1.0/nifi/
#vim conf/nifi.properties (Do below changes in nifi.properties file)

Below changes need to do in #web properties# section
# web properties#
nifi.web.http.host=ip-172-30-0-10.ec2.internal
nifi.web.http.port=7070

Below changes need to do in #Cluster node properties# section
# Cluster node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=ip-172-30-0-10.ec2.internal
nifi.cluster.node.protocol.port=12000
nifi.cluster.node.unicast.manager.address=ip-172-30-0-10.ec2.internal
nifi.cluster.node.unicast.manager.protocol.port=12001

Delete or Comment  #Cluster Manager properties# section in slave node.

# Cluster Manager properties#
#nifi.cluster.is.manager=true
#nifi.cluster.manager.address=ip-172-30-0-10.ec2.internal
#nifi.cluster.manager.protocol.port=12001
#nifi.cluster.manager.protocol.threads=10
#nifi.cluster.manager.node.event.history.size=25
#nifi.cluster.manager.node.api.connection.timeout=5 sec
#nifi.cluster.manager.node.api.read.timeout=5 sec
#nifi.cluster.manager.node.firewall.file=
#nifi.cluster.flow.election.max.wait.time=5 mins
#nifi.cluster.flow.election.max.candidates=

Below changes need to do in #zookeeper properties# section

#zookeeper properties, used for cluster management#
nifi.zookeeper.connect.string=ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181

5.      Start & Stop NIFI Services on all Nodes

Run below command on all nodes to start NIFI service

#/opt/HDF-2.0.1.0/nifi/bin/nifi.sh start

Run below command check status of NIFI service

#/opt/HDF-2.0.1.0/nifi/bin/nifi.sh status
6.    Login to NIFI Application

Open web browser :