Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Oppiskele Introduction to ML Pipelines in PySpark | Section
Feature Engineering with PySpark

Introduction to ML Pipelines in PySpark

Pyyhkäise näyttääksesi valikon

In the previous chapters you applied transformers one by one – fit, transform, fit, transform. For production workflows this is error-prone and hard to maintain. ML Pipelines chain multiple transformers and estimators into a single object that can be fit and applied in one step.

Pipeline Concepts

A Pipeline is a sequence of stages. Each stage is either:

  • a Transformer – has only transform(), takes a DataFrame and returns a DataFrame (e.g. VectorAssembler, OneHotEncoder);
  • an Estimator – has fit() which produces a Transformer (e.g. StringIndexer, StandardScaler, LogisticRegression).

When you call pipeline.fit(df), Spark runs each stage in order – fitting estimators and applying transformers – and returns a PipelineModel.

Building a Pipeline

12345678910111213141516171819202122232425262728293031323334353637383940
import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor, when from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("MLPipelines") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("integer")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Defining pipeline stages indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") encoder = OneHotEncoder(inputCol="AIRLINE_IDX", outputCol="AIRLINE_VEC") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "DEPARTURE_HOUR", "AIRLINE_VEC"], outputCol="FEATURES_RAW" ) scaler = StandardScaler(inputCol="FEATURES_RAW", outputCol="FEATURES", withMean=True, withStd=True) # Assembling the pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler]) # Fitting and transforming in one step pipeline_model = pipeline.fit(flights_df) result_df = pipeline_model.transform(flights_df) result_df.select("LABEL", "FEATURES").show(5, truncate=False)

Saving and Loading a Pipeline

12345678
# Saving the fitted pipeline model to disk pipeline_model.save("flights_pipeline_model") # Loading it back for inference from pyspark.ml import PipelineModel loaded_model = PipelineModel.load("flights_pipeline_model") result_df = loaded_model.transform(flights_df)
question mark

What is the difference between a Transformer and an Estimator in a PySpark Pipeline?

Valitse oikea vastaus

Oliko kaikki selvää?

Miten voimme parantaa sitä?

Kiitos palautteestasi!

Osio 1. Luku 10

Kysy tekoälyä

expand

Kysy tekoälyä

ChatGPT

Kysy mitä tahansa tai kokeile jotakin ehdotetuista kysymyksistä aloittaaksesi keskustelumme

Osio 1. Luku 10
some-alt