Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Lära Introduction to ML Pipelines in PySpark | Section
Feature Engineering with PySpark

Introduction to ML Pipelines in PySpark

Svep för att visa menyn

In the previous chapters you applied transformers one by one – fit, transform, fit, transform. For production workflows this is error-prone and hard to maintain. ML Pipelines chain multiple transformers and estimators into a single object that can be fit and applied in one step.

Pipeline Concepts

A Pipeline is a sequence of stages. Each stage is either:

  • a Transformer – has only transform(), takes a DataFrame and returns a DataFrame (e.g. VectorAssembler, OneHotEncoder);
  • an Estimator – has fit() which produces a Transformer (e.g. StringIndexer, StandardScaler, LogisticRegression).

When you call pipeline.fit(df), Spark runs each stage in order – fitting estimators and applying transformers – and returns a PipelineModel.

Building a Pipeline

12345678910111213141516171819202122232425262728293031323334353637383940
import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor, when from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("MLPipelines") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("integer")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Defining pipeline stages indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") encoder = OneHotEncoder(inputCol="AIRLINE_IDX", outputCol="AIRLINE_VEC") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "DEPARTURE_HOUR", "AIRLINE_VEC"], outputCol="FEATURES_RAW" ) scaler = StandardScaler(inputCol="FEATURES_RAW", outputCol="FEATURES", withMean=True, withStd=True) # Assembling the pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler]) # Fitting and transforming in one step pipeline_model = pipeline.fit(flights_df) result_df = pipeline_model.transform(flights_df) result_df.select("LABEL", "FEATURES").show(5, truncate=False)

Saving and Loading a Pipeline

12345678
# Saving the fitted pipeline model to disk pipeline_model.save("flights_pipeline_model") # Loading it back for inference from pyspark.ml import PipelineModel loaded_model = PipelineModel.load("flights_pipeline_model") result_df = loaded_model.transform(flights_df)
question mark

What is the difference between a Transformer and an Estimator in a PySpark Pipeline?

Vänligen välj det korrekta svaret

Var allt tydligt?

Hur kan vi förbättra det?

Tack för dina kommentarer!

Avsnitt 1. Kapitel 10

Fråga AI

expand

Fråga AI

ChatGPT

Fråga vad du vill eller prova någon av de föreslagna frågorna för att starta vårt samtal

Avsnitt 1. Kapitel 10
some-alt