Introduction to ML Pipelines in PySpark
Glissez pour afficher le menu
In the previous chapters you applied transformers one by one – fit, transform, fit, transform. For production workflows this is error-prone and hard to maintain. ML Pipelines chain multiple transformers and estimators into a single object that can be fit and applied in one step.
Pipeline Concepts
A Pipeline is a sequence of stages. Each stage is either:
- a Transformer – has only
transform(), takes a DataFrame and returns a DataFrame (e.g.VectorAssembler,OneHotEncoder); - an Estimator – has
fit()which produces a Transformer (e.g.StringIndexer,StandardScaler,LogisticRegression).
When you call pipeline.fit(df), Spark runs each stage in order – fitting estimators and applying transformers – and returns a PipelineModel.
Building a Pipeline
12345678910111213141516171819202122232425262728293031323334353637383940import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor, when from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("MLPipelines") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("integer")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Defining pipeline stages indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") encoder = OneHotEncoder(inputCol="AIRLINE_IDX", outputCol="AIRLINE_VEC") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "DEPARTURE_HOUR", "AIRLINE_VEC"], outputCol="FEATURES_RAW" ) scaler = StandardScaler(inputCol="FEATURES_RAW", outputCol="FEATURES", withMean=True, withStd=True) # Assembling the pipeline pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler]) # Fitting and transforming in one step pipeline_model = pipeline.fit(flights_df) result_df = pipeline_model.transform(flights_df) result_df.select("LABEL", "FEATURES").show(5, truncate=False)
Saving and Loading a Pipeline
12345678# Saving the fitted pipeline model to disk pipeline_model.save("flights_pipeline_model") # Loading it back for inference from pyspark.ml import PipelineModel loaded_model = PipelineModel.load("flights_pipeline_model") result_df = loaded_model.transform(flights_df)
Tout était clair ?
Merci pour vos commentaires !
Section 1. Chapitre 10
Demandez à l'IA
Demandez à l'IA
Posez n'importe quelle question ou essayez l'une des questions suggérées pour commencer notre discussion
Section 1. Chapitre 10