Written custom Interceptor for Flume.
Problem: Messages are in Kafka in Avro format and has been encoded with 8 bytes of text in front. Need to extract each message by removing the first 8 bytes and send this message to HDFS Location which is a Hive external table location.
Solution: This interceptor is required to remove finger print, so the logic has to decode the message and remove first 8 bytes and convert the data to AVRO format and send to sink.
Below is the approach:
1. create a class that implements Interceptor.
2. Override public Event intercept(Event event) method.
3. Below is the logic in intercept method,
byte[] eventBody = event.getBody();
try
{
Schema SCHEMA = new Schema.Parser().parse(CustomInterceptor.class.getClassLoader().getResourceAsStream("djr.avsc"));
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(SCHEMA);
BinaryDecoder binaryDecoder = new DecoderFactory().binaryDecoder(eventBody, 8, eventBody.length - 8, null);
GenericRecord jsonRecord = reader.read(null, binaryDecoder);
//////////JSON conversion to AVRO/////////
File file = new File("/home/gorrepat/record.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(SCHEMA);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(SCHEMA, file);
dataFileWriter.append(jsonRecord);
///////////////JSON conversion to AVRO///////////
event.setBody(str.getBytes());
}
4. Flume configuration,
agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = hdfs-sink
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.topic = cdpTopic
agent.sources.kafka-source.groupId = flume-id
agent.sources.kafka-source.channels = memory-channel
agent.sources.kafka-source.interceptors = etpr-parameters-interceptor custominterceptor
#getting serialized data from CustomInterceptor class
agent.sources.kafka-source.interceptors.custominterceptor.type= com.djr.interceptor.CustomInterceptor$Builder
agent.sinks.hdfs-sink.hdfs.path = hdfs://mas:8020/var/lib/hadoop-hdfs/mas_test_data/scripts/json_test/CustomInterceptorJson/date=%y-%m-%d
//////For CDP topic data is directly written to HDFS location. This would write to a partitioned directory with date=. %y%m and %d values are passed by flume.///////////
/////Below sink is for CRM plugin which uses kite data set.
sinks:
crm-plugin-kite-sink-hive-1:
channels:
- crm-plugin-mem-channel-hive
config:
type: org.apache.flume.sink.kite.DatasetSink
# Kite dataset url for hive.
kite.repo.uri: 'repo:hive'
# Kite dataset name for hive.
kite.dataset.name: inv_all
# Batch commit size.
kite.batchSize: 100000
# Batch timeout in seconds.
kite.rollInterval: 30
Note:
- Need to copy the custom inceptor JAR file under flume lib dir(/usr/lib/flume-ng/lib) and specify the class name of the custom interceptor under agent.sources.kafka-source.interceptors.custominterceptor.type
- Kite data set plug-in is used in sink to write data to Hive tables. To use Kite data set need to copy kite-data-hive.jar to flume lib dir(/usr/lib/flume-ng/lib) and specify type: org.apache.flume.sink.kite.DatasetSink.
Kite dataset can be used for writing data to HBase as well using kite-data-hbase.jar. There are many options available in Kite data set
Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
ReplyDeleteBig Data Hadoop training in electronic city