Tuesday, August 29, 2017
Monday, August 28, 2017
NIFI and OOZIE Flow
Nifi Installation https://nifi.apache.org/docs/nifi-docs/html/getting-started.html
This document describes the data flow for spark job in which NIFI acts as source point for data ingestion from Rest API.
This document describes the data flow for spark job in which NIFI acts as source point for data ingestion from Rest API.
High level data flow:
1. Fetching data from REST: Need to
pass URL to the REST API to fetch data, this call includes start and dates.
2. Copying to HDFS: Store the fetched
file in HDFS for the spark application to use it as its input file.
3. Invoking Spark job from OOZIE.
4. Backup the fetched file in
HDFS.
Note: The above
specified activity happens for a single flow. In a given day, if this flow is
executed twice then this leads to second file generation from REST and it
ultimately copies this file in Spark and Backup locations in HDFS.
Description of each
activity:
1.
Fetching
data from REST: This involves NIFI to read data from Source(REST) and
destination is HDFS
Below
are the sub-activities:
·
Getting start and end dates: These parameters
are being used in URL to fetch data from,
NIFI generates this information in the form of
properties which is implemented in the processor “UpdateAttribute”. A
limitation in NIFI which cannot have this processor as the source point and
this needs an Upstream Processor, so ‘ManualGetFileTesting’ processor has been
created as start of the job and acts as upstream processor for ‘UpdateAttribute’.
Every time this processor passes a dummy file from the location \\file1\corporate_open_share\IT\test to the NIFI
flow.
·
Reading from REST:
‘InvokeHTTP’ processor does read file from REST (https://XYZ?startDate=${StartDate}&endDate=${EndDate}&language=en). This
generates the source file which is in the form of CSV.
‘GenerateFlowFile’ processor is an automated form of
‘ManualGetFiletesting’ and is scheduled every day at 4:00 p.m UTC time.
2.
Passing the
CSV file to Spark JOB:
Spark Job will fetch the file from a HDFS location
“/user/Leela_1/data/nifi_file” as input and removes the file
on successful completion of the job.
‘PutHDFS’ processor copies the CSV file to HDFS. The file name
would be ‘oil-data.csv’
Triggering Spark Job:
Spark Job is triggered via OOZIE every day at 4:30 p.m UTC
timestamp.
3.
Backing up
the data to HDFS:
The same CSV file is backed in a HDFS location
“hdfs://cluster-namenode.com:8020/user/Leela_1//${date.dir}” based on
date attribute that was set as property in step 1.
4.
Triggering
OOZIE Job:
Oozie is a Job scheduler to run jobs in Hadoop.
There are 3 Kinds of jobs:
Workflow –
Scheduling on demand
Coordinator– Scheduling
as per time and frequency
Bundle – defines
and execute a bunch of coordinator applications
Coordinator jobs have been implemented for
Spark job execution.
Files to be in local file system:
1. Job.properties
Files to be in HDFS:
1. Job.properties
2. Workflow.xml
3. Spark-Coordinate.xml
4. Shell script file
Job.properties
================================================================
nameNode=hdfs://cluster-namenode.com:8020
jobTracker=dev-jp1.cluster.com:8050
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
ProcessName=vi1
wfroot=${nameNode}/user/Leela_1/jobs/oozie/test
jobStart=2017-08-27T00:00Z
jobEnd=2027-08-27T00:00Z
user.name=Leela_1
myscript=myscript.sh
myscriptPath=${nameNode}/user/Leela_1/jobs/oozie/test/myscript.sh
oozie.coord.application.path=${wfroot}/spark-coord.xml
This job.properties defines properties like nameNode,jobtracker
etc.
Variables will be set here which are used in job execution and
they are:
wfroot
is
the variable that specifies all the above mentioned files in HDFS for job
execution.
myscript
Shell
script file that executes Spark Job.
oozie.coord.application.path
Path
of spark-coord.xml (this would be replaced with oozie.wf.application.path in
case of workflow jobs)
Spark-coord.xml
<?xml version="1.0" encoding="UTF-8"?>
<coordinator-app name="spark-coord"
frequency="${coord:minutes(10)}" start="${jobStart}"
end="${jobEnd}" timezone="UTC"
xmlns="uri:oozie:coordinator:0.4">
<action>
<workflow>
<app-path>${wfroot}/workflow.xml</app-path>
</workflow>
</action>
</coordinator-app>
Specifies frequency,start,end and path of the Work-flow.xml
file.
Values for ${jobStart} and ${jobEnd} have been specified in job.properties.
Workflow.xml
<workflow-app
xmlns="uri:oozie:workflow:0.4" name="shell-wf">
<start to="shell-node"/>
<action name="shell-node">
<shell
xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${myscript}</exec>
<file>${myscriptPath}</file>
<capture-output/>
</shell>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Shell action failed,
error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="fail-output">
<message>Incorrect output,
expected [Hello Oozie] but was
[${wf:actionData('shell-node')['my_output']}]</message>
</kill>
<end name="end"/>
</workflow-app>
This uses variables set in job.properties and Spark-coord.xml
and executes the Shell script in HDFS under ${nameNode}/user/Leela_1/jobs/oozie/test/myscript.sh
The action executed
is <shell >
type
which copy jar file and executes spark Job.
myscript.sh
#!/bin/bash
#command to call spark
#!/bin/bash
if [ -e
/tmp/xyz-0.0.1-SNAPSHOT.jar ]
then
rm /tmp/xyx-0.0.1-SNAPSHOT.jar
fi
hadoop fs -get
hdfs://dev-nn1.cluster-namenode.com:8020/user/Leela_1/hydraulic-0.0.1-SNAPSHOT.jar
/tmp/
chmod 777
/tmp/xyz-0.0.1-SNAPSHOT.jar
export SPARK_MAJOR_VERSION=2
spark-submit --class
com.xyz.Code --master yarn --num-executors 12 --executor-cores
12 --executor-memory 4096m --driver-memory 2048m --driver-cores 5 --packages "com.databricks:spark-avro_2.11:3.2.0,com.databricks:spark-csv_2.11:1.5.0"
/tmp/xyz-0.0.1-SNAPSHOT.jar
hdfs://hddev-nn1.xyzinc.com:8020/user/Leela_1/data.csv
abc_data_test
Scheduling Job
command: The above job can be scheduled via the command
submit and running:
oozie job -oozie
http://hddev-namenodeinc.com:11000/oozie -config
job.properties -run
Few other oozie
commands:
validating Xml file
oozie validate -oozie http://hddev-namenodeinc.com:11000/oozie
hdfs://hddev-nn1.namenodeinc.com:8020/user/narendra26954/jobs/oozie/test/workflow.xml
Job info
oozie job -oozie
http://hddev-namenodeinc.com:11000/oozie -info 0000107-170815214258044-oozie-oozi-W
kill job
oozie job -oozie http://hddev-sn1.namenodeinc.com:11000/oozie --kill
0000042-170824180706071-oozie-oozi-C
Validating the
Job Status:
All the
scheduled jobs and their status can be viewed in Oozie UI. http://hddev-sn1.namenodeinc.com:11000/oozie/
Also OOZIE Job status can be seen in HUE
oozie commands
1] To submit job - Goto to directory containing job.properties and run following command.
oozie job --oozie http://<HOSTNAME>:11000/oozie --config job.properties -submit
2] To kill a job
oozie job --oozie http://<HOSTNAME>:11000/oozie --kill [coord_jobid]
3]To suspend a job
oozie job --oozie http://<HOSTNAME>:11000/oozie --suspend [coord_jobid]
4]To resume suspended job(coord_jobid is the one used which is suspended)
oozie job --oozie http://<HOSTNAME>:11000/oozie --resume [coord_jobid]
5] To restart a failed workflow.
oozie job -rerun [parent_workflow_jobid] -Doozie.wf.rerun.failnodes=true
Resultant of the Job execution:
The CSV file is
taken as Spark job input and a Hive table “abc_temp”
would be created post processing.
Hive table schema:
Hive table schema:
CREATE EXTERNAL TABLE
abc_temp
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
'hdfs://hddev-nn1.namenode.com:8020/user/Leela_1/abc_data_test'
TBLPROPERTIES
('avro.schema.url'='hdfs://hddev-nn1.namenode.com/user/Leela_1/abc_v_5.avsc',"hive.input.dir.recursive"
= "TRUE", "hive.mapred.supports.subdirectories" =
"TRUE",
"hive.supports.subdirectories" = "TRUE", "mapred.input.dir.recursive" =
"TRUE");
Note: Using Kite abc_v_5.avsc was generated using sample
CSV file sent initially.
Eg:
Kite Installation:
curl http://central.maven.org/maven2/org/kitesdk/kite-tools/1.1.0/kite-tools-1.1.0-binary.jar -o kite-dataset
A csv file with Header like,
id,Name,Location,Desg
1,Leela,Hyd,SW Engg
Command to extract schema:
kite-dataset csv-schema 1.csv --class Sample -o sample.avsc
Use this avsc in TBLProperties in Hive table creation.
Eg:
Kite Installation:
curl http://central.maven.org/maven2/org/kitesdk/kite-tools/1.1.0/kite-tools-1.1.0-binary.jar -o kite-dataset
A csv file with Header like,
id,Name,Location,Desg
1,Leela,Hyd,SW Engg
Command to extract schema:
kite-dataset csv-schema 1.csv --class Sample -o sample.avsc
Use this avsc in TBLProperties in Hive table creation.
NIFI Installation in cluster mode
Installation
& Configuration of
NiFi
on AWS-EMR cluster
Author
|
Date
|
Version
|
Devender
Chauhan
|
14/06/2018
|
1
|
This
documentation describes the process required to install and configure NIFI
application which helps the user to set up the environment correctly.
Apache NiFi is an easy to use, powerful, and reliable system to
process and distribute data.
Apache NiFi is based on technology previously called “Niagara
Files” that was in development and used at scale within the NSA for the last 8
years and was made available to the Apache Software Foundation through the NSA
Technology Transfer Program. Some of the use cases include, but are not limited
to:
·
Big Data Ingest– Offers a simple, reliable and secure way to
collect data streams.
·
IoAT Optimization– Allows organizations to overcome real world
constraints such as limited or expensive bandwidth while ensuring data quality
and reliability.
·
Compliance– Enables organizations to understand everything that
happens to data in motion from its creation to its final resting place, which
is particularly important for regulated industries that must retain and report
on chain of custody.
·
Digital Security– Helps organizations collect large volumes of
data from many sources and prioritize which data is brought back for analysis
first, a critical capability given the time sensitivity of identifying security
breache
=> Create EMR cluster with 1 master and 2 datanodes :
=> Here is the current configuration of our EMR cluster :
1 MasterNode : IP : 172.30.0.240,
Hostname : ip-172-30-0-240.ec2.internal
2 DataNodes : IP
: 172.30.0.67, Hostname : ip-172-30-0-67.ec2.internal
IP : 172.30.0.10, Hostname : ip-172-30-0-10.ec2.internal
·
Operating system :
◦
Centos/RHEL 6/7 or Amazon Linux
·
Java : 1.7 or higher version.
Run below
command on all nodes to install java (By Default it’s installed on EMR
Machines).
Command :
#yum
install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
Here
we are installing NIFI on 3 machines namely :-
172.30.0.240 ----- ip-172-30-0-240.ec2.internal ----- Master Node
172.30.0.67 ------ ip-172-30-0-67.ec2.internal -----
Slave Node1
172.30.0.10 ------ ip-172-30-0-10.ec2.internal
----- Slave Node2
·
Passwordless SSH on all nodes of NIFI
cluster using root user.
Generate
Public key and Private key using below command on both nodes on NIFI cluster.
#ssh-keygen -t dsa (Press Enter)
[root@ip-172-30-0-240 .ssh]# ssh-keygen -t dsa
Generating public/private dsa key pair.
Enter file in which to save the key (/root/.ssh/id_dsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_dsa.
Your public key has been saved in /root/.ssh/id_dsa.pub.
The key fingerprint is:
SHA256:mqpkzsLOUxJomwoqcGOzLAMwW9ccYUB69mScgaeIkdk
root@ip-172-30-0-240.ec2.internal
The key's randomart image is:
+---[DSA 1024]----+
| + .oo+. |
|+ E..ooo |
|.o..o==. |
|=oooo+o |
|o++. . S |
|=+=. o |
|B+=+ o |
|OBo . |
|o*=.. |
+----[SHA256]-----+
Now
private and public key is generated for root user in /root/.ssh directory.
Upload public key of 172.30.0.240 into
authorized_keys of 172.30.0.240 (on same node) using below command.
#cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
Use SSH
from node 172.30.0.240 and upload new generated public key (id_rsa.pub) on slave nodes
(172.30.0.67 & 172.30.0.10) under root's .ssh directory as a file
name authorized_keys
#cat .ssh/id_rsa.pub | ssh 172.30.0.67 'cat >>
.ssh/authorized_keys'
#cat .ssh/id_rsa.pub | ssh 172.30.0.10 'cat >>
.ssh/authorized_keys'
·
Disable Firewall :
Do
the below changes on all nodes :
#service iptables stop
·
Set Hostname :
#hostname ip-172-30-0-240.ec2.internal (Set on masternode1)
#vim /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=ip-172-30-0-240.ec2.internal
NOZEROCONF=yes
##hostname ip-172-30-0-67.ec2.internal (Set on Slavenode1)
#vim /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=ip-172-30-0-67.ec2.internal
NOZEROCONF=yes
#hostname ip-172-30-0-10.ec2.internal (Set on Slavenode2)
#vim /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=ip-172-30-0-10.ec2.internal
NOZEROCONF=yes
·
Set Swappiness
Change
default Swappiness on all nodes :
#sysctl vm.swappiness=10
#echo "vm.swappiness = 10" >> /etc/sysctl.conf
·
Set /etc/hosts file :
[root@ip-172-30-0-240 .ssh]# cat
/etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 localhost6 localhost6.localdomain6
172.30.0.240
ip-172-30-0-240.ec2.internal ip-172-30-0-240
172.30.0.67 ip-172-30-0-67.ec2.internal ip-172-30-0-67
172.30.0.10 ip-172-30-0-10.ec2.internal ip-172-30-0-10
=> scp /etc/hosts ip-172-30-0-67.ec2.internal:/etc/
=> scp /etc/hosts ip-172-30-0-10.ec2.internal:/etc/
On
all nodes :
To
download tar file we need to install wget command if it's not available on
system.
Command:
#yum
install -y wget
#cd /opt
#wget http://public-repo-1.hortonworks.com/HDF/2.0.1.0/HDF-2.0.1.0-12.tar.gz
It will
download HDF-2.0.1.0-12.tar.gz file, now we need to extract this file.
#tar
-xzf HDF-2.0.1.0-12.tar.gz
#ls
HDF-2.0.1.0
#cd /opt/HDF-2.0.1.0/nifi/
we need to
do below configuration for nifi cluster in nifi.properties file.
#vim
conf/nifi.properties
(Do below changes in nifi.properties file)
Below
changes need to do in #web properties# section
# web
properties#
nifi.web.http.host=ip-172-30-0-240.ec2.internal
nifi.web.http.port=7070
Below
changes need to do in #Cluster node properties# section
# Cluster
node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=ip-172-30-0-240.ec2.internal
nifi.cluster.node.protocol.port=12000
Also add
below two properties in same section
nifi.cluster.node.unicast.manager.address=ip-172-30-0-240.ec2.internal
nifi.cluster.node.unicast.manager.protocol.port=12001
Add
#Cluster Manager properties# section after this.
# Cluster
Manager properties#
nifi.cluster.is.manager=true
nifi.cluster.manager.address=ip-172-30-0-240.ec2.internal
nifi.cluster.manager.protocol.port=12001
nifi.cluster.manager.protocol.threads=10
nifi.cluster.manager.node.event.history.size=25
nifi.cluster.manager.node.api.connection.timeout=5
sec
nifi.cluster.manager.node.api.read.timeout=5
sec
nifi.cluster.manager.node.firewall.file=
nifi.cluster.flow.election.max.wait.time=5
mins
nifi.cluster.flow.election.max.candidates=
Below
changes need to do in #zookeeper properties# section
#zookeeper
properties, used for cluster management#
nifi.zookeeper.connect.string=ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181
we need to
do below configuration for nifi cluster in state-management.xml file
#vim
conf/state-management.xml
(add the highlighted text in state-management.xml file)
<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect
String">ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181</property>
<property name="Root
Node">/nifi</property>
<property name="Session
Timeout">10 seconds</property>
<property name="Access
Control">Open</property>
</cluster-provider>
4.
Configuration
of NIFI on All Slavenodes (ip-172-30-0-67.ec2.internal &
ip-172-30-0-10.ec2.internal)
Run
below command from MasterNode to Slavenode1 : (ip-172-30-0-67.ec2.internal)
Copy Nifi
directory from ip-172-30-0-240.ec2.internal to ip-172-30-0-67.ec2.internal
#scp -r
/opt/HDF-2.0.1.0/nifi/* ip-172-30-0-67.ec2.internal:/opt/HDF-2.0.1.0/nifi/
Now
login on Slavenmode1 using root user and do the following nifi configuration :
we need to
do below configuration for nifi cluster in nifi.properties file.
#cd /opt/HDF-2.0.1.0/nifi/
#vim
conf/nifi.properties
(Do below changes in nifi.properties file)
Below
changes need to do in #web properties# section
# web
properties#
nifi.web.http.host=ip-172-30-0-67.ec2.internal
nifi.web.http.port=7070
Below
changes need to do in #Cluster node properties# section
# Cluster
node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=ip-172-30-0-67.ec2.internal
nifi.cluster.node.protocol.port=12000
nifi.cluster.node.unicast.manager.address=ip-172-30-0-67.ec2.internal
nifi.cluster.node.unicast.manager.protocol.port=12001
Delete
or Comment #Cluster Manager properties#
section in slave node.
# Cluster
Manager properties#
#nifi.cluster.is.manager=true
#nifi.cluster.manager.address=ip-172-30-0-67.ec2.internal
#nifi.cluster.manager.protocol.port=12001
#nifi.cluster.manager.protocol.threads=10
#nifi.cluster.manager.node.event.history.size=25
#nifi.cluster.manager.node.api.connection.timeout=5
sec
#nifi.cluster.manager.node.api.read.timeout=5
sec
#nifi.cluster.manager.node.firewall.file=
#nifi.cluster.flow.election.max.wait.time=5
mins
#nifi.cluster.flow.election.max.candidates=
Below
changes need to do in #zookeeper properties# section
#zookeeper
properties, used for cluster management#
nifi.zookeeper.connect.string=ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181
===========
Run
below command from MasterNode to Slavenode2 : ip-172-30-0-10.ec2.internal)
Copy Nifi
directory from ip-172-30-0-240.ec2.internal to ip-172-30-0-10.ec2.internal
#scp -r /opt/HDF-2.0.1.0/nifi/*
ip-172-30-0-10.ec2.internal:/opt/HDF-2.0.1.0/nifi
Now
login on Slavenmode1 using root user and do the following nifi configuration :
we need to
do below configuration for nifi cluster in nifi.properties file.
#cd /opt/HDF-2.0.1.0/nifi/
#vim
conf/nifi.properties
(Do below changes in nifi.properties file)
Below
changes need to do in #web properties# section
# web
properties#
nifi.web.http.host=ip-172-30-0-10.ec2.internal
nifi.web.http.port=7070
Below
changes need to do in #Cluster node properties# section
# Cluster
node properties (only configure for cluster nodes) #
nifi.cluster.is.node=true
nifi.cluster.node.address=ip-172-30-0-10.ec2.internal
nifi.cluster.node.protocol.port=12000
nifi.cluster.node.unicast.manager.address=ip-172-30-0-10.ec2.internal
nifi.cluster.node.unicast.manager.protocol.port=12001
Delete
or Comment #Cluster Manager properties#
section in slave node.
# Cluster
Manager properties#
#nifi.cluster.is.manager=true
#nifi.cluster.manager.address=ip-172-30-0-10.ec2.internal
#nifi.cluster.manager.protocol.port=12001
#nifi.cluster.manager.protocol.threads=10
#nifi.cluster.manager.node.event.history.size=25
#nifi.cluster.manager.node.api.connection.timeout=5
sec
#nifi.cluster.manager.node.api.read.timeout=5
sec
#nifi.cluster.manager.node.firewall.file=
#nifi.cluster.flow.election.max.wait.time=5
mins
#nifi.cluster.flow.election.max.candidates=
Below
changes need to do in #zookeeper properties# section
#zookeeper
properties, used for cluster management#
nifi.zookeeper.connect.string=ip-172-30-0-240.ec2.internal:2181,ip-172-30-0-67.ec2.internal:2181,ip-172-30-0-10.ec2.internal:2181
Run below
command on all nodes to start NIFI service
#/opt/HDF-2.0.1.0/nifi/bin/nifi.sh
start
Run below
command check status of NIFI service
#/opt/HDF-2.0.1.0/nifi/bin/nifi.sh
status
Open web
browser :
Subscribe to:
Posts (Atom)