Saturday, January 5, 2019

StreamSets

Streamsets is a datapipeline tool and has multiple built in ready to use processors through which pipelines can be build. Few examples are below:

1. Incrementally ingest records from RDBMS to S3 location with lookups applied.
Select Origin as 'JDBC Query Consumer' -> Configuration 'JDBC' provide the JDBC URL.
jdbc:sqlserver://Databasehostname:1433;database=retaildb

 SQL Query:
SELECT A.*
,B1.TYPECODE as pctl_TypeCode,B1.NAME as pctl_Name,B1.DESCRIPTION as pctl_Description
FROM retail_contact A
LEFT OUTER JOIN pctl_typelist B1 ON (A.Subtype=B1.ID)
where A.UpdateTime > '${OFFSET}' ORDER BY A.UpdateTime

InitialOffset: 1970-01-01
Offset Column: UpdateTime

Destination: Amazon S3

Bucket: poc-bucket
Note: Don't provide s3:\\poc-bucket as this throws error. Just provide poc-bucket.

Common prefix: retaildb          //A directory name that can have the table data in this root directory.

Partition Prefix: Table1/ingestiondt=${YYYY()}-${MM()}-${DD()}     //This would ingest data as per date partition

Data Format: Delimited
Default Format: CSV
Header Line: With Header Line.

2. Incrementally ingest data from Multiple tables to independent directories as per table names Without Lookups.

Select Origin as 'JDBC Multitable Consumer' and provide 'JDBC connection String'
Schema: %
Table Pattern: retailtables_%
Offset Columns: UpdateTime


Select Destination as 'Amazon S3'
Bucket: poc-bucket
Common prefix: retaildb
Partition Prefix: ${record:attribute('jdbc.tables')}/ingestiondt=${YYYY()}-${MM()}-${DD()}   //This would create directories with table names and insert as per date Partition. 'jdbc.tables' is a variable through which table name is set.

If there is a need to pull data from a certain date(non-historical) then set,

Initial Offset: UpdateTime : ${time:dateTimeToMilliseconds(time:extractDateFromString('2018-10-01 00:00:00.000','yyyy-MM-dd HH:mm:ss.SSS'))}

2 comments: