Skip to content

Commit

Permalink
fix(elasticsearch): refactor idHashAlgo setting (datahub-project#11193)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Aug 16, 2024
1 parent cb33c0f commit edb9a87
Show file tree
Hide file tree
Showing 40 changed files with 90 additions and 109 deletions.
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ PE_CONSUMER_ENABLED=true
UI_INGESTION_ENABLED=true
ENTITY_SERVICE_ENABLE_RETENTION=true

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ MCE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
UI_INGESTION_ENABLED=true

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to enable Metadata Service Authentication
METADATA_SERVICE_AUTH_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mae-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_IMPL=elasticsearch
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mae-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ NEO4J_PASSWORD=datahub
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mce-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ MAE_CONSUMER_ENABLED=false
PE_CONSUMER_ENABLED=false
UI_INGESTION_ENABLED=false

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to configure kafka topic names
# Make sure these names are consistent across the whole deployment
# METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1
Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mce-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ MAE_CONSUMER_ENABLED=false
PE_CONSUMER_ENABLED=false
UI_INGESTION_ENABLED=false

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to configure kafka topic names
# Make sure these names are consistent across the whole deployment
# METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1
Expand Down
1 change: 0 additions & 1 deletion docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ services:
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_IMPL=elasticsearch
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
- ELASTIC_ID_HASH_ALGO=MD5
hostname: datahub-mae-consumer
image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head}
ports:
Expand All @@ -38,7 +37,6 @@ services:
- EBEAN_DATASOURCE_USERNAME=datahub
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
2 changes: 0 additions & 2 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ services:
- NEO4J_PASSWORD=datahub
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
- ELASTIC_ID_HASH_ALGO=MD5
hostname: datahub-mae-consumer
image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head}
ports:
Expand All @@ -48,7 +47,6 @@ services:
- EBEAN_DATASOURCE_USERNAME=datahub
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 0 additions & 1 deletion docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -59,7 +60,7 @@ public Edge(
null);
}

public String toDocId() {
public String toDocId(@Nonnull String idHashAlgo) {
StringBuilder rawDocId = new StringBuilder();
rawDocId
.append(getSource().toString())
Expand All @@ -72,9 +73,8 @@ public String toDocId() {
}

try {
String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
byte[] bytesOfRawDocID = rawDocId.toString().getBytes(StandardCharsets.UTF_8);
MessageDigest md = MessageDigest.getInstance(hashAlgo);
MessageDigest md = MessageDigest.getInstance(idHashAlgo);
byte[] thedigest = md.digest(bytesOfRawDocID);
return Base64.getEncoder().encodeToString(thedigest);
} catch (NoSuchAlgorithmException e) {
Expand Down
1 change: 0 additions & 1 deletion metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ test {
// override, testng controlling parallelization
// increasing >1 will merely run all tests extra times
maxParallelForks = 1
environment "ELASTIC_ID_HASH_ALGO", "MD5"
}
useTestNG() {
suites 'src/test/resources/testng.xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd
private final ESGraphWriteDAO _graphWriteDAO;
private final ESGraphQueryDAO _graphReadDAO;
private final ESIndexBuilder _indexBuilder;
private final String idHashAlgo;
public static final String INDEX_NAME = "graph_service_v1";
private static final Map<String, Object> EMPTY_HASH = new HashMap<>();

Expand Down Expand Up @@ -125,7 +126,7 @@ public LineageRegistry getLineageRegistry() {

@Override
public void addEdge(@Nonnull final Edge edge) {
String docId = edge.toDocId();
String docId = edge.toDocId(idHashAlgo);
String edgeDocument = toDocument(edge);
_graphWriteDAO.upsertDocument(docId, edgeDocument);
}
Expand All @@ -137,7 +138,7 @@ public void upsertEdge(@Nonnull final Edge edge) {

@Override
public void removeEdge(@Nonnull final Edge edge) {
String docId = edge.toDocId();
String docId = edge.toDocId(idHashAlgo);
_graphWriteDAO.deleteDocument(docId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class UpdateIndicesService implements SearchIndicesService {
private final SystemMetadataService _systemMetadataService;
private final SearchDocumentTransformer _searchDocumentTransformer;
private final EntityIndexBuilders _entityIndexBuilders;
@Nonnull private final String idHashAlgo;

@Value("${featureFlags.graphServiceDiffModeEnabled:true}")
private boolean _graphDiffMode;
Expand Down Expand Up @@ -117,13 +118,15 @@ public UpdateIndicesService(
TimeseriesAspectService timeseriesAspectService,
SystemMetadataService systemMetadataService,
SearchDocumentTransformer searchDocumentTransformer,
EntityIndexBuilders entityIndexBuilders) {
EntityIndexBuilders entityIndexBuilders,
@Nonnull String idHashAlgo) {
_graphService = graphService;
_entitySearchService = entitySearchService;
_timeseriesAspectService = timeseriesAspectService;
_systemMetadataService = systemMetadataService;
_searchDocumentTransformer = searchDocumentTransformer;
_entityIndexBuilders = entityIndexBuilders;
this.idHashAlgo = idHashAlgo;
}

@Override
Expand Down Expand Up @@ -601,7 +604,9 @@ private void updateTimeseriesFields(
SystemMetadata systemMetadata) {
Map<String, JsonNode> documents;
try {
documents = TimeseriesAspectTransformer.transform(urn, aspect, aspectSpec, systemMetadata);
documents =
TimeseriesAspectTransformer.transform(
urn, aspect, aspectSpec, systemMetadata, idHashAlgo);
} catch (JsonProcessingException e) {
log.error("Failed to generate timeseries document from aspect: {}", e.toString());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static Map<String, JsonNode> transform(
@Nonnull final Urn urn,
@Nonnull final RecordTemplate timeseriesAspect,
@Nonnull final AspectSpec aspectSpec,
@Nullable final SystemMetadata systemMetadata)
@Nullable final SystemMetadata systemMetadata,
@Nonnull final String idHashAlgo)
throws JsonProcessingException {
ObjectNode commonDocument = getCommonDocument(urn, timeseriesAspect, systemMetadata);
Map<String, JsonNode> finalDocuments = new HashMap<>();
Expand All @@ -74,7 +75,7 @@ public static Map<String, JsonNode> transform(
final Map<TimeseriesFieldSpec, List<Object>> timeseriesFieldValueMap =
FieldExtractor.extractFields(timeseriesAspect, aspectSpec.getTimeseriesFieldSpecs());
timeseriesFieldValueMap.forEach((k, v) -> setTimeseriesField(document, k, v));
finalDocuments.put(getDocId(document, null), document);
finalDocuments.put(getDocId(document, null, idHashAlgo), document);

// Create new rows for the member collection fields.
final Map<TimeseriesFieldCollectionSpec, List<Object>> timeseriesFieldCollectionValueMap =
Expand All @@ -83,7 +84,7 @@ public static Map<String, JsonNode> transform(
timeseriesFieldCollectionValueMap.forEach(
(key, values) ->
finalDocuments.putAll(
getTimeseriesFieldCollectionDocuments(key, values, commonDocument)));
getTimeseriesFieldCollectionDocuments(key, values, commonDocument, idHashAlgo)));
return finalDocuments;
}

Expand Down Expand Up @@ -216,12 +217,13 @@ private static void setTimeseriesField(
private static Map<String, JsonNode> getTimeseriesFieldCollectionDocuments(
final TimeseriesFieldCollectionSpec fieldSpec,
final List<Object> values,
final ObjectNode commonDocument) {
final ObjectNode commonDocument,
@Nonnull final String idHashAlgo) {
return values.stream()
.map(value -> getTimeseriesFieldCollectionDocument(fieldSpec, value, commonDocument))
.collect(
Collectors.toMap(
keyDocPair -> getDocId(keyDocPair.getSecond(), keyDocPair.getFirst()),
keyDocPair -> getDocId(keyDocPair.getSecond(), keyDocPair.getFirst(), idHashAlgo),
Pair::getSecond));
}

Expand Down Expand Up @@ -257,9 +259,9 @@ private static Pair<String, ObjectNode> getTimeseriesFieldCollectionDocument(
finalDocument);
}

private static String getDocId(@Nonnull JsonNode document, String collectionId)
private static String getDocId(
@Nonnull JsonNode document, String collectionId, @Nonnull String idHashAlgo)
throws IllegalArgumentException {
String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
String docId = document.get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD).toString();
JsonNode eventGranularity = document.get(MappingsBuilder.EVENT_GRANULARITY);
if (eventGranularity != null) {
Expand All @@ -278,9 +280,9 @@ private static String getDocId(@Nonnull JsonNode document, String collectionId)
docId += partitionSpec.toString();
}

if (hashAlgo.equalsIgnoreCase("SHA-256")) {
if (idHashAlgo.equalsIgnoreCase("SHA-256")) {
return DigestUtils.sha256Hex(docId);
} else if (hashAlgo.equalsIgnoreCase("MD5")) {
} else if (idHashAlgo.equalsIgnoreCase("MD5")) {
return DigestUtils.md5Hex(docId);
}
throw new IllegalArgumentException("Hash function not handled !");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public abstract class SearchGraphServiceTestBase extends GraphServiceTestBase {
@Nonnull
protected abstract ESIndexBuilder getIndexBuilder();

private final IndexConvention _indexConvention = IndexConventionImpl.NO_PREFIX;
private final IndexConvention _indexConvention = IndexConventionImpl.noPrefix("MD5");
private final String _indexName = _indexConvention.getIndexName(INDEX_NAME);
private ElasticSearchGraphService _client;

Expand Down Expand Up @@ -108,7 +108,8 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) {
_indexConvention,
writeDAO,
readDAO,
getIndexBuilder());
getIndexBuilder(),
"MD5");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
operationContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new SnapshotEntityRegistry(new Snapshot()),
new IndexConventionImpl("lineage_search_service_test"))
new IndexConventionImpl("lineage_search_service_test", "MD5"))
.asSession(RequestContext.TEST, Authorizer.EMPTY, TestOperationContexts.TEST_USER_AUTH);
settingsBuilder = new SettingsBuilder(null);
elasticSearchService = buildEntitySearchService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
operationContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new SnapshotEntityRegistry(new Snapshot()),
new IndexConventionImpl("search_service_test"))
new IndexConventionImpl("search_service_test", "MD5"))
.asSession(RequestContext.TEST, Authorizer.EMPTY, TestOperationContexts.TEST_USER_AUTH);

settingsBuilder = new SettingsBuilder(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest
public void setup() {
opContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new SnapshotEntityRegistry(new Snapshot()), new IndexConventionImpl("es_service_test"));
new SnapshotEntityRegistry(new Snapshot()),
new IndexConventionImpl("es_service_test", "MD5"));
settingsBuilder = new SettingsBuilder(null);
elasticSearchService = buildService();
elasticSearchService.reindexAll(Collections.emptySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
mockClient = mock(RestHighLevelClient.class);
opContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new IndexConventionImpl("es_browse_dao_test"));
new IndexConventionImpl("es_browse_dao_test", "MD5"));
browseDAO = new ESBrowseDAO(mockClient, searchConfiguration, customSearchConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class SystemMetadataServiceTestBase extends AbstractTestNGSpring
protected abstract ESIndexBuilder getIndexBuilder();

private final IndexConvention _indexConvention =
new IndexConventionImpl("es_system_metadata_service_test");
new IndexConventionImpl("es_system_metadata_service_test", "MD5");

private ElasticSearchSystemMetadataService _client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {

opContext =
TestOperationContexts.systemContextNoSearchAuthorization(
entityRegistry, new IndexConventionImpl("es_timeseries_aspect_service_test"));
entityRegistry, new IndexConventionImpl("es_timeseries_aspect_service_test", "MD5"));

elasticSearchTimeseriesAspectService = buildService();
elasticSearchTimeseriesAspectService.reindexAll(Collections.emptySet());
Expand All @@ -152,7 +152,7 @@ private ElasticSearchTimeseriesAspectService buildService() {

private void upsertDocument(TestEntityProfile dp, Urn urn) throws JsonProcessingException {
Map<String, JsonNode> documents =
TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null);
TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null, "MD5");
assertEquals(documents.size(), 3);
documents.forEach(
(key, value) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ protected String longTailIndexPrefix() {

@Bean(name = "sampleDataIndexConvention")
protected IndexConvention indexConvention(@Qualifier("sampleDataPrefix") String prefix) {
return new IndexConventionImpl(prefix);
return new IndexConventionImpl(prefix, "MD5");
}

@Bean(name = "longTailIndexConvention")
protected IndexConvention longTailIndexConvention(@Qualifier("longTailPrefix") String prefix) {
return new IndexConventionImpl(prefix);
return new IndexConventionImpl(prefix, "MD5");
}

@Bean(name = "sampleDataFixtureName")
Expand Down
Loading

0 comments on commit edb9a87

Please sign in to comment.