In this article, we are converting the SQL queries to Pyspark code
Task | SQL Command | PySpark Command |
Selecting Data | SELECT col1, col2 FROM table; | df.select(“col1”, “col2”) |
Filtering Data | SELECT * FROM table WHERE condition; | df.filter(condition) |
Grouping and Aggregating | SELECT col1, count(*) FROM table GROUP BY col1 | df.groupby(“col1”).count() |
Joining data | SELECT * FROM table1 JOIN table2 ON table1.col = table2.col | df1.join(df2, df1.col==df2.col) |
Ordering data | SELECT * FROM table ORDER BY column ASC; | df.orderBy(“column”, ascending=True) |
Creating Temporary Views | CRATE OR REPLACE TEMP VIEW view_name AS SELECT * FROM table | df.createOrReplaceTempView(“view_name”) |
Running SQL Queries | SELECT * FROM view_name WHERE condition | df.sql(SELECT * FROM view_name WHERE condition).show() |
Writing data to the table | INSERT INTO table (col1, col2 ) values (val1, val2) | df.write.mode(“overwrite”).saveAsTable(“table”) |
Reading data from table | SELECT * FROM Table | df=spark.read.table(“Table”) |
Counting Rows | SELECT COUNT(*) FROM table | df.count() |
Dropping duplicates | SELECT DISTINCT col1, col2 FROM table | df.dropDuplicates([“col1”, “col2”]) |
Aggregating by condition | SELECT col1, AVG(col2) FROM table GROUP BY col1 | df.groupBy(“col”).avg(col2) |
Renaming columns | SELECT col1 AS new_col1 FROM table | df.withColumnRenamed(“col1”, “new_col1”) |
Limiting Rows | SELECT * FROM table LIMIT 10 | df.limit(10) |
Calculating Sum (similar of Max, MIN, AVG) | SELECT SUM(col1) FROM table; | df.selectExpr(“sum(col1)”).show() |
Dropping columns | DROP col1 FROM table | df.drop(“col1”) |
Handling Null or Not Null Values | SELECT col1 FROM table WHERE col2 IS NULL / NOT NULL | df.filter(df.col2.isNull()) df.filter(df.col2.isNotNull()) |
Greater Than or Equal (same as <,>, <=, ==, !=) | SELECT * FROM table WHERE col >= value; | df.filter(df.col >= value).show() |
IN | SELECT * FROM table WHERE col IN (val1, val2) | df.filter(df.column.isin(val1, val2)).show() |
Between | SELECT * FROM table WHERE col val1 and val2 | df.filter(df.column.between(val1, val2)).show() |
LIKE | SELECT * FROM table WHERE col like ‘pattern’ | df.filter(df.column.like(‘pattern’)).show() |
CAST | SELECT CAST(col AS data_type) FROM table | df.selectExpr(“CAST(col as data_type)”).show() |
String operation in Pyspark
Task | SQL Command | PySpark Command |
Upper Case | SELECT UPPER(col) FROM table | df.selectExpr(“UPPER(col)”).show() |
Lower Case | SELECT LOWER(col) FROM table | df.selectExpr(“LOWER(col)”).show() |
Length | SELECT LENGTH(col) FROM table | df.selectExpr(“LENGHT(col)”).show() |
Substring | SELECT SUBSTRING(col, start, length) FROM table | df.selectExpr(“SUBSTRING(col, start, end)”) |
Concatenation | SELECT CONCAT(col1, col2) FROM table | df.selectExpr(“CONCAT(col1, col2)”) |
Trim | SELECT TRIM(col) FROM table | df.selectExpr(“TRIM(col)”).show() |
Replace | SELECT REPLACE(col, ‘old_string’, ‘new_string’) FROM table | df.selectExpr(“REPLACE(col, ‘old_string’, ‘new_string’)”).show |
Instr | SELECT INSTR(col, ‘search_string’) FROM table | df.selectExpr(“INSTR(col, ‘search_string’)”).show() |
Left padding | SELECT LPAD(col, length, ‘pad_string’) FROM table | df.selectExpr(“LPAD(col, length, ‘pad_string’)”).show() |
Right padding | SELECT RPAD(col, length, ‘pad_string’) FROM table | df.selectExpr(“RPAD(col, length, ‘pad_string’)”).show() |
Date/Time Operators:
Task | SQL Command | PySpark Command |
Date Difference | SELECT DATEDIFF(end_date, start_date) FROM table | df.selectExpr(“DATEDIFF(end_Date, start_date)”).show() |
Current Date | SELECT CURRENT_DATE; | df.selectExpr(“CURRENT_DATE”).show() |
Current Timestamp | SELECT CURRENT_TIMESTAMP; | df.selectExpr(“CURRENT_TIMESTAMP”).show() |
Date Add | SELECT DATE_ADD(start_date, INTERVAL nums_day DAY) FROM table | df.selectExpr(“DATE_ADD(start_date, INTERVAL nums_days DAY)”).show() |
Date Subtract | SELSELECT DATE_SUB(start_date, INTERVAL nums_day DAY) FROM table | df.selectExpr(“DATE_SUB(start_date, INTERVAL nums_days DAY)”).show() |
Date Format | SELECT DATE_FORMAT(date, ‘format’) FROM table; | df.selectExpr(“DATE_FORMAT(date, ‘format’)”).show() |
Window functions
Task | SQL Command | PySpark Command |
Sum | SELECT SUM(col) OVER(PARTITION BY partition_col) FROM table | window = Window.partitionBy(“partition_col”) df.withColumn(“new_col”, F.sum(“col”).over(window)).show() |
Average | SELECT AVG(col) OVER(PARTITION BY partition_col ORDER BY order_col) FROM table | window = Window.partitionBy(“partition_col”).orderBy(“order_col”) df.WithColumn(“new_col”, F.row_number().over(window)).show() |
Row Number | SELECT ROW_NUMBER() OVER(PARTITION BY partition_col ORDER BY order_col) FROM table | window = Window.partitionBy(“partition_col”).orderBy(“order_col”) df.withColumn( “new_row_num_col”, F.row_number().over(window) ).show() |
Rank | SELECT RANK() OVER(PARTITION BY partition_col ORDER BY ord_col) FROM table | window = Window.partitionBy(“partition_col”).orderBy(“ord_col”) df.withColumn(“new_rnk_col”, F.rank().over(window())).show() |
Dense Rank | SELECT DENSE_RANK() OVER(PARTITION BY partition_col ORDER BY order_col) FROM table | window = Window.partitionBy(“partition_col”).orderBy(“order_col”) df.withColumn(“new_dense_rnk”, F.dense_rank.over(window)).show() |