Skip to content

Commit

Permalink
Search refactor (open-metadata#13397)
Browse files Browse the repository at this point in the history
* Refactor Search

* Refactor Search

* Fix propgation bugs

* Fix propgation bugs

* Fix glossary term search

* Fix glossary term search

* Only issue search requests if the client is configured properly

* Only issue search requests if the client is configured properly

* Fix glossary index

* add documentation for sharded tables (open-metadata#13361)

* Cost analysis agg (open-metadata#13408)

* feat: updated DI workflow to inherit from BaseWorkflow + split processor and producer classes

* feat: __init__.py files creation

* feat: updated workflow import classes in code and doc

* feat: moved kpi runner from runner to processor folder

* fix: skip failure on list entities

* feat: deleted unused files

* feat: updated status reporter

* feat: ran linting

* feat: fix test error with typing and fqn

* feat: updated test dependencies

* feat: ran linting

* feat: move execution order up

* feat: updated cost analysis report to align with new workflow

* feat: fix entity already exists for pipeline entity status

* feat: ran python linting

* feat: move skip_on_failure to method

* feat: added unusedReport to DI

* feat: added aggregated unused report

* feat: ran linting

* feat: reverted compose file changes

---------

Co-authored-by: Sriharsha Chintalapani <[email protected]>

* Add Java 17 support (open-metadata#12895)

* Add Java 17 support

* Change Test HTTP client provider

* Create Tests HTTP Client once

* Create Tests HTTP Client once

* fix(CI): Update CI to use jdk 17 and dockerfiles as well

---------

Co-authored-by: Akash-Jain <[email protected]>

* Refactor Search

* Refactor Search

* Fix propgation bugs

* Fix propgation bugs

* Fix glossary term search

* Fix glossary term search

* Only issue search requests if the client is configured properly

* Only issue search requests if the client is configured properly

* Fix glossary index

* Merge main

* fix style

* deleted field propagation

* Fix style

* close the test client

* Changing to jersey connector

* Fix Authentication Exception headers

---------

Co-authored-by: Pere Miquel Brull <[email protected]>
Co-authored-by: Ayush Shah <[email protected]>
Co-authored-by: Teddy <[email protected]>
Co-authored-by: Akash-Jain <[email protected]>
  • Loading branch information
5 people authored Oct 3, 2023
1 parent efa0802 commit 8578aee
Show file tree
Hide file tree
Showing 83 changed files with 1,733 additions and 2,308 deletions.
2 changes: 1 addition & 1 deletion openmetadata-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@
</dependency>
<dependency>
<groupId>org.glassfish.jersey.connectors</groupId>
<artifactId>jersey-grizzly-connector</artifactId>
<artifactId>jersey-jetty-connector</artifactId>
<version>2.40</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.openmetadata.service.jdbi3.TokenRepository;
import org.openmetadata.service.jdbi3.UsageRepository;
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.reflections.Reflections;

Expand All @@ -76,6 +77,7 @@ public final class Entity {
@Getter @Setter private static UsageRepository usageRepository;
@Getter @Setter private static SystemRepository systemRepository;
@Getter @Setter private static ChangeEventRepository changeEventRepository;
@Getter @Setter private static SearchRepository searchRepository;

// List of all the entities
private static final List<String> ENTITY_LIST = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
import org.openmetadata.service.resources.CollectionRegistry;
import org.openmetadata.service.resources.databases.DatasourceConfig;
import org.openmetadata.service.resources.settings.SettingsCache;
import org.openmetadata.service.search.SearchEventPublisher;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.secrets.SecretsManagerUpdateService;
Expand Down Expand Up @@ -132,6 +132,8 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
private AuthenticatorHandler authenticatorHandler;
private static CollectionDAO collectionDAO;

private static SearchRepository searchRepository;

@Override
public void run(OpenMetadataApplicationConfig catalogConfig, Environment environment)
throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
Expand All @@ -149,6 +151,10 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ
JdbiTransactionManager.initialize(jdbiUnitOfWorkProvider.getHandleManager());
environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(new HashSet<>()));

// initialize Search Repository, all repositories use SearchRepository this line should always before initializing
// repository
searchRepository = new SearchRepository(catalogConfig.getElasticSearchConfiguration(), collectionDAO);

// as first step register all the repositories
Entity.initializeRepositories(collectionDAO);

Expand Down Expand Up @@ -435,12 +441,6 @@ private void registerEventFilter(
}

private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataApplicationConfig) {
// register ElasticSearch Event publisher
if (openMetadataApplicationConfig.getElasticSearchConfiguration() != null) {
SearchEventPublisher searchEventPublisher =
new SearchEventPublisher(openMetadataApplicationConfig.getElasticSearchConfiguration(), collectionDAO);
EventPubSub.addEventHandler(searchEventPublisher);
}

if (openMetadataApplicationConfig.getEventMonitorConfiguration() != null) {
final EventMonitor eventMonitor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import static org.openmetadata.schema.dataInsight.DataInsightChartResult.DataInsightChartType.PERCENTAGE_OF_ENTITIES_WITH_OWNER_BY_TYPE;
import static org.openmetadata.schema.dataInsight.DataInsightChartResult.DataInsightChartType.TOTAL_ENTITIES_BY_TIER;
import static org.openmetadata.schema.dataInsight.DataInsightChartResult.DataInsightChartType.TOTAL_ENTITIES_BY_TYPE;
import static org.openmetadata.schema.type.DataReportIndex.ENTITY_REPORT_DATA_INDEX;
import static org.openmetadata.service.Entity.EVENT_SUBSCRIPTION;
import static org.openmetadata.service.Entity.KPI;
import static org.openmetadata.service.Entity.TEAM;
import static org.openmetadata.service.events.scheduled.ReportsHandler.SEARCH_CLIENT;
import static org.openmetadata.service.search.SearchIndexDefinition.ElasticSearchIndexType.ENTITY_REPORT_DATA_INDEX;
import static org.openmetadata.service.util.SubscriptionUtil.getAdminsData;
import static org.openmetadata.service.util.SubscriptionUtil.getNumberOfDays;

Expand Down Expand Up @@ -185,7 +185,7 @@ private DataInsightTotalAssetTemplate createTotalAssetTemplate(
// Get total Assets Data
TreeMap<Long, List<Object>> dateWithDataMap =
searchRepository.getSortedDate(
team, scheduleTime, currentTime, TOTAL_ENTITIES_BY_TYPE, ENTITY_REPORT_DATA_INDEX.indexName);
team, scheduleTime, currentTime, TOTAL_ENTITIES_BY_TYPE, ENTITY_REPORT_DATA_INDEX.value());
if (dateWithDataMap.firstEntry() != null && dateWithDataMap.lastEntry() != null) {

List<TotalEntitiesByType> first =
Expand Down Expand Up @@ -218,7 +218,7 @@ private DataInsightDescriptionAndOwnerTemplate createDescriptionTemplate(
scheduleTime,
currentTime,
PERCENTAGE_OF_ENTITIES_WITH_DESCRIPTION_BY_TYPE,
ENTITY_REPORT_DATA_INDEX.indexName);
ENTITY_REPORT_DATA_INDEX.value());
if (dateWithDataMap.firstEntry() != null && dateWithDataMap.lastEntry() != null) {
List<PercentageOfEntitiesWithDescriptionByType> first =
JsonUtils.convertValue(dateWithDataMap.firstEntry().getValue(), new TypeReference<>() {});
Expand Down Expand Up @@ -267,7 +267,7 @@ private DataInsightDescriptionAndOwnerTemplate createOwnershipTemplate(
scheduleTime,
currentTime,
PERCENTAGE_OF_ENTITIES_WITH_OWNER_BY_TYPE,
ENTITY_REPORT_DATA_INDEX.indexName);
ENTITY_REPORT_DATA_INDEX.value());
if (dateWithDataMap.firstEntry() != null && dateWithDataMap.lastEntry() != null) {
List<PercentageOfEntitiesWithOwnerByType> first =
JsonUtils.convertValue(dateWithDataMap.firstEntry().getValue(), new TypeReference<>() {});
Expand Down Expand Up @@ -313,7 +313,7 @@ private DataInsightDescriptionAndOwnerTemplate createTierTemplate(
// This assumes that on a particular date the correct count per entities are given
TreeMap<Long, List<Object>> dateWithDataMap =
searchRepository.getSortedDate(
team, scheduleTime, currentTime, TOTAL_ENTITIES_BY_TIER, ENTITY_REPORT_DATA_INDEX.indexName);
team, scheduleTime, currentTime, TOTAL_ENTITIES_BY_TIER, ENTITY_REPORT_DATA_INDEX.value());
if (dateWithDataMap.lastEntry() != null) {
List<TotalEntitiesByTier> last =
JsonUtils.convertValue(dateWithDataMap.lastEntry().getValue(), new TypeReference<>() {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class ReportsHandler {
private final Scheduler reportScheduler = new StdSchedulerFactory().getScheduler();
private static final ConcurrentHashMap<UUID, JobDetail> reportJobKeyMap = new ConcurrentHashMap<>();

private ReportsHandler(SearchRepository searchRepository) throws SchedulerException {
this.searchRepository = searchRepository;
private ReportsHandler() throws SchedulerException {
this.searchRepository = Entity.getSearchRepository();
this.chartRepository = (DataInsightChartRepository) Entity.getEntityRepository(Entity.DATA_INSIGHT_CHART);
this.reportScheduler.start();
}
Expand All @@ -69,9 +69,9 @@ public ConcurrentMap<UUID, JobDetail> getReportMap() {
return reportJobKeyMap;
}

public static void initialize(SearchRepository searchRepository) throws SchedulerException {
public static void initialize() throws SchedulerException {
if (!initialized) {
instance = new ReportsHandler(searchRepository);
instance = new ReportsHandler();
initialized = true;
} else {
LOG.info("Reindexing Handler is already initialized");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.CollectionRegistry;
import org.openmetadata.service.search.IndexUtil;
import org.openmetadata.service.search.SearchIndexDefinition;
import org.openmetadata.service.search.models.IndexMapping;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;

Expand Down Expand Up @@ -158,8 +157,8 @@ public static Set<String> getEntitiesIndex(List<String> entities) {
Set<String> indexesToSearch = new HashSet<>();
for (String entityType : entities) {
try {
SearchIndexDefinition.ElasticSearchIndexType type = IndexUtil.getIndexMappingByEntityType(entityType);
indexesToSearch.add(type.indexName);
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
indexesToSearch.add(indexMapping.getIndexName());
} catch (RuntimeException ex) {
LOG.error("Failing to get Index for EntityType");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public Response getResponse(Response.Status status, String message) {
return Response.status(status)
.type(APPLICATION_JSON_TYPE)
.entity(new ErrorMessage(status.getStatusCode(), message))
.header("WWW-Authenticate", "om-auth")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@
import static org.openmetadata.service.Entity.getEntityFields;
import static org.openmetadata.service.exception.CatalogExceptionMessage.csvNotSupported;
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.service.resources.EntityResource.searchRepository;
import static org.openmetadata.service.search.SearchIndexDefinition.ENTITY_TO_CHILDREN_MAPPING;
import static org.openmetadata.service.search.SearchRepository.ADD;
import static org.openmetadata.service.search.SearchRepository.DEFAULT_UPDATE_SCRIPT;
import static org.openmetadata.service.search.SearchRepository.DELETE;
import static org.openmetadata.service.search.SearchRepository.UPDATE;
import static org.openmetadata.service.util.EntityUtil.compareTagLabel;
import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
Expand Down Expand Up @@ -101,7 +95,6 @@
import org.openmetadata.schema.api.VoteRequest;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.api.teams.CreateTeam;
import org.openmetadata.schema.entity.classification.Classification;
import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.data.Table;
Expand Down Expand Up @@ -137,6 +130,7 @@
import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
import org.openmetadata.service.resources.tags.TagLabelUtil;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
Expand Down Expand Up @@ -199,6 +193,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
@Getter protected final String entityType;
@Getter protected final EntityDAO<T> dao;
@Getter protected final CollectionDAO daoCollection;
@Getter protected final SearchRepository searchRepository;
@Getter protected final Set<String> allowedFields;
public final boolean supportsSoftDelete;
@Getter protected final boolean supportsTags;
Expand All @@ -212,7 +207,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
protected final boolean supportsDataProducts;
@Getter protected final boolean supportsReviewers;
@Getter protected final boolean supportsExperts;
protected boolean quoteFqn = false; // Entity fqns not hierarchical such user, teams, services need to be quoted
protected boolean quoteFqn = false; // Entity FQNS not hierarchical such user, teams, services need to be quoted

/** Fields that can be updated during PATCH operation */
@Getter private final Fields patchFields;
Expand All @@ -235,6 +230,7 @@ protected EntityRepository(
allowedFields = getEntityFields(entityClass);
this.dao = entityDAO;
this.daoCollection = collectionDAO;
this.searchRepository = Entity.getSearchRepository();
this.entityType = entityType;
this.patchFields = getFields(patchFields);
this.putFields = getFields(putFields);
Expand Down Expand Up @@ -736,38 +732,7 @@ protected void postCreate(T entity) {
@SuppressWarnings("unused")
protected void postUpdate(T original, T updated) {
if (supportsSearch) {
searchRepository.updateEntity(updated, DEFAULT_UPDATE_SCRIPT, "");
if (ENTITY_TO_CHILDREN_MAPPING.get(updated.getEntityReference().getType()) != null
&& (updated.getChangeDescription() != null)) {
for (FieldChange fieldChange : updated.getChangeDescription().getFieldsAdded()) {
if (fieldChange.getName().equalsIgnoreCase(FIELD_OWNER)) {
searchRepository.handleOwnerUpdates(original, updated, ADD);
}
if (fieldChange.getName().equalsIgnoreCase(FIELD_DOMAIN)) {
searchRepository.handleDomainUpdates(original, updated, ADD);
}
}
for (FieldChange fieldChange : updated.getChangeDescription().getFieldsUpdated()) {
if (fieldChange.getName().equalsIgnoreCase(FIELD_OWNER)) {
searchRepository.handleOwnerUpdates(original, updated, UPDATE);
}
if (fieldChange.getName().equalsIgnoreCase(FIELD_DOMAIN)) {
searchRepository.handleDomainUpdates(original, updated, UPDATE);
}
if (fieldChange.getName().equalsIgnoreCase("disabled")
&& updated.getEntityReference().getType().equals(Entity.CLASSIFICATION)) {
searchRepository.handleClassificationUpdate((Classification) updated);
}
}
for (FieldChange fieldChange : updated.getChangeDescription().getFieldsDeleted()) {
if (fieldChange.getName().equalsIgnoreCase(FIELD_OWNER)) {
searchRepository.handleOwnerUpdates(original, updated, DELETE);
}
if (fieldChange.getName().equalsIgnoreCase(FIELD_DOMAIN)) {
searchRepository.handleDomainUpdates(original, updated, DELETE);
}
}
}
searchRepository.updateEntity(updated);
}
}

Expand Down Expand Up @@ -912,24 +877,15 @@ public void deleteFromSearch(T entity, String changeType) {
if (supportsSearch) {
if (changeType.equals(RestUtil.ENTITY_SOFT_DELETED)) {
searchRepository.softDeleteOrRestoreEntity(entity, true);
if (ENTITY_TO_CHILDREN_MAPPING.get(entity.getEntityReference().getType()) != null) {
searchRepository.handleSoftDeletedAndRestoredEntity(entity, true);
}
} else {
searchRepository.deleteEntity(entity, "", "", "");
if (ENTITY_TO_CHILDREN_MAPPING.get(entity.getEntityReference().getType()) != null) {
searchRepository.handleEntityDeleted(entity);
}
searchRepository.deleteEntity(entity);
}
}
}

public void restoreFromSearch(T entity) {
if (supportsSearch) {
searchRepository.softDeleteOrRestoreEntity(entity, false);
if (ENTITY_TO_CHILDREN_MAPPING.get(entity.getEntityReference().getType()) != null) {
searchRepository.handleSoftDeletedAndRestoredEntity(entity, false);
}
}
}

Expand All @@ -952,7 +908,6 @@ private DeleteResponse<T> delete(String deletedBy, T original, boolean recursive
cleanup(updated);
changeType = RestUtil.ENTITY_DELETED;
}
if (supportsSearch) {}
LOG.info("{} deleted {}", hardDelete ? "Hard" : "Soft", updated.getFullyQualifiedName());
return new DeleteResponse<>(updated, changeType);
}
Expand Down Expand Up @@ -1012,7 +967,7 @@ protected void cleanup(T entityInterface) {
// Delete all the tag labels
daoCollection.tagUsageDAO().deleteTagLabelsByTargetPrefix(entityInterface.getFullyQualifiedName());

// when the glossary and tag is deleted .. delete its usage
// when the glossary and tag is deleted, delete its usage
daoCollection.tagUsageDAO().deleteTagLabelsByFqn(entityInterface.getFullyQualifiedName());
// Delete all the usage data
daoCollection.usageDAO().delete(id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package org.openmetadata.service.jdbi3;

import static org.openmetadata.service.resources.EntityResource.searchRepository;

import java.util.UUID;
import lombok.Getter;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.JsonUtils;

@Repository
public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInterface> {
@Getter protected final String collectionPath;
@Getter protected final EntityTimeSeriesDAO timeSeriesDao;
@Getter protected final CollectionDAO daoCollection;
@Getter protected final SearchRepository searchRepository;
@Getter protected final String entityType;
@Getter protected final Class<T> entityClass;

Expand All @@ -29,6 +29,7 @@ protected EntityTimeSeriesRepository(
this.daoCollection = daoCollection;
this.entityClass = entityClass;
this.entityType = entityType;
this.searchRepository = Entity.getSearchRepository();
Entity.registerEntity(entityClass, entityType, this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
package org.openmetadata.service.jdbi3;

import static org.openmetadata.service.resources.EntityResource.searchRepository;

import java.util.HashMap;
import java.util.List;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.analytics.ReportData.ReportDataType;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;

public class ReportDataRepository extends EntityTimeSeriesRepository<ReportData> {
public static final String COLLECTION_PATH = "/v1/analytics/report";
public static final String REPORT_DATA_EXTENSION = "reportData.reportDataResult";

private final SearchRepository searchRepository;

public ReportDataRepository(CollectionDAO daoCollection) {
super(
COLLECTION_PATH,
daoCollection,
daoCollection.reportDataTimeSeriesDao(),
ReportData.class,
Entity.ENTITY_REPORT_DATA);
searchRepository = Entity.getSearchRepository();
}

public ResultList<ReportData> getReportData(ReportDataType reportDataType, Long startTs, Long endTs) {
Expand Down
Loading

0 comments on commit 8578aee

Please sign in to comment.