In this article, we are converting the SQL queries to Pyspark code

TaskSQL Command PySpark Command
Selecting Data SELECT col1, col2 FROM table;df.select(“col1”, “col2”)
Filtering DataSELECT * FROM table WHERE condition; df.filter(condition)
Grouping and Aggregating SELECT col1, count(*)
FROM table
GROUP BY col1
df.groupby(“col1”).count()
Joining dataSELECT *
FROM table1
JOIN table2
ON table1.col = table2.col
df1.join(df2, df1.col==df2.col)
Ordering data SELECT *
FROM table
ORDER BY column ASC;
df.orderBy(“column”, ascending=True)
Creating Temporary ViewsCRATE OR REPLACE TEMP VIEW view_name AS SELECT * FROM table df.createOrReplaceTempView(“view_name”)
Running SQL Queries SELECT *
FROM view_name
WHERE condition
df.sql(SELECT *
FROM view_name
WHERE condition).show()
Writing data to the table INSERT INTO table (col1, col2 )
values (val1, val2)
df.write.mode(“overwrite”).saveAsTable(“table”)
Reading data from table SELECT *
FROM Table
df=spark.read.table(“Table”)
Counting Rows SELECT COUNT(*)
FROM table
df.count()
Dropping duplicatesSELECT DISTINCT col1, col2
FROM table
df.dropDuplicates([“col1”, “col2”])
Aggregating by condition SELECT col1, AVG(col2)
FROM table
GROUP BY col1
df.groupBy(“col”).avg(col2)
Renaming columns SELECT col1 AS new_col1
FROM table
df.withColumnRenamed(“col1”, “new_col1”)
Limiting RowsSELECT *
FROM table
LIMIT 10
df.limit(10)
Calculating Sum
(similar of Max, MIN, AVG)
SELECT SUM(col1)
FROM table;
df.selectExpr(“sum(col1)”).show()
Dropping columnsDROP col1
FROM table
df.drop(“col1”)
Handling Null or Not Null Values SELECT col1
FROM table
WHERE col2 IS NULL / NOT NULL
df.filter(df.col2.isNull())
df.filter(df.col2.isNotNull())
Greater Than or Equal
(same as <,>, <=, ==, !=)
SELECT *
FROM table
WHERE col >= value;
df.filter(df.col >= value).show()
IN SELECT *
FROM table
WHERE col IN (val1, val2)
df.filter(df.column.isin(val1, val2)).show()
BetweenSELECT *
FROM table
WHERE col val1 and val2
df.filter(df.column.between(val1, val2)).show()
LIKESELECT *
FROM table
WHERE col like ‘pattern’
df.filter(df.column.like(‘pattern’)).show()
CASTSELECT CAST(col AS data_type)
FROM table
df.selectExpr(“CAST(col as data_type)”).show()

String operation in Pyspark

TaskSQL Command PySpark Command
Upper CaseSELECT UPPER(col)
FROM table
df.selectExpr(“UPPER(col)”).show()
Lower CaseSELECT LOWER(col)
FROM table
df.selectExpr(“LOWER(col)”).show()
Length SELECT LENGTH(col)
FROM table
df.selectExpr(“LENGHT(col)”).show()
SubstringSELECT SUBSTRING(col, start, length)
FROM table
df.selectExpr(“SUBSTRING(col, start, end)”)
ConcatenationSELECT CONCAT(col1, col2)
FROM table
df.selectExpr(“CONCAT(col1, col2)”)
Trim SELECT TRIM(col)
FROM table
df.selectExpr(“TRIM(col)”).show()
Replace SELECT REPLACE(col, ‘old_string’, ‘new_string’)
FROM table
df.selectExpr(“REPLACE(col, ‘old_string’, ‘new_string’)”).show
Instr SELECT INSTR(col, ‘search_string’)
FROM table
df.selectExpr(“INSTR(col, ‘search_string’)”).show()
Left paddingSELECT LPAD(col, length, ‘pad_string’)
FROM table
df.selectExpr(“LPAD(col, length, ‘pad_string’)”).show()
Right paddingSELECT RPAD(col, length, ‘pad_string’)
FROM table
df.selectExpr(“RPAD(col, length, ‘pad_string’)”).show()

Date/Time Operators:

TaskSQL CommandPySpark Command
Date DifferenceSELECT DATEDIFF(end_date, start_date)
FROM table
df.selectExpr(“DATEDIFF(end_Date, start_date)”).show()
Current DateSELECT CURRENT_DATE;df.selectExpr(“CURRENT_DATE”).show()
Current TimestampSELECT CURRENT_TIMESTAMP;df.selectExpr(“CURRENT_TIMESTAMP”).show()
Date AddSELECT DATE_ADD(start_date, INTERVAL nums_day DAY)
FROM table
df.selectExpr(“DATE_ADD(start_date, INTERVAL nums_days DAY)”).show()
Date SubtractSELSELECT DATE_SUB(start_date, INTERVAL nums_day DAY)
FROM table
df.selectExpr(“DATE_SUB(start_date, INTERVAL nums_days DAY)”).show()
Date FormatSELECT DATE_FORMAT(date, ‘format’) FROM table;df.selectExpr(“DATE_FORMAT(date, ‘format’)”).show()

Window functions

TaskSQL CommandPySpark Command
SumSELECT SUM(col)
OVER(PARTITION BY partition_col)
FROM table
window = Window.partitionBy(“partition_col”)

df.withColumn(“new_col”, F.sum(“col”).over(window)).show()
AverageSELECT AVG(col)
OVER(PARTITION BY partition_col ORDER BY order_col)
FROM table
window = Window.partitionBy(“partition_col”).orderBy(“order_col”)

df.WithColumn(“new_col”, F.row_number().over(window)).show()
Row NumberSELECT ROW_NUMBER()
OVER(PARTITION BY partition_col ORDER BY order_col)
FROM table
window = Window.partitionBy(“partition_col”).orderBy(“order_col”)

df.withColumn(
“new_row_num_col”,
F.row_number().over(window)
).show()
RankSELECT
RANK() OVER(PARTITION BY partition_col ORDER BY ord_col)
FROM table
window = Window.partitionBy(“partition_col”).orderBy(“ord_col”)

df.withColumn(“new_rnk_col”,
F.rank().over(window())).show()
Dense RankSELECT
DENSE_RANK()
OVER(PARTITION BY partition_col
ORDER BY order_col)
FROM table
window = Window.partitionBy(“partition_col”).orderBy(“order_col”)

df.withColumn(“new_dense_rnk”, F.dense_rank.over(window)).show()

Leave a Reply

Your email address will not be published. Required fields are marked *