Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Lernen Understanding Execution Plans | Section
Data Processing with PySpark

Understanding Execution Plans

Swipe um das Menü anzuzeigen

Before Spark runs any query, it builds an execution plan – a description of every step it will take to produce the result. Reading execution plans helps you spot inefficiencies like unnecessary shuffles, repeated scans, or missing filters.

Reading the Plan with explain()

1234567891011121314151617181920212223
import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("ExecutionPlans") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) query = flights_df \ .filter(col("ARRIVAL_DELAY") > 60) \ .groupBy("AIRLINE") \ .agg(avg("ARRIVAL_DELAY").alias("AVG_DELAY")) # Printing the logical and physical plan query.explain(extended=True)

explain(extended=True) shows four plans: the parsed, analyzed, optimized, and physical plan. In practice you mostly care about the physical plan at the bottom.

What to Look For

12345678910
# A join with no broadcast hint – may cause an expensive shuffle from pyspark.sql import Row airlines_df = spark.createDataFrame([ Row(IATA="AA", NAME="American Airlines"), Row(IATA="DL", NAME="Delta Air Lines"), ]) joined = flights_df.join(airlines_df, flights_df["AIRLINE"] == airlines_df["IATA"]) joined.explain()

Key terms in the physical plan:

  • FileScan – reading from disk. Multiple FileScan nodes mean the file is read more than once – a sign that caching would help;
  • Exchange – a shuffle across the network. Expensive on large datasets;
  • BroadcastHashJoin – Spark broadcasts the smaller DataFrame to all executors, avoiding a shuffle. Faster than a regular join for small reference tables;
  • SortMergeJoin – used when both sides are large. Requires sorting and shuffling.

Forcing a Broadcast Join

12345
from pyspark.sql.functions import broadcast # Telling Spark to broadcast the small airlines DataFrame joined = flights_df.join(broadcast(airlines_df), flights_df["AIRLINE"] == airlines_df["IATA"]) joined.explain()
question mark

What does an Exchange node in the physical plan indicate?

Wählen Sie die richtige Antwort aus

War alles klar?

Wie können wir es verbessern?

Danke für Ihr Feedback!

Abschnitt 1. Kapitel 11

Fragen Sie AI

expand

Fragen Sie AI

ChatGPT

Fragen Sie alles oder probieren Sie eine der vorgeschlagenen Fragen, um unser Gespräch zu beginnen

Abschnitt 1. Kapitel 11
some-alt