Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Impara Introduction to ML Pipelines in PySpark | Section
Feature Engineering with PySpark

Introduction to ML Pipelines in PySpark

Scorri per mostrare il menu

In the previous chapters you applied transformers one by one – fit, transform, fit, transform. For production workflows this is error-prone and hard to maintain. ML Pipelines chain multiple transformers and estimators into a single object that can be fit and applied in one step.

Pipeline Concepts

A Pipeline is a sequence of stages. Each stage is either:

  • a Transformer – has only transform(), takes a DataFrame and returns a DataFrame (e.g. VectorAssembler, OneHotEncoder);
  • an Estimator – has fit() which produces a Transformer (e.g. StringIndexer, StandardScaler, LogisticRegression).

When you call pipeline.fit(df), Spark runs each stage in order – fitting estimators and applying transformers – and returns a PipelineModel.

Building a Pipeline

12345678910111213141516171819202122232425262728293031323334353637383940
import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor, when from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("MLPipelines") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("integer")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Defining pipeline stages indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") encoder = OneHotEncoder(inputCol="AIRLINE_IDX", outputCol="AIRLINE_VEC") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "DEPARTURE_HOUR", "AIRLINE_VEC"], outputCol="FEATURES_RAW" ) scaler = StandardScaler(inputCol="FEATURES_RAW", outputCol="FEATURES", withMean=True, withStd=True) # Assembling the pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler]) # Fitting and transforming in one step pipeline_model = pipeline.fit(flights_df) result_df = pipeline_model.transform(flights_df) result_df.select("LABEL", "FEATURES").show(5, truncate=False)

Saving and Loading a Pipeline

12345678
# Saving the fitted pipeline model to disk pipeline_model.save("flights_pipeline_model") # Loading it back for inference from pyspark.ml import PipelineModel loaded_model = PipelineModel.load("flights_pipeline_model") result_df = loaded_model.transform(flights_df)
question mark

What is the difference between a Transformer and an Estimator in a PySpark Pipeline?

Seleziona la risposta corretta

Tutto è chiaro?

Come possiamo migliorarlo?

Grazie per i tuoi commenti!

Sezione 1. Capitolo 10

Chieda ad AI

expand

Chieda ad AI

ChatGPT

Chieda pure quello che desidera o provi una delle domande suggerite per iniziare la nostra conversazione

Sezione 1. Capitolo 10
some-alt