1

I have following Code. It reads a pre-existing file for a ML model. I am trying to run it on databricks on multiple cases

import numpy as np
import joblib
class WeightedEnsembleRegressor:
 """
 Holds models, their weights, scaler and feature order for prediction & persistence.
 """
 import joblib
 def __init__(self, trained_models, model_weights, scaler, feature_order):
 self.trained_models = trained_models
 self.model_weights = model_weights
 self.scaler = scaler
 self.feature_order = feature_order
 def save(self, path):
 joblib.dump(self, path)
 @staticmethod
 def load(path):
 return joblib.load(path)
def Process(df):
 import traceback
 import xgboost
 import pickle
 import joblib
 import sys
 #print(sys.version)
 data={}
 data['msg']=[""]
 try:
 ensemble = WeightedEnsembleRegressor.load('final_model_v3_1.joblib')
 data['msg'] = ["success" + f"""{sys.version} """+ f"""{joblib.__version__} """ + f"""{pickle.compatible_formats} """]
 except Exception:
 data['msg'] = ["fail"+ f"""{sys.version} """ + f"""{joblib.__version__} """ + f"""{pickle.compatible_formats} """+f"""{traceback.format_exc()}""" + f"""{xgboost.__version__}""" + f"""{catboost.__version__}"""]
 return pd.DataFrame.from_dict(data,orient='index').transpose()

When I run this like below

display(Process(df))

It runs fine with following message

success3.11.10 (main, Sep 7 2024, 18:35:41) [GCC 11.4.0] 1.2.0 ['1.0', '1.1', '1.2', '1.3', '2.0', '3.0', '4.0', '5.0'] 

However, when I run like this

display(pDf.groupBy('num').applyInPandas(Process,'msg string'))

it fails with message

fail3.11.10 (main, Sep 7 2024, 18:35:41) [GCC 11.4.0] 1.2.0 ['1.0', '1.1', '1.2', '1.3', '2.0', '3.0', '4.0', '5.0'] Traceback (most recent call last):
 File "/home/spark-bf403da4-5cb5-46b6-b647-9a/.ipykernel/41375/command-8846548913142126-766074700", line 13, in Process
 File "/home/spark-bf403da4-5cb5-46b6-b647-9a/.ipykernel/41375/command-8846548913142125-2478242552", line 31, in load
 File "/databricks/python/lib/python3.11/site-packages/joblib/numpy_pickle.py", line 658, in load
 obj = _unpickle(fobj, filename, mmap_mode)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/databricks/python/lib/python3.11/site-packages/joblib/numpy_pickle.py", line 577, in _unpickle
 obj = unpickler.load()
 ^^^^^^^^^^^^^^^^
 File "/usr/lib/python3.11/pickle.py", line 1213, in load
 dispatch[key[0]](self)
 File "/usr/lib/python3.11/pickle.py", line 1538, in load_stack_global
 self.append(self.find_class(module, name))
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/usr/lib/python3.11/pickle.py", line 1582, in find_class
 return _getattribute(sys.modules[module], name)[0]
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/usr/lib/python3.11/pickle.py", line 331, in _getattribute
 raise AttributeError("Can't get attribute {!r} on {!r}"
AttributeError: Can't get attribute 'WeightedEnsembleRegressor' on <module '_engine_pyspark.databricks.workerwrap' from '/databricks/spark/python/_engine_pyspark.zip/_engine_pyspark/databricks/workerwrap.py'>
3.0.21.2.8

Any insight into how to fix this appreciated

asked Aug 12, 2025 at 12:40
4
  • No, it's hard to help because you're not showing the actual exception message at all. Commented Aug 12, 2025 at 12:54
  • @AKX, sorry added all the exception message Commented Aug 12, 2025 at 13:00
  • Looks like the class you define in the databricks script gets declared in a databricks-specific pseudo-module, which doesn't then exist in the spark environment. If you just have a data class (no methods on it), it'd be easier to pickle a dict of those attributes. Otherwise, you'd need to make sure the class has a stable import path. Commented Aug 12, 2025 at 13:32
  • @AKX, sorry I do not follow, the class is defined in the same script Commented Aug 12, 2025 at 15:42

1 Answer 1

0

The solution was to register as mlflow model


class WeightedEnsembleRegressorPyfunc(mlflow.pyfunc.PythonModel):
...

Then

python_model = WeightedEnsembleRegressor.load('final_model_v3_1.joblib')
mlflow.set_registry_uri("databricks")
with mlflow.start_run() as run:
 mlflow.pyfunc.log_model(
 artifact_path="ensemble_model",
 python_model=python_model
 )
 run_id = run.info.run_id

Now you can use python_model in a function and use applyInPandas

answered Sep 3, 2025 at 12:54
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.