Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce compute cluster version #1601

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion llmfoundry/command_utils/data_prep/convert_delta_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urllib.parse
from collections import namedtuple
from concurrent.futures import ProcessPoolExecutor
from importlib.metadata import version as importlib_version
from typing import TYPE_CHECKING, Iterable, Optional, Union
from uuid import uuid4

Expand Down Expand Up @@ -49,7 +50,7 @@
except ImportError:
data_frame_installed = False

MINIMUM_DB_CONNECT_DBR_VERSION = '14.1'
MINIMUM_DB_CONNECT_DBR_VERSION = importlib_version('databricks-connect')
MINIMUM_SQ_CONNECT_DBR_VERSION = '12.2'

TABLENAME_PATTERN = re.compile(r'(\S+)\.(\S+)\.(\S+)')
Expand Down Expand Up @@ -676,6 +677,29 @@ def fetch_DT(
if e.code(
) == grpc.StatusCode.INTERNAL and 'Job aborted due to stage failure' in e.details(
):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
if cluster_id:
res = w.clusters.get(cluster_id=cluster_id)
cluster_spark_version_key = res.spark_version
assert cluster_spark_version_key
cluster_major_version_match = re.search(
r'^(\d+)\.',
cluster_spark_version_key,
)

assert cluster_major_version_match
cluster_major_version = int(cluster_major_version_match.group(1))
databricks_connect_major_version = int(
MINIMUM_DB_CONNECT_DBR_VERSION.split('.')[0],
)

if cluster_major_version != databricks_connect_major_version:
raise FaultyDataPrepCluster(
message=
f'Data prep cluster version is compatible with current databricks-connect version {MINIMUM_DB_CONNECT_DBR_VERSION}. If you keep encountering this issue, please consider downgrading the compute Databricks Runtime Version to {MINIMUM_DB_CONNECT_DBR_VERSION}.',
) from e

raise FaultyDataPrepCluster(
message=
f'Faulty data prep cluster, please try swapping data prep cluster: {e.details()}',
Expand Down
Loading