Friday, July 14, 2017

Redshift Part 3 - Copying and loading Data, Table creation, Determining Dist and sort keys and Troubleshooting

To see tables in the database.


select distinct(tablename) from pg_table_def where schemaname = 'public';


To see all the errors, use the statement,


select query, substring(filename,22,25) as filename,line_number as line,
substring(colname,0,12) as column, type, position as pos, substring(raw_line,0,30) as line_text,
substring(raw_field_value,0,15) as field_text,
substring(err_reason,0,45) as reason
from stl_load_errors
order by query desc
limit 10;

To delete a table use,


drop table dwdate;

or

drop table dwdate cascade;    //cascade keyword will remove this tables relationships with other tables.

To get table schema:


select "column", type, encoding, distkey, sortkey
from pg_table_def where tablename = 'myevent';

Points to remember before copying data from a source:


1. Data source


You can use the COPY command to load data from an Amazon S3 bucket, an Amazon EMR cluster, a remote host using an SSH connection, or an Amazon DynamoDB table. For this tutorial, you will load from data files in an Amazon S3 bucket. When loading from Amazon S3, you must provide the name of the bucket and the location of the data files, by providing either an object path for the data files or the location of a manifest file that explicitly lists each data file and its location.

Key prefix

An object stored in Amazon S3 is uniquely identified by an object key, which includes the bucket name, folder names, if any, and the object name. A key prefix refers to a set of objects with the same prefix. The object path is a key prefix that the COPY command uses to load all objects that share the key prefix. For example, the key prefix custdata.txt can refer to a single file or to a set of files, including custdata.txt.001, custdata.txt.002, and so on.
Manifest file
If you need to load files with different prefixes, for example, from multiple buckets or folders, or if you need to exclude files that share a prefix, you can use a manifest file. A manifest file explicitly lists each load file and its unique object key. You will use a manifest file to load the PART table later in this tutorial.

2. To improve query performance set distkey and sortkeys especially when doing join operations.

3. 'diststyle all' keyword will make the table distributed.

Copying CSV data :

CREATE TABLE part
(
  p_partkey     INTEGER NOT NULL,
  p_name        VARCHAR(22) NOT NULL,
  p_mfgr        VARCHAR(6),
  p_category    VARCHAR(7) NOT NULL,
  p_brand1      VARCHAR(9) NOT NULL,
  p_color       VARCHAR(11) NOT NULL,
  p_type        VARCHAR(25) NOT NULL,
  p_size        INTEGER NOT NULL,
  p_container   VARCHAR(10) NOT NULL
);

copy part from 's3://sample-redshift-data/Sample1/part-csv.tbl'
credentials 'aws_access_key_id=AKIAJR65X2FNTIQPLZVQ;aws_secret_access_key=UACFV4mzBdssys9eJDRdGOSUPO4l6nbMb9fH8/Px'
csv
null as '\000';

Note: when null as '\000' is specified then, The table column that receives the NULL value must be configured as nullable. That is, it must not include the NOT NULL constraint in the CREATE TABLE specification.

Copying data to dwdate table:


copy dwdate from 's3://sample-redshift-data/Sample1/dwdate-tab.tbl'
credentials 'aws_access_key_id=AKIAJR65X2FNTIQPLZVQ;aws_secret_access_key=UACFV4mzBdssys9eJDRdGOSUPO4l6nbMb9fH8/Px'
gzip
delimiter '\t'
dateformat 'auto';
Note: You can specify only one date format. If the load data contains inconsistent formats, possibly in different columns, or if the format is not known at load time, you use DATEFORMAT with the 'auto' argument. When 'auto' is specified, COPY will recognize any valid date or time format and convert it to the default format. The 'auto' option recognizes several formats that are not supported when using a DATEFORMAT and TIMEFORMAT string.

Copying data to customer table:

copy customer from 's3://sample-redshift-data/Sample1/customer-fw-manifest'
credentials 'aws_access_key_id=AKIAJR65X2FNTIQPLZVQ;aws_secret_access_key=UACFV4mzBdssys9eJDRdGOSUPO4l6nbMb9fH8/Px'
fixedwidth 'c_custkey:10, c_name:25, c_address:25, c_city:10, c_nation:15, c_region :12, c_phone:15,c_mktsegment:10'
maxerror 10
acceptinvchars as '^'
manifest;

Note: The ACCEPTINVCHARS option is usually a better choice for managing invalid characters. ACCEPTINVCHARS instructs COPY to replace each invalid character with a specified valid character and continue with the load operation
For this step, you will add the ACCEPTINVCHARS with the replacement character '^'.

Copying data to supplier table.


copy supplier from 's3://awssampledbuswest2/ssbgz/supplier.tbl'
credentials 'aws_access_key_id=AKIAJR65X2FNTIQPLZVQ;aws_secret_access_key=UACFV4mzBdssys9eJDRdGOSUPO4l6nbMb9fH8/Px'
delimiter '|'
gzip
region 'us-west-2';

Note: gzip keyword is used to specify that the data to be loaded in s3 source is compressed in gzip format.

Copying data to lineorder table.


copy lineorder from 's3://awssampledb/load/lo/lineorder-multi.tbl'
credentials 'aws_access_key_id=AKIAJR65X2FNTIQPLZVQ;aws_secret_access_key=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'
gzip
compupdate off
region 'us-east-1';

Note: Compare the time consumed to load data from a single large file and the same file splitted to multiple files.
loading using Multiple files is faster because multiple nodes runs in parallel to load split data.


Video 31 - Best practices to load data


1. Highest throughput can be obtained when loaded data from S3. DynamoDB,EMR comes next.
2. Can use CTAS when copying from internal table. Even this is faster as this makes use of internal cluster resources.
eg: "CREATE TABLE AS" OR "INSERT INTO SELECT"
3. If need to use Insert INTO TABLE values(..), then the best way is to use multi-row inserts. Possibly use batch loading methodology.
4. Redshift links 1 core for 1 slice of the file. Ideal load is 1S3 file per core, however this is an ideal case and is not possible always as ideal slice size is between 1MB-1GB.
   consider, if we have 10 DC1.large nodes of 2 cores each, 10 * 2 cores = 20 cores. vCPU column specifies the number of cores.
   So, 20 slices are ideal for even load distribution.
5. USE the linux command to split a large data file to multiple files "split -[#] N". Eg: "split -500N", this command will split the file into 500 equal sized files.
6. Even amount of data per split enhances speed.
7. If we know the sortkey name in advance, then pre sort it before loading to s3. The will reduce the burden of sorting on Redshift.
8. Recommended to split files in the multiples of 32 like 32,64,96 ...
9. Consider on going loads of daily, weekly data coming in that has to be inserted into a main table that has historical data. In addition, this recent data
needs to be accessed frequently. So, create another table and insert data as per the timestamp. This will allow users to fetch data quickly.
10.There is an overhead in using sortkey option as the data has to be ordered as per the sortkeys. The more the sortkeys the longer the time would be
consumed for copying the data.
11. Use Vaccum when copy command was not used. need to use when large set of data operation was performed post INSERT, UPDATE and DELETE.
12. Work load manager can be used to setup priorities on queries. wlm_query_slot_count controls memory allocation and maximum upto 50 resources can be configured
for a single query. Default is 5.

eg: if bydefault the memory allocation is 50% each for 2 queries, and 1 query priority can be increased to 75% and the 2nd query will get 25%.
this can be brought back to default after the completion of 1st query execution.

This can speedup COPY & VACCUM operations.


Few points for table creation:


1. By default Redshift does compression for each column and stores the data. Compression not applied for the columns specified in 'sortkey'.
   default compression is lzo.
 
Eg:   Create table:
 
   create table customer2 (
  c_custkey     integer        not null,
  c_name         varchar(25)    not null,
  c_address     varchar(25)    not null,
  c_city         varchar(10)    not null,
  c_nation       varchar(15)    not null)
compound sortkey (c_custkey, c_city);

Get table schema:

select "column", type, encoding, distkey, sortkey
from pg_table_def where tablename = 'customer2';

column type encoding distkey  sortkey
-----------------------------------------------------------------
c_custkey integer none false 1
c_name character varying(25) lzo false 0
c_address character varying(25) lzo false 0
c_city character varying(10) none false 2
c_nation character varying(15) lzo false 0

2. A Foreign Key is a column or a combination of columns whose values match a Primary Key in a different table.

The relationship between 2 tables matches the Primary Key in one of the tables with a Foreign Key in the second table.

references is the keyword that links Primary key and Foriegn key.

Eg:
CUSTOMERS table

CREATE TABLE CUSTOMERS(
   ID   INT              NOT NULL,
   NAME VARCHAR (20)     NOT NULL,
   AGE  INT              NOT NULL,
   ADDRESS  CHAR (25) ,
   SALARY   DECIMAL (18, 2),    
   PRIMARY KEY (ID)
);
ORDERS table

CREATE TABLE ORDERS (
   ID          INT        NOT NULL,
   DATE        DATETIME,
   CUSTOMER_ID INT references CUSTOMERS(ID),
   AMOUNT     integer,
   PRIMARY KEY (ID)
);

3. 2 types of sorting is available Compound and Interleaved.

COMPOUND
Specifies that the data is sorted using a compound key made up of all of the listed columns, in the order they are listed. A compound sort key is most useful when a query scans rows according to the order of the sort columns. The performance benefits of sorting with a compound key decrease when queries rely on secondary sort columns. You can define a maximum of 400 COMPOUND SORTKEY columns per table.

INTERLEAVED
Specifies that the data is sorted using an interleaved sort key. A maximum of eight columns can be specified for an interleaved sort key.

An interleaved sort gives equal weight to each column, or subset of columns, in the sort key, so queries do not depend on the order of the columns in the sort key. When a query uses one or more secondary sort columns, interleaved sorting significantly improves query performance. Interleaved sorting carries a small overhead cost for data loading and vacuuming operations.

Data distribution is of 3 types, Even (bydefault), Key and All

Eg: This example shows Interleaved and All distribution.

create table customer_interleaved (
  c_custkey     integer        not null,
  c_name         varchar(25)    not null,
  c_address     varchar(25)    not null,
  c_city         varchar(10)    not null,
  c_nation       varchar(15)    not null,
  c_region       varchar(12)    not null,
  c_phone       varchar(15)    not null,
  c_mktsegment      varchar(10)    not null)
diststyle all
interleaved sortkey (c_custkey, c_city, c_mktsegment);

Eg: Even Distribution

create table myevent(
eventid int,
eventname varchar(200),
eventcity varchar(30));

Eg: Key distribution

create table t1(col1 int distkey, col2 int) diststyle key;

4. Create a Table with an IDENTITY Column

The following example creates a table named VENUE_IDENT, which has an IDENTITY column named VENUEID. This column starts with 0 and increments by 1 for each record. VENUEID is also declared as the primary key of the table.

create table venue_ident(venueid bigint identity(0, 1),
venuename varchar(100),
venuecity varchar(30),
venuestate char(2),
venueseats integer,
primary key(venueid));

5. Create a Table with DEFAULT Column Values

The following example creates a CATEGORYDEF table that declares default values for each column:

create table categorydef(
catid smallint not null default 0,
catgroup varchar(10) default 'Special',
catname varchar(10) default 'Other',
catdesc varchar(50) default 'Special events',
primary key(catid));

insert into categorydef values(default,default,default,default);

select * from categorydef;

catid | catgroup | catname |    catdesc
-------+----------+---------+----------------
0 | Special  | Other   | Special events

References:

http://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html
http://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_examples.html



POC on AWS:

1. Created EMR cluster with ec2 instances. Also created S3 buckets and Redshift cluster. 2. Access ids and Security keys would also be generated for each type of instance. 3. Copied CSV data to an S3 location. 4. Hive tables are created on this S3 location. 5. Few Transformations are applied and inserted data to partitioned table. 6. copied the data from this S3 location to Redshift tables using copy command, S3 location and access id and security key. 7. On S3 hive tables can be created only on EMR, but not in Cloudera/Hortonworks

No comments:

Post a Comment