Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Oppiskele Window Functions in Spark SQL | Section
Data Processing with PySpark

Window Functions in Spark SQL

Pyyhkäise näyttääksesi valikon

Window functions compute a value for each row based on a group of related rows – without collapsing them into a single result like groupBy does. They are essential for rankings, running totals, and comparing each row to a group aggregate.

Setting Up a Window

1234567891011121314151617
import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg, rank, dense_rank, row_number, round from pyspark.sql.window import Window urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("WindowFunctions") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["ARRIVAL_DELAY"])

Ranking Flights within Each Airline

12345678
# Defining a window: partition by airline, order by arrival delay descending window_spec = Window.partitionBy("AIRLINE").orderBy(col("ARRIVAL_DELAY").desc()) # Adding rank and row number columns ranked_df = flights_df.withColumn("RANK", rank().over(window_spec)) \ .withColumn("ROW_NUM", row_number().over(window_spec)) ranked_df.select("AIRLINE", "FLIGHT_NUMBER", "ARRIVAL_DELAY", "RANK", "ROW_NUM").show(10)

rank() assigns the same rank to ties and skips the next value. row_number() always assigns a unique sequential number regardless of ties.

Comparing Each Flight to the Airline Average

123456789101112
# Computing average delay per airline without collapsing rows window_airline = Window.partitionBy("AIRLINE") flights_df = flights_df.withColumn( "AVG_AIRLINE_DELAY", round(avg("ARRIVAL_DELAY").over(window_airline), 2) ).withColumn( "DELAY_VS_AVG", round(col("ARRIVAL_DELAY") - col("AVG_AIRLINE_DELAY"), 2) ) flights_df.select("AIRLINE", "FLIGHT_NUMBER", "ARRIVAL_DELAY", "AVG_AIRLINE_DELAY", "DELAY_VS_AVG").show(5)

Run this and try a filter for DELAY_VS_AVG > 60 to find flights that were significantly worse than their airline's average.

question mark

What is the difference between rank() and row_number()?

Valitse oikea vastaus

Oliko kaikki selvää?

Miten voimme parantaa sitä?

Kiitos palautteestasi!

Osio 1. Luku 8

Kysy tekoälyä

expand

Kysy tekoälyä

ChatGPT

Kysy mitä tahansa tai kokeile jotakin ehdotetuista kysymyksistä aloittaaksesi keskustelumme

Osio 1. Luku 8
some-alt