Skip to content

Commit

Permalink
Bugfixes for type mappings removal transformations, README + other si…
Browse files Browse the repository at this point in the history
…mple improvements

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Dec 10, 2024
1 parent fbc0291 commit c13ef4a
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else {
content.release();
}
} else if (msg instanceof HttpMessage) {
}
if (msg instanceof HttpMessage) { // this & HttpContent are interfaces & 'Full' messages implement both
message = (HttpMessage) msg;
}
if (msg instanceof LastHttpContent) {
Expand All @@ -206,16 +207,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
var finalMsg = (message instanceof HttpRequest)
? new DefaultFullHttpRequest(message.protocolVersion(),
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
: new DefaultFullHttpResponse(message.protocolVersion(),
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
super.channelRead(ctx, finalMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public static String getTransactionSummaryStringPreamble() {
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE...")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES...")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS...")
.add("METHOD...")
.add("URI...")
.toString();
}
Expand Down Expand Up @@ -219,6 +220,11 @@ public static String toTransactionSummaryString(
transformStreamToString(parsed.targetResponseList.stream(),
r -> "" + r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY))
)
// method
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.METHOD_KEY))
.orElse(MISSING_STR))
// uri
.add(
parsed.sourceRequestOp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,71 @@ GET activity/post/_search

## Routing data to new indices

The structure of the documents and indices need to change. Some options are to use separate indices, drop some of
The structure of the documents and indices need to change. Options are to use separate indices, drop some of
the types to make an index single-purpose, or to create an index that's the union of all the types' fields.

With a simple mapping directive, we can define each of these three behaviors. The following yaml shows how to map
documents into two different indices named users and posts:
Specific instances of those behaviors can be expressed via a map (or dictionary) or indices to types to target indices.
The following sample json shows how to map documents from the 'activity' index into two different indices
('users' and 'posts'):
```
activity:
user: new_users
post: new_posts
{
"activity": {
"user: "new_users",
"post": "new_posts"
}
```

To drop one type, just leave it out:
To drop the 'post' type, just leave it out:
```
activity:
user: only_users
{
"activity": {
"user": "only_users"
}
}
```

To merge types together, use the same value:
To merge types into a single index, use the same value:
```
activity:
user: any_activity
post: any_activity
{
"activity": {
"user": "any_activity",
"post": "any_activity",
}
}
```

Any _indices_ that are NOT specified won't be modified - all additions, changes, and queries on those other indices not
specified at the root level will remain untouched by the static mapping rewriter. However, missing types from a
specified index will be removed. To remove ALL the activity for a given index, specify an index with no
children types.
To remove ALL the activity for a given index, specify an index with no children types.
```
activity: {}
{
"activity": {}
}
```

In addition to static source/target mappings, users can specify source and type pairs as a regex and use any captured
groups in the target index name. Regex rules take precedent _after_ the static rules and are only applied when there
was no index match in the static mappings.
Those regex rules take precedence **after** the static mappings specified above.

In addition to static source/target mappings, users can specify source and type pairs as regex patterns and
use captured groups in the target index name.
Any source _indices_ that are NOT specified in the maps will be processed through the regex route rules.
The regex rules are only applied if the source index doesn't match a key in the static route map

Regex replacement is controlled via an ordered list of `[indexNamePattern, typeNamePattern, replacementString]`.
The transformer will use the replacement for the first matched item found.
If none are found, unlike missing indices for static mappings, the system presumes that the index and type are
**NOT** to be propagated to the target - any reference to those types and their corresponding data will be suppressed.
To preserve all items, a default rule will need to be included.
Regex replace rules are evaluated by concatenating the source index and source types into a single string.
The pattern components are also concatenated into a corresponding match string.
The replacement value will replace the _matched_ part of the source index + typename and replace it with the
specified value.
If that specified value contains (numerical) backreferences, those will pull from the captured groups of the
concatenated pattern.
The concatenated pattern is the index pattern followed by the type pattern, meaning that the groups in the index are
numbered from 1 and the type pattern group numbers start after all the groups from the index.

Missing types from a specified index will be removed.
When the regex pattern isn't defined `["(.*)", "(.*)", "\\1_\\2"]` is used to map each type into its own isolated
index, preserving all data and its separation.

For more details about regexes, see the [Python](https://docs.python.org/3/library/re.html#re.sub) or
[Java](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html) documentation.
This transform uses python-style backreferences (`'`\1`) for replacement patterns.
Notice that regexes can NOT be specified in the index-type map.
They can _only_ be used via the list, which will be evaluated in the order of the list until a match is found.

The following sample shows how indices that start with 'time-' will be migrated and every other index and type not
already matched will be dropped.
Expand All @@ -114,41 +138,18 @@ merging all types into a single index with the same name as the source index.
]
```

## Final Results

```
PUT any_activity
{
"mappings": {
"properties": {
"type": {
"type": "keyword"
},
"name": {
"type": "text"
},
"user_name": {
"type": "keyword"
},
"email": {
"type": "keyword"
},
"content": {
"type": "text"
},
"tweeted_at": {
"type": "date"
}
}
}
}
PUT any_activity/_doc/someuser
{
"name": "Some User",
"user_name": "user",
"email": "[email protected]"
}
For more examples, compare the following cases.
Though note that anything matched by the static maps shown above will block any of these rules from being evaluated.

| Regex Entry | Source Index | Source Type | Target Index | PUT Doc URL | Bulk Index Command |
|--------------------------------------------------------------------------------|-------------|-------------|-------------------|--------------------------------|----------------------------------------------------------------|
| `[["time-(.*)", "(cpu)", "time-\\1-\\2"]]` | time-nov11 | cpu | time-nov11-cpu | /time-nov11-cpu/_doc/doc512 | `{"index": {"_index": "time-nov11-cpu", "_id": "doc512" }}` |
| `[["time-(.*)", "(cpu)", "time-\\1-\\2"]]` | logs | access | [DELETED] | [DELETED] | [DELETED] |
| `[["time-(.*)", "(cpu)", "time-\\1-\\2"],`<br/>` ["(.*)", "(.*)", "\\1-\\2"]]` | logs | access | logs_access | /logs_access/_doc/doc513 | `{"index": {"_index": "logs_access", "_id": "doc513" }}` |
| `[["time-(.*)", "(cpu)", "time-\\1-\\2"],`<br/>`[["", ".*", ""]]` | everything | widgets | everything | /everything/_doc/doc514 | `{"index": {"_index": "everything", "_id": "doc514" }}` |
| `[["time-(.*)", "(cpu)", "time-\\1-\\2"],`<br/>`[["", ".*", ""]]` | everything | sprockets | everything | /everything/_doc/doc515 | `{"index": {"_index": "everything", "_id": "doc515" }}` |
| `[["time-(.*)", "(.*)-(cpu)", "\\2-\\3-\\1"]]` | time-nov11 | host123-cpu | host123-cpu-nov11 | /host123-cpu-nov11/_doc/doc512 | `{"index": {"_index": "host123-cpu-nov11", "_id": "doc512" }}` |
| `[["", ".*", ""]]` | metadata | users | metadata | /metadata/_doc/doc516 | `{"index": {"_index": "metadata", "_id": "doc516" }}` |
| `[[".*", ".*", "leftovers"]]` | logs | access | leftovers | /leftovers/_doc/doc517 | `{"index": {"_index": "leftovers", "_id": "doc517" }}` |
| `[[".*", ".*", "leftovers"]]` | permissions | access | leftovers | /leftovers/_doc/doc517 | `{"index": {"_index": "leftovers", "_id": "doc517" }}` |

```
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public TypeMappingsSanitizationTransformer(
// types of patterns are being used.
// This regex says, match the type part and reduce it to nothing, leave the index part untouched.
var regexIndexMappings = Optional.ofNullable(regexIndexMappingsIncoming)
.orElseGet(() -> (indexMappingsIncoming == null ? List.of(List.of("", ".*", "")) : List.of()));
.orElseGet(() -> (indexMappingsIncoming == null ? List.of(List.of("(.*)", "(.*)", "\\1_\\2")) : List.of()));

return incomingJson -> Map.of("source_document", incomingJson,
"index_mappings", indexMappings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
{
"method": "{{ input_map.request.method }}",
"URI": "/{{ target_index }}/_doc/{{ match.group3 }}",
"preserveWhenMissing": ["headers","payload"]
"preserveWhenMissing": "*"
}
{%- endif -%}
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
{%- endmacro -%}

{%- macro convert_source_index_to_target(source_index, source_type, index_mappings, regex_index_mappings) -%}
{%- set ns = namespace(target_index=none) -%}
{%- set ns.target_index = (index_mappings[source_index] | default({}))[source_type] -%}
{%- if ns.target_index is none -%}
{%- set ns.target_index = convert_source_index_to_target_via_regex(source_index, source_type, regex_index_mappings) -%}
{%- if source_type == "_doc" -%}
{{- source_index -}}
{%- else -%}
{%- set ns = namespace(target_index=none) -%}
{%- set ns.target_index = (index_mappings[source_index] | default({}))[source_type] -%}
{%- if ns.target_index is none -%}
{%- set ns.target_index = convert_source_index_to_target_via_regex(source_index, source_type, regex_index_mappings) -%}
{%- endif -%}
{{- ns.target_index -}}
{%- endif -%}
{{- ns.target_index -}}
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.transform;

import java.util.LinkedHashMap;
import java.util.List;

import org.opensearch.migrations.testutils.JsonNormalizer;

Expand All @@ -23,13 +22,12 @@ public void test() throws Exception {
"}";

var expectedString = "{\n" +
" \"index\": { \"_index\": \"network\", \"_id\": \"1\" },\n" +
" \"index\": { \"_index\": \"performance_network\", \"_id\": \"1\" },\n" +
" \"source\": { \"field1\": \"value1\" }\n" +
"}";


var regexIndexMappings = List.of(List.of(".*", "", ""));
var indexTypeMappingRewriter = new TypeMappingsSanitizationTransformer(null, regexIndexMappings);
var indexTypeMappingRewriter = new TypeMappingsSanitizationTransformer(null, null);
var resultObj = indexTypeMappingRewriter.transformJson(OBJECT_MAPPER.readValue(testString, LinkedHashMap.class));
log.atInfo().setMessage("resultStr = {}").addArgument(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ public class TypeMappingsSanitizationProviderTest {
public void testSimpleTransform() throws JsonProcessingException {
var config = Map.of("staticMappings",
Map.of(
"indexA", Map.of(
"type1", "indexA_1",
"type2", "indexA_2"),
"indexB", Map.of(
"type1", "indexB",
"type2", "indexB"),
"indexC", Map.of(
"type2", "indexC")),
"indexa", Map.of(
"type1", "indexa_1",
"type2", "indexa_2"),
"indexb", Map.of(
"type1", "indexb",
"type2", "indexb"),
"indexc", Map.of(
"type2", "indexc")),
"regexMappings", List.of(List.of("(time.*)", "(type.*)", "\\1_And_\\2")));
final String TEST_INPUT_REQUEST = "{\n"
+ " \"method\": \"PUT\",\n"
+ " \"URI\": \"/indexA/type2/someuser\",\n"
+ " \"URI\": \"/indexa/type2/someuser\",\n"
+ " \"headers\": {\n"
+ " \"host\": \"127.0.0.1\"\n"
+ " },\n"
Expand All @@ -56,7 +56,7 @@ public void testSimpleTransform() throws JsonProcessingException {
+ "}\n";
final String EXPECTED = "{\n"
+ " \"method\": \"PUT\",\n"
+ " \"URI\": \"/indexA_2/_doc/someuser\",\n"
+ " \"URI\": \"/indexa_2/_doc/someuser\",\n"
+ " \"headers\": {\n"
+ " \"host\": \"127.0.0.1\"\n"
+ " },\n"
Expand All @@ -82,8 +82,8 @@ public void testSimpleTransform() throws JsonProcessingException {
Assertions.assertEquals(
JsonNormalizer.fromString(
EXPECTED.replace(
"/indexA_2/_doc/someuser",
"/indexA/_doc/someuser")),
"/indexa_2/_doc/someuser",
"/indexa_type2/_doc/someuser")),
JsonNormalizer.fromObject(resultFromNullConfig));
}
}
Expand Down

0 comments on commit c13ef4a

Please sign in to comment.