Streamsets is a datapipeline tool and has multiple built in ready to use processors through which pipelines can be build. Few examples are below:
1. Incrementally ingest records from RDBMS to S3 location with lookups applied.
Select Origin as 'JDBC Query Consumer' -> Configuration 'JDBC' provide the JDBC URL.
jdbc:sqlserver://Databasehostname:1433;database=retaildb
SQL Query:
SELECT A.*
,B1.TYPECODE as pctl_TypeCode,B1.NAME as pctl_Name,B1.DESCRIPTION as pctl_Description
FROM retail_contact A
LEFT OUTER JOIN pctl_typelist B1 ON (A.Subtype=B1.ID)
where A.UpdateTime > '${OFFSET}' ORDER BY A.UpdateTime
InitialOffset: 1970-01-01
Offset Column: UpdateTime
Destination: Amazon S3
Bucket: poc-bucket
Note: Don't provide s3:\\poc-bucket as this throws error. Just provide poc-bucket.
Common prefix: retaildb //A directory name that can have the table data in this root directory.
Partition Prefix: Table1/ingestiondt=${YYYY()}-${MM()}-${DD()} //This would ingest data as per date partition
Data Format: Delimited
Default Format: CSV
Header Line: With Header Line.
2. Incrementally ingest data from Multiple tables to independent directories as per table names Without Lookups.
Select Origin as 'JDBC Multitable Consumer' and provide 'JDBC connection String'
Schema: %
Table Pattern: retailtables_%
Offset Columns: UpdateTime
Select Destination as 'Amazon S3'
Bucket: poc-bucket
Common prefix: retaildb
Partition Prefix: ${record:attribute('jdbc.tables')}/ingestiondt=${YYYY()}-${MM()}-${DD()} //This would create directories with table names and insert as per date Partition. 'jdbc.tables' is a variable through which table name is set.
If there is a need to pull data from a certain date(non-historical) then set,
Initial Offset: UpdateTime : ${time:dateTimeToMilliseconds(time:extractDateFromString('2018-10-01 00:00:00.000','yyyy-MM-dd HH:mm:ss.SSS'))}
1. Incrementally ingest records from RDBMS to S3 location with lookups applied.
Select Origin as 'JDBC Query Consumer' -> Configuration 'JDBC' provide the JDBC URL.
jdbc:sqlserver://Databasehostname:1433;database=retaildb
SQL Query:
SELECT A.*
,B1.TYPECODE as pctl_TypeCode,B1.NAME as pctl_Name,B1.DESCRIPTION as pctl_Description
FROM retail_contact A
LEFT OUTER JOIN pctl_typelist B1 ON (A.Subtype=B1.ID)
where A.UpdateTime > '${OFFSET}' ORDER BY A.UpdateTime
InitialOffset: 1970-01-01
Offset Column: UpdateTime
Destination: Amazon S3
Bucket: poc-bucket
Note: Don't provide s3:\\poc-bucket as this throws error. Just provide poc-bucket.
Common prefix: retaildb //A directory name that can have the table data in this root directory.
Partition Prefix: Table1/ingestiondt=${YYYY()}-${MM()}-${DD()} //This would ingest data as per date partition
Data Format: Delimited
Default Format: CSV
Header Line: With Header Line.
2. Incrementally ingest data from Multiple tables to independent directories as per table names Without Lookups.
Select Origin as 'JDBC Multitable Consumer' and provide 'JDBC connection String'
Schema: %
Table Pattern: retailtables_%
Offset Columns: UpdateTime
Select Destination as 'Amazon S3'
Bucket: poc-bucket
Common prefix: retaildb
Partition Prefix: ${record:attribute('jdbc.tables')}/ingestiondt=${YYYY()}-${MM()}-${DD()} //This would create directories with table names and insert as per date Partition. 'jdbc.tables' is a variable through which table name is set.
If there is a need to pull data from a certain date(non-historical) then set,
Initial Offset: UpdateTime : ${time:dateTimeToMilliseconds(time:extractDateFromString('2018-10-01 00:00:00.000','yyyy-MM-dd HH:mm:ss.SSS'))}
GOOD INFORMATION
ReplyDeleteBig Data and Hadoop Online Training
Execellent Blog with Very Useful Information.
ReplyDeleteThank you,Keep updating More Blogs.
big data and hadoop training
big data hadoop course
big data and hadoop online training
big data training