Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Impara Automating Model Retraining | Orchestrating ML Pipelines
MLOps for Machine Learning Engineers

bookAutomating Model Retraining

As new data becomes available, retraining your ML models ensures predictions remain accurate and relevant — a key step in maintaining model performance over time. With Apache Airflow, you can schedule and orchestrate this retraining automatically, removing manual steps and ensuring full reproducibility.

In this setup, an Airflow DAG defines each retraining step: loading data, preprocessing, training, evaluation, and logging. Airflow handles the order and scheduling, so your model stays up to date with minimal effort.

Example: Automated Retraining DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging

def load_data():
    iris = load_iris(as_frame=True)
    df = iris.frame
    df.to_csv('/tmp/iris.csv', index=False)

def train_and_evaluate():
    df = pd.read_csv('/tmp/iris.csv')
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('clf', RandomForestClassifier(n_estimators=50, random_state=42))
    ])
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    logging.info(f"Retrained model accuracy: {acc:.4f}")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'retrain_ml_model',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    tags=['ml', 'retraining'],
)

load_data_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

train_and_evaluate_task = PythonOperator(
    task_id='train_and_evaluate',
    python_callable=train_and_evaluate,
    dag=dag,
)

load_data_task >> train_and_evaluate_task
Note
Note

Automating retraining helps mitigate model drift — the gradual decline in performance as data patterns change. With Airflow, retraining pipelines can run daily, weekly, or after data updates, ensuring your deployed model stays accurate and reliable.

question mark

What is the main benefit of automating model retraining with Airflow?

Select the correct answer

Tutto è chiaro?

Come possiamo migliorarlo?

Grazie per i tuoi commenti!

Sezione 4. Capitolo 3

Chieda ad AI

expand

Chieda ad AI

ChatGPT

Chieda pure quello che desidera o provi una delle domande suggerite per iniziare la nostra conversazione

Suggested prompts:

Can you explain how to add more steps to the DAG, like model deployment?

How do I monitor the retraining process in Airflow?

What changes are needed to use my own dataset instead of Iris?

Awesome!

Completion rate improved to 6.25

bookAutomating Model Retraining

Scorri per mostrare il menu

As new data becomes available, retraining your ML models ensures predictions remain accurate and relevant — a key step in maintaining model performance over time. With Apache Airflow, you can schedule and orchestrate this retraining automatically, removing manual steps and ensuring full reproducibility.

In this setup, an Airflow DAG defines each retraining step: loading data, preprocessing, training, evaluation, and logging. Airflow handles the order and scheduling, so your model stays up to date with minimal effort.

Example: Automated Retraining DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging

def load_data():
    iris = load_iris(as_frame=True)
    df = iris.frame
    df.to_csv('/tmp/iris.csv', index=False)

def train_and_evaluate():
    df = pd.read_csv('/tmp/iris.csv')
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('clf', RandomForestClassifier(n_estimators=50, random_state=42))
    ])
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    logging.info(f"Retrained model accuracy: {acc:.4f}")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'retrain_ml_model',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    tags=['ml', 'retraining'],
)

load_data_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

train_and_evaluate_task = PythonOperator(
    task_id='train_and_evaluate',
    python_callable=train_and_evaluate,
    dag=dag,
)

load_data_task >> train_and_evaluate_task
Note
Note

Automating retraining helps mitigate model drift — the gradual decline in performance as data patterns change. With Airflow, retraining pipelines can run daily, weekly, or after data updates, ensuring your deployed model stays accurate and reliable.

question mark

What is the main benefit of automating model retraining with Airflow?

Select the correct answer

Tutto è chiaro?

Come possiamo migliorarlo?

Grazie per i tuoi commenti!

Sezione 4. Capitolo 3
some-alt