Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 3, 2024
2 parents 629db58 + 16a0241 commit 12c2409
Show file tree
Hide file tree
Showing 16 changed files with 836 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
import textwrap
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, List

Expand Down Expand Up @@ -28,6 +30,8 @@
FeatureGroupSummaryTypeDef,
)

logger = logging.getLogger(__name__)


@dataclass
class FeatureGroupProcessor:
Expand Down Expand Up @@ -197,11 +201,12 @@ def get_feature_wu(

full_table_name = f"{glue_database}.{glue_table}"

self.report.report_warning(
full_table_name,
f"""Note: table {full_table_name} is an AWS Glue object.
logging.info(
textwrap.dedent(
f"""Note: table {full_table_name} is an AWS Glue object. This source does not ingest all metadata for Glue tables.
To view full table metadata, run Glue ingestion
(see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""",
(see https://datahubproject.io/docs/generated/ingestion/sources/glue)"""
)
)

feature_sources.append(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
Expand Down Expand Up @@ -65,6 +66,8 @@
"Unknown": DeploymentStatusClass.UNKNOWN,
}

logger = logging.getLogger(__name__)


@dataclass
class ModelProcessor:
Expand Down Expand Up @@ -385,6 +388,26 @@ def strip_quotes(string: str) -> str:
model_metrics,
)

@staticmethod
def get_group_name_from_arn(arn: str) -> str:
"""
Extract model package group name from a SageMaker ARN.
Args:
arn (str): Full ARN of the model package group
Returns:
str: Name of the model package group
Example:
>>> ModelProcessor.get_group_name_from_arn("arn:aws:sagemaker:eu-west-1:123456789:model-package-group/my-model-group")
'my-model-group'
"""
logger.debug(
f"Extracting group name from ARN: {arn} because group was not seen before"
)
return arn.split("/")[-1]

def get_model_wu(
self,
model_details: "DescribeModelOutputTypeDef",
Expand Down Expand Up @@ -425,8 +448,14 @@ def get_model_wu(
model_group_arns = model_uri_groups | model_image_groups

model_group_names = sorted(
[self.group_arn_to_name[x] for x in model_group_arns]
[
self.group_arn_to_name[x]
if x in self.group_arn_to_name
else self.get_group_name_from_arn(x)
for x in model_group_arns
]
)

model_group_urns = [
builder.make_ml_model_group_urn("sagemaker", x, self.env)
for x in model_group_names
Expand Down
15 changes: 15 additions & 0 deletions metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,18 @@ def test_sagemaker_ingest(tmp_path, pytestconfig):
output_path=tmp_path / "sagemaker_mces.json",
golden_path=test_resources_dir / "sagemaker_mces_golden.json",
)


def test_doc_test_run():
import doctest

import datahub.ingestion.source.aws.sagemaker_processors.models

assert (
doctest.testmod(
datahub.ingestion.source.aws.sagemaker_processors.models,
raise_on_error=True,
verbose=True,
).attempted
== 1
)
27 changes: 26 additions & 1 deletion metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api project(':entity-registry')
api project(':metadata-integration:java:datahub-event')
implementation project(':metadata-integration:java:datahub-schematron:lib')

implementation(externalDependency.kafkaAvroSerializer) {
exclude group: "org.apache.avro"
}
Expand Down Expand Up @@ -60,10 +61,35 @@ task copyAvroSchemas {
compileJava.dependsOn copyAvroSchemas


// Add Python environment validation task
task validatePythonEnv {
doFirst {
def venvPath = System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
def isWindows = System.getProperty('os.name').toLowerCase().contains('windows')
def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python"

def result = exec {
commandLine pythonExe, "-c", "import sys; print(sys.executable)"
ignoreExitValue = true
standardOutput = new ByteArrayOutputStream()
errorOutput = new ByteArrayOutputStream()
}

if (result.exitValue != 0) {
throw new GradleException("Python virtual environment not properly set up at ${venvPath}")
}
}
}

test {
// to avoid simultaneous executions of tests when complete build is run
mustRunAfter(":metadata-io:test")
useJUnit()
// Add Python environment configuration
dependsOn validatePythonEnv
dependsOn tasks.getByPath(":metadata-ingestion:installDev")
systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
finalizedBy jacocoTestReport
}

task checkShadowJar(type: Exec) {
Expand Down Expand Up @@ -111,7 +137,6 @@ shadowJar {
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'
relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone'
// Below jars added for kafka emitter only
// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro'
relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer'
relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy'
relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import click
from typing import Dict, Any
import json
from dataclasses import dataclass
from abc import ABC, abstractmethod
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey


class URNGenerator(ABC):
@abstractmethod
def generate(self, args: Dict[str, Any]) -> str:
pass


class DatabaseURNGenerator(URNGenerator):
def generate(self, args: Dict[str, Any]) -> str:
required_fields = ["platform", "database"]
for field in required_fields:
if field not in args:
raise ValueError(f"Missing required field: {field}")

all_fields = required_fields + ["instance"]
for arg in args:
if arg not in all_fields:
raise ValueError(f"Invalid field: {arg}")

database_key = DatabaseKey(
platform=args["platform"],
instance=args.get("instance"),
database=args["database"],
)
return database_key.as_urn()


class SchemaURNGenerator(URNGenerator):
def generate(self, args: Dict[str, Any]) -> str:
required_fields = ["platform", "database", "schema"]
all_fields = required_fields + ["instance", "env"]
for field in required_fields:
if field not in args:
raise ValueError(f"Missing required field: {field}")

for arg in args:
if arg not in all_fields:
raise ValueError(f"Invalid field: {arg}")

schema_key = SchemaKey(
platform=args["platform"],
instance=args.get("instance"),
env=args.get("env"),
database=args["database"],
schema=args["schema"],
)
return schema_key.as_urn()


URN_GENERATORS = {
"database": DatabaseURNGenerator(),
"schema": SchemaURNGenerator(),
}


def validate_key_value(ctx, param, value):
if not value:
return {}

result = {}
for item in value:
try:
key, val = item.split("=", 1)
result[key.strip()] = val.strip()
except ValueError:
raise click.BadParameter(
f"Invalid key-value pair: {item}. Format should be key=value"
)
return result


@click.command()
@click.option(
"--container-type",
type=click.Choice(["database", "schema"]),
required=True,
help="The type of container to generate a URN for",
)
@click.option(
"--param",
"-p",
multiple=True,
callback=validate_key_value,
help="Parameters in key=value format. Can be used multiple times.",
)
@click.option(
"--output-format",
type=click.Choice(["text", "json"]),
default="text",
help="Output format for the URN",
)
def generate_urn(container_type: str, param: Dict[str, str], output_format: str):
"""Generate URNs for different types of containers.
Example usage:
./container_urn_generator.py --container-type database -p platform=test-platform -p instance=DEV -p database=test-database
"""
try:
generator = URN_GENERATORS[container_type]
urn = generator.generate(param)

if output_format == "json":
result = {"urn": urn, "container_type": container_type, "parameters": param}
click.echo(json.dumps(result, indent=2))
else:
click.echo(urn)

except KeyError as e:
raise click.UsageError(f"Unknown container type: {container_type}")
except ValueError as e:
raise click.UsageError(str(e))


if __name__ == "__main__":
generate_urn()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.linkedin.common.urn.Urn;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class ContainerKey extends DataHubKey {
private String platform;
private String instance;

private static final String URN_PREFIX = "urn:li:container:";
private static final String URN_ENTITY = "container";
private static final String PLATFORM_MAP_FIELD = "platform";
private static final String INSTANCE_MAP_FIELD = "instance";

@Override
public Map<String, String> guidDict() {

Map<String, String> bag = new HashMap<>();
if (platform != null) bag.put(PLATFORM_MAP_FIELD, platform);
if (instance != null) bag.put(INSTANCE_MAP_FIELD, instance);

return bag;
}

public String asUrnString() {
String guid = guid();
return URN_PREFIX + guid;
}

public Urn asUrn() {
return Urn.createFromTuple(URN_ENTITY, guid());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.security.MessageDigest;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataHubGuidGenerator {
private static final ObjectMapper objectMapper = new ObjectMapper();

@SneakyThrows
public static String dataHubGuid(Map<String, String> obj) {
// Configure ObjectMapper for consistent serialization
objectMapper.configure(
com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

// Convert map to JSON string with sorted keys
String jsonKey = objectMapper.writeValueAsString(obj);

// Generate MD5 hash
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hashBytes = md.digest(jsonKey.getBytes());

// Convert byte array to hexadecimal string
StringBuilder hexString = new StringBuilder();
for (byte hashByte : hashBytes) {
String hex = Integer.toHexString(0xff & hashByte);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}

if (log.isDebugEnabled()) {
log.debug("DataHub Guid for {} is : {}", jsonKey, hexString);
}
return hexString.toString();
}
}
Loading

0 comments on commit 12c2409

Please sign in to comment.