From 7429075ebd93c7138ca341f4c217343f90c03d19 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 2 Dec 2024 23:23:22 -0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(java-sdk):=20add=20utils=20classes=20t?= =?UTF-8?q?o=20give=20equivalence=20with=20python=20uti=E2=80=A6=20(#12002?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/datahub-client/build.gradle | 27 +++- .../scripts/container_key_guid_generator.py | 122 ++++++++++++++++++ .../models/util/ContainerKey.java | 42 ++++++ .../models/util/DataHubGuidGenerator.java | 41 ++++++ .../models/util/DataHubKey.java | 32 +++++ .../models/util/DatabaseKey.java | 30 +++++ .../datahubproject/models/util/FieldPath.java | 88 +++++++++++++ .../datahubproject/models/util/SchemaKey.java | 30 +++++ .../models/util/DataHubGuidGeneratorTest.java | 61 +++++++++ .../models/util/DatabaseKeyTest.java | 51 ++++++++ .../models/util/FieldPathTest.java | 83 ++++++++++++ .../models/util/SchemaKeyTest.java | 62 +++++++++ .../models/util/TestHelper.java | 114 ++++++++++++++++ 13 files changed, 782 insertions(+), 1 deletion(-) create mode 100644 metadata-integration/java/datahub-client/scripts/container_key_guid_generator.py create mode 100644 metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/ContainerKey.java create mode 100644 metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubGuidGenerator.java create mode 100644 metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubKey.java create mode 100644 metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DatabaseKey.java create mode 100644 metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/FieldPath.java create mode 100644 metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/SchemaKey.java create mode 100644 metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DataHubGuidGeneratorTest.java create mode 100644 metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DatabaseKeyTest.java create mode 100644 metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/FieldPathTest.java create mode 100644 metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/SchemaKeyTest.java create mode 100644 metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/TestHelper.java diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index 87a9623bbf062..af71227809d2a 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -20,6 +20,7 @@ dependencies { api project(':entity-registry') api project(':metadata-integration:java:datahub-event') implementation project(':metadata-integration:java:datahub-schematron:lib') + implementation(externalDependency.kafkaAvroSerializer) { exclude group: "org.apache.avro" } @@ -60,10 +61,35 @@ task copyAvroSchemas { compileJava.dependsOn copyAvroSchemas +// Add Python environment validation task +task validatePythonEnv { + doFirst { + def venvPath = System.getProperty('python.venv.path', '../../../metadata-ingestion/venv') + def isWindows = System.getProperty('os.name').toLowerCase().contains('windows') + def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python" + + def result = exec { + commandLine pythonExe, "-c", "import sys; print(sys.executable)" + ignoreExitValue = true + standardOutput = new ByteArrayOutputStream() + errorOutput = new ByteArrayOutputStream() + } + + if (result.exitValue != 0) { + throw new GradleException("Python virtual environment not properly set up at ${venvPath}") + } + } +} + test { // to avoid simultaneous executions of tests when complete build is run mustRunAfter(":metadata-io:test") useJUnit() + // Add Python environment configuration + dependsOn validatePythonEnv + dependsOn tasks.getByPath(":metadata-ingestion:installDev") + systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../../../metadata-ingestion/venv') + finalizedBy jacocoTestReport } task checkShadowJar(type: Exec) { @@ -111,7 +137,6 @@ shadowJar { relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework' relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone' // Below jars added for kafka emitter only -// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro' relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer' relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy' relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka' diff --git a/metadata-integration/java/datahub-client/scripts/container_key_guid_generator.py b/metadata-integration/java/datahub-client/scripts/container_key_guid_generator.py new file mode 100644 index 0000000000000..9fc18a85426bd --- /dev/null +++ b/metadata-integration/java/datahub-client/scripts/container_key_guid_generator.py @@ -0,0 +1,122 @@ +import click +from typing import Dict, Any +import json +from dataclasses import dataclass +from abc import ABC, abstractmethod +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey + + +class URNGenerator(ABC): + @abstractmethod + def generate(self, args: Dict[str, Any]) -> str: + pass + + +class DatabaseURNGenerator(URNGenerator): + def generate(self, args: Dict[str, Any]) -> str: + required_fields = ["platform", "database"] + for field in required_fields: + if field not in args: + raise ValueError(f"Missing required field: {field}") + + all_fields = required_fields + ["instance"] + for arg in args: + if arg not in all_fields: + raise ValueError(f"Invalid field: {arg}") + + database_key = DatabaseKey( + platform=args["platform"], + instance=args.get("instance"), + database=args["database"], + ) + return database_key.as_urn() + + +class SchemaURNGenerator(URNGenerator): + def generate(self, args: Dict[str, Any]) -> str: + required_fields = ["platform", "database", "schema"] + all_fields = required_fields + ["instance", "env"] + for field in required_fields: + if field not in args: + raise ValueError(f"Missing required field: {field}") + + for arg in args: + if arg not in all_fields: + raise ValueError(f"Invalid field: {arg}") + + schema_key = SchemaKey( + platform=args["platform"], + instance=args.get("instance"), + env=args.get("env"), + database=args["database"], + schema=args["schema"], + ) + return schema_key.as_urn() + + +URN_GENERATORS = { + "database": DatabaseURNGenerator(), + "schema": SchemaURNGenerator(), +} + + +def validate_key_value(ctx, param, value): + if not value: + return {} + + result = {} + for item in value: + try: + key, val = item.split("=", 1) + result[key.strip()] = val.strip() + except ValueError: + raise click.BadParameter( + f"Invalid key-value pair: {item}. Format should be key=value" + ) + return result + + +@click.command() +@click.option( + "--container-type", + type=click.Choice(["database", "schema"]), + required=True, + help="The type of container to generate a URN for", +) +@click.option( + "--param", + "-p", + multiple=True, + callback=validate_key_value, + help="Parameters in key=value format. Can be used multiple times.", +) +@click.option( + "--output-format", + type=click.Choice(["text", "json"]), + default="text", + help="Output format for the URN", +) +def generate_urn(container_type: str, param: Dict[str, str], output_format: str): + """Generate URNs for different types of containers. + + Example usage: + ./container_urn_generator.py --container-type database -p platform=test-platform -p instance=DEV -p database=test-database + """ + try: + generator = URN_GENERATORS[container_type] + urn = generator.generate(param) + + if output_format == "json": + result = {"urn": urn, "container_type": container_type, "parameters": param} + click.echo(json.dumps(result, indent=2)) + else: + click.echo(urn) + + except KeyError as e: + raise click.UsageError(f"Unknown container type: {container_type}") + except ValueError as e: + raise click.UsageError(str(e)) + + +if __name__ == "__main__": + generate_urn() diff --git a/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/ContainerKey.java b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/ContainerKey.java new file mode 100644 index 0000000000000..5bc6c829dcaa1 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/ContainerKey.java @@ -0,0 +1,42 @@ +package io.datahubproject.models.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.linkedin.common.urn.Urn; +import java.util.HashMap; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class ContainerKey extends DataHubKey { + private String platform; + private String instance; + + private static final String URN_PREFIX = "urn:li:container:"; + private static final String URN_ENTITY = "container"; + private static final String PLATFORM_MAP_FIELD = "platform"; + private static final String INSTANCE_MAP_FIELD = "instance"; + + @Override + public Map guidDict() { + + Map bag = new HashMap<>(); + if (platform != null) bag.put(PLATFORM_MAP_FIELD, platform); + if (instance != null) bag.put(INSTANCE_MAP_FIELD, instance); + + return bag; + } + + public String asUrnString() { + String guid = guid(); + return URN_PREFIX + guid; + } + + public Urn asUrn() { + return Urn.createFromTuple(URN_ENTITY, guid()); + } +} diff --git a/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubGuidGenerator.java b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubGuidGenerator.java new file mode 100644 index 0000000000000..ea67b7fab1781 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubGuidGenerator.java @@ -0,0 +1,41 @@ +package io.datahubproject.models.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.security.MessageDigest; +import java.util.Map; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DataHubGuidGenerator { + private static final ObjectMapper objectMapper = new ObjectMapper(); + + @SneakyThrows + public static String dataHubGuid(Map obj) { + // Configure ObjectMapper for consistent serialization + objectMapper.configure( + com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + + // Convert map to JSON string with sorted keys + String jsonKey = objectMapper.writeValueAsString(obj); + + // Generate MD5 hash + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] hashBytes = md.digest(jsonKey.getBytes()); + + // Convert byte array to hexadecimal string + StringBuilder hexString = new StringBuilder(); + for (byte hashByte : hashBytes) { + String hex = Integer.toHexString(0xff & hashByte); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + + if (log.isDebugEnabled()) { + log.debug("DataHub Guid for {} is : {}", jsonKey, hexString); + } + return hexString.toString(); + } +} diff --git a/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubKey.java b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubKey.java new file mode 100644 index 0000000000000..a047dd8840935 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DataHubKey.java @@ -0,0 +1,32 @@ +package io.datahubproject.models.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import lombok.Data; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class DataHubKey { + // Static ObjectMapper instance since it's thread-safe and expensive to create + protected static final ObjectMapper MAPPER = new ObjectMapper(); + // Static TypeReference instance since it doesn't change + private static final TypeReference> MAP_TYPE_REFERENCE = + new TypeReference>() {}; + + static { + MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + + public Map guidDict() { + return MAPPER.convertValue(this, MAP_TYPE_REFERENCE); + } + + public String guid() { + Map bag = guidDict(); + return DataHubGuidGenerator.dataHubGuid(bag); + } +} diff --git a/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DatabaseKey.java b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DatabaseKey.java new file mode 100644 index 0000000000000..87a79ea2e7440 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/DatabaseKey.java @@ -0,0 +1,30 @@ +package io.datahubproject.models.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class DatabaseKey extends ContainerKey { + private String database; + + private static final String DATABASE_MAP_FIELD = "database"; + + @Override + public Map guidDict() { + // Get the parent's GUID dictionary first + Map bag = super.guidDict(); + + // Add the database field if it's not null + if (database != null) { + bag.put(DATABASE_MAP_FIELD, database); + } + + return bag; + } +} diff --git a/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/FieldPath.java b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/FieldPath.java new file mode 100644 index 0000000000000..ab8c11ae9b67d --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/FieldPath.java @@ -0,0 +1,88 @@ +package io.datahubproject.models.util; + +import lombok.NonNull; + +public class FieldPath { + private final String rawFieldPath; + private final String[] segments; + private final String version; + private final String simplePath; + + public FieldPath(@NonNull String rawFieldPath) { + this.rawFieldPath = rawFieldPath; + this.segments = rawFieldPath.split("\\."); + this.version = computeVersion(); + this.simplePath = computeSimplePath(); + } + + public boolean isTopLevel() { + return depth() == 1; + } + + /** + * Returns the logical depth of the field path, ignoring type and version metadata Example: + * "[version=2.0][type=Foo].address.[type=union].[type=string].street" -> depth = 2 + * + * @return The logical depth of the field path + */ + public int depth() { + String[] segments = simplePath().split("\\."); + return segments.length; + } + + public String leafFieldName() { + return segments[segments.length - 1]; + } + + /** + * Extracts the version number from the field path. Example: rawFieldPath = "a.b.c" -> version() = + * "1" rawFieldPath = "[version=2].a.b.c" -> version() = "2" rawFieldPath = "[version=20].a.b.c" + * -> version() = "20" + * + * @return String representing the version number + */ + private String computeVersion() { + if (rawFieldPath == null) { + return "1"; + } + + // Check for version pattern [version=X] where X can be any non-bracket characters + java.util.regex.Pattern pattern = java.util.regex.Pattern.compile("\\[version=([^\\]]+)\\]"); + java.util.regex.Matcher matcher = pattern.matcher(rawFieldPath); + + if (matcher.find()) { + return matcher.group(1); + } + + return "1"; + } + + public String version() { + return version; + } + + /** + * Returns the simplified path without version, type, or other metadata Example: + * "[version=2.0][type=Foo].address.[type=union].[type=string].street" -> "address.street" + * + * @return The simplified field path + */ + private String computeSimplePath() { + if (rawFieldPath == null) { + return ""; + } + // Remove all metadata blocks [xxx=yyy] + String simplified = rawFieldPath.replaceAll("\\[.*?\\]", ""); + // Replace all "double dots" with a single dot + simplified = simplified.replaceAll("\\.+", "."); + // Replace all leading and trailing dots + simplified = simplified.replaceAll("^\\.|\\.$", ""); + // Remove any trailing metadata blocks without dots + simplified = simplified.replaceAll("\\[.*?\\]", ""); + return simplified; + } + + public String simplePath() { + return simplePath; + } +} diff --git a/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/SchemaKey.java b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/SchemaKey.java new file mode 100644 index 0000000000000..800ee43614749 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/java/io/datahubproject/models/util/SchemaKey.java @@ -0,0 +1,30 @@ +package io.datahubproject.models.util; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SchemaKey extends DatabaseKey { + private String schema; + + private static final String SCHEMA_MAP_FIELD = "schema"; + + @Override + public Map guidDict() { + // Get the parent's GUID dictionary first + Map bag = super.guidDict(); + + // Add the database field if it's not null + if (schema != null) { + bag.put(SCHEMA_MAP_FIELD, schema); + } + + return bag; + } +} diff --git a/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DataHubGuidGeneratorTest.java b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DataHubGuidGeneratorTest.java new file mode 100644 index 0000000000000..8ee65d51de92d --- /dev/null +++ b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DataHubGuidGeneratorTest.java @@ -0,0 +1,61 @@ +package io.datahubproject.models.util; + +import static org.junit.Assert.*; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +public class DataHubGuidGeneratorTest { + + @Test + public void testGuidGeneration() throws NoSuchAlgorithmException, JsonProcessingException { + // Test data + Map obj = new HashMap<>(); + obj.put("container", "test-container"); + + // Generate GUID + String guid = DataHubGuidGenerator.dataHubGuid(obj); + + // Assert + assertEquals("4d90f727b9d10ba7cea297dc8b427985", guid); + } + + @Test + public void testContainerUrnGeneration() { + // Test data + DatabaseKey databaseKey = + DatabaseKey.builder() + .platform("test-platform") + .instance("DEV") + .database("test-database") + .build(); + + System.out.println(databaseKey.guidDict()); + + // Generate URN + String urn = databaseKey.asUrnString(); + // With instance + String expectedUrn = "urn:li:container:e40f103ea7c6def4f4b24cd858d5e412"; + + // Assert + assertEquals(expectedUrn, urn); + } + + @Test + public void testContainerUrnGenerationNoInstance() { + // Test data + ContainerKey containerKey = + DatabaseKey.builder().platform("test-platform").database("test-database").build(); + + // Generate URN + String urn = containerKey.asUrnString(); + // Without instance + String expectedUrn = "urn:li:container:1929d86c0a92e2d3bb9ba193c8c2b66f"; + + // Assert + assertEquals(expectedUrn, urn); + } +} diff --git a/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DatabaseKeyTest.java b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DatabaseKeyTest.java new file mode 100644 index 0000000000000..08c6e9ce33a68 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/DatabaseKeyTest.java @@ -0,0 +1,51 @@ +package io.datahubproject.models.util; + +import static org.junit.Assert.*; + +import java.util.Map; +import org.junit.Test; + +public class DatabaseKeyTest { + @Test + public void testDatabaseUrnGeneration() { + // Test data + DatabaseKey databaseKey = + DatabaseKey.builder() + .platform("test-platform") + .instance("DEV") + .database("test-database") + .build(); + + System.out.println(databaseKey.guidDict()); + + // Generate URN + String urn = databaseKey.asUrnString(); + // With instance + // "urn:li:container:e40f103ea7c6def4f4b24cd858d5e412"; + String expectedUrn = + TestHelper.generateContainerKeyGuid( + "database", + Map.of("platform", "test-platform", "instance", "DEV", "database", "test-database")); + + // Assert + assertEquals(expectedUrn, urn); + } + + @Test + public void testDatabaseUrnGenerationNoInstance() { + // Test data + ContainerKey containerKey = + DatabaseKey.builder().platform("test-platform").database("test-database").build(); + + // Generate URN + String urn = containerKey.asUrnString(); + // Without instance + // "urn:li:container:1929d86c0a92e2d3bb9ba193c8c2b66f"; + String expectedUrn = + TestHelper.generateContainerKeyGuid( + "database", Map.of("platform", "test-platform", "database", "test-database")); + + // Assert + assertEquals(expectedUrn, urn); + } +} diff --git a/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/FieldPathTest.java b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/FieldPathTest.java new file mode 100644 index 0000000000000..55b97f735393c --- /dev/null +++ b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/FieldPathTest.java @@ -0,0 +1,83 @@ +package io.datahubproject.models.util; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class FieldPathTest { + + @Test + public void testSimplePathConstruction() { + FieldPath path = new FieldPath("field1.field2.field3"); + assertEquals("field1.field2.field3", path.simplePath()); + assertEquals("1", path.version()); + assertEquals("field3", path.leafFieldName()); + assertEquals(3, path.depth()); + assertFalse(path.isTopLevel()); + } + + @Test + public void testTopLevelPath() { + FieldPath path = new FieldPath("singleField"); + assertTrue(path.isTopLevel()); + assertEquals(1, path.depth()); + assertEquals("singleField", path.leafFieldName()); + assertEquals("singleField", path.simplePath()); + } + + @Test + public void testVersionExtraction() { + FieldPath path = new FieldPath("[version=2.0].field1.field2"); + assertEquals("2.0", path.version()); + assertEquals("field1.field2", path.simplePath()); + } + + @Test + public void testComplexPathWithMetadata() { + FieldPath path = + new FieldPath("[version=2.0][type=Foo].address.[type=union].[type=string].street"); + assertEquals("2.0", path.version()); + assertEquals("address.street", path.simplePath()); + assertEquals("street", path.leafFieldName()); + assertEquals(2, path.depth()); + } + + @Test + public void testPathWithMultipleMetadataBlocks() { + FieldPath path = + new FieldPath("[version=3][type=Record][nullable=true].user.details[type=union].name"); + System.out.println(path.simplePath()); + assertEquals("3", path.version()); + assertEquals("user.details.name", path.simplePath()); + assertEquals("name", path.leafFieldName()); + assertEquals(3, path.depth()); + } + + @Test + public void testPathWithTrailingMetadata() { + FieldPath path = new FieldPath("field1.field2[type=string]"); + assertEquals("field1.field2", path.simplePath()); + assertEquals("1", path.version()); + } + + @Test + public void testPathWithOnlyMetadata() { + FieldPath path = new FieldPath("[type=string][version=4]"); + assertEquals("", path.simplePath()); + assertEquals("4", path.version()); + } + + @Test + public void testDepthCalculationWithComplexPath() { + FieldPath path = + new FieldPath("[version=2].user.[type=record].address.[type=union].[type=string].street"); + assertEquals(3, path.depth()); + assertEquals("user.address.street", path.simplePath()); + } + + @Test + public void testLeafFieldNameWithMetadata() { + FieldPath path = new FieldPath("field1.field2[type=string]"); + assertEquals("field2[type=string]", path.leafFieldName()); + } +} diff --git a/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/SchemaKeyTest.java b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/SchemaKeyTest.java new file mode 100644 index 0000000000000..7d40491a9b1d8 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/SchemaKeyTest.java @@ -0,0 +1,62 @@ +package io.datahubproject.models.util; + +import static org.junit.Assert.*; + +import java.util.Map; +import org.junit.Test; + +public class SchemaKeyTest { + @Test + public void testSchemaUrnGeneration() { + // Test data + SchemaKey schemaKey = + SchemaKey.builder() + .platform("test-platform") + .instance("DEV") + .database("test-database") + .schema("test-schema") + .build(); + + System.out.println(schemaKey.guidDict()); + + // Generate URN + String urn = schemaKey.asUrnString(); + // With instance + String expectedUrn = + TestHelper.generateContainerKeyGuid( + "schema", + Map.of( + "platform", "test-platform", + "instance", "DEV", + "database", "test-database", + "schema", "test-schema")); + + // Assert + assertEquals(expectedUrn, urn); + } + + @Test + public void testSchemaUrnGenerationNoInstance() { + // Test data + ContainerKey containerKey = + SchemaKey.builder() + .platform("test-platform") + .database("test-database") + .schema("test-schema") + .build(); + + // Generate URN + String urn = containerKey.asUrnString(); + // Without instance + String expectedUrn = + TestHelper.generateContainerKeyGuid( + "schema", + Map.of( + "platform", "test-platform", + "database", "test-database", + "schema", "test-schema")); + + // Assert + assertEquals(expectedUrn, urn); + } +} diff --git a/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/TestHelper.java b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/TestHelper.java new file mode 100644 index 0000000000000..e150ceda16262 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/test/java/io/datahubproject/models/util/TestHelper.java @@ -0,0 +1,114 @@ +package io.datahubproject.models.util; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class TestHelper { + + private static final String TEST_RESOURCES_DIR = "src/test/resources"; + // private static final String TEMP_OUTPUT_DIR = "build/test-outputs"; + private static final String PYTHON_SCRIPT = "scripts/container_key_guid_generator.py"; + private static final String VENV_PATH = + "../../../metadata-ingestion/venv"; // Adjust this path to your venv location + + public static void setup() { + // Create output directory if it doesn't exist + // new File(TEMP_OUTPUT_DIR).mkdirs(); + + // Verify venv exists + if (!new File(VENV_PATH).exists()) { + throw new RuntimeException("Virtual environment not found at " + VENV_PATH); + } + } + + private static ProcessBuilder createPythonProcessBuilder(String... args) { + ProcessBuilder pb; + String os = System.getProperty("os.name").toLowerCase(); + + if (os.contains("windows")) { + // Windows paths + String pythonPath = Paths.get(VENV_PATH, "Scripts", "python").toString(); + pb = + new ProcessBuilder( + Stream.concat(Stream.of(pythonPath), Stream.of(args)).toArray(String[]::new)); + } else { + // Unix-like paths + String pythonPath = Paths.get(VENV_PATH, "bin", "python").toString(); + pb = + new ProcessBuilder( + Stream.concat(Stream.of(pythonPath), Stream.of(args)).toArray(String[]::new)); + } + + // Add virtual environment to PYTHONPATH + Map env = pb.environment(); + String sitePkgPath = + Paths.get( + VENV_PATH, + os.contains("windows") ? "Lib/site-packages" : "lib/python3.x/site-packages") + .toString(); + + String pythonPath = env.getOrDefault("PYTHONPATH", ""); + env.put("PYTHONPATH", pythonPath + File.pathSeparator + sitePkgPath); + + return pb; + } + + /** + * Executes the container key GUID generator Python script with the given parameters + * + * @param containerType The type of container (e.g., "database") + * @param parameters Map of parameter key-value pairs + * @return The generated URN as a string + * @throws RuntimeException if the Python script execution fails + */ + public static String generateContainerKeyGuid( + String containerType, Map parameters) { + List command = new ArrayList<>(); + command.add(PYTHON_SCRIPT); + command.add("--container-type"); + command.add(containerType); + + // Add each parameter as -p key=value + for (Map.Entry entry : parameters.entrySet()) { + command.add("-p"); + command.add(entry.getKey() + "=" + entry.getValue()); + } + + try { + System.out.println("Executing Python script: " + String.join(" ", command)); + String[] commandArray = command.toArray(new String[0]); + + Process process = createPythonProcessBuilder(commandArray).start(); + + // Read the output + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line = reader.readLine(); + + // Wait for the process to complete + int exitCode = process.waitFor(); + + if (exitCode != 0) { + // Read error stream if the script failed + BufferedReader errorReader = + new BufferedReader(new InputStreamReader(process.getErrorStream())); + StringBuilder errorMessage = new StringBuilder(); + String errorLine; + while ((errorLine = errorReader.readLine()) != null) { + errorMessage.append(errorLine).append("\n"); + } + throw new RuntimeException( + "Python script failed with exit code " + exitCode + ": " + errorMessage); + } + + return line != null ? line.trim() : ""; + } catch (Exception e) { + throw new RuntimeException("Failed to execute Python script", e); + } + } +} From 16a02411c343b2527fcac3044cf863a9ab9ae843 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 3 Dec 2024 10:48:04 +0100 Subject: [PATCH 2/2] fix(ingest/sagemaker): Gracefully handle missing model group (#12000) --- .../sagemaker_processors/feature_groups.py | 13 +++++--- .../source/aws/sagemaker_processors/models.py | 31 ++++++++++++++++++- .../unit/sagemaker/test_sagemaker_source.py | 15 +++++++++ 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py index b8b96c6306a3b..c4561b9d9e676 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/feature_groups.py @@ -1,3 +1,5 @@ +import logging +import textwrap from dataclasses import dataclass from typing import TYPE_CHECKING, Iterable, List @@ -28,6 +30,8 @@ FeatureGroupSummaryTypeDef, ) +logger = logging.getLogger(__name__) + @dataclass class FeatureGroupProcessor: @@ -197,11 +201,12 @@ def get_feature_wu( full_table_name = f"{glue_database}.{glue_table}" - self.report.report_warning( - full_table_name, - f"""Note: table {full_table_name} is an AWS Glue object. + logging.info( + textwrap.dedent( + f"""Note: table {full_table_name} is an AWS Glue object. This source does not ingest all metadata for Glue tables. To view full table metadata, run Glue ingestion - (see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""", + (see https://datahubproject.io/docs/generated/ingestion/sources/glue)""" + ) ) feature_sources.append( diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py index eef2b26ee08f2..0f433aaecf2d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py @@ -1,3 +1,4 @@ +import logging from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime @@ -65,6 +66,8 @@ "Unknown": DeploymentStatusClass.UNKNOWN, } +logger = logging.getLogger(__name__) + @dataclass class ModelProcessor: @@ -385,6 +388,26 @@ def strip_quotes(string: str) -> str: model_metrics, ) + @staticmethod + def get_group_name_from_arn(arn: str) -> str: + """ + Extract model package group name from a SageMaker ARN. + + Args: + arn (str): Full ARN of the model package group + + Returns: + str: Name of the model package group + + Example: + >>> ModelProcessor.get_group_name_from_arn("arn:aws:sagemaker:eu-west-1:123456789:model-package-group/my-model-group") + 'my-model-group' + """ + logger.debug( + f"Extracting group name from ARN: {arn} because group was not seen before" + ) + return arn.split("/")[-1] + def get_model_wu( self, model_details: "DescribeModelOutputTypeDef", @@ -425,8 +448,14 @@ def get_model_wu( model_group_arns = model_uri_groups | model_image_groups model_group_names = sorted( - [self.group_arn_to_name[x] for x in model_group_arns] + [ + self.group_arn_to_name[x] + if x in self.group_arn_to_name + else self.get_group_name_from_arn(x) + for x in model_group_arns + ] ) + model_group_urns = [ builder.make_ml_model_group_urn("sagemaker", x, self.env) for x in model_group_names diff --git a/metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py b/metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py index 2450e6fa8fe56..138319feb3db6 100644 --- a/metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py +++ b/metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py @@ -241,3 +241,18 @@ def test_sagemaker_ingest(tmp_path, pytestconfig): output_path=tmp_path / "sagemaker_mces.json", golden_path=test_resources_dir / "sagemaker_mces_golden.json", ) + + +def test_doc_test_run(): + import doctest + + import datahub.ingestion.source.aws.sagemaker_processors.models + + assert ( + doctest.testmod( + datahub.ingestion.source.aws.sagemaker_processors.models, + raise_on_error=True, + verbose=True, + ).attempted + == 1 + )