Saturday, April 8, 2017

Sqoop Examples

Usecase to Import data

In a table which is have10k as of today trying to implement incremental logic so that when i run tomorrow need to capture only newly inserted and updated records.


[cloudera@quickstart ~]$ sqoop import --connect jdbc:mysql://localhost/Leela_db --username root --password cloudera --query="select * from trans1 where accno > 100 AND \$CONDITIONS" --incremental append --check-column accno --last-value 102 --target-dir 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Transaction_db' --m 1

The newly inserted values would be saved as a new file in the same directory.

To get the complete list of values imported then use,

hadoop fs -cat /user/Leela/Hive/Transaction_db/part-m-*

mysql> select * from trans1;
+-------+--------+
| accno | amount |
+-------+--------+
|   101 |   3000 |
|   102 |   2000 |
+-------+--------+

NOw update the table by adding one more entry and update 101 record.

mysql> select * from trans1;
+-------+--------+
| accno | amount |
+-------+--------+
|   102 |   2000 |
|   106 |   5000 |
|   101 |   5000 |
+-------+--------+

[cloudera@quickstart ~]$ sqoop import --connect jdbc:mysql://localhost/Leela_db --username root --password cloudera --query="select * from trans1 where accno > 100 AND \$CONDITIONS" --incremental append --check-column accno --last-value 102 --target-dir 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Transaction_db' --m 1

Follow
https://www.tutorialspoint.com/sqoop/sqoop_import.htm

Import All tables from a Database:

Imports all the tables from the RDBMS database server to the HDFS. Each table data is stored in a separate directory and the directory name is same as the table name.

sqoop import-all-tables --connect jdbc:mysql://localhost/Leela_db --username root --password cloudera --target-dir 'hdfs://quickstart.cloudera:8020/user/Leela/Hive/Transaction_db2'

Split-by



--split-by is used when there is no primary key in the database and used to divide the rows among multiple mappers equally. Need not be used in case of single mapper.


For splitting data Sqoop fires
SELECT MIN(col1), MAX(col2) FROM TABLE
then divide it as per you number of mappers.
Now take an example of integer as --split-by column
Table has some id column having value 1 to 100 and you using 4 mappers (-m 4 in your sqoop command)
Sqoop get MIN and MAX value using:
SELECT MIN(id), MAX(id) FROM TABLE
OUTPUT:
1,100
Splitting on integer is easy. You will make 4 parts:
  • 1-25
  • 25-50
  • 51-75
  • 76-100
Now string as --split-by column
Table has some name column having value "dev" to "sam" and you using 4 mappers 
n case of Integer example, all the mappers will get balanced load (all will fetch 25 records from RDBMS).
In case of string, there is less probability that data is sorted. So, it's difficult to give similar loads to all the mappers.

Reason to use : Sometimes the primary key doesn't have an even distribution of values between the min and max values(which is used to create the splits if --split-by is not available). In such a situation you can specify some other column which has proper distribution of data to create splits for efficient imports.
Split-by must be numeric because according to the specs: "By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits." The alternative is to use --boundary-query which also requires numeric columns. Otherwise the Sqoop job will fail. If you don't have such a column in your table the only workaround is to use only 1 mapper: "-m 1".

--boundary-query




--boundary-query used in case 


there is no primary key
Like to add a boundary condition similar as query while importing data
--boundary-query 'select min(order_id), max(order_id) from orders where order_id > 9999 AND $CONDITIONS'
This condition would set the boudary condition to import the rows whose orders are above 9999 and divided among the number of mappers defined to pull the data.

Note: Same could be acheived using --where or --query, however we can access the better performance of --boundary-query (or) where condition.


Source: https://www.youtube.com/watch?v=IanqwBAZtvg

--boundary-query : By default sqoop will use query select min(), max() from to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument.
Reason to use : If --split-by is not giving you the optimal performance you can use this to improve the performance further.
eg--boundary-query "SELECT min(id), max(id) from some_table"

sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username root --password cloudera --boundary-query "SELECT 1,7 FROM departments" --table departments --columns "department_id,department_name" --m 2

In the above case SELECT 1,7 is directly mentioned,so primary key of the sql table would be considered as split_by column.

If there are only 7 records and we have mentioned 1,25 where there are no records from 8 to 25 then first mapper file will have all the 7 records.

$CONDITIONS

If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned by bounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by.

Explanation of the way $CONDITION works:

for the --query "SELECT * FROM foo where id >= 0 AND id < 20000 AND $CONDITIONS"

In this case Sqoop process, will replace with a unique condition expression internally to get the data-set. If you run a parallel import, the map tasks will execute your query with different values substituted in for $CONDITIONS. e.g., one mapper may execute "select bla from foo WHERE (id >=0 AND id < 10000)", and the next mapper may execute "select bla from foo WHERE (id >= 10000 AND id < 20000)" and so on.

Incremental Imports:


Sqoop supports two types of incremental imports: append and lastmodified. You can use the --incremental argument to specify the type of incremental import to perform.

You should specify append mode when importing a table where new rows are continually being added with increasing row id values. You specify the column containing the row’s id with --check-column. Sqoop imports rows where the check column has a value greater than the one specified with --last-value.

An alternate table update strategy supported by Sqoop is called lastmodified mode. You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported.


At the end of an incremental import, the value which should be specified as --last-value for a subsequent import is printed to the screen. When running a subsequent import, you should specify --last-value in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import. See the section on saved jobs later in this document for more information.

Note: --incremental lastmodified can be applied only to timestamp and date datatypes. --append to be added for --incremental lastmodified.

Eg: Incremental append

sqoop import --connect jdbc:mysql://quickstart:3306/nav --username root --password cloudera --table school1 --target-dir /user/cloudera/school1 --incremental append --check-column id --last-value 3 --m 1;

Eg: Incremental lastmodified

sqoop import --connect jdbc:mysql://quickstart:3306/nav --username root --password cloudera --table tab1 --target-dir /user/cloudera/tab1 --append --incremental lastmodified --check-column tme --last-value '2017-07-05 00:00:00' --m 1;

Follow http://lavnish.blogspot.in/2017/07/sqoop-incremental-imports.html for more info.

Note: For incremental pull in --query a where condition is specified like where Updated_date > "Previouspulledtime" AND Updated_date < "current_time" can be used.

This is used in scenarios where reprocessing required when there are downstream jobs which access the Raw data pulled. This will have more control over the data being pulled to the raw layer. In order to re pull the data for a small time interval, simply the previously pulled file can be removed and the Previouspulledtime time can be updated and executed the sqoop command.

sqoop job --create:

Sqoop jobs can also be created and are useful in case of incremental lastmodified where sqoop automatically saves the lastmodified value.



Syntax: 
sqoop job --create myjob -- import ....

here 'myjob' is the created job.

sqoop job --options-file:

Instead of exposing Username and Password in the command --options-file can be used

Command:
sqoop import --connect jdbc:sqlserver://SERVERNAME:1433 --options-file testQuery --target-dir /user/Leela/ID6 -split-by ID -m 1

Contents in testQuery file are:

 --username
edwuser1

--password
'passWord123'


--query
'SELECT count(*) As cnt FROM TestDB.Table1 where ID > '0' AND ID < '304' AND $CONDITIONS' 


sqoop eval

Sqoop eval will return the result quickly and this doesn't write to HDFS and can see 70% improvement in performance than import. The result can be captured in Linux machine and need to format the result which is a overhead.

Eg:
sqoop eval --connect jdbc:sqlserver://localhost:1433 --username user1 --password 'pwd#15' --query "SELECT TOP 10 * FROM table1"

No comments:

Post a Comment