Skip to content

Commit

Permalink
additional checks
Browse files Browse the repository at this point in the history
  • Loading branch information
v-chen_data committed Oct 18, 2024
1 parent f394b7a commit 5406327
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions llmfoundry/command_utils/data_prep/convert_delta_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urllib.parse
from collections import namedtuple
from concurrent.futures import ProcessPoolExecutor
from importlib.metadata import version as importlib_version
from typing import TYPE_CHECKING, Iterable, Optional, Union
from uuid import uuid4

Expand All @@ -17,7 +18,6 @@
import requests
from composer.utils import retry
from packaging import version
from importlib.metadata import version as importlib_version

from llmfoundry.utils.exceptions import (
ClusterDoesNotExistError,
Expand Down Expand Up @@ -50,7 +50,7 @@
except ImportError:
data_frame_installed = False

MINIMUM_DB_CONNECT_DBR_VERSION = importlib_version("databricks-connect")
MINIMUM_DB_CONNECT_DBR_VERSION = importlib_version('databricks-connect')
MINIMUM_SQ_CONNECT_DBR_VERSION = '12.2'

TABLENAME_PATTERN = re.compile(r'(\S+)\.(\S+)\.(\S+)')
Expand Down Expand Up @@ -545,22 +545,28 @@ def validate_and_get_cluster_info(
res = w.clusters.get(cluster_id=cluster_id)
if res is None:
raise ClusterDoesNotExistError(cluster_id)

# Validate compute version is expected
cluster_spark_version_key = res.spark_version
assert cluster_spark_version_key
cluster_major_version_match = re.search(r'^(\d+)\.', cluster_spark_version_key)
cluster_major_version_match = re.search(
r'^(\d+)\.',
cluster_spark_version_key,
)

assert cluster_major_version_match
cluster_major_version = int(cluster_major_version_match.group(1))
databricks_connect_major_version = int(MINIMUM_DB_CONNECT_DBR_VERSION.split('.')[0])
databricks_connect_major_version = int(
MINIMUM_DB_CONNECT_DBR_VERSION.split('.')[0],
)

if cluster_major_version != databricks_connect_major_version:
raise ValueError(
f'Cluster Spark version {cluster_spark_version_key} does not match Databricks Connect version {MINIMUM_DB_CONNECT_DBR_VERSION}',
log.warning(
f'Cluster Spark version {cluster_spark_version_key} does not match Databricks Connect version {MINIMUM_DB_CONNECT_DBR_VERSION}.',
f'If you encounter _MultiThreadedRendezvous issues, please consider downgrading the compute Databricks Runtime Version to',
f'{MINIMUM_DB_CONNECT_DBR_VERSION}.',
)


data_security_mode = str(
res.data_security_mode,
).upper()[len('DATASECURITYMODE.'):]
Expand Down Expand Up @@ -692,6 +698,29 @@ def fetch_DT(
if e.code(
) == grpc.StatusCode.INTERNAL and 'Job aborted due to stage failure' in e.details(
):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
if cluster_id:
res = w.clusters.get(cluster_id=cluster_id)
cluster_spark_version_key = res.spark_version
assert cluster_spark_version_key
cluster_major_version_match = re.search(
r'^(\d+)\.',
cluster_spark_version_key,
)

assert cluster_major_version_match
cluster_major_version = int(cluster_major_version_match.group(1))
databricks_connect_major_version = int(
MINIMUM_DB_CONNECT_DBR_VERSION.split('.')[0],
)

if cluster_major_version != databricks_connect_major_version:
raise FaultyDataPrepCluster(
message=
f'Data prep cluster version is compatible with current databricks-connect version {MINIMUM_DB_CONNECT_DBR_VERSION}. If you keep encountering this issue, please consider downgrading the compute Databricks Runtime Version to {MINIMUM_DB_CONNECT_DBR_VERSION}.',
) from e

raise FaultyDataPrepCluster(
message=
f'Faulty data prep cluster, please try swapping data prep cluster: {e.details()}',
Expand Down

0 comments on commit 5406327

Please sign in to comment.