I have built a predictive maintenance machine learning project in a Jupyter Notebook, with plans to convert it into a Flask application later on. The code is functional and passes Ruff tests. But I am not sure if I have done everything the right way and my code follows the best practices. Would you please review my code?
"""
=============================================================================
Predictive Maintenance of Industrial Machines
This script performs an end-to-end machine learning project aimed at
predicting potential failures in industrial machines based on sensor data.
It covers data loading, cleaning, exploratory data analysis (EDA),
feature engineering, preprocessing, model training (evaluating multiple
classifiers), imbalance handling (SMOTE), model evaluation, and saving
the best-performing model for deployment.
The primary goal is to identify different types of machine failures
(e.g., tool wear, heat dissipation) to enable proactive maintenance,
thereby reducing downtime and operational costs.
=============================================================================
"""
# 1. Import Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import warnings
import configparser
import joblib
import logging # Import the logging module
# Ignore warnings (can also be configured with logging if desired)
warnings.filterwarnings("ignore")
# Scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
classification_report,
confusion_matrix,
ConfusionMatrixDisplay
# roc_auc_score # Removed as per Ruff feedback (unused) - add back if you implement ROC AUC
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC # Not used in models_to_evaluate, but imported. Keep if planned, else remove.
from sklearn.neighbors import KNeighborsClassifier
# Imbalanced-learn
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
# --- Configure Logging ---
# Basic configuration: logs to console, INFO level and above.
# You can customize this further (e.g., log to a file, different format).
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("Libraries imported and logging configured successfully.")
# 2. Configuration Loading
logger.info("--- Loading Configuration from config.ini ---")
config = configparser.ConfigParser()
# Add error handling for file not found
try:
if not config.read('config.ini'):
logger.error("Configuration file 'config.ini' not found or empty.")
# Decide how to handle this: raise error, exit, or use defaults
raise FileNotFoundError("config.ini not found or is empty.")
except configparser.Error as e:
logger.error(f"Error parsing config.ini: {e}")
raise
# Paths
DATA_FILE_PATH = config.get('Paths', 'DATA_FILE_PATH')
MODEL_PATH = config.get('Paths', 'MODEL_PATH')
PREPROCESSOR_PATH = config.get('Paths', 'PREPROCESSOR_PATH') # Still here, though less critical
# Column Names
OPERATIONAL_HOURS_COLUMN_NAME = config.get('ColumnNames', 'OPERATIONAL_HOURS_COLUMN_NAME')
FAILURE_TYPE_COLUMN_NAME = config.get('ColumnNames', 'FAILURE_TYPE_COLUMN_NAME')
# Helper function to parse comma-separated strings from config into lists
def parse_list_from_config(config_string):
return [item.strip() for item in config_string.split(',') if item.strip()]
# Raw Features
NUMERICAL_FEATURES_RAW = parse_list_from_config(config.get('Features', 'NUMERICAL_FEATURES_RAW_STR'))
CATEGORICAL_FEATURES_RAW = parse_list_from_config(config.get('Features', 'CATEGORICAL_FEATURES_RAW_STR'))
# Engineered Features
FEATURE_TEMP_DIFF = config.get('Features', 'FEATURE_TEMP_DIFF')
FEATURE_MECH_POWER = config.get('Features', 'FEATURE_MECH_POWER')
# Processed Target Column Name
TARGET_COLUMN_PROCESSED = config.get('Features', 'TARGET_COLUMN_PROCESSED')
logger.info("Configuration loaded successfully.")
# 3. Load and Initial Clean
logger.info(f"--- Loading data from {DATA_FILE_PATH} ---")
try:
df = pd.read_csv(DATA_FILE_PATH)
except FileNotFoundError:
logger.error(f"Data file not found at {DATA_FILE_PATH}. Please check the path in config.ini.")
raise
except Exception as e:
logger.error(f"Error loading data from {DATA_FILE_PATH}: {e}")
raise
logger.info("Original DataFrame head:\n%s", df.head())
logger.info("\nDataFrame Info (before any processing):")
# To log df.info() output, capture it or log components
# For simplicity, we can just note that info was checked. User can uncomment print for detail.
# df.info() # Or use: logger.info(df.info(verbose=True, buf=io.StringIO()).getvalue())
logger.info("DataFrame info checked (see console output if un-commented).")
logger.info("\nMissing values (before any processing):\n%s", df.isnull().sum())
logger.info("\nDuplicate rows: %s", df.duplicated().sum())
# Basic data cleaning and type conversion
if FAILURE_TYPE_COLUMN_NAME not in df.columns:
logger.error(f"Target column '{FAILURE_TYPE_COLUMN_NAME}' not found in DataFrame.")
raise KeyError(f"Target column '{FAILURE_TYPE_COLUMN_NAME}' not found in DataFrame.")
if OPERATIONAL_HOURS_COLUMN_NAME not in df.columns:
logger.error(f"Feature column '{OPERATIONAL_HOURS_COLUMN_NAME}' not found in DataFrame.")
raise KeyError(f"Feature column '{OPERATIONAL_HOURS_COLUMN_NAME}' not found in DataFrame.")
df[TARGET_COLUMN_PROCESSED] = df[FAILURE_TYPE_COLUMN_NAME]
df[OPERATIONAL_HOURS_COLUMN_NAME] = pd.to_numeric(df[OPERATIONAL_HOURS_COLUMN_NAME], errors='coerce')
if df[OPERATIONAL_HOURS_COLUMN_NAME].isnull().any():
num_nans = df[OPERATIONAL_HOURS_COLUMN_NAME].isnull().sum()
logger.warning(f"{num_nans} values in '{OPERATIONAL_HOURS_COLUMN_NAME}' became NaN after conversion. Consider imputation if necessary.")
columns_to_drop = ['UDI', 'Product ID'] # Standard columns to drop
# If the original failure type column name is different from the processed one, add it for dropping
if FAILURE_TYPE_COLUMN_NAME != TARGET_COLUMN_PROCESSED and FAILURE_TYPE_COLUMN_NAME in df.columns:
columns_to_drop.append(FAILURE_TYPE_COLUMN_NAME)
columns_to_drop_existing = [col for col in columns_to_drop if col in df.columns]
if columns_to_drop_existing:
df.drop(columns=columns_to_drop_existing, inplace=True)
logger.info(f"Dropped columns: {columns_to_drop_existing}")
logger.info("\nDataFrame head after initial cleaning and drops:\n%s", df.head())
# 4. Feature Engineering
logger.info("--- Performing Feature Engineering ---")
temp_cols_exist = 'Process_temperature_K' in df.columns and 'Air_temperature_K' in df.columns
power_cols_exist = 'Torque_Nm' in df.columns and 'Rotational_speed_rpm' in df.columns
if temp_cols_exist:
df[FEATURE_TEMP_DIFF] = df['Process_temperature_K'] - df['Air_temperature_K']
logger.info(f"Created feature: '{FEATURE_TEMP_DIFF}'")
else:
logger.warning(f"Columns for '{FEATURE_TEMP_DIFF}' calculation not found. Skipping feature.")
if power_cols_exist:
df[FEATURE_MECH_POWER] = np.round((df['Torque_Nm'] * df['Rotational_speed_rpm'] * 2 * np.pi) / 60, 4)
logger.info(f"Created feature: '{FEATURE_MECH_POWER}'")
else:
logger.warning(f"Columns for '{FEATURE_MECH_POWER}' calculation not found. Skipping feature.")
logger.info("\nDataFrame head after feature engineering:\n%s", df.head())
# 5. Exploratory Data Analysis (EDA)
# EDA often involves visual output, so `print` or direct display is common here.
# Logging can be used for summaries or if issues occur.
logger.info("--- Starting Exploratory Data Analysis (EDA) ---")
if 'Type' in df.columns:
plt.figure(figsize=(6, 4))
sns.countplot(x='Type', data=df, palette='viridis')
plt.title('Distribution of Product Types')
plt.xlabel('Product Type')
plt.ylabel('Count')
plt.show() # In a script, this might save the figure instead: plt.savefig('type_distribution.png')
if TARGET_COLUMN_PROCESSED in df.columns:
plt.figure(figsize=(10, 6))
sns.countplot(y=TARGET_COLUMN_PROCESSED, data=df, order=df[TARGET_COLUMN_PROCESSED].value_counts().index, palette='magma')
plt.title('Distribution of Failure Categories')
plt.xlabel('Count')
plt.ylabel('Failure Category')
plt.tight_layout()
plt.show() # plt.savefig('failure_categories.png')
logger.info("\nTarget variable '%s' distribution:\n%s", TARGET_COLUMN_PROCESSED, df[TARGET_COLUMN_PROCESSED].value_counts(normalize=True) * 100)
numerical_features_for_eda = NUMERICAL_FEATURES_RAW[:] # Create a copy
if FEATURE_TEMP_DIFF in df.columns:
numerical_features_for_eda.append(FEATURE_TEMP_DIFF)
if FEATURE_MECH_POWER in df.columns:
numerical_features_for_eda.append(FEATURE_MECH_POWER)
numerical_features_for_eda = [col for col in numerical_features_for_eda if col in df.columns and pd.api.types.is_numeric_dtype(df[col])]
logger.info(f"Plotting distributions for numerical features: {numerical_features_for_eda}")
for col in numerical_features_for_eda:
if df[col].isnull().any(): # Check for NaNs before plotting
logger.warning(f"Feature '{col}' contains NaNs. Distribution plot might be affected or error out. Consider imputation.")
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
sns.histplot(data=df, x=col, kde=True, ax=axes[0], color='skyblue', element="step")
axes[0].set_title(f"{col} Distribution")
sns.boxplot(data=df, x=col, ax=axes[1], color='lightcoral')
axes[1].set_title(f"{col} - Outlier Check")
plt.tight_layout()
plt.show() # plt.savefig(f'{col}_distribution_boxplot.png')
numeric_df_for_corr = df.select_dtypes(include=np.number)
if numeric_df_for_corr.empty:
logger.info("No numerical features found for correlation heatmap.")
else:
plt.figure(figsize=(12, 8))
corr_matrix = numeric_df_for_corr.corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=0.5)
plt.title('Correlation Matrix of Numerical Features')
plt.show() # plt.savefig('correlation_matrix.png')
# 6. Data Preprocessing for Modeling
logger.info("--- Preprocessing Data for Modeling ---")
ALL_NUMERICAL_FEATURES = NUMERICAL_FEATURES_RAW[:] # Create a copy
if FEATURE_TEMP_DIFF in df.columns:
ALL_NUMERICAL_FEATURES.append(FEATURE_TEMP_DIFF)
if FEATURE_MECH_POWER in df.columns:
ALL_NUMERICAL_FEATURES.append(FEATURE_MECH_POWER)
ALL_NUMERICAL_FEATURES = [col for col in ALL_NUMERICAL_FEATURES if col in df.columns]
ALL_CATEGORICAL_FEATURES = [col for col in CATEGORICAL_FEATURES_RAW if col in df.columns]
logger.info(f"Selected Numerical Features for Model: {ALL_NUMERICAL_FEATURES}")
logger.info(f"Selected Categorical Features for Model: {ALL_CATEGORICAL_FEATURES}")
# Ensure features are present and handle NaNs if any before creating X
for col_list, list_name in [(ALL_NUMERICAL_FEATURES, "Numerical"), (ALL_CATEGORICAL_FEATURES, "Categorical")]:
for col in col_list:
if df[col].isnull().any():
logger.warning(f"{list_name} feature '{col}' contains NaNs. This might cause issues in a pipeline without imputation. Consider handling NaNs explicitly.")
X = df[ALL_NUMERICAL_FEATURES + ALL_CATEGORICAL_FEATURES]
y = df[TARGET_COLUMN_PROCESSED]
if y.isnull().any():
nan_target_indices = y[y.isnull()].index
X = X.drop(index=nan_target_indices).reset_index(drop=True)
y = y.drop(index=nan_target_indices).reset_index(drop=True)
logger.info(f"Dropped {len(nan_target_indices)} rows due to NaN in target.")
logger.info(f"Shape of X: {X.shape}, Shape of y: {y.shape}")
if X.empty or y.empty:
logger.error("Feature matrix X or target vector y is empty. Cannot proceed with training.")
raise ValueError("X or y is empty after preprocessing.")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
# Define preprocessing pipelines
# Consider adding SimpleImputer if NaNs are expected and not handled before this stage
numerical_transformer = Pipeline(steps=[('scaler', StandardScaler())])
categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))]) # sparse_output=False for dense array
# Create the column transformer
# Ensure feature lists are not empty before creating ColumnTransformer
active_numerical_features = [col for col in ALL_NUMERICAL_FEATURES if col in X_train.columns]
active_categorical_features = [col for col in ALL_CATEGORICAL_FEATURES if col in X_train.columns]
transformers_list = []
if active_numerical_features:
transformers_list.append(('num', numerical_transformer, active_numerical_features))
else:
logger.warning("No active numerical features found for the preprocessor.")
if active_categorical_features:
transformers_list.append(('cat', categorical_transformer, active_categorical_features))
else:
logger.warning("No active categorical features found for the preprocessor.")
if not transformers_list:
logger.error("No transformers could be added to ColumnTransformer (no active numerical or categorical features).")
raise ValueError("Preprocessor cannot be created without active features.")
preprocessor = ColumnTransformer(
transformers=transformers_list,
remainder='drop' # or 'passthrough' if you have other columns you want to keep
)
logger.info("Preprocessor created successfully.")
# 7. Model Training & Evaluation
logger.info("--- Training and Evaluating Models ---")
models_to_evaluate = {
'Logistic Regression': LogisticRegression(solver='liblinear', multi_class='ovr', random_state=42, max_iter=1000), # Increased max_iter
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(random_state=42, class_weight='balanced'),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
}
results_summary = {}
best_model_name = None
best_model_pipeline = None
best_macro_f1 = 0.0
y_train_counts = Counter(y_train)
if not y_train_counts: # Handle empty y_train case
logger.error("y_train is empty. Cannot proceed with model training.")
raise ValueError("y_train is empty.")
min_class_count = min(y_train_counts.values())
smote_k_neighbors = max(1, min_class_count - 1) if min_class_count > 1 else 1 # Ensure k_neighbors >= 1
# SMOTE is applicable if the smallest class has at least k_neighbors + 1 samples.
# So if k_neighbors is 1, min_class_count must be >= 2.
use_smote = min_class_count > smote_k_neighbors and min_class_count > 1
if use_smote:
logger.info(f"SMOTE will be used with k_neighbors={smote_k_neighbors}.")
else:
logger.info(f"SMOTE will be skipped (min_class_count={min_class_count}, k_neighbors={smote_k_neighbors}). Relying on class_weight or model's robustness to imbalance.")
for name, model in models_to_evaluate.items():
logger.info(f"--- Training {name} ---")
pipeline_steps = [('preprocessor', preprocessor)]
# Apply SMOTE only if needed and model doesn't handle imbalance internally
# (e.g. RF with class_weight='balanced' already attempts to handle imbalance)
if use_smote and not ('class_weight' in model.get_params() and model.get_params()['class_weight'] == 'balanced'):
pipeline_steps.append(('smote', SMOTE(random_state=42, k_neighbors=smote_k_neighbors)))
logger.info(f"Applying SMOTE for {name}.")
elif use_smote and ('class_weight' in model.get_params() and model.get_params()['class_weight'] == 'balanced'):
logger.info(f"Model {name} uses class_weight. SMOTE not applied by default in this configuration.")
pipeline_steps.append(('classifier', model))
full_pipeline = ImbPipeline(steps=pipeline_steps)
try:
full_pipeline.fit(X_train, y_train)
y_pred_test = full_pipeline.predict(X_test)
report = classification_report(y_test, y_pred_test, zero_division=0, output_dict=True)
macro_f1_score = report['macro avg']['f1-score']
logger.info(f"{name} Test Macro F1-Score: {macro_f1_score:.4f}")
results_summary[name] = {'macro_f1': macro_f1_score, 'pipeline': full_pipeline}
if macro_f1_score > best_macro_f1:
best_macro_f1 = macro_f1_score
best_model_name = name
best_model_pipeline = full_pipeline
except Exception as e:
logger.error(f"Error training or evaluating {name}: {e}", exc_info=True) # Log full traceback
results_summary[name] = {'macro_f1': 0, 'pipeline': None, 'error': str(e)}
if best_model_name:
logger.info(f"Best performing model (based on Macro F1): {best_model_name} with F1: {best_macro_f1:.4f}")
else:
logger.warning("No model was successfully selected as best. Training a fallback Random Forest model.")
# Fallback to a default if needed
best_model_name = "Random Forest (Fallback)"
try:
fallback_pipeline_steps = [
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(random_state=42, class_weight='balanced'))
]
# Conditionally add SMOTE to fallback if applicable
if use_smote:
fallback_pipeline_steps.insert(1, ('smote', SMOTE(random_state=42, k_neighbors=smote_k_neighbors)))
logger.info("Applying SMOTE to fallback Random Forest.")
best_model_pipeline = ImbPipeline(steps=fallback_pipeline_steps)
best_model_pipeline.fit(X_train, y_train)
logger.info("Fallback Random Forest model has been trained.")
except Exception as e:
logger.error(f"Error training fallback Random Forest model: {e}", exc_info=True)
best_model_pipeline = None # Ensure it's None if fallback fails
final_model_pipeline = best_model_pipeline
# 8. Detailed Evaluation of the Chosen Model
if final_model_pipeline:
logger.info(f"--- Detailed Evaluation for: {best_model_name} ---")
y_pred_final = final_model_pipeline.predict(X_test)
# Ensure model_classes are derived correctly for the report
try:
model_classes = final_model_pipeline.named_steps['classifier'].classes_
except AttributeError:
logger.warning("Could not retrieve classes_ from the classifier. Using unique classes from y_test.")
model_classes = sorted(y_test.unique())
logger.info("\nClassification Report:\n%s", classification_report(y_test, y_pred_final, labels=model_classes, zero_division=0))
cm = confusion_matrix(y_test, y_pred_final, labels=model_classes)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=model_classes)
fig, ax = plt.subplots(figsize=(max(8, len(model_classes)*0.8), max(6, len(model_classes)*0.6))) # Dynamic figsize
disp.plot(cmap='Blues', ax=ax, xticks_rotation='vertical')
plt.title(f"{best_model_name} - Confusion Matrix")
plt.tight_layout()
plt.show() # plt.savefig(f'{best_model_name}_confusion_matrix.png')
# Feature Importance for tree-based models
final_classifier = final_model_pipeline.named_steps['classifier']
if hasattr(final_classifier, 'feature_importances_'):
try:
preprocessor_step = final_model_pipeline.named_steps['preprocessor']
transformed_feature_names = preprocessor_step.get_feature_names_out()
importances = final_classifier.feature_importances_
feature_imp_df = pd.DataFrame({
'Feature': transformed_feature_names,
'Importance': importances
}).sort_values(by='Importance', ascending=False)
plt.figure(figsize=(10, max(8, len(transformed_feature_names)*0.3))) # Dynamic figsize
sns.barplot(x='Importance', y='Feature', data=feature_imp_df.head(20), palette='viridis')
plt.title(f'Top 20 Feature Importances - {best_model_name}')
plt.tight_layout()
plt.show() # plt.savefig(f'{best_model_name}_feature_importance.png')
except Exception as e:
logger.error(f"Could not plot feature importances: {e}", exc_info=True)
# 9. Save Model
if final_model_pipeline:
try:
joblib.dump(final_model_pipeline, MODEL_PATH)
logger.info(f"--- Model Saved ---\nFull pipeline saved to: {MODEL_PATH}")
except Exception as e:
logger.error(f"Error saving model to {MODEL_PATH}: {e}", exc_info=True)
else:
logger.warning("No model was available to save.")
# 10. Example of Loading and Using the Saved Pipeline
logger.info("--- Example: Load and Predict with Saved Full Pipeline ---")
if not final_model_pipeline: # Check if a model was trained/assigned
logger.info("Skipping loading example as no final model pipeline was available/trained.")
else:
try:
loaded_full_pipeline = joblib.load(MODEL_PATH)
logger.info(f"Full pipeline loaded successfully from {MODEL_PATH}")
if X_test.empty:
logger.info("X_test is empty, cannot create sample data for prediction example.")
else:
sample_raw_data_df = X_test.head(3).copy()
logger.info("\nSample raw data for prediction (first 3 from X_test, as DataFrame):\n%s", sample_raw_data_df.to_string())
try:
predictions = loaded_full_pipeline.predict(sample_raw_data_df)
if hasattr(loaded_full_pipeline, "predict_proba"):
probabilities = loaded_full_pipeline.predict_proba(sample_raw_data_df)
else:
probabilities = None
logger.info("predict_proba not available for this loaded pipeline/model.")
logger.info("Predictions for sample data:")
for i, pred in enumerate(predictions):
logger.info(f"Sample {i+1}: Predicted Failure Category = {pred}")
if probabilities is not None:
class_list = list(loaded_full_pipeline.classes_)
if pred in class_list:
pred_class_idx = class_list.index(pred)
logger.info(f" Confidence (Prob for {pred}): {probabilities[i, pred_class_idx]:.4f}")
else:
logger.warning(f" Could not find class '{pred}' in model's known classes for probability display.")
except Exception as e:
logger.error(f"An error occurred during the prediction step with loaded model: {e}", exc_info=True)
except FileNotFoundError:
logger.error(f"Error: Model file {MODEL_PATH} not found. Cannot run prediction example.")
except Exception as e:
logger.error(f"An unexpected error occurred while loading the model or predicting: {e}", exc_info=True)
logger.info("--- Notebook execution finished ---")
Here is the link to my Jupyter Notebook:
The code is based on feedback that I received from my previous question on the same project. Here is the link: