Pyspak complete tutorial from scratch

Shuffle: when data persists and is shared with other executors.
whenever shuffle happens job is get divided into stages

Driver

  1. Heart of the Spark Application
  2. Manages the information and state of executors
  3. Analyzes, distributes, and Schedules the work on executor

Executor

  1. Execute the code
  2. Report the status of execution to the driver (respond to the driver with the execution status)


in the following diagram, the user assigns a job to the driver. driver in turn analyzes distributes and breaks down and assigns it to executors.
executors are basically JVM process that runs in the cluster machines and it consists of cores

In the following example, we have six-cores(chocolate color) and three executors(green color).
All the six cores can run in parallel and perform six tasks at a time and we also need to remember one core can only do one task at a time so six cores running in parallel would do six tasks at a time and this this parallel nature is something that provides spark the capability of parallel processing

What is Partition in pyspark?

To allow every executors to work in parallel, Spark breaks down the data into chunks called partitions.

So, from our last example – the bag of marble contained pouches of marble, which can be considered as Partition.

what is transformation?

Transformations are nothing but basically the instructions or code that we Supply in order to modify or transform our data.
eg: select, where, group by
Transformation helps build up a logical plan

there are basically two types of Transformations narrow and wide

Narrow(1-to-1) vs wide transformation (1 to N)


Narrow transformation: After applying transformation each partition contributes to at-most one partition. or in other words data will not shuffle across the partitions.

Wide Transformation: After applying transformation if one partition contributes to more than one partition. This type of transformation leads to a data Shuffle.

What are Actions?

To trigger the execution we need to call an Action. This basically executes the plan created by Transformation.

Actions are of three types:

  1. View data in the console
  2. Collect data to native language
  3. Write data to output data sources

Spark prefers Lazy Evaluation

Spark will wait till the last moment to execute the graph of computation.(on transformation operation it creates the graph(this graphs contains transformation operation)

Lazy Evaluation allows Spark to optimize, plan, and use the resources properly for execution.

sandwich shop example

The sandwich preparation instructions were TRANSFORMATION and Payment was the ACTION to start the sandwich preparation

What is Spark Session?

  1. The Driver Process is known as Spark Session
  2. It is the entry point for a Spark execution
  3. The Spark Session instance executes the code in the cluster.
  4. And the relation is one-to-one, i.e. For 1 Spark Application we will have 1
    Spark Session instance.

What is dataframe in pyspark?

Execution plan

  1. logical execution plan
  2. Physical execution plan

1. logical execution plan

The following diagram shows the Logical planning workflow, once the code is supplied by the user spark creates an unresolved logical plan this unresolved logical plan is validated against the catalog in order to validate column names and the table names once this validation is done Spark creates a resolved logical plan this resolved logical plan is taken into the Catalyst Optimizer which basically does the whole optimization for The Logical planning, once Catalyst Optimizer completes its optimization it generates an optimized logical plan which is The Logical dag. (dag is directed acyclic graph – A DAG represents the logical execution plan of a spark job)

next in planning is the physical planning once the optimized logical plan is ready, spark generates multiple physical plans based on the cluster and the physical configuration these physical plans are run against a cost model which basically generates cost for each of the physical plan once the cost is validated spark selects the best physical plan and that physical plan is sent to cluster for execution and once the executor receives the physical plan they run that physical plan against the data partitions and that is how the whole execution for spark works.

Spark hands one

local[*]: local([*]) is to run Spark locally with as many worker threads as logical cores on your machine.

from pyspark.sql import SparkSession
spark =  SparkSession
        .builder
        .appName("Spark Learning")
        .master("local[*]")
        .getOrCreate()

Interview question

Is there any way to to use spark session with a different name.

yes, we can change it using the getActiveSession(). In the above code the session name is spark. To use it with a different name you need to use the following command

vishal = spark.getActiveSession()  //here spark is the existing session name we are changing spark session name to vishal  
>>> spark
<pyspark.sql.session.sparkSession object at 0x7f7947c17790>
>>> vishal  = spark.getActiveSession()
›>> vishal
<pyspark.sql.session.Sparksession object at 0x7f7947c17790>

You can see that after changing the session name in the above code, it is pointing to the same spark session.

  • Dataframe columns
  • Dataframe Row
  • DataFrame Schema
  • Structured transformations – select, expr, selectExpr, cast

How to create a empty dataframe in spark

spark stores the schema in the form of struct type and struct Fields.

StructType([StructField(“emp_name”, StringType(), True)]
emp_name : this is the column name in the data frame
StringType(): the type of emp_name is string type
True: emp_name column is nullable.

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("Spark Learning")
    .master("local[*]")
    .getOrCreate()
)
  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# Define the schema
schema = StructType([
    StructField("emp_name", StringType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("salary", FloatType(), True),
    StructField("age", IntegerType(), True)
])

# Create an empty DataFrame with the defined schema
empty_df = spark.createDataFrame([], schema)

# Show the empty DataFrame
empty_df.show()


output:

+--------+-------------+------+---+
|emp_name|department_id|salary|age|
+--------+-------------+------+---+
+--------+-------------+------+---+

Create data frame with data

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema
schema = StructType([
    StructField("emp_name", StringType(), True),
    StructField("dept", IntegerType(), True),
    StructField("salary", FloatType(), True),
    StructField("age", IntegerType(), True)
])

data = [
    ("Alice", 1, 55000.0, 29),
    ("Bob", 2, 60000.0, 35),
    ("Charlie", 3, 70000.0, 40),
    ("David", 1, 50000.0, 30),
    ("Eve", 2, 65000.0, 28),
    ("Frank", 3, 75000.0, 45),
    ("Grace", 1, 56000.0, 32),
    ("Hannah", 2, 62000.0, 38),
    ("Isaac", 3, 72000.0, 50),
    ("Jack", 1, 58000.0, 34)
]

# Create the DataFrame using the schema and data
emp = spark.createDataFrame(data, schema)


# Show the empty DataFrame
emp.show()

output: 
  
+--------+----+-------+---+
|emp_name|dept| salary|age|
+--------+----+-------+---+
|   Alice|   1|55000.0| 29|
|     Bob|   2|60000.0| 35|
| Charlie|   3|70000.0| 40|
|   David|   1|50000.0| 30|
|     Eve|   2|65000.0| 28|
|   Frank|   3|75000.0| 45|
|   Grace|   1|56000.0| 32|
|  Hannah|   2|62000.0| 38|
|   Isaac|   3|72000.0| 50|
|    Jack|   1|58000.0| 34|
+--------+----+-------+---+

How to select columns from the data frame in pyspark

The following are the different ways you can select the col name. Here we are assuming the data frame name is emp

  • emp.col_name
  • col(“col_name”)
  • expr(“col_name”)
  • emp[“col_name”]
// sql: select emp_name, dept, age, salary from emp; # you can write this statment as follows 

from pyspark.sql.functions import col, expr
df = emp.select(emp.emp_name, col("dept"), expr("age"), emp["salary"])
df.show()


+--------+----+---+-------+
|emp_name|dept|age| salary|
+--------+----+---+-------+
|   Alice|   1| 29|55000.0|
|     Bob|   2| 35|60000.0|
| Charlie|   3| 40|70000.0|
|   David|   1| 30|50000.0|
|     Eve|   2| 28|65000.0|
|   Frank|   3| 45|75000.0|
|   Grace|   1| 32|56000.0|
|  Hannah|   2| 38|62000.0|
|   Isaac|   3| 50|72000.0|
|    Jack|   1| 34|58000.0|
+--------+----+---+-------+  

expr()

expr() evaluates SQL expression string and returns the result as a Column object, allowing SQL-like syntax within DataFrame operations.

# using the expr() for select 
# select emp_name as employee_name, dept cast(salary as int) as salary, age from emp; 

emp_casted= emp.select(expr("emp_name as employee_name"), emp.dept, expr("cast(salary as int) as salary"), emp.age) 
emp_casted.show()
  
+-------------+----+------+---+
|employee_name|dept|salary|age|
+-------------+----+------+---+
|        Alice|   1| 55000| 29|
|          Bob|   2| 60000| 35|
|      Charlie|   3| 70000| 40|
|        David|   1| 50000| 30|
|          Eve|   2| 65000| 28|
|        Frank|   3| 75000| 45|
|        Grace|   1| 56000| 32|
|       Hannah|   2| 62000| 38|
|        Isaac|   3| 72000| 50|
|         Jack|   1| 58000| 34|
+-------------+----+------+---+
  

selectExpr() = select + expr()

emp_casted= emp.selectExpr("emp_name as employee_name", "dept", "cast(salary as int) as salary", "age") 
emp_casted.show()


+-------------+----+------+---+
|employee_name|dept|salary|age|
+-------------+----+------+---+
|        Alice|   1| 55000| 29|
|          Bob|   2| 60000| 35|
|      Charlie|   3| 70000| 40|
|        David|   1| 50000| 30|
|          Eve|   2| 65000| 28|
|        Frank|   3| 75000| 45|
|        Grace|   1| 56000| 32|
|       Hannah|   2| 62000| 38|
|        Isaac|   3| 72000| 50|
|         Jack|   1| 58000| 34|
+-------------+----+------+---+

Write the following query in pyspark

select emp_name, age, salary from emp where age > 30; 
emp_final = emp.select("emp_name", "dept", "age","salary").where("age > 30")
emp_final.show()

+--------+----+---+-------+
|emp_name|dept|age| salary|
+--------+----+---+-------+
|     Bob|   2| 35|60000.0|
| Charlie|   3| 40|70000.0|
|   Frank|   3| 45|75000.0|
|   Grace|   1| 32|56000.0|
|  Hannah|   2| 38|62000.0|
|   Isaac|   3| 50|72000.0|
|    Jack|   1| 34|58000.0|
+--------+----+---+-------+
# casting columns 
# select emp_name, dpet, age, cast(salary as double) as salary from emp; 

from pyspark.sql.functions import col, cast 
emp_casted = emp.select("emp_name", "dept", "age", col("salary").cast("double"))
emp_casted.show(2)

output: 
    +--------+----+---+-------+
    |emp_name|dept|age| salary|
    +--------+----+---+-------+
    |   Alice|   1| 29|55000.0|
    |     Bob|   2| 35|60000.0|
    +--------+----+---+-------+
  
  
emp_casted.printSchema()

output:
    root
     |-- emp_name: string (nullable = true)
     |-- dept: integer (nullable = true)
     |-- age: integer (nullable = true)
     |-- salary: double (nullable = true)

withColumn()

this method adds a new column or overwrites the existing column.
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

Most Important Note for withColumn():
This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with multiple columns at once. read more

# adding column
#following query will calculate the tax on salary. here we are assuming it as 20 percent
# select emp_name, dept, age,salary, (salary * 0.2) as tax from emp_casted. 


emp_taxed = emp_casted.withColumn("tax", col("salary")*0.02)
emp_taxed.show()

+--------+----+---+-------+------+
|emp_name|dept|age| salary|   tax|
+--------+----+---+-------+------+
|   Alice|   1| 29|55000.0|1100.0|
|     Bob|   2| 35|60000.0|1200.0|
| Charlie|   3| 40|70000.0|1400.0|
|   David|   1| 30|50000.0|1000.0|
|     Eve|   2| 28|65000.0|1300.0|
|   Frank|   3| 45|75000.0|1500.0|
|   Grace|   1| 32|56000.0|1120.0|
|  Hannah|   2| 38|62000.0|1240.0|
|   Isaac|   3| 50|72000.0|1440.0|
|    Jack|   1| 34|58000.0|1160.0|
+--------+----+---+-------+------+

pyspark interview question – How to add multiple columns in data frame.

columns ={
"tax" : col("salary") * 0.2 ,
"oneNumber" : lit(1),
"columnTwo" : lit("two")
}

emp_multiple_col_add = emp.withColumns(columns)
emp_multiple_col_add.show(1)
output:
    +--------+----+---+-------+-------+---------+---------+
    |emp_name|dept|age| salary|    tax|oneNumber|columnTwo|
    +--------+----+---+-------+-------+---------+---------+
    |   Alice|   1| 29|55000.0|11000.0|        1|      two|
    +--------+----+---+-------+-------+---------+---------+

How to add a static column in pyspark?

# adding static values in the column or new column with values
#literals
# select emp_name, dept, age,salary,salary, tax, 1 as columnOne, 'two' as columnTwo from emp_taxed. 


from pyspark.sql.functions import lit
emp_new_col = emp_taxed.withColumn("columnOne",lit(1)).withColumn("columnTwo", lit("two"))
emp_new_col.show()

output:
+--------+----+---+-------+------+---------+---------+
|emp_name|dept|age| salary|   tax|columnOne|columnTwo|
+--------+----+---+-------+------+---------+---------+
|   Alice|   1| 29|55000.0|1100.0|        1|      two|
|     Bob|   2| 35|60000.0|1200.0|        1|      two|
| Charlie|   3| 40|70000.0|1400.0|        1|      two|
|   David|   1| 30|50000.0|1000.0|        1|      two|
|     Eve|   2| 28|65000.0|1300.0|        1|      two|
|   Frank|   3| 45|75000.0|1500.0|        1|      two|
|   Grace|   1| 32|56000.0|1120.0|        1|      two|
|  Hannah|   2| 38|62000.0|1240.0|        1|      two|
|   Isaac|   3| 50|72000.0|1440.0|        1|      two|
|    Jack|   1| 34|58000.0|1160.0|        1|      two|
+--------+----+---+-------+------+---------+---------+

withColumnRenamed() And withColumnsRenamed()

withColumnRenamed() – when you want to rename the single column
withCoumnsReanmed() – when you want to rename multiple columns

emp_taxed.show(1)
output: 
      +--------+----+---+-------+------+
      |emp_name|dept|age| salary|   tax|
      +--------+----+---+-------+------+
      |   Alice|   1| 29|55000.0|1100.0|
      +--------+----+---+-------+------+
      only showing top 1 row
      

emp_rename = emp_taxed.withColumnRenamed("emp_name", "name")
emp_rename.show(1)
output: 
        +-----+----+---+-------+------+
        | name|dept|age| salary|   tax|
        +-----+----+---+-------+------+
        |Alice|   1| 29|55000.0|1100.0|
        +-----+----+---+-------+------+
        only showing top 1 row



emp_rename_multiple_cols = emp_taxed.withColumnsRenamed({"dept": "DEPT", "age": "AGE"})
emp_rename_multiple_cols.show(1)
output: 
    +--------+----+---+-------+------+
    |emp_name|DEPT|AGE| salary|   tax|
    +--------+----+---+-------+------+
    |   Alice|   1| 29|55000.0|1100.0|
    +--------+----+---+-------+------+
    only showing top 1 row

Drop(), limit(), show()

drop(): drop the columns from the data frame.
limit(): select limited number of rows
show(): show the limited number of rows

difference between limit() and show()

show(2) this will print two rows, but when you want to write or store the number of rows in a file then you need to use limit() function.

# remove columns 
emp_dropped = emp_new_col.drop('columnOne', 'columnOne', 'age')
emp_dropped.show(1)
output:
    --------+----+-------+------+---------+
    |emp_name|dept| salary|   tax|columnTwo|
    +--------+----+-------+------+---------+
    |   Alice|   1|55000.0|1100.0|      two|
    +--------+----+-------+------+---------+

emp_limit = emp.limit(2).show()
output:
      +--------+----+---+-------+
      |emp_name|dept|age| salary|
      +--------+----+---+-------+
      |   Alice|   1| 29|55000.0|
      |     Bob|   2| 35|60000.0|
      +--------+----+---+-------+

Leave a Reply

Your email address will not be published. Required fields are marked *