Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type mappings transformation #1154

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
21fb34b
Initial checkin for type mapping removal transformer and experimentin…
gregschohn Nov 13, 2024
926e6e8
Further building out the jinjava and index type mappings transformers…
gregschohn Nov 18, 2024
9bd5c60
Checkpoint on jinjava and loading template transformations.
gregschohn Nov 19, 2024
f1e1d50
Checkpoint add support for macros and feature flags for different par…
gregschohn Nov 20, 2024
dd98377
Cleanup + added a "preserve" flag to let templates inline source node…
gregschohn Nov 21, 2024
357a6a6
Add jinja-style regex matching to jinjava transformations.
gregschohn Nov 22, 2024
12c0cfa
Checkpoint - I've added a new invoke_macro java function to lookup a …
gregschohn Nov 23, 2024
e8dfb3f
Get a test that changes some contents via the route macro to pass.
gregschohn Nov 24, 2024
93dc5c8
Assorted jinjava template changes for replayer...
gregschohn Nov 24, 2024
4b32d42
Setup regex type mappings where one can specify index and types as re…
gregschohn Nov 25, 2024
0712eae
Further work to support bulk delete, index create with type mapping m…
gregschohn Dec 2, 2024
e6c0229
Updated READMEs and finished implementing a basic version of bulk tra…
gregschohn Dec 3, 2024
071368a
Transformation cleanup in light of new type mappings transformation.
gregschohn Dec 3, 2024
5c885fe
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 3, 2024
59fd4d9
Simple bugfixes, especially around more carefully staying away from r…
gregschohn Dec 4, 2024
fb9c20c
Remove YAML support for feature flags - nobody is using it and no rea…
gregschohn Dec 4, 2024
f5c4fed
Change how the transformation netty pipeline discovers if the payload…
gregschohn Dec 4, 2024
5be3ddc
Minor cleanup + the addition of a resource cache for jinjava java res…
gregschohn Dec 4, 2024
73c8e13
Lots of improvements for jinjava and type mappings transformations an…
gregschohn Dec 7, 2024
739d5d7
Implement a simple take on translating RFS bulk requests to use index…
gregschohn Dec 8, 2024
95b71f4
Checkpoint with further refactoring improvements, mostly around tests.
gregschohn Dec 9, 2024
2b1f3e0
Add a new preserve tag (preserveWhenMissing) to copy only when the ke…
gregschohn Dec 9, 2024
4e59965
Enable type mappings transform for the replayer in the docker-compose…
gregschohn Dec 9, 2024
09377de
Flip the default should throw behavior for HttpJsonMessageWithFaultin…
gregschohn Dec 4, 2024
fcf6bbc
Update type mapping sanitization READMEs to use backslash backreferen…
gregschohn Dec 9, 2024
39ead16
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 9, 2024
1df1642
Merge branch 'SaferPayloadFaults' into TypeMappingsTransformation
gregschohn Dec 9, 2024
fbc0291
Update remaining preserve calls so that protocol will be carried forw…
gregschohn Dec 9, 2024
c13ef4a
Bugfixes for type mappings removal transformations, README + other si…
gregschohn Dec 10, 2024
00968cd
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
implementation project(":RFS")
implementation project(":transformation")
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
Expand Down
2 changes: 1 addition & 1 deletion RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'

testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" #--transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]'"
opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
environment:
Expand Down
4 changes: 2 additions & 2 deletions TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ transform to add GZIP encoding and another to apply a new header would be config
```

To run only one transformer without any configuration, the `--transformer-config` argument can simply
be set to the name of the transformer (e.g. 'JsonTransformerForOpenSearch23PlusTargetTransformerProvider',
be set to the name of the transformer (e.g. 'TypeMappingSanitizationTransformerProvider',
without quotes or any json surrounding it).

The user can also specify a file to read the transformations from using the `--transformer-config-file`. Users can
also pass the script as an argument via `--transformer-config-base64`. Each of the `transformer-config` options
is mutually exclusive.

Some simple transformations are included to change headers to add compression or to force an HTTP message payload to
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
be chunked. Another transformer, [TypeMappingSanitizationTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/jsonTypeMappingsSanitizationTransformer/src/main/java/org/opensearch/migrations/transform/TypeMappingsSanitizationTransformer.java),
is a work-in-progress to excise type mapping references from URIs and message payloads since versions of OpenSource
greater than 2.3 do not support them.

Expand Down
4 changes: 2 additions & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -59,7 +59,7 @@ dependencies {
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
@Getter
@Setter
private boolean disableThrowingPayloadNotLoaded;
private boolean payloadWasAccessed;

public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
underlyingMap = new TreeMap<>();
Expand All @@ -51,19 +52,19 @@ public Iterator<Map.Entry<String, Object>> iterator() {
return new Iterator<>() {
@Override
public boolean hasNext() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}

@Override
public Map.Entry<String, Object> next() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
};
}

@Override
public int size() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
};
} else {
Expand All @@ -80,8 +81,21 @@ public Object put(String key, Object value) {
public Object get(Object key) {
var value = super.get(key);
if (value == null && !disableThrowingPayloadNotLoaded) {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
return value;
}

public boolean missingPaylaodWasAccessed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Paylaod

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return payloadWasAccessed;
}

public void resetMissingPaylaodWasAccessed() {
payloadWasAccessed = false;
}

private PayloadNotLoadedException makeFault() throws PayloadNotLoadedException {
payloadWasAccessed = true;
return PayloadNotLoadedException.getInstance();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Optional;

import org.opensearch.migrations.replay.datahandlers.PayloadAccessFaultingMap;
import org.opensearch.migrations.replay.datahandlers.PayloadNotLoadedException;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.transform.IAuthTransformer;
import org.opensearch.migrations.transform.IJsonTransformer;
Expand Down Expand Up @@ -67,25 +66,34 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
IAuthTransformer authTransformer = requestPipelineOrchestrator.authTransfomerFactory.getAuthTransformer(
httpJsonMessage
);
HttpJsonRequestWithFaultingPayload transformedMessage = null;
try {
transformedMessage = transform(transformer, httpJsonMessage);
} catch (Exception e) {
var payload = (PayloadAccessFaultingMap) httpJsonMessage.payload();
if (payload.missingPaylaodWasAccessed()) {
payload.resetMissingPaylaodWasAccessed();
log.atDebug().setMessage("The transforms for this message require payload manipulation, "
+ "all content handlers are being loaded.").log();
// make a fresh message and its headers
requestPipelineOrchestrator.addJsonParsingHandlers(
ctx,
transformer,
getAuthTransformerAsStreamingTransformer(authTransformer)
);
ctx.fireChannelRead(handleAuthHeaders(httpJsonMessage, authTransformer));
} else{
throw new TransformationException(e);
}
}

if (transformedMessage != null) {
handlePayloadNeutralTransformationOrThrow(
ctx,
originalHttpJsonMessage,
transform(transformer, httpJsonMessage),
transformedMessage,
authTransformer
);
} catch (PayloadNotLoadedException pnle) {
log.debug(
"The transforms for this message require payload manipulation, "
+ "all content handlers are being loaded."
);
// make a fresh message and its headers
requestPipelineOrchestrator.addJsonParsingHandlers(
ctx,
transformer,
getAuthTransformerAsStreamingTransformer(authTransformer)
);
ctx.fireChannelRead(handleAuthHeaders(httpJsonMessage, authTransformer));
}
} else if (msg instanceof HttpContent) {
ctx.fireChannelRead(msg);
Expand Down Expand Up @@ -131,43 +139,35 @@ private void handlePayloadNeutralTransformationOrThrow(

var pipeline = ctx.pipeline();
if (streamingAuthTransformer != null) {
log.info(
diagnosticLabel
+ "An Authorization Transformation is required for this message. "
+ "The headers and payload will be parsed and reformatted."
);
log.atInfo().setMessage("{} An Authorization Transformation is required for this message. "
+ "The headers and payload will be parsed and reformatted.")
.addArgument(diagnosticLabel).log();
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
} else if (headerFieldsAreIdentical(originalRequest, httpJsonMessage)) {
log.info(
diagnosticLabel
+ "Transformation isn't necessary. "
+ "Resetting the processing pipeline to let the caller send the original network bytes as-is."
);
log.atInfo().setMessage("{} Transformation isn't necessary. "
+ "Resetting the processing pipeline to let the caller send the original network bytes as-is.")
.addArgument(diagnosticLabel)
.log();
RequestPipelineOrchestrator.removeAllHandlers(pipeline);

} else if (headerFieldIsIdentical("content-encoding", originalRequest, httpJsonMessage)
&& headerFieldIsIdentical("transfer-encoding", originalRequest, httpJsonMessage)) {
log.info(
diagnosticLabel
+ "There were changes to the headers that require the message to be reformatted "
+ "but the payload doesn't need to be transformed."
);
// By adding the baseline handlers and removing this and previous handlers in reverse order,
// we will cause the upstream handlers to flush their in-progress accumulated ByteBufs downstream
// to be processed accordingly
requestPipelineOrchestrator.addBaselineHandlers(pipeline);
ctx.fireChannelRead(httpJsonMessage);
RequestPipelineOrchestrator.removeThisAndPreviousHandlers(pipeline, this);
} else {
log.info(
diagnosticLabel
+ "New headers have been specified that require the payload stream to be "
+ "reformatted. Setting up the processing pipeline to parse and reformat the request payload."
);
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
}
log.atInfo().setMessage("{} There were changes to the headers that require the message to be reformatted "
+ "but the payload doesn't need to be transformed.")
.addArgument(diagnosticLabel).log();
// By adding the baseline handlers and removing this and previous handlers in reverse order,
// we will cause the upstream handlers to flush their in-progress accumulated ByteBufs downstream
// to be processed accordingly
requestPipelineOrchestrator.addBaselineHandlers(pipeline);
ctx.fireChannelRead(httpJsonMessage);
RequestPipelineOrchestrator.removeThisAndPreviousHandlers(pipeline, this);
} else {
log.atInfo().setMessage("{} New headers have been specified that require the payload stream to be "
+ "reformatted. Setting up the processing pipeline to parse and reformat the request payload.")
.addArgument(diagnosticLabel).log();
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
}
}

private static HttpJsonRequestWithFaultingPayload handleAuthHeaders(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.opensearch.migrations.replay;


import org.opensearch.migrations.replay.datahandlers.PayloadAccessFaultingMap;
import org.opensearch.migrations.replay.datahandlers.http.HttpJsonRequestWithFaultingPayload;
import org.opensearch.migrations.replay.datahandlers.http.ListKeyAdaptingCaseInsensitiveHeadersMap;
import org.opensearch.migrations.replay.datahandlers.http.StrictCaseInsensitiveHttpHeadersMap;
import org.opensearch.migrations.transform.TransformationLoader;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PayloadNotFoundTest {
@Test
public void testTransformsPropagateExceptionProperly() throws JsonProcessingException {
final HttpJsonRequestWithFaultingPayload FAULTING_MAP = new HttpJsonRequestWithFaultingPayload();
FAULTING_MAP.setMethod("PUT");
FAULTING_MAP.setPath("/_bulk");
FAULTING_MAP.setHeaders(new ListKeyAdaptingCaseInsensitiveHeadersMap(new StrictCaseInsensitiveHttpHeadersMap()));
FAULTING_MAP.headers().put("Content-Type", "application/json");
FAULTING_MAP.setPayloadFaultMap(new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap()));
final String EXPECTED = "{\n"
+ " \"method\": \"PUT\",\n"
+ " \"URI\": \"/_bulk\",\n"
+ " \"headers\": {\n"
+ " \"Content-Type\": \"application/json\"\n"
+ " }\n"
+ "}\n";

var transformer = new TransformationLoader().getTransformerFactoryLoader("newhost", null,
"[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]");
var e = Assertions.assertThrows(Exception.class,
() -> transformer.transformJson(FAULTING_MAP));
Assertions.assertTrue(((PayloadAccessFaultingMap)FAULTING_MAP.payload()).missingPaylaodWasAccessed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public void testMalformedPayload_andThrowingTransformation_IsPassedThrough() thr
new TransformationLoader().getTransformerFactoryLoader(
HOST_NAME,
null,
"[{\"JsonTransformerForOpenSearch23PlusTargetTransformerProvider\":\"\"}]"
"[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]"
),
null,
testPacketCapture,
Expand Down
2 changes: 2 additions & 0 deletions commonDependencyVersionConstraints/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ dependencies {

def jackson = '2.16.2'
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: jackson
api group: 'com.fasterxml.jackson.core', name: 'jackson-dataformat-yaml', version: jackson
api group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jackson
api group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-smile', version: jackson

def jupiter = '5.10.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.migrations.testutils;

import java.util.SortedMap;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.SneakyThrows;

public class JsonNormalizer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
.configure(SerializationFeature.INDENT_OUTPUT, true);

@SneakyThrows
public static String fromString(String input) {
return OBJECT_MAPPER.writeValueAsString(OBJECT_MAPPER.readValue(input, SortedMap.class));
}

@SneakyThrows
public static String fromObject(Object obj) {
return fromString(OBJECT_MAPPER.writeValueAsString(obj));
}
}
Loading
Loading