Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Nov 26, 2024
2 parents 1aa5ba4 + 3423e34 commit 1e0a512
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ def get_kafka_consumer(
if CallableConsumerConfig.is_callable_config(connection.consumer_config):
# As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration
logger.debug("Initiating polling for kafka consumer")
consumer.poll(timeout=30)
logger.debug("Initiated polling for kafka consumer")

return consumer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public static void validateUrn(

/** Recursively process URN parts with URL decoding */
private static Stream<String> processUrnPartRecursively(String urnPart) {
String decodedPart = URLDecoder.decode(urnPart, StandardCharsets.UTF_8);
String decodedPart =
URLDecoder.decode(URLEncodingFixer.fixURLEncoding(urnPart), StandardCharsets.UTF_8);
if (decodedPart.startsWith("urn:li:")) {
// Recursively process nested URN after decoding
return UrnUtils.getUrn(decodedPart).getEntityKey().getParts().stream()
Expand Down Expand Up @@ -177,4 +178,53 @@ public static void validateRecordTemplate(
RecordTemplateValidator.validate(aspect, resultFunction, validator);
}
}

/**
* Fixes malformed URL encoding by escaping unescaped % characters while preserving valid
* percent-encoded sequences.
*/
private static class URLEncodingFixer {
/**
* @param input The potentially malformed URL-encoded string
* @return A string with proper URL encoding that can be safely decoded
*/
public static String fixURLEncoding(String input) {
if (input == null) {
return null;
}

StringBuilder result = new StringBuilder(input.length() * 2);
int i = 0;

while (i < input.length()) {
char currentChar = input.charAt(i);

if (currentChar == '%') {
if (i + 2 < input.length()) {
// Check if the next two characters form a valid hex pair
String hexPair = input.substring(i + 1, i + 3);
if (isValidHexPair(hexPair)) {
// This is a valid percent-encoded sequence, keep it as is
result.append(currentChar);
} else {
// Invalid sequence, escape the % character
result.append("%25");
}
} else {
// % at the end of string, escape it
result.append("%25");
}
} else {
result.append(currentChar);
}
i++;
}

return result.toString();
}

private static boolean isValidHexPair(String pair) {
return pair.matches("[0-9A-Fa-f]{2}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,24 @@ public void testValidComplexUrn() {

@Test(expectedExceptions = NullPointerException.class)
public void testUrnNull() {
// Act
ValidationApiUtils.validateUrn(entityRegistry, null);
}

@Test
public void testValidPartialUrlEncode() {
Urn validUrn = UrnUtils.getUrn("urn:li:assertion:123=-%28__% weekly__%29");

ValidationApiUtils.validateUrn(entityRegistry, validUrn);
// If no exception is thrown, test passes
}

@Test
public void testValidPartialUrlEncode2() {
Urn validUrn =
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:s3,urn:li:dataset:%28urn:li:dataPlatform:s3%2Ctest-datalake-concepts%prog_maintenance%2CPROD%29,PROD)");

ValidationApiUtils.validateUrn(entityRegistry, validUrn);
// If no exception is thrown, test passes
}
}

0 comments on commit 1e0a512

Please sign in to comment.