Skip to content

Commit

Permalink
Removed hard dependency on Ray.
Browse files Browse the repository at this point in the history
  • Loading branch information
adivekar-utexas committed Jan 8, 2025
1 parent 120cfed commit 4454e0a
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 687 deletions.
32 changes: 20 additions & 12 deletions src/fmcore/data/sdf/DaskScalableDataFrame.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
from typing import *
import math, io, ray, numpy as np, pandas as pd
import io
import math
import numpy as np
from collections import deque
from concurrent.futures._base import Future
from pandas.core.frame import Series as PandasSeries, DataFrame as PandasDataFrame
from typing import *

import dask.dataframe as dd
from dask.dataframe.core import Scalar as DaskScalar, Series as DaskSeries, DataFrame as DaskDataFrame
from fmcore.util import multiple_are_not_none, all_are_none, is_function, wrap_fn_output, \
get_default, RayDaskPersistWaitCallback, set_param_from_alias, Alias, safe_validate_arguments, Log, Executor
from pandas.core.frame import Series as PandasSeries, DataFrame as PandasDataFrame
from pydantic import conint
from pydantic.typing import Literal

from fmcore.constants import DataLayout, Parallelize
from fmcore.data.sdf.ScalableSeries import ScalableSeries
from fmcore.data.sdf.DaskScalableSeries import DaskScalableSeries
from fmcore.data.sdf.ScalableDataFrame import ScalableDataFrame, ScalableDataFrameOrRaw, is_scalable, \
DataFrameShardingError
from fmcore.data.sdf.DaskScalableSeries import DaskScalableSeries
from pydantic import validate_arguments, conint, constr
from fmcore.data.sdf.ScalableSeries import ScalableSeries
from fmcore.util import accumulate, run_concurrent
from pydantic.typing import Literal
from collections import deque
from fmcore.util import multiple_are_not_none, all_are_none, is_function, wrap_fn_output, \
get_default, RayDaskPersistWaitCallback, Alias, safe_validate_arguments, Log, Executor
from fmcore.util.language._import import _IS_RAY_INSTALLED

if _IS_RAY_INSTALLED:
import ray

DaskScalableDataFrame = "DaskScalableDataFrame"

Expand Down Expand Up @@ -438,8 +446,8 @@ def ilen(sdf_list: List[ScalableDataFrame]) -> int:
def _fetch_partition(ddf: DaskDataFrame, ddf_i: int) -> PandasDataFrame:
df: DaskDataFrame = ddf.partitions[ddf_i]
df: PandasDataFrame = df.compute()
if isinstance(df, PandasSeries) and len(df) == 1 and isinstance(df.iloc[0], ray.ObjectRef):
## If you pass a Dask-on-Ray-DataFrame to a ray Task/Actor, for some reason it treats each partition
if _IS_RAY_INSTALLED and isinstance(df, PandasSeries) and len(df) == 1 and isinstance(df.iloc[0], ray.ObjectRef):
## If you pass a Dask-on-Ray-DataFrame to a Ray Task/Actor, for some reason it treats each partition
## like a Series object with one element. The one element is a ray.ObjectRef of the actual partition's
## PandasDataFrame. So we need to fetch the actual PandasDataFrame.
df: PandasDataFrame = accumulate(df.iloc[0])
Expand Down
59 changes: 31 additions & 28 deletions src/fmcore/framework/evaluator/RayEvaluator.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
from typing import *
import gc
import logging
import math
import random
import time
import warnings
import time, os, gc, math, random, numpy as np, pandas as pd, ray, logging
from contextlib import contextmanager, ExitStack
from functools import partial
from ray import tune, air
from ray.runtime_env import RuntimeEnv as RayRuntimeEnv
from fmcore.util import Parameters, set_param_from_alias, safe_validate_arguments, format_exception_msg, String, \
Timer, get_default, is_list_like, is_empty_list_like, run_concurrent, \
Timeout24Hr, Timeout1Hr, Timeout, as_list, accumulate, get_result, wait, AutoEnum, auto, ProgressBar, only_item, \
ignore_all_output, ignore_logging, ignore_warnings, ignore_stdout, LoadBalancingStrategy
from fmcore.util.aws import S3Util
from typing import *

import ray
from pydantic import Extra, conint, confloat, root_validator
from pydantic.typing import Literal

from fmcore.constants import Storage, REMOTE_STORAGES, DataLayout, FILE_FORMAT_TO_FILE_ENDING_MAP, \
FailureAction
from fmcore.data import FileMetadata, ScalableDataFrame
from fmcore.data.sdf import DaskScalableDataFrame
from fmcore.constants import Task, DataSplit, Storage, REMOTE_STORAGES, DataLayout, FILE_FORMAT_TO_FILE_ENDING_MAP, \
FailureAction
from fmcore.framework.evaluator.Evaluator import Evaluator, save_predictions, load_predictions
from fmcore.framework.task_data import Datasets, Dataset
from fmcore.framework.metric import Metric
from fmcore.framework.predictions import Predictions
from fmcore.framework.algorithm import Algorithm, TaskOrStr
from fmcore.framework.metric import Metric, Metrics
from fmcore.framework.task_data import Dataset
from fmcore.framework.tracker.Tracker import Tracker
from fmcore.util import RayInitConfig, max_num_resource_actors, ActorComposite, RequestCounter
from pydantic import Extra, conint, confloat, constr, root_validator, validator
from pydantic.typing import Literal
from fmcore.util import RayInitConfig, max_num_resource_actors, RayActorComposite, RequestCounter
from fmcore.util import set_param_from_alias, safe_validate_arguments, format_exception_msg, String, \
Timer, get_default, run_concurrent, \
Timeout, as_list, accumulate, get_result, wait, AutoEnum, auto, ProgressBar, only_item, \
ignore_all_output, ignore_logging, ignore_warnings, ignore_stdout, LoadBalancingStrategy
from fmcore.util.aws import S3Util


@ray.remote
Expand Down Expand Up @@ -81,8 +85,7 @@ def __init__(
request_counter: RequestCounter,
verbosity: int,
):
from fmcore.framework import Algorithm, Evaluator
import fmcore.algorithm ## Registers all algorithms
from fmcore.framework import Evaluator
self.verbosity = verbosity
self.evaluator: Optional[Evaluator] = None ## Set this temporarily while loading when calling Evaluator.of(...)
with algorithm_evaluator_verbosity(self.verbosity):
Expand Down Expand Up @@ -121,7 +124,7 @@ def evaluate_shard(
import pandas as pd
from fmcore.util import accumulate
from fmcore.data import FileMetadata
from fmcore.framework import Dataset, Datasets, Predictions, Metric, Metrics
from fmcore.framework import Dataset, Predictions
from concurrent.futures._base import Future

## Stops Pandas SettingWithCopyWarning in output. Ref: https://stackoverflow.com/a/20627316
Expand Down Expand Up @@ -264,7 +267,7 @@ class RunConfig(Evaluator.RunConfig):

nested_evaluator_name: Optional[str] = None
num_models: Optional[conint(ge=1)] = None
model: Optional[List[ActorComposite]] = None ## Stores the actors.
model: Optional[List[RayActorComposite]] = None ## Stores the actors.
resources_per_model: Dict[
Literal['cpu', 'gpu'],
Union[confloat(ge=0.0, lt=1.0), conint(ge=0)]
Expand Down Expand Up @@ -324,7 +327,7 @@ def _load_model(
num_actors: Optional[int] = None,
progress_bar: Optional[Union[Dict, bool]] = True,
**kwargs,
) -> List[ActorComposite]:
) -> List[RayActorComposite]:
num_actors: int = get_default(num_actors, self.num_actors)
progress_bar: Optional[Dict] = self._run_evaluation_progress_bar(progress_bar)
nested_evaluator_params: Dict = self._create_nested_evaluator_params(**kwargs)
Expand All @@ -340,7 +343,7 @@ def actor_factory(*, request_counter: Any, actor_i: int, actor_id: str, **kwargs
verbosity=self.verbosity,
)

return ActorComposite.create_actors(
return RayActorComposite.create_actors(
actor_factory,
num_actors=num_actors,
progress_bar=progress_bar,
Expand All @@ -352,8 +355,8 @@ def cleanup_model(self, ):
def _kill_actors(self):
try:
if self.model is not None:
actor_composites: List[ActorComposite] = self.model
self.model: Optional[List[ActorComposite]] = None
actor_composites: List[RayActorComposite] = self.model
self.model: Optional[List[RayActorComposite]] = None
for actor_comp in actor_composites:
actor_comp.kill()
del actor_comp
Expand Down Expand Up @@ -644,10 +647,10 @@ def _run_evaluation(
## When using DataLoadingStrategy.LOCAL, we can pick which actor to send the data to based on
## the LoadBalancingStrategy.
if load_balancing_strategy is LoadBalancingStrategy.ROUND_ROBIN:
actor_comp: ActorComposite = self.model[part_i % num_actors_created]
actor_comp: RayActorComposite = self.model[part_i % num_actors_created]
elif load_balancing_strategy is LoadBalancingStrategy.RANDOM:
rnd_actor_i: int = random.choice(list(range(0, num_actors_created)))
actor_comp: ActorComposite = self.model[rnd_actor_i]
actor_comp: RayActorComposite = self.model[rnd_actor_i]
elif load_balancing_strategy is LoadBalancingStrategy.LEAST_USED:
## When all actors unused, latest_last_completed_timestamp is -1, so we will pick a random actor
## After that, we will pick the actor with the least load which has most-recently completed
Expand Down Expand Up @@ -677,7 +680,7 @@ def _run_evaluation(
if actor_usage == min_actor_usage \
and last_completed_timestamp == latest_last_completed_timestamp
])
actor_comp: ActorComposite = only_item([
actor_comp: RayActorComposite = only_item([
actor_comp for actor_comp in self.model
if actor_comp.actor_id == actor_id
])
Expand Down
7 changes: 6 additions & 1 deletion src/fmcore/framework/evaluator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from fmcore.framework.evaluator.Evaluator import *
from fmcore.framework.evaluator.LocalEvaluator import *
from fmcore.framework.evaluator.AccelerateEvaluator import *
from fmcore.framework.evaluator.RayEvaluator import *
from fmcore.util.language._import import _IS_RAY_INSTALLED

if _IS_RAY_INSTALLED:
from fmcore.framework.evaluator.RayEvaluator import *
else:
RayEvaluator = 'RayEvaluator'
6 changes: 4 additions & 2 deletions src/fmcore/framework/metric.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import *
import math, copy, io, pickle
from abc import ABC, abstractmethod
import numpy as np, pandas as pd, ray
import numpy as np, pandas as pd
from scipy import stats as sps
from fmcore.constants import DataSplit, MLType, AggregationStrategy, Parallelize
from fmcore.util import Parameters, MutableParameters, Registry, String, classproperty, as_list, \
safe_validate_arguments, get_default, set_param_from_alias, str_normalize, dispatch, dispatch_executor, \
accumulate, accumulate_iter, partial_sort, is_function, fn_str, as_set, format_exception_msg, is_null, Log, \
ProgressBar, Alias
from fmcore.util.language._import import _IS_RAY_INSTALLED
from fmcore.data.sdf import ScalableDataFrame
from pydantic import constr, conint, confloat, root_validator
from functools import singledispatchmethod
Expand Down Expand Up @@ -516,7 +517,8 @@ def evaluate(
total=len(metrics_list),
prefer_kwargs=False,
)
if parallelize is Parallelize.ray:
if _IS_RAY_INSTALLED and parallelize is Parallelize.ray:
import ray
data_ref = ray.put(data)
data = data_ref
metric_futs: List[Tuple[Metric, Any]] = []
Expand Down
Loading

0 comments on commit 4454e0a

Please sign in to comment.