I am developing a predictive maintenance project using Machine Learning. I have completed the initial code in Jupyter Notebook. Later, I will convert it into a Flask app.
The code is working fine for now, but I am not completely sure if it adheres to best practices. Could you kindly review it?
Here's my code in Jupyter Notebook:
# =============================================================================
# Predictive Maintenance of Industrial Machines
#
# This script performs an end-to-end machine learning project aimed at
# predicting potential failures in industrial machines based on sensor data.
# It covers data loading, cleaning, exploratory data analysis (EDA),
# feature engineering, preprocessing, model training (evaluating multiple
# classifiers), imbalance handling (SMOTE), model evaluation, and saving
# the best-performing model for deployment.
#
# The primary goal is to identify different types of machine failures
# (e.g., tool wear, heat dissipation) to enable proactive maintenance,
# thereby reducing downtime and operational costs.
# =============================================================================
# 1. Import Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import warnings
import configparser
import joblib
# Ignore warnings
warnings.filterwarnings("ignore")
# Scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
classification_report,
confusion_matrix,
ConfusionMatrixDisplay,
roc_auc_score
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
# Imbalanced-learn
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
print("Libraries imported successfully.")
# 2. Configuration Loading
print("\n--- Loading Configuration from config.ini ---")
config = configparser.ConfigParser()
config.read('config.ini')
# Paths
DATA_FILE_PATH = config.get('Paths', 'DATA_FILE_PATH')
MODEL_PATH = config.get('Paths', 'MODEL_PATH')
PREPROCESSOR_PATH = config.get('Paths', 'PREPROCESSOR_PATH')
# Column Names
OPERATIONAL_HOURS_COLUMN_NAME = config.get('ColumnNames', 'OPERATIONAL_HOURS_COLUMN_NAME')
FAILURE_TYPE_COLUMN_NAME = config.get('ColumnNames', 'FAILURE_TYPE_COLUMN_NAME')
# Helper function to parse comma-separated strings from config into lists
def parse_list_from_config(config_string):
return [item.strip() for item in config_string.split(',') if item.strip()]
# Raw Features
NUMERICAL_FEATURES_RAW = parse_list_from_config(config.get('Features', 'NUMERICAL_FEATURES_RAW_STR'))
CATEGORICAL_FEATURES_RAW = parse_list_from_config(config.get('Features', 'CATEGORICAL_FEATURES_RAW_STR'))
# Engineered Features
FEATURE_TEMP_DIFF = config.get('Features', 'FEATURE_TEMP_DIFF')
FEATURE_MECH_POWER = config.get('Features', 'FEATURE_MECH_POWER')
# Processed Target Column Name
TARGET_COLUMN_PROCESSED = config.get('Features', 'TARGET_COLUMN_PROCESSED')
print("Configuration loaded successfully.")
# 3. Load and Initial Clean
print(f"\n--- Loading data from {DATA_FILE_PATH} ---")
df = pd.read_csv(DATA_FILE_PATH)
print("Original DataFrame head:")
print(df.head())
print("\nDataFrame Info (before any processing):")
df.info()
print("\nMissing values (before any processing):")
print(df.isnull().sum())
print("\nDuplicate rows:", df.duplicated().sum())
# Basic data cleaning and type conversion
if FAILURE_TYPE_COLUMN_NAME not in df.columns:
raise KeyError(f"Target column '{FAILURE_TYPE_COLUMN_NAME}' not found in DataFrame.")
if OPERATIONAL_HOURS_COLUMN_NAME not in df.columns:
raise KeyError(f"Feature column '{OPERATIONAL_HOURS_COLUMN_NAME}' not found in DataFrame.")
df[TARGET_COLUMN_PROCESSED] = df[FAILURE_TYPE_COLUMN_NAME]
df[OPERATIONAL_HOURS_COLUMN_NAME] = pd.to_numeric(df[OPERATIONAL_HOURS_COLUMN_NAME], errors='coerce')
if df[OPERATIONAL_HOURS_COLUMN_NAME].isnull().any():
num_nans = df[OPERATIONAL_HOURS_COLUMN_NAME].isnull().sum()
print(f"Warning: {num_nans} values in '{OPERATIONAL_HOURS_COLUMN_NAME}' became NaN after conversion.")
columns_to_drop = ['UDI', 'Product ID']
if FAILURE_TYPE_COLUMN_NAME != TARGET_COLUMN_PROCESSED:
columns_to_drop.append(FAILURE_TYPE_COLUMN_NAME)
columns_to_drop_existing = [col for col in columns_to_drop if col in df.columns]
if columns_to_drop_existing:
df.drop(columns=columns_to_drop_existing, inplace=True)
print(f"\nDropped columns: {columns_to_drop_existing}")
print("\nDataFrame head after initial cleaning and drops:")
print(df.head())
# 4. Feature Engineering
print("\n--- Performing Feature Engineering ---")
temp_cols_exist = 'Process_temperature_K' in df.columns and 'Air_temperature_K' in df.columns
power_cols_exist = 'Torque_Nm' in df.columns and 'Rotational_speed_rpm' in df.columns
if temp_cols_exist:
df[FEATURE_TEMP_DIFF] = df['Process_temperature_K'] - df['Air_temperature_K']
print(f"Created feature: '{FEATURE_TEMP_DIFF}'")
else:
print(f"Warning: Columns for '{FEATURE_TEMP_DIFF}' calculation not found. Skipping feature.")
if power_cols_exist:
df[FEATURE_MECH_POWER] = np.round((df['Torque_Nm'] * df['Rotational_speed_rpm'] * 2 * np.pi) / 60, 4)
print(f"Created feature: '{FEATURE_MECH_POWER}'")
else:
print(f"Warning: Columns for '{FEATURE_MECH_POWER}' calculation not found. Skipping feature.")
print("\nDataFrame head after feature engineering:")
print(df.head())
# 5. Exploratory Data Analysis (EDA)
print("\n--- Starting Exploratory Data Analysis (EDA) ---")
if 'Type' in df.columns:
plt.figure(figsize=(6, 4))
sns.countplot(x='Type', data=df, palette='viridis')
plt.title('Distribution of Product Types')
plt.xlabel('Product Type')
plt.ylabel('Count')
plt.show()
if TARGET_COLUMN_PROCESSED in df.columns:
plt.figure(figsize=(10, 6))
sns.countplot(y=TARGET_COLUMN_PROCESSED, data=df, order=df[TARGET_COLUMN_PROCESSED].value_counts().index, palette='magma')
plt.title('Distribution of Failure Categories')
plt.xlabel('Count')
plt.ylabel('Failure Category')
plt.tight_layout()
plt.show()
print(f"\nTarget variable '{TARGET_COLUMN_PROCESSED}' distribution:\n", df[TARGET_COLUMN_PROCESSED].value_counts(normalize=True) * 100)
numerical_features_for_eda = NUMERICAL_FEATURES_RAW[:]
if FEATURE_TEMP_DIFF in df.columns:
numerical_features_for_eda.append(FEATURE_TEMP_DIFF)
if FEATURE_MECH_POWER in df.columns:
numerical_features_for_eda.append(FEATURE_MECH_POWER)
numerical_features_for_eda = [col for col in numerical_features_for_eda if col in df.columns and pd.api.types.is_numeric_dtype(df[col])]
print(f"\nPlotting distributions for numerical features: {numerical_features_for_eda}")
for col in numerical_features_for_eda:
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
sns.histplot(data=df, x=col, kde=True, ax=axes[0], color='skyblue', element="step")
axes[0].set_title(f"{col} Distribution")
sns.boxplot(data=df, x=col, ax=axes[1], color='lightcoral')
axes[1].set_title(f"{col} - Outlier Check")
plt.tight_layout()
plt.show()
# REFACTORED for better readability as per feedback
numeric_df_for_corr = df.select_dtypes(include=np.number)
if numeric_df_for_corr.empty:
print("No numerical features found for correlation heatmap.")
else:
plt.figure(figsize=(12, 8))
corr_matrix = numeric_df_for_corr.corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=0.5)
plt.title('Correlation Matrix of Numerical Features')
plt.show()
# 6. Data Preprocessing for Modeling
print("\n--- Preprocessing Data for Modeling ---")
ALL_NUMERICAL_FEATURES = NUMERICAL_FEATURES_RAW[:]
if FEATURE_TEMP_DIFF in df.columns: ALL_NUMERICAL_FEATURES.append(FEATURE_TEMP_DIFF)
if FEATURE_MECH_POWER in df.columns: ALL_NUMERICAL_FEATURES.append(FEATURE_MECH_POWER)
ALL_NUMERICAL_FEATURES = [col for col in ALL_NUMERICAL_FEATURES if col in df.columns]
ALL_CATEGORICAL_FEATURES = [col for col in CATEGORICAL_FEATURES_RAW if col in df.columns]
print(f"Selected Numerical Features for Model: {ALL_NUMERICAL_FEATURES}")
print(f"Selected Categorical Features for Model: {ALL_CATEGORICAL_FEATURES}")
X = df[ALL_NUMERICAL_FEATURES + ALL_CATEGORICAL_FEATURES]
y = df[TARGET_COLUMN_PROCESSED]
if y.isnull().any():
nan_target_indices = y[y.isnull()].index
X = X.drop(index=nan_target_indices).reset_index(drop=True)
y = y.drop(index=nan_target_indices).reset_index(drop=True)
print(f"Dropped {len(nan_target_indices)} rows due to NaN in target.")
print("\nShape of X:", X.shape)
print("Shape of y:", y.shape)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
# Define preprocessing pipelines
numerical_transformer = Pipeline(steps=[('scaler', StandardScaler())])
categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore'))])
# Create the column transformer
preprocessor = ColumnTransformer(
transformers=[
('num', numerical_transformer, ALL_NUMERICAL_FEATURES),
('cat', categorical_transformer, ALL_CATEGORICAL_FEATURES)
],
remainder='drop'
)
print("\nPreprocessor created successfully.")
# 7. Model Training & Evaluation
print("\n--- Training and Evaluating Models ---")
models_to_evaluate = {
'Logistic Regression': LogisticRegression(solver='liblinear', multi_class='ovr', random_state=42),
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(random_state=42, class_weight='balanced'),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
}
results_summary = {}
best_model_name = None
best_model_pipeline = None
best_macro_f1 = 0.0
y_train_counts = Counter(y_train)
min_class_count = min(y_train_counts.values()) if y_train_counts else 0
smote_k_neighbors = max(1, min_class_count - 1) if min_class_count > 1 else 1
use_smote = min_class_count > smote_k_neighbors
if use_smote:
print(f"SMOTE will be used with k_neighbors={smote_k_neighbors}.")
else:
print("SMOTE will be skipped due to insufficient minority class samples.")
for name, model in models_to_evaluate.items():
print(f"\n--- Training {name} ---")
pipeline_steps = [('preprocessor', preprocessor)]
# Apply SMOTE only if needed and model doesn't handle imbalance internally
if use_smote and 'class_weight' not in model.get_params():
pipeline_steps.append(('smote', SMOTE(random_state=42, k_neighbors=smote_k_neighbors)))
print(f"Applying SMOTE for {name}.")
pipeline_steps.append(('classifier', model))
full_pipeline = ImbPipeline(steps=pipeline_steps)
full_pipeline.fit(X_train, y_train)
y_pred_test = full_pipeline.predict(X_test)
report = classification_report(y_test, y_pred_test, zero_division=0, output_dict=True)
macro_f1_score = report['macro avg']['f1-score']
print(f"{name} Test Macro F1-Score: {macro_f1_score:.4f}")
results_summary[name] = {'macro_f1': macro_f1_score, 'pipeline': full_pipeline}
if macro_f1_score > best_macro_f1:
best_macro_f1 = macro_f1_score
best_model_name = name
best_model_pipeline = full_pipeline
if best_model_name:
print(f"\nBest performing model (based on Macro F1): {best_model_name} with F1: {best_macro_f1:.4f}")
else:
print("\nNo model was successfully selected as best.")
# Fallback to a default if needed
best_model_name = "Random Forest (Fallback)"
best_model_pipeline = ImbPipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(random_state=42, class_weight='balanced'))
])
best_model_pipeline.fit(X_train, y_train)
print("A fallback Random Forest model has been trained.")
final_model_pipeline = best_model_pipeline
# 8. Detailed Evaluation of the Chosen Model
if final_model_pipeline:
print(f"\n--- Detailed Evaluation for: {best_model_name} ---")
y_pred_final = final_model_pipeline.predict(X_test)
model_classes = final_model_pipeline.classes_
print("\nClassification Report:")
print(classification_report(y_test, y_pred_final, labels=model_classes, zero_division=0))
cm = confusion_matrix(y_test, y_pred_final, labels=model_classes)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=model_classes)
fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(cmap='Blues', ax=ax, xticks_rotation='vertical')
plt.title(f"{best_model_name} - Confusion Matrix")
plt.tight_layout()
plt.show()
# Feature Importance for tree-based models
final_classifier = final_model_pipeline.named_steps['classifier']
if hasattr(final_classifier, 'feature_importances_'):
preprocessor_step = final_model_pipeline.named_steps['preprocessor']
transformed_feature_names = preprocessor_step.get_feature_names_out()
importances = final_classifier.feature_importances_
feature_imp_df = pd.DataFrame({
'Feature': transformed_feature_names,
'Importance': importances
}).sort_values(by='Importance', ascending=False)
plt.figure(figsize=(10, 8))
sns.barplot(x='Importance', y='Feature', data=feature_imp_df.head(20), palette='viridis')
plt.title(f'Top 20 Feature Importances - {best_model_name}')
plt.tight_layout()
plt.show()
# 9. Save Model
if final_model_pipeline:
joblib.dump(final_model_pipeline, MODEL_PATH)
print(f"\n--- Model Saved ---\nFull pipeline saved to: {MODEL_PATH}")
else:
print("\nNo model was available to save.")
# 10. REFACTORED: Example of Loading and Using the Saved Pipeline
print("\n--- Example: Load and Predict with Saved Full Pipeline ---")
if not final_model_pipeline:
print("Skipping loading example as no final model pipeline was available/trained.")
else:
# --- Step 1: Try to load the file ---
# This try block is now minimal, focusing only on the file loading operation.
try:
loaded_full_pipeline = joblib.load(MODEL_PATH)
except FileNotFoundError:
print(f"Error: Model file not found at {MODEL_PATH}. Cannot run prediction example.")
except Exception as e:
# Catch other potential loading errors (e.g., pickle error, permissions)
print(f"An unexpected error occurred while loading the model: {e}")
else:
# --- Step 2: Proceed with prediction ONLY if loading was successful ---
print(f"Full pipeline loaded successfully from {MODEL_PATH}")
if X_test.empty:
print("X_test is empty, cannot create sample data for prediction example.")
else:
sample_raw_data_df = X_test.head(3).copy()
print("\nSample raw data for prediction (first 3 from X_test, as DataFrame):")
print(sample_raw_data_df)
# --- Step 3: A separate try block for the prediction logic ---
# This isolates prediction errors (e.g., bad input data) from loading errors.
try:
predictions = loaded_full_pipeline.predict(sample_raw_data_df)
# Check for predict_proba availability safely
if hasattr(loaded_full_pipeline, "predict_proba"):
probabilities = loaded_full_pipeline.predict_proba(sample_raw_data_df)
else:
probabilities = None
print("predict_proba not available for this loaded pipeline/model.")
print("\nPredictions for sample data:")
for i, pred in enumerate(predictions):
print(f"Sample {i+1}: Predicted Failure Category = {pred}")
if probabilities is not None:
# Safely get the class index
class_list = list(loaded_full_pipeline.classes_)
if pred in class_list:
pred_class_idx = class_list.index(pred)
print(f" Confidence (Prob for {pred}): {probabilities[i, pred_class_idx]:.4f}")
else:
print(f" Could not find class '{pred}' in model's known classes.")
except Exception as e:
# Catches errors during .predict() or .predict_proba()
print(f"\nAn error occurred during the prediction step: {e}")
print("This might be due to a mismatch between the sample data and what the model expects.")
print("\n--- Notebook execution finished ---")
The code is based on feedback that I received from my previous question on the same project. Here is the link:
1 Answer 1
Documentation
The PEP 8 style guide recommends adding a docstring at the top of the code to summarize its purpose. You could convert the header comments to a docstring:
"""
=============================================================================
Predictive Maintenance of Industrial Machines
This script performs an end-to-end machine learning project aimed at
predicting potential failures in industrial machines based on sensor data.
It covers data loading, cleaning, exploratory data analysis (EDA),
feature engineering, preprocessing, model training (evaluating multiple
classifiers), imbalance handling (SMOTE), model evaluation, and saving
the best-performing model for deployment.
The primary goal is to identify different types of machine failures
(e.g., tool wear, heat dissipation) to enable proactive maintenance,
thereby reducing downtime and operational costs.
=============================================================================
"""
Logging
There are many print
statements in the code (more than 60). These
appear to be informational in nature, as opposed to being used to write
to an output file. They look like they are intended to update the user on
progress and to notify or warn the user of unexpected conditions. With so
many potential messages, it is possible that some of the more important ones
will be easily missed by the user.
Consider logging to gain better control of the printed messages. This can categorize them as info, warnings and errors.
Comments
It is great that you incorporated advice from previous reviews. However, there is no need to permanently add comments to the code which merely state that you did so. For example, this comment can be deleted:
# REFACTORED for better readability as per feedback
This comment can also be deleted since it just restates what the code below it does:
# 10. REFACTORED: Example of Loading and Using the Saved Pipeline
print("\n--- Example: Load and Predict with Saved Full Pipeline ---")
Here is another that can be deleted:
# This try block is now minimal, focusing only on the file loading operation.
It is perfectly understandable if you find these comments helpful while developing the code. You could add them to some versions of the file in your GitHub repo, but they don't belong in the final version.
Tools
You could run code development tools to automatically find some style issues with your code.
For example, ruff
identifies unused imports like:
F401 [*] `sklearn.metrics.roc_auc_score` imported but unused
|
| confusion_matrix,
| ConfusionMatrixDisplay,
| roc_auc_score
| ^^^^^^^^^^^^^ F401
| )
| from sklearn.linear_model import LogisticRegression
|
= help: Remove unused import: `sklearn.metrics.roc_auc_score`
Some of them are classified as "fixable", and you can instruct ruff
to automatically remove them using the --fix
option.
ruff
also identifies lines like this:
if FEATURE_TEMP_DIFF in df.columns: ALL_NUMERICAL_FEATURES.append(FEATURE_TEMP_DIFF)
which would be better split into 2 lines:
if FEATURE_TEMP_DIFF in df.columns:
ALL_NUMERICAL_FEATURES.append(FEATURE_TEMP_DIFF)
The same goes for:
if FEATURE_MECH_POWER in df.columns: ALL_NUMERICAL_FEATURES.append(FEATURE_MECH_POWER)