Automating Model Retraining
As new data becomes available, retraining your ML models ensures predictions remain accurate and relevant — a key step in maintaining model performance over time. With Apache Airflow, you can schedule and orchestrate this retraining automatically, removing manual steps and ensuring full reproducibility.
In this setup, an Airflow DAG defines each retraining step: loading data, preprocessing, training, evaluation, and logging. Airflow handles the order and scheduling, so your model stays up to date with minimal effort.
Example: Automated Retraining DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging
def load_data():
iris = load_iris(as_frame=True)
df = iris.frame
df.to_csv('/tmp/iris.csv', index=False)
def train_and_evaluate():
df = pd.read_csv('/tmp/iris.csv')
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
pipeline = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=50, random_state=42))
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
acc = accuracy_score(y_test, y_pred)
logging.info(f"Retrained model accuracy: {acc:.4f}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'retrain_ml_model',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
tags=['ml', 'retraining'],
)
load_data_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)
train_and_evaluate_task = PythonOperator(
task_id='train_and_evaluate',
python_callable=train_and_evaluate,
dag=dag,
)
load_data_task >> train_and_evaluate_task
Automating retraining helps mitigate model drift — the gradual decline in performance as data patterns change. With Airflow, retraining pipelines can run daily, weekly, or after data updates, ensuring your deployed model stays accurate and reliable.
Tack för dina kommentarer!
Fråga AI
Fråga AI
Fråga vad du vill eller prova någon av de föreslagna frågorna för att starta vårt samtal
Can you explain how to add more steps to the DAG, like model deployment?
How do I monitor the retraining process in Airflow?
What changes are needed to use my own dataset instead of Iris?
Awesome!
Completion rate improved to 6.25
Automating Model Retraining
Svep för att visa menyn
As new data becomes available, retraining your ML models ensures predictions remain accurate and relevant — a key step in maintaining model performance over time. With Apache Airflow, you can schedule and orchestrate this retraining automatically, removing manual steps and ensuring full reproducibility.
In this setup, an Airflow DAG defines each retraining step: loading data, preprocessing, training, evaluation, and logging. Airflow handles the order and scheduling, so your model stays up to date with minimal effort.
Example: Automated Retraining DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import logging
def load_data():
iris = load_iris(as_frame=True)
df = iris.frame
df.to_csv('/tmp/iris.csv', index=False)
def train_and_evaluate():
df = pd.read_csv('/tmp/iris.csv')
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
pipeline = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=50, random_state=42))
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
acc = accuracy_score(y_test, y_pred)
logging.info(f"Retrained model accuracy: {acc:.4f}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'retrain_ml_model',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
tags=['ml', 'retraining'],
)
load_data_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)
train_and_evaluate_task = PythonOperator(
task_id='train_and_evaluate',
python_callable=train_and_evaluate,
dag=dag,
)
load_data_task >> train_and_evaluate_task
Automating retraining helps mitigate model drift — the gradual decline in performance as data patterns change. With Airflow, retraining pipelines can run daily, weekly, or after data updates, ensuring your deployed model stays accurate and reliable.
Tack för dina kommentarer!