Monday, July 24, 2017

pyspark

Few points:

1. pyspark will take input only from HDFS and not from local file system.

PySpark Notes

To install Packages

Python Installation
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

Packages Installation
pip3 install <PACKAGENAME>
pip3 install requests --user
pip3 install pandas


Requests Example:
requests package is used while working with rest API's.

import requests
payload= f"grant_type=client_credentials&client_id={client_id_value}&client_secret={client_secret_value}&undefined="
headers={
'content-Type': ""
'Accept':"application/json;v=1"
}

response= requests.request("POST",url,data=payload, header,verify=false)

Pandas Examples:

import pandas as pd
pdf = pd.Dataframe(df_list)
pdf.to_csv(PATH, sep='\u0001', index=False, header = False)

SnowFlake connection:

import snowflake.connector

sfconnec = snowflake.connector.connect(user,
pwd,
account,
warehouse,
dbname,
schema,
role)
sfJdbcConnection = sfconnec.cursor

sqldata = sfJdbcConnection.execute("SELECT * FROM TBL1").fetchall
tableschema = ['Name', 'Address', 'Id']
panda_df = DataFrame(sqldata, columns=tableschema)   #Converts to Pandas DF


Transpose/Pivot  using Pandas:

The advantage of using Pandas is that it has numerous libraries and is powerful. one such example is Transpose operation where columns are to be converted as Rows. 
For this we do not need to write any complex logic. In pandas 'melt' is the function that does this job for used
Eg:
import pandas as pd
tablecols = ['Name', 'Address', 'Id']
pd.melt(csv_df, tablecols)

Later rename the columns

source: https://www.journaldev.com/33398/pandas-melt-unmelt-pivot-function


Steps for AWS Lambda creation:

1. Create Function
2. Add Tags.
3. Add VPS Settings
4. Create Layer
5. Start writing function

To create a layer

pip3 install -U pandas -t ./

Now, Pandas library will be saved in the current directory, zip it and upload as layer to the Lambdas function.

Python Iteration through list

numbers = (1,2,3,4)

def addition_op(n):
return n+ n

out_num = map(addition_op, numbers)

out_nums2 = map(lambda x: x + x, numbers)

Python alternative of map(x => x) like in Scala

df.select("text").show()
df.select("text").rdd.map(lambda x: str(x).upper()).collect()

+-----+ | text| +-----+ |hello| |world| +-----+
["ROW(TEXT='HELLO')", "ROW(TEXT='WORLD')"]

Example2:
df.show()

+-----+----------+ | text|upper_text| +-----+----------+ |hello| HELLO| |world| WORLD| +-----+----------+

dd = df.rdd.map(lambda x: (x.text, x.upper_text))

dd.collect()
[('hello', 'HELLO'), ('world', 'WORLD')]


Parallelism in Python( Alternate to Futures in Scala)

use ThreadPoolExecutor

Eg:
from concurrent.futures ThreadPoolExecutor, ProcessPoolExecutor


with ThreadPoolExecutor(max_workers = 4) as exe:
    exec.submit(get_Tbl1_data)
    exec.submit(get_Tbl2_data)
    exec.submit(get_Tbl3_data)
    exec.submit(get_Tbl4_data)
    exec.submit(get_Tbl5_data)

def get_Tbl1_data:
    run_snow(conn, Tbl1_sql)

Snowflake Connection:
Used sqlalchemy for connecting to Snowflake from Python.

eg:
from sqlalchemy import create_engine

create_engine(URL(account="hostname", user='user', password='password',warehouse='warehousename',database='databasename',schema='schemaname'))

Install multiple packages in Python

pip3 install --upgrade -r /path/requirements.txt --user

requirements.txt
pandas==1.1.2
snowflake-sqlalchemy==1.2.3

pyspark snowflake connectivity

Need to install the following:

  • Snowflake Connector for Spark
  • Snowflake JDBC driver

# Installing Snowflake Connector for Spark
pip install snowflake-connector-python
pip install snowflake-spark-connector


from pyspark.sql import SparkSession

# Initialize your Spark session
spark = SparkSession.builder \
    .appName("PySpark with Snowflake") \
    .config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.10.0,net.snowflake:snowflake-jdbc:3.13.1") \
    .getOrCreate()

# Snowflake options
sfOptions = {
    "sfURL": "<your_snowflake_account_url>",
    "sfUser": "<your_username>",
    "sfPassword": "<your_password>",
    "sfDatabase": "<your_database>",
    "sfSchema": "<your_schema>",
    "sfWarehouse": "<your_warehouse>",
    "sfRole": "<your_role>"
}

# Read from Snowflake
df = spark.read \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "<your_table_name>") \
    .load()

# Show the DataFrame
df.show()


Pass --jars to pyspark

Jars can be passed to the Pyspark code using --jars and use it in the .py file using import, the same way as like scala.


To pass the --jars option when running a PySpark job, you can use it to include external libraries (such as .jar files) that your Spark application needs at runtime. This is commonly used to include JDBC drivers, connectors (e.g., Snowflake, Kafka, etc.), or custom jars.

pyspark --jars /path/to/spark-snowflake_2.12-2.10.0.jar,/path/to/snowflake-jdbc-3.13.1.jar

If you're running a PySpark job using spark-submit, you can include the --jars option the same way:

spark-submit \
  --master <your-cluster-master-url> \
  --deploy-mode <deploy-mode> \
  --jars /path/to/spark-snowflake_2.12-2.10.0.jar,/path/to/snowflake-jdbc-3.13.1.jar \
  your_pyspark_script.py

Common Use Cases:

  1. Including JDBC Drivers: When you need to connect Spark to databases like Snowflake, MySQL, or Postgres.
  2. Custom UDF Libraries: For adding custom transformations or functions.
  3. Spark Connectors: Like Kafka, ElasticSearch, or Snowflake connectors.

Note: In PySpark, the --files option allows you to pass external files to be distributed across the cluster and accessible to the tasks running on worker nodes. This can include configuration files, data files, or even compressed archives like .zip files.

pyspark --files /path/to/myfiles.zip my_pyspark_script.py

import zipfile
import os
from pyspark import SparkFiles

# Get the path to the .zip file
zip_file_path = SparkFiles.get("myfiles.zip")

# Define the extraction directory (you can define any directory name)
extraction_dir = "/tmp/extracted_files"

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extraction_dir)

# Now you can access files in the extraction_dir
for root, dirs, files in os.walk(extraction_dir):
    for file in files:
        print(f"File: {file}")

# Continue your PySpark operations using the extracted files


zipWithIndex


Zips this RDD with its element indices.

rdd1 = sc.parallelize(['A','B','C','D']).zipWithIndex()
rdd1.collect()

[('A', 0), ('B', 1), ('C', 2), ('D', 3)]

4 comments:

  1. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    https://www.emexotechnologies.com/online-courses/big-data-hadoop-training-in-electronic-city/

    ReplyDelete
  2. Thank you for the useful information. Share more updates.
    Idioms
    Speaking Test



    ReplyDelete