PySpark Notes
To install Packages
Python Installation
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
Packages Installation
pip3 install <PACKAGENAME>
pip3 install requests --user
pip3 install pandas
Requests Example:
requests package is used while working with rest API's.
import requests
payload= f"grant_type=client_credentials&client_id={client_id_value}&client_secret={client_secret_value}&undefined="
headers={
'content-Type': ""
'Accept':"application/json;v=1"
}
response= requests.request("POST",url,data=payload, header,verify=false)
Pandas Examples:
import pandas as pd
pdf = pd.Dataframe(df_list)
pdf.to_csv(PATH, sep='\u0001', index=False, header = False)
SnowFlake connection:
import snowflake.connector
sfconnec = snowflake.connector.connect(user,
pwd,
account,
warehouse,
dbname,
schema,
role)
sfJdbcConnection = sfconnec.cursor
sqldata = sfJdbcConnection.execute("SELECT * FROM TBL1").fetchall
tableschema = ['Name', 'Address', 'Id']
panda_df = DataFrame(sqldata, columns=tableschema) #Converts to Pandas DF
Transpose/Pivot using Pandas:
The advantage of using Pandas is that it has numerous libraries and is powerful. one such example is Transpose operation where columns are to be converted as Rows.
For this we do not need to write any complex logic. In pandas 'melt' is the function that does this job for used
Eg:
import pandas as pd
tablecols = ['Name', 'Address', 'Id']
pd.melt(csv_df, tablecols)
Later rename the columns
source: https://www.journaldev.com/33398/pandas-melt-unmelt-pivot-function
Steps for AWS Lambda creation:
1. Create Function
2. Add Tags.
3. Add VPS Settings
4. Create Layer
5. Start writing function
To create a layer
pip3 install -U pandas -t ./
Now, Pandas library will be saved in the current directory, zip it and upload as layer to the Lambdas function.
Python Iteration through list
numbers = (1,2,3,4)
def addition_op(n):
return n+ n
out_num = map(addition_op, numbers)
out_nums2 = map(lambda x: x + x, numbers)
Python alternative of map(x => x) like in Scala
df.select("text").show()
df.select("text").rdd.map(lambda x: str(x).upper()).collect()
+-----+
| text|
+-----+
|hello|
|world|
+-----+
["ROW(TEXT='HELLO')", "ROW(TEXT='WORLD')"]
Example2:
df.show()
+-----+----------+
| text|upper_text|
+-----+----------+
|hello| HELLO|
|world| WORLD|
+-----+----------+
dd = df.rdd.map(lambda x: (x.text, x.upper_text))
dd.collect()
[('hello', 'HELLO'), ('world', 'WORLD')]
Parallelism in Python( Alternate to Futures in Scala)
use ThreadPoolExecutor
Eg:
from concurrent.futures ThreadPoolExecutor, ProcessPoolExecutor
with ThreadPoolExecutor(max_workers = 4) as exe:
exec.submit(get_Tbl1_data)
exec.submit(get_Tbl2_data)
exec.submit(get_Tbl3_data)
exec.submit(get_Tbl4_data)
exec.submit(get_Tbl5_data)
def get_Tbl1_data:
run_snow(conn, Tbl1_sql)
Snowflake Connection:
Used sqlalchemy for connecting to Snowflake from Python.
eg:
from sqlalchemy import create_engine
create_engine(URL(account="hostname", user='user', password='password',warehouse='warehousename',database='databasename',schema='schemaname'))
Install multiple packages in Python
pip3 install --upgrade -r /path/requirements.txt --user
requirements.txt
pandas==1.1.2
snowflake-sqlalchemy==1.2.3
pyspark snowflake connectivity
Need to install the following:
- Snowflake Connector for Spark
- Snowflake JDBC driver
# Installing Snowflake Connector for Spark
pip install snowflake-connector-python
pip install snowflake-spark-connector
from pyspark.sql import SparkSession
# Initialize your Spark session
spark = SparkSession.builder \
.appName("PySpark with Snowflake") \
.config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.10.0,net.snowflake:snowflake-jdbc:3.13.1") \
.getOrCreate()
# Snowflake options
sfOptions = {
"sfURL": "<your_snowflake_account_url>",
"sfUser": "<your_username>",
"sfPassword": "<your_password>",
"sfDatabase": "<your_database>",
"sfSchema": "<your_schema>",
"sfWarehouse": "<your_warehouse>",
"sfRole": "<your_role>"
}
# Read from Snowflake
df = spark.read \
.format("snowflake") \
.options(**sfOptions) \
.option("dbtable", "<your_table_name>") \
.load()
# Show the DataFrame
df.show()
Pass --jars to pyspark
Jars can be passed to the Pyspark code using --jars and use it in the .py file using import, the same way as like scala.
To pass the --jars
option when running a PySpark job, you can use it to include external libraries (such as .jar
files) that your Spark application needs at runtime. This is commonly used to include JDBC drivers, connectors (e.g., Snowflake, Kafka, etc.), or custom jars.
pyspark --jars /path/to/spark-snowflake_2.12-2.10.0.jar,/path/to/snowflake-jdbc-3.13.1.jar
If you're running a PySpark job using spark-submit
, you can include the --jars
option the same way:
spark-submit \
--master <your-cluster-master-url> \
--deploy-mode <deploy-mode> \
--jars /path/to/spark-snowflake_2.12-2.10.0.jar,/path/to/snowflake-jdbc-3.13.1.jar \
your_pyspark_script.py
Common Use Cases:
- Including JDBC Drivers: When you need to connect Spark to databases like Snowflake, MySQL, or Postgres.
- Custom UDF Libraries: For adding custom transformations or functions.
- Spark Connectors: Like Kafka, ElasticSearch, or Snowflake connectors.
Note: In PySpark, the --files
option allows you to pass external files to be distributed across the cluster and accessible to the tasks running on worker nodes. This can include configuration files, data files, or even compressed archives like .zip
files.
pyspark --files /path/to/myfiles.zip my_pyspark_script.py
import zipfile
import os
from pyspark import SparkFiles
# Get the path to the .zip file
zip_file_path = SparkFiles.get("myfiles.zip")
# Define the extraction directory (you can define any directory name)
extraction_dir = "/tmp/extracted_files"
# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(extraction_dir)
# Now you can access files in the extraction_dir
for root, dirs, files in os.walk(extraction_dir):
for file in files:
print(f"File: {file}")
# Continue your PySpark operations using the extracted files
zipWithIndex
Zips this RDD with its element indices.
rdd1 = sc.parallelize(['A','B','C','D']).zipWithIndex()
rdd1.collect()
[('A', 0), ('B', 1), ('C', 2), ('D', 3)]