Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Lære Introduction to ML Pipelines in PySpark | Section
Feature Engineering with PySpark

Introduction to ML Pipelines in PySpark

Sveip for å vise menyen

In the previous chapters you applied transformers one by one – fit, transform, fit, transform. For production workflows this is error-prone and hard to maintain. ML Pipelines chain multiple transformers and estimators into a single object that can be fit and applied in one step.

Pipeline Concepts

A Pipeline is a sequence of stages. Each stage is either:

  • a Transformer – has only transform(), takes a DataFrame and returns a DataFrame (e.g. VectorAssembler, OneHotEncoder);
  • an Estimator – has fit() which produces a Transformer (e.g. StringIndexer, StandardScaler, LogisticRegression).

When you call pipeline.fit(df), Spark runs each stage in order – fitting estimators and applying transformers – and returns a PipelineModel.

Building a Pipeline

12345678910111213141516171819202122232425262728293031323334353637383940
import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor, when from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("MLPipelines") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("integer")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Defining pipeline stages indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") encoder = OneHotEncoder(inputCol="AIRLINE_IDX", outputCol="AIRLINE_VEC") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "DEPARTURE_HOUR", "AIRLINE_VEC"], outputCol="FEATURES_RAW" ) scaler = StandardScaler(inputCol="FEATURES_RAW", outputCol="FEATURES", withMean=True, withStd=True) # Assembling the pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler]) # Fitting and transforming in one step pipeline_model = pipeline.fit(flights_df) result_df = pipeline_model.transform(flights_df) result_df.select("LABEL", "FEATURES").show(5, truncate=False)

Saving and Loading a Pipeline

12345678
# Saving the fitted pipeline model to disk pipeline_model.save("flights_pipeline_model") # Loading it back for inference from pyspark.ml import PipelineModel loaded_model = PipelineModel.load("flights_pipeline_model") result_df = loaded_model.transform(flights_df)
question mark

What is the difference between a Transformer and an Estimator in a PySpark Pipeline?

Velg det helt riktige svaret

Alt var klart?

Hvordan kan vi forbedre det?

Takk for tilbakemeldingene dine!

Seksjon 1. Kapittel 10

Spør AI

expand

Spør AI

ChatGPT

Spør om hva du vil, eller prøv ett av de foreslåtte spørsmålene for å starte chatten vår

Seksjon 1. Kapittel 10
some-alt