From 301e5cd18f50b7e4e91962e6be4957870809d3ac Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 26 Nov 2024 00:02:06 -0600 Subject: [PATCH 1/2] fix(urn-validator): update urn validation logic (#11952) --- .../entity/validation/ValidationApiUtils.java | 52 ++++++++++++++++++- .../validation/ValidationApiUtilsTest.java | 19 ++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java index adc539cd926d3..c2e1c47eca1fd 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java @@ -115,7 +115,8 @@ public static void validateUrn( /** Recursively process URN parts with URL decoding */ private static Stream processUrnPartRecursively(String urnPart) { - String decodedPart = URLDecoder.decode(urnPart, StandardCharsets.UTF_8); + String decodedPart = + URLDecoder.decode(URLEncodingFixer.fixURLEncoding(urnPart), StandardCharsets.UTF_8); if (decodedPart.startsWith("urn:li:")) { // Recursively process nested URN after decoding return UrnUtils.getUrn(decodedPart).getEntityKey().getParts().stream() @@ -177,4 +178,53 @@ public static void validateRecordTemplate( RecordTemplateValidator.validate(aspect, resultFunction, validator); } } + + /** + * Fixes malformed URL encoding by escaping unescaped % characters while preserving valid + * percent-encoded sequences. + */ + private static class URLEncodingFixer { + /** + * @param input The potentially malformed URL-encoded string + * @return A string with proper URL encoding that can be safely decoded + */ + public static String fixURLEncoding(String input) { + if (input == null) { + return null; + } + + StringBuilder result = new StringBuilder(input.length() * 2); + int i = 0; + + while (i < input.length()) { + char currentChar = input.charAt(i); + + if (currentChar == '%') { + if (i + 2 < input.length()) { + // Check if the next two characters form a valid hex pair + String hexPair = input.substring(i + 1, i + 3); + if (isValidHexPair(hexPair)) { + // This is a valid percent-encoded sequence, keep it as is + result.append(currentChar); + } else { + // Invalid sequence, escape the % character + result.append("%25"); + } + } else { + // % at the end of string, escape it + result.append("%25"); + } + } else { + result.append(currentChar); + } + i++; + } + + return result.toString(); + } + + private static boolean isValidHexPair(String pair) { + return pair.matches("[0-9A-Fa-f]{2}"); + } + } } diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java index 7a671a0a49630..e683e594d8766 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java @@ -89,7 +89,24 @@ public void testValidComplexUrn() { @Test(expectedExceptions = NullPointerException.class) public void testUrnNull() { - // Act ValidationApiUtils.validateUrn(entityRegistry, null); } + + @Test + public void testValidPartialUrlEncode() { + Urn validUrn = UrnUtils.getUrn("urn:li:assertion:123=-%28__% weekly__%29"); + + ValidationApiUtils.validateUrn(entityRegistry, validUrn); + // If no exception is thrown, test passes + } + + @Test + public void testValidPartialUrlEncode2() { + Urn validUrn = + UrnUtils.getUrn( + "urn:li:dataset:(urn:li:dataPlatform:s3,urn:li:dataset:%28urn:li:dataPlatform:s3%2Ctest-datalake-concepts%prog_maintenance%2CPROD%29,PROD)"); + + ValidationApiUtils.validateUrn(entityRegistry, validUrn); + // If no exception is thrown, test passes + } } From 3423e348992e614c48043f8656c8d066f0dae66e Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:22:06 +0530 Subject: [PATCH 2/2] feat(ingest): add more logs for kafka polling (#11954) --- metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index 06d929774240b..e57dc853a83c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -157,7 +157,9 @@ def get_kafka_consumer( if CallableConsumerConfig.is_callable_config(connection.consumer_config): # As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration + logger.debug("Initiating polling for kafka consumer") consumer.poll(timeout=30) + logger.debug("Initiated polling for kafka consumer") return consumer