Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New update with mlflow experiments causing _pickle.PicklingError #95

Open
hatMatch opened this issue Oct 28, 2022 · 0 comments
Open

New update with mlflow experiments causing _pickle.PicklingError #95

hatMatch opened this issue Oct 28, 2022 · 0 comments

Comments

@hatMatch
Copy link

Since the update to mlflow integration with hyperopt where names are automatically assigned to experiments (such as smiling-worm-674), I began getting the following error consistently when running a previously working mlflow experiment with SparkTrials().

ERROR:hyperopt-spark:trial task 0 failed, exception is
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 405.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 405.0 (TID 1472) (10.143.252.81 executor 0):
org.apache.spark.api.python.PythonException: '_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range'

However, my experiment is not doing any pickling and my code is not referenced in the full traceback, so I am not exactly sure what the issue is. I can confirm that the experiment works when using hyperopt.Trials() rather than hyperopt.SparkTrials(). Apologies for such a lengthy issue, and sorry if the issue is some simple mistake on my end!

Here is the full traceback:

Full Traceback
Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
   return Pickler.dump(self, obj)
 File "/databricks/python/lib/python3.9/site-packages/patsy/origin.py", line 117, in __getstate__
   raise NotImplementedError
NotImplementedError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/serializers.py", line 527, in dumps
   return cloudpickle.dumps(obj, pickle_protocol)
 File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
   cp.dump(obj)
 File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 604, in dump
   if "recursion" in e.args[0]:
IndexError: tuple index out of range

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/worker.py", line 876, in main
   process()
 File "/databricks/spark/python/pyspark/worker.py", line 868, in process
   serializer.dump_stream(out_iter, outfile)
 File "/databricks/spark/python/pyspark/serializers.py", line 329, in dump_stream
   bytes = self.serializer.dumps(vs)
 File "/databricks/spark/python/pyspark/serializers.py", line 537, in dumps
   raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range

   at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:692)
   at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:902)
   at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:884)
   at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:645)
   at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   at scala.collection.Iterator.foreach(Iterator.scala:943)
   at scala.collection.Iterator.foreach$(Iterator.scala:943)
   at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
   at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
   at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
   at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
   at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1029)
   at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
   at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
   at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
   at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
   at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
   at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   at org.apache.spark.scheduler.Task.run(Task.scala:96)
   at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
   at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
   at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3257)
   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3189)
   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3180)
   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3180)
   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1414)
   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1414)
   at scala.Option.foreach(Option.scala:407)
   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1414)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3466)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3407)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3395)
   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1166)
   at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2702)
   at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1027)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:411)
   at org.apache.spark.rdd.RDD.collect(RDD.scala:1025)
   at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:282)
   at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
   at sun.reflect.GeneratedMethodAccessor282.invoke(Unknown Source)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
   at py4j.Gateway.invoke(Gateway.java:306)
   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at py4j.commands.CallCommand.execute(CallCommand.java:79)
   at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
   at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
   at java.lang.Thread.run(Thread.java:748)

The following is the code that is being run in the experiments:

mlflow.start_run()
spark_trials = SparkTrials(parallelism=16)

with mlflow.start_run(run_name='test_experiment'):
  best_result = fmin(
    fn=objective, 
    space=space,
    algo=tpe.suggest,
    max_evals=1024,
    trials=spark_trials)
Hyperopt Optimization Function
def objective(args):
    
    # Initialize model pipeline
    pipe = Pipeline(steps=[
        ('selection', args['selection'])
    ])
    
    pipe.set_params(**args['params']) # Model parameters will be set here
    pipe.fit(X, y)
    penalty = pipe['selection'].penalty_
    try:
        residual = np.sum(pipe['selection']._resid) / len(pipe['selection']._resid)
    except AttributeError:
        residual = -10000
    r2 = r2_score(y, pipe.predict(X))
    score = 1 - r2
    mean_square = mean_squared_error(y, pipe.predict(X))
    mlflow.log_metric('avg_residual', residual)
    mlflow.log_metric('mean_squared_error', mean_square)
    mlflow.log_metric('penalty', penalty)
    mlflow.log_metric('r2', r2)

    print(f"Model Name: {args['selection']}: ", score)
          
    # Since we have to minimize the score, we return 1- score.
    return {'loss': score, 'status': STATUS_OK}

Here are the parameters and parameter space:

Params and Parameter Space
params = {
  'selection__fixed': hp.choice('selection.fixed', fixed_arrs),
  'selection__random': hp.choice('selection.random', random_arrs),
  'selection__intercept': hp.choice('selection.intercept', (0, 1)),
  'selection__cov': hp.choice('selection.cov', (0, 1))
  }

space = hp.choice('regressors', [
    {
    'selection':LMEBaseRegressor(group=['panel'],
                                 dependent=dependent,
                                 media=media_cols),
    'params': params
    }
  ]
)

And finally here is the regressor I am using (including because its a custom class built ontop of sklearn):

LMEBaseRegressor Class
class LMEBaseRegressor(BaseEstimator, RegressorMixin):
    """Implementation of an LME Regression for scikit."""

    def __init__(self, random=None, fixed=None,
                 group=['panel'], dependent=None,
                 intercept=0, cov=0, media=None):
        self.random = random
        self.fixed = fixed
        self.group = group
        self.dependent = dependent
        self.intercept = intercept
        self.cov = cov
        self.media = media

    def fit(self, X, y):
        """Fit the model with LME."""
        str_dep = self.dependent[0]
        str_fixed = ' + '.join(self.fixed)
        str_random = ' + '.join(self.random)
        data = pd.concat([X, y], axis=1)
        self.penalty_ = 0
        print(f"{str_dep} ~ {self.intercept} + {str_fixed}")
        print(f"{self.cov} + {str_random}")
        try:
            mixed = smf.mixedlm(f"{str_dep} ~ {self.intercept} + {str_fixed}",
                                data,
                                re_formula=f"~ {self.cov} + {str_random}",
                                groups=data['panel'],
                                use_sqrt=True)\
                .fit(method=['lbfgs'])
            self._model = mixed
            self._resid = mixed.resid
            self.coef_ = mixed.params[0:len(self.fixed)]                    
        
        except(ValueError):
            print("Cannot predict random effects from singular covariance structure.")
            self.penalty_ = 100

        except(np.linalg.LinAlgError):
            print("Linear Algebra Error: recheck base model fit or try using fewer variables.")
            self.penalty_  = 100
        return self

    def predict(self, X):
        """Take the coefficients provided from fit and multiply them by X."""
        if self.penalty_ != 0:
            return np.ones(len(X)) * -100 * self.penalty_
        return self._model.predict(X)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant