Introduction to ML Pipelines in PySpark
Swipe to show menu
In the previous chapters you applied transformers one by one – fit, transform, fit, transform. For production workflows this is error-prone and hard to maintain. ML Pipelines chain multiple transformers and estimators into a single object that can be fit and applied in one step.
Pipeline Concepts
A Pipeline is a sequence of stages. Each stage is either:
- a Transformer – has only
transform(), takes a DataFrame and returns a DataFrame (e.g.VectorAssembler,OneHotEncoder); - an Estimator – has
fit()which produces a Transformer (e.g.StringIndexer,StandardScaler,LogisticRegression).
When you call pipeline.fit(df), Spark runs each stage in order – fitting estimators and applying transformers – and returns a PipelineModel.
Building a Pipeline
12345678910111213141516171819202122232425262728293031323334353637383940import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor, when from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("MLPipelines") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("integer")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Defining pipeline stages indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") encoder = OneHotEncoder(inputCol="AIRLINE_IDX", outputCol="AIRLINE_VEC") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "DEPARTURE_HOUR", "AIRLINE_VEC"], outputCol="FEATURES_RAW" ) scaler = StandardScaler(inputCol="FEATURES_RAW", outputCol="FEATURES", withMean=True, withStd=True) # Assembling the pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler]) # Fitting and transforming in one step pipeline_model = pipeline.fit(flights_df) result_df = pipeline_model.transform(flights_df) result_df.select("LABEL", "FEATURES").show(5, truncate=False)
Saving and Loading a Pipeline
12345678# Saving the fitted pipeline model to disk pipeline_model.save("flights_pipeline_model") # Loading it back for inference from pyspark.ml import PipelineModel loaded_model = PipelineModel.load("flights_pipeline_model") result_df = loaded_model.transform(flights_df)
Everything was clear?
Thanks for your feedback!
Section 1. Chapter 10
Ask AI
Ask AI
Ask anything or try one of the suggested questions to begin our chat
Section 1. Chapter 10