Supervised Learning in PySpark: Overview
Deslize para mostrar o menu
Supervised learning trains a model on labeled examples – each row has an input (FEATURES) and a known output (LABEL). The model learns to map inputs to outputs and generalizes to new, unseen data.
PySpark MLlib provides a consistent API for all supervised algorithms: every model has fit() which produces a trained model, and transform() which generates predictions.
Two Types of Supervised Tasks
- Classification – the label is a category. Predicting whether a flight will be delayed (
0or1) is a binary classification problem; - Regression – the label is a continuous number. Predicting the exact number of minutes a flight will be delayed is a regression problem.
The flights dataset supports both: use ARRIVAL_DELAY > 15 as a binary label for classification, or use ARRIVAL_DELAY directly as a continuous target for regression.
The Standard MLlib Workflow
123456789101112131415161718192021222324252627import urllib.request from pyspark.sql import SparkSession from pyspark.sql.functions import col, floor from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml import Pipeline urllib.request.urlretrieve( "https://staging-content-media-cdn.codefinity.com/courses/aa80ac56-0d50-49e8-9231-2c2374cd3e9d/flights.csv", "flights.csv" ) spark = SparkSession.builder \ .appName("SupervisedOverview") \ .master("local[*]") \ .getOrCreate() flights_df = spark.read.csv("flights.csv", header=True, inferSchema=True) \ .fillna(0, subset=["DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "SCHEDULED_TIME"]) flights_df = flights_df \ .withColumn("LABEL", (col("ARRIVAL_DELAY") > 15).cast("double")) \ .withColumn("DEPARTURE_HOUR", floor(col("SCHEDULED_DEPARTURE") / 100).cast("integer")) # Splitting into train and test sets train_df, test_df = flights_df.randomSplit([0.8, 0.2], seed=42) print(f"Train rows: {train_df.count()}, Test rows: {test_df.count()}")
randomSplit([0.8, 0.2], seed=42) splits the data reproducibly – 80% for training, 20% for testing. Always split before fitting any transformer to avoid data leakage.
The Standard MLlib Workflow
Every MLlib algorithm follows the same pattern – define, fit, transform, evaluate. Here is a minimal end-to-end example using logistic regression as a preview of the next chapter:
123456789101112131415161718from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator indexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_IDX") assembler = VectorAssembler( inputCols=["DEPARTURE_DELAY", "DISTANCE", "SCHEDULED_TIME", "DEPARTURE_HOUR", "AIRLINE_IDX"], outputCol="FEATURES" ) lr = LogisticRegression(featuresCol="FEATURES", labelCol="LABEL", maxIter=10) pipeline = Pipeline(stages=[indexer, assembler, lr]) model = pipeline.fit(train_df) predictions = model.transform(test_df) evaluator = BinaryClassificationEvaluator(labelCol="LABEL", metricName="areaUnderROC") print(f"AUC-ROC: {evaluator.evaluate(predictions):.4f}")
Obrigado pelo seu feedback!
Pergunte à IA
Pergunte à IA
Pergunte o que quiser ou experimente uma das perguntas sugeridas para iniciar nosso bate-papo