diff --git a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql index c33578f546f7..126eb63bc335 100644 --- a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql @@ -79,4 +79,7 @@ CREATE TABLE IF NOT EXISTS table_entity_extension ( jsonSchema VARCHAR(256) NOT NULL, -- Schema used for generating JSON json JSON NOT NULL, PRIMARY KEY (id, extension) -); \ No newline at end of file +); + +ALTER TABLE entity_relationship ADD INDEX from_entity_type_index(fromId, fromEntity), ADD INDEX to_entity_type_index(toId, toEntity); +ALTER TABLE tag DROP CONSTRAINT fqnHash, ADD CONSTRAINT UNIQUE(fqnHash), ADD PRIMARY KEY(id); diff --git a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql index 081b2e61b5b6..ecd7d82d7202 100644 --- a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql @@ -77,4 +77,16 @@ CREATE TABLE IF NOT EXISTS table_entity_extension ( jsonSchema VARCHAR(256) NOT NULL, -- Schema used for generating JSON json JSONB NOT NULL, PRIMARY KEY (id, extension) -); \ No newline at end of file +); + +-- Add index on fromId and fromEntity columns +CREATE INDEX from_entity_type_index ON entity_relationship (fromId, fromEntity); + +-- Add index on toId and toEntity columns +CREATE INDEX to_entity_type_index ON entity_relationship (toId, toEntity); + +ALTER TABLE tag DROP CONSTRAINT IF EXISTS tag_fqnhash_key; + +ALTER TABLE tag ADD CONSTRAINT unique_fqnHash UNIQUE (fqnHash); + +ALTER TABLE tag ADD CONSTRAINT tag_pk PRIMARY KEY (id); diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml index c46571e1dd37..1b52383cf313 100644 --- a/openmetadata-service/pom.xml +++ b/openmetadata-service/pom.xml @@ -76,6 +76,11 @@ io.swagger.core.v3 swagger-annotations + + ru.vyarus.guicey + guicey-jdbi3 + 5.9.1 + com.smoketurner dropwizard-swagger @@ -231,8 +236,6 @@ io.github.resilience4j resilience4j-retry - - software.amazon.awssdk @@ -480,6 +483,12 @@ woodstox-core ${woodstox.version} + + com.google.guava + guava + 32.0.1-jre + compile + diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index 46c87036a614..f1dabfbdf1ad 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -13,6 +13,7 @@ package org.openmetadata.service; +import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass; import static org.openmetadata.service.util.MicrometerBundleSingleton.webAnalyticEvents; import io.dropwizard.Application; @@ -40,6 +41,7 @@ import java.security.cert.CertificateException; import java.time.temporal.ChronoUnit; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Optional; import javax.naming.ConfigurationException; @@ -85,6 +87,8 @@ import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkApplicationEventListener; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.migration.Migration; import org.openmetadata.service.migration.api.MigrationWorkflow; import org.openmetadata.service.monitoring.EventMonitor; @@ -137,12 +141,16 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ ChangeEventConfig.initialize(catalogConfig); final Jdbi jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory()); + JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider = JdbiUnitOfWorkProvider.withDefault(jdbi); + CollectionDAO daoObject = + (CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class); + environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(jdbiUnitOfWorkProvider, new HashSet<>())); // Configure the Fernet instance Fernet.getInstance().setFernetKey(catalogConfig); // Init Settings Cache - SettingsCache.initialize(jdbi.onDemand(CollectionDAO.class), catalogConfig); + SettingsCache.initialize(daoObject, catalogConfig); // init Secret Manager final SecretsManager secretsManager = @@ -186,23 +194,23 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ // start event hub before registering publishers EventPubSub.start(); - registerResources(catalogConfig, environment, jdbi); + registerResources(catalogConfig, environment, jdbi, jdbiUnitOfWorkProvider, daoObject); // Register Event Handler - registerEventFilter(catalogConfig, environment, jdbi); + registerEventFilter(catalogConfig, environment, jdbiUnitOfWorkProvider); environment.lifecycle().manage(new ManagedShutdown()); // Register Event publishers - registerEventPublisher(catalogConfig, jdbi); + registerEventPublisher(catalogConfig, daoObject); // update entities secrets if required new SecretsManagerUpdateService(secretsManager, catalogConfig.getClusterName()).updateEntities(); // start authorizer after event publishers // authorizer creates admin/bot users, ES publisher should start before to index users created by authorizer - authorizer.init(catalogConfig, jdbi); + authorizer.init(catalogConfig, daoObject); // authenticationHandler Handles auth related activities - authenticatorHandler.init(catalogConfig, jdbi); + authenticatorHandler.init(catalogConfig, daoObject); webAnalyticEvents = MicrometerBundleSingleton.latencyTimer(catalogConfig.getEventMonitorConfiguration()); FilterRegistration.Dynamic micrometerFilter = @@ -400,21 +408,22 @@ private void registerAuthenticator(OpenMetadataApplicationConfig catalogConfig) } } - private void registerEventFilter(OpenMetadataApplicationConfig catalogConfig, Environment environment, Jdbi jdbi) { + private void registerEventFilter( + OpenMetadataApplicationConfig catalogConfig, Environment environment, JdbiUnitOfWorkProvider provider) { if (catalogConfig.getEventHandlerConfiguration() != null) { - ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, jdbi); + ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, provider); environment.jersey().register(eventFilter); ContainerResponseFilter reindexingJobs = new SearchIndexEvent(); environment.jersey().register(reindexingJobs); } } - private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataApplicationConfig, Jdbi jdbi) { + private void registerEventPublisher( + OpenMetadataApplicationConfig openMetadataApplicationConfig, CollectionDAO daoObject) { // register ElasticSearch Event publisher if (openMetadataApplicationConfig.getElasticSearchConfiguration() != null) { SearchEventPublisher searchEventPublisher = - new SearchEventPublisher( - openMetadataApplicationConfig.getElasticSearchConfiguration(), jdbi.onDemand(CollectionDAO.class)); + new SearchEventPublisher(openMetadataApplicationConfig.getElasticSearchConfiguration(), daoObject); EventPubSub.addEventHandler(searchEventPublisher); } @@ -429,11 +438,18 @@ private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataAp } } - private void registerResources(OpenMetadataApplicationConfig config, Environment environment, Jdbi jdbi) { + private void registerResources( + OpenMetadataApplicationConfig config, + Environment environment, + Jdbi jdbi, + JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider, + CollectionDAO daoObject) { List extensionResources = config.getExtensionConfiguration() != null ? config.getExtensionConfiguration().getResourcePackage() : null; CollectionRegistry.initialize(extensionResources); - CollectionRegistry.getInstance().registerResources(jdbi, environment, config, authorizer, authenticatorHandler); + CollectionRegistry.getInstance() + .registerResources( + jdbi, jdbiUnitOfWorkProvider, environment, config, daoObject, authorizer, authenticatorHandler); environment.jersey().register(new JsonPatchProvider()); OMErrorPageHandler eph = new OMErrorPageHandler(config.getWebConfiguration()); eph.addErrorPage(Response.Status.NOT_FOUND.getStatusCode(), "/"); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java index fb394fdd4bb5..d25c81ac52eb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/AuditEventHandler.java @@ -16,11 +16,11 @@ import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.type.AuditLog; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.slf4j.Marker; import org.slf4j.MarkerFactory; @@ -28,7 +28,7 @@ public class AuditEventHandler implements EventHandler { private final Marker auditMarker = MarkerFactory.getMarker("AUDIT"); - public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) { + public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) { // Nothing to do } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java index 542c23e61c3d..0cf9be79e36e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/ChangeEventHandler.java @@ -16,14 +16,16 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity; import static org.openmetadata.service.formatter.util.FormatterUtil.getChangeEventFromResponseContext; +import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.SecurityContext; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.Handle; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.ChangeEvent; @@ -33,6 +35,7 @@ import org.openmetadata.service.events.subscription.AlertUtil; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.FeedRepository; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.socket.WebSocketManager; import org.openmetadata.service.util.FeedUtils; import org.openmetadata.service.util.JsonUtils; @@ -40,23 +43,29 @@ @Slf4j public class ChangeEventHandler implements EventHandler { - private CollectionDAO dao; - private FeedRepository feedDao; private ObjectMapper mapper; private NotificationHandler notificationHandler; - public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) { - this.dao = jdbi.onDemand(CollectionDAO.class); - this.feedDao = new FeedRepository(dao); + private JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider; + + public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) { this.mapper = new ObjectMapper(); - this.notificationHandler = new NotificationHandler(jdbi.onDemand(CollectionDAO.class)); + this.notificationHandler = new NotificationHandler(jdbiUnitOfWorkProvider); + this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider; } + @SneakyThrows public Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext) { String method = requestContext.getMethod(); SecurityContext securityContext = requestContext.getSecurityContext(); String loggedInUserName = securityContext.getUserPrincipal().getName(); try { + Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get(); + handle.getConnection().setAutoCommit(true); + CollectionDAO collectionDAO = + (CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class); + CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO(); + FeedRepository feedRepository = new FeedRepository(collectionDAO); if (responseContext.getEntity() != null && responseContext.getEntity().getClass().equals(Thread.class)) { // we should move this to Email Application notifications instead of processing it here. notificationHandler.processNotifications(responseContext); @@ -77,7 +86,8 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon changeEvent = copyChangeEvent(changeEvent); changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entity)); } - dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); + + changeEventDAO.insert(JsonUtils.pojoToJson(changeEvent)); // Add a new thread to the entity for every change event // for the event to appear in activity feeds @@ -86,12 +96,12 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon for (Thread thread : listOrEmpty(FeedUtils.getThreads(changeEvent, loggedInUserName))) { // Don't create a thread if there is no message if (thread.getMessage() != null && !thread.getMessage().isEmpty()) { - feedDao.create(thread); + feedRepository.create(thread, changeEvent); String jsonThread = mapper.writeValueAsString(thread); WebSocketManager.getInstance() .broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread); if (changeEvent.getEventType().equals(EventType.ENTITY_DELETED)) { - deleteAllConversationsRelatedToEntity(getEntity(changeEvent)); + deleteAllConversationsRelatedToEntity(getEntity(changeEvent), collectionDAO); } } } @@ -99,7 +109,9 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon } } } catch (Exception e) { - LOG.error("Failed to capture change event for method {} due to ", method, e); + LOG.error("Failed to capture the change event for method {} due to ", method, e); + } finally { + jdbiUnitOfWorkProvider.getHandleManager().clear(); } return null; } @@ -115,12 +127,12 @@ private static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) { .withCurrentVersion(changeEvent.getCurrentVersion()); } - private void deleteAllConversationsRelatedToEntity(EntityInterface entityInterface) { + private void deleteAllConversationsRelatedToEntity(EntityInterface entityInterface, CollectionDAO collectionDAO) { String entityId = entityInterface.getId().toString(); - List threadIds = dao.feedDAO().findByEntityId(entityId); + List threadIds = collectionDAO.feedDAO().findByEntityId(entityId); for (String threadId : threadIds) { - dao.relationshipDAO().deleteAll(threadId, Entity.THREAD); - dao.feedDAO().delete(threadId); + collectionDAO.relationshipDAO().deleteAll(threadId, Entity.THREAD); + collectionDAO.feedDAO().delete(threadId); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java index b881a40cb69f..6b4ed1f7a152 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventFilter.java @@ -13,7 +13,11 @@ package org.openmetadata.service.events; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.ForkJoinPool; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; @@ -21,8 +25,8 @@ import javax.ws.rs.core.UriInfo; import javax.ws.rs.ext.Provider; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.security.JwtFilter; import org.openmetadata.service.util.ParallelStreamUtil; @@ -34,13 +38,13 @@ public class EventFilter implements ContainerResponseFilter { private final ForkJoinPool forkJoinPool; private final List eventHandlers; - public EventFilter(OpenMetadataApplicationConfig config, Jdbi jdbi) { + public EventFilter(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) { this.forkJoinPool = new ForkJoinPool(FORK_JOIN_POOL_PARALLELISM); this.eventHandlers = new ArrayList<>(); - registerEventHandlers(config, jdbi); + registerEventHandlers(config, provider); } - private void registerEventHandlers(OpenMetadataApplicationConfig config, Jdbi jdbi) { + private void registerEventHandlers(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) { try { Set eventHandlerClassNames = new HashSet<>(config.getEventHandlerConfiguration().getEventHandlerClassNames()); @@ -48,7 +52,7 @@ private void registerEventHandlers(OpenMetadataApplicationConfig config, Jdbi jd @SuppressWarnings("unchecked") EventHandler eventHandler = ((Class) Class.forName(eventHandlerClassName)).getConstructor().newInstance(); - eventHandler.init(config, jdbi); + eventHandler.init(config, provider); eventHandlers.add(eventHandler); LOG.info("Added event handler {}", eventHandlerClassName); } @@ -64,6 +68,7 @@ public void filter(ContainerRequestContext requestContext, ContainerResponseCont if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) { return; } + eventHandlers .parallelStream() .forEach( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java index 89f4fcf5f722..5bafcaf21189 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/EventHandler.java @@ -15,11 +15,11 @@ import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerResponseContext; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; public interface EventHandler { - void init(OpenMetadataApplicationConfig config, Jdbi jdbi); + void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbi); Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java index 6e089785180e..856141df519b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/WebAnalyticEventHandler.java @@ -6,8 +6,8 @@ import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.UriInfo; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.util.MicrometerBundleSingleton; @Slf4j @@ -17,7 +17,7 @@ public class WebAnalyticEventHandler implements EventHandler { public static final String WEB_ANALYTIC_ENDPOINT = "v1/analytics/web/events/collect"; private static final String COUNTER_NAME = "web.analytics.events"; - public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) { + public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) { this.prometheusMeterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry; this.clusterName = config.getClusterName(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChangeEventRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChangeEventRepository.java index 554504ce9624..c71bfac813c9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChangeEventRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChangeEventRepository.java @@ -29,7 +29,6 @@ public ChangeEventRepository(CollectionDAO dao) { this.dao = dao.changeEventDAO(); } - @Transaction public List list( long timestamp, List entityCreatedList, @@ -51,6 +50,11 @@ public List list( return changeEvents; } + @Transaction + public void insert(ChangeEvent event) { + dao.insert(JsonUtils.pojoToJson(event)); + } + @Transaction public void deleteAll(String entityType) { dao.deleteAll(entityType); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index a9fcfa37a580..33c32afc0a92 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -88,7 +88,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.VoteRequest; @@ -376,7 +375,6 @@ public static List getEntitiesFromSeedData(String entityType, String path } /** Initialize a given entity if it does not exist. */ - @Transaction public void initializeEntity(T entity) { String existingJson = dao.findJsonByFqn(entity.getFullyQualifiedName(), ALL); if (existingJson != null) { @@ -396,13 +394,11 @@ public EntityUpdater getUpdater(T original, T updated, Operation operation) { return new EntityUpdater(original, updated, operation); } - @Transaction public final T get(UriInfo uriInfo, UUID id, Fields fields) { return get(uriInfo, id, fields, NON_DELETED, false); } /** Used for getting an entity with a set of requested fields */ - @Transaction public final T get(UriInfo uriInfo, UUID id, Fields fields, Include include, boolean fromCache) { if (!fromCache) { // Clear the cache and always get the entity from the database to ensure read-after-write consistency @@ -422,7 +418,6 @@ public final T get(UriInfo uriInfo, UUID id, Fields fields, Include include, boo } /** getReference is used for getting the entity references from the entity in the cache. */ - @Transaction public final EntityReference getReference(UUID id, Include include) throws EntityNotFoundException { return find(id, include).getEntityReference(); } @@ -430,7 +425,6 @@ public final EntityReference getReference(UUID id, Include include) throws Entit /** * Find method is used for getting an entity only with core fields stored as JSON without any relational fields set */ - @Transaction public T find(UUID id, Include include) throws EntityNotFoundException { try { @SuppressWarnings("unchecked") @@ -445,12 +439,10 @@ public T find(UUID id, Include include) throws EntityNotFoundException { } } - @Transaction public T getByName(UriInfo uriInfo, String fqn, Fields fields) { return getByName(uriInfo, fqn, fields, NON_DELETED, false); } - @Transaction public final T getByName(UriInfo uriInfo, String fqn, Fields fields, Include include, boolean fromCache) { fqn = quoteFqn ? EntityInterfaceUtil.quoteName(fqn) : fqn; if (!fromCache) { @@ -470,13 +462,11 @@ public final T getByName(UriInfo uriInfo, String fqn, Fields fields, Include inc return withHref(uriInfo, entityClone); } - @Transaction public final EntityReference getReferenceByName(String fqn, Include include) { fqn = quoteFqn ? EntityInterfaceUtil.quoteName(fqn) : fqn; return findByName(fqn, include).getEntityReference(); } - @Transaction public T findByNameOrNull(String fqn, Include include) { try { return findByName(fqn, include); @@ -488,7 +478,6 @@ public T findByNameOrNull(String fqn, Include include) { /** * Find method is used for getting an entity only with core fields stored as JSON without any relational fields set */ - @Transaction public T findByName(String fqn, Include include) { fqn = quoteFqn ? EntityInterfaceUtil.quoteName(fqn) : fqn; try { @@ -504,7 +493,6 @@ public T findByName(String fqn, Include include) { } } - @Transaction public final List listAll(Fields fields, ListFilter filter) { // forward scrolling, if after == null then first page is being asked List jsons = dao.listAfter(filter, Integer.MAX_VALUE, ""); @@ -517,7 +505,6 @@ public final List listAll(Fields fields, ListFilter filter) { return entities; } - @Transaction public ResultList listAfter(UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) { int total = dao.listCount(filter); List entities = new ArrayList<>(); @@ -545,7 +532,6 @@ public ResultList listAfter(UriInfo uriInfo, Fields fields, ListFilter filter } } - @Transaction public ResultList listAfterWithSkipFailure( UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String after) throws IOException { List errors = new ArrayList<>(); @@ -576,7 +562,6 @@ public ResultList listAfterWithSkipFailure( } } - @Transaction public ResultList listBefore(UriInfo uriInfo, Fields fields, ListFilter filter, int limitParam, String before) { // Reverse scrolling - Get one extra result used for computing before cursor List jsons = dao.listBefore(filter, limitParam + 1, RestUtil.decodeCursor(before)); @@ -599,7 +584,6 @@ public ResultList listBefore(UriInfo uriInfo, Fields fields, ListFilter filte return getResultList(entities, beforeCursor, afterCursor, total); } - @Transaction public T getVersion(UUID id, String version) { Double requestedVersion = Double.parseDouble(version); String extension = EntityUtil.getVersionExtension(entityType, requestedVersion); @@ -618,7 +602,6 @@ public T getVersion(UUID id, String version) { CatalogExceptionMessage.entityVersionNotFound(entityType, id, requestedVersion)); } - @Transaction public EntityHistory listVersions(UUID id) { T latest = setFieldsInternal(dao.findEntityById(id, ALL), putFields); String extensionPrefix = EntityUtil.getVersionExtensionPrefix(entityType); @@ -639,7 +622,6 @@ public final T create(UriInfo uriInfo, T entity) { return entity; } - @Transaction public final T createInternal(T entity) { prepareInternal(entity, false); return createNewEntity(entity); @@ -704,7 +686,6 @@ public final PutResponse createOrUpdate(UriInfo uriInfo, T updated) { return response; } - @Transaction public final PutResponse createOrUpdateInternal(UriInfo uriInfo, T updated) { T original = JsonUtils.readValue(dao.findJsonByFqn(updated.getFullyQualifiedName(), ALL), entityClass); if (original == null) { // If an original entity does not exist then create it, else update @@ -725,7 +706,6 @@ protected void postUpdate(T entity) { // For example ingestion pipeline creates a pipeline in AirFlow. } - @Transaction public PutResponse update(UriInfo uriInfo, T original, T updated) { // Get all the fields in the original entity that can be updated during PUT operation setFieldsInternal(original, putFields); @@ -743,7 +723,6 @@ public PutResponse update(UriInfo uriInfo, T original, T updated) { return new PutResponse<>(Status.OK, withHref(uriInfo, updated), change); } - @Transaction public final PatchResponse patch(UriInfo uriInfo, UUID id, String user, JsonPatch patch) { // Get all the fields in the original entity that can be updated during PATCH operation T original = setFieldsInternal(dao.findEntityById(id), patchFields); @@ -765,7 +744,6 @@ public final PatchResponse patch(UriInfo uriInfo, UUID id, String user, JsonP return new PatchResponse<>(Status.OK, withHref(uriInfo, updated), change); } - @Transaction public PutResponse addFollower(String updatedBy, UUID entityId, UUID userId) { // Get entity T entity = dao.findEntityById(entityId); @@ -798,7 +776,6 @@ public PutResponse addFollower(String updatedBy, UUID entityId, UUID userId) return new PutResponse<>(Status.OK, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); } - @Transaction public PutResponse updateVote(String updatedBy, UUID entityId, VoteRequest request) { T originalEntity = dao.findEntityById(entityId); @@ -892,7 +869,6 @@ private DeleteResponse delete(String deletedBy, T original, boolean recursive return new DeleteResponse<>(updated, changeType); } - @Transaction public final DeleteResponse deleteInternalByName( String updatedBy, String name, boolean recursive, boolean hardDelete) { // Validate entity @@ -900,9 +876,9 @@ public final DeleteResponse deleteInternalByName( return delete(updatedBy, entity, recursive, hardDelete); } - @Transaction public final DeleteResponse deleteInternal(String updatedBy, UUID id, boolean recursive, boolean hardDelete) { // Validate entity + T entity = dao.findEntityById(id, ALL); return delete(updatedBy, entity, recursive, hardDelete); } @@ -973,7 +949,6 @@ private void invalidate(T entity) { CACHE_WITH_NAME.invalidate(new ImmutablePair<>(entityType, entity.getFullyQualifiedName())); } - @Transaction public PutResponse deleteFollower(String updatedBy, UUID entityId, UUID userId) { T entity = find(entityId, NON_DELETED); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java index 20addc390302..d1eaf23a49d2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/FeedRepository.java @@ -48,7 +48,6 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.json.JSONObject; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.feed.CloseTask; @@ -57,7 +56,9 @@ import org.openmetadata.schema.api.feed.ThreadCount; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.entity.teams.User; +import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.EventType; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Post; import org.openmetadata.schema.type.Reaction; @@ -120,7 +121,6 @@ public enum PaginationType { AFTER } - @Transaction public int getNextTaskId() { dao.feedDAO().updateTaskId(); return dao.feedDAO().getTaskId(); @@ -130,7 +130,7 @@ public class ThreadContext { @Getter protected final Thread thread; @Getter @Setter protected final EntityLink about; @Getter @Setter protected EntityInterface aboutEntity; - @Getter private final EntityReference createdBy; + @Getter private EntityReference createdBy; ThreadContext(Thread thread) { this.thread = thread; @@ -140,6 +140,19 @@ public class ThreadContext { thread.withEntityId(aboutEntity.getId()); // Add entity id to thread } + ThreadContext(Thread thread, ChangeEvent event) { + this.thread = thread; + this.about = EntityLink.parse(thread.getAbout()); + if (event.getEventType().equals(EventType.ENTITY_DELETED)) { + String json = (String) event.getEntity(); + this.aboutEntity = JsonUtils.readValue(json, Entity.getEntityClassFromType(event.getEntityType())); + } else { + this.aboutEntity = Entity.getEntity(about, getFields(), ALL); + } + this.createdBy = Entity.getEntityReferenceByName(Entity.USER, thread.getCreatedBy(), NON_DELETED); + thread.withEntityId(aboutEntity.getId()); // Add entity id to thread + } + public TaskWorkflow getTaskWorkflow() { EntityRepository repository = Entity.getEntityRepository(about.getEntityType()); return repository.getTaskWorkflow(this); @@ -187,12 +200,20 @@ private ThreadContext getThreadContext(Thread thread) { return new ThreadContext(thread); } - @Transaction + private ThreadContext getThreadContext(Thread thread, ChangeEvent event) { + return new ThreadContext(thread, event); + } + public Thread create(Thread thread) { ThreadContext threadContext = getThreadContext(thread); return createThread(threadContext); } + public Thread create(Thread thread, ChangeEvent event) { + ThreadContext threadContext = getThreadContext(thread, event); + return createThread(threadContext); + } + public void store(ThreadContext threadContext) { // Insert a new thread dao.feedDAO().insert(JsonUtils.pojoToJson(threadContext.getThread())); @@ -229,7 +250,6 @@ public void storeRelationships(ThreadContext threadContext) { storeMentions(thread, thread.getMessage()); } - @Transaction private Thread createThread(ThreadContext threadContext) { Thread thread = threadContext.getThread(); if (thread.getType() == ThreadType.Task) { @@ -363,7 +383,6 @@ private void storeMentions(Thread thread, String message) { null)); } - @Transaction public Thread addPostToThread(String id, Post post, String userName) { // Validate the user posting the message UUID fromUserId = Entity.getEntityReferenceByName(USER, post.getFrom(), NON_DELETED).getId(); @@ -395,7 +414,6 @@ public Post getPostById(Thread thread, String postId) { return post.get(); } - @Transaction public DeleteResponse deletePost(Thread thread, Post post, String userName) { List posts = thread.getPosts(); // Remove the post to be deleted from the posts list @@ -410,7 +428,6 @@ public DeleteResponse deletePost(Thread thread, Post post, String userName return new DeleteResponse<>(post, RestUtil.ENTITY_DELETED); } - @Transaction public DeleteResponse deleteThread(Thread thread, String deletedByUser) { deleteThreadInternal(thread.getId().toString()); LOG.info("{} deleted thread with id {}", deletedByUser, thread.getId()); @@ -428,7 +445,6 @@ public void deleteThreadInternal(String id) { dao.feedDAO().delete(id); } - @Transaction public void deleteByAbout(UUID entityId) { List threadIds = listOrEmpty(dao.feedDAO().findByEntityId(entityId.toString())); for (String threadId : threadIds) { @@ -440,7 +456,6 @@ public void deleteByAbout(UUID entityId) { } } - @Transaction public ThreadCount getThreadsCount(FeedFilter filter, String link) { List> result; if (link == null) { @@ -498,7 +513,6 @@ public List listPosts(String threadId) { } /** List threads based on the filters and limits in the order of the updated timestamp. */ - @Transaction public ResultList list(FeedFilter filter, String link, int limitPosts, String userId, int limit) { int total; List threads; @@ -599,7 +613,6 @@ private void storeReactions(Thread thread, String user) { null); } - @Transaction public final PatchResponse patchPost(Thread thread, Post post, String user, JsonPatch patch) { // Apply JSON patch to the original post to get the updated post Post updated = JsonUtils.applyPatch(post, patch, Post.class); @@ -624,7 +637,6 @@ public final PatchResponse patchPost(Thread thread, Post post, String user return new PatchResponse<>(Status.OK, updated, change); } - @Transaction public final PatchResponse patchThread(UriInfo uriInfo, UUID id, String user, JsonPatch patch) { // Get all the fields in the original thread that can be updated during PATCH operation Thread original = get(id.toString()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 8ae0df328932..c035f8f35314 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -17,7 +17,6 @@ import java.util.UUID; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.json.JSONObject; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig; @@ -89,7 +88,6 @@ public void prepare(IngestionPipeline ingestionPipeline, boolean update) { ingestionPipeline.setService(entityReference); } - @Transaction public IngestionPipeline deletePipelineStatus(UUID ingestionPipelineId) { // Validate the request content IngestionPipeline ingestionPipeline = dao.findEntityById(ingestionPipelineId); @@ -168,7 +166,6 @@ private ChangeDescription addPipelineStatusChangeDescription(Double version, Obj return change; } - @Transaction public RestUtil.PutResponse addPipelineStatus(UriInfo uriInfo, String fqn, PipelineStatus pipelineStatus) { // Validate the request content IngestionPipeline ingestionPipeline = dao.findEntityByName(fqn); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/KpiRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/KpiRepository.java index 36607c25f56b..538f1dfa905b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/KpiRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/KpiRepository.java @@ -8,7 +8,6 @@ import java.util.Map; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.dataInsight.ChartParameterValues; import org.openmetadata.schema.dataInsight.DataInsightChart; @@ -99,7 +98,6 @@ public void storeRelationships(Kpi kpi) { addRelationship(kpi.getId(), kpi.getDataInsightChart().getId(), KPI, DATA_INSIGHT_CHART, Relationship.USES); } - @Transaction public RestUtil.PutResponse addKpiResult(UriInfo uriInfo, String fqn, KpiResult kpiResult) { // Validate the request content Kpi kpi = dao.findEntityByName(fqn); @@ -115,7 +113,6 @@ public RestUtil.PutResponse addKpiResult(UriInfo uriInfo, String fqn, KpiResu return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); } - @Transaction public RestUtil.PutResponse deleteKpiResult(String fqn, Long timestamp) { // Validate the request content Kpi kpi = dao.findEntityByName(fqn); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index e0ffd725bce7..ad72c7d49a09 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.ColumnsEntityInterface; import org.openmetadata.schema.api.lineage.AddLineage; import org.openmetadata.schema.entity.data.Table; @@ -40,19 +39,16 @@ public LineageRepository(CollectionDAO dao) { this.dao = dao; } - @Transaction public EntityLineage get(String entityType, String id, int upstreamDepth, int downstreamDepth) { EntityReference ref = Entity.getEntityReferenceById(entityType, UUID.fromString(id), Include.NON_DELETED); return getLineage(ref, upstreamDepth, downstreamDepth); } - @Transaction public EntityLineage getByName(String entityType, String fqn, int upstreamDepth, int downstreamDepth) { EntityReference ref = Entity.getEntityReferenceByName(entityType, fqn, Include.NON_DELETED); return getLineage(ref, upstreamDepth, downstreamDepth); } - @Transaction public void addLineage(AddLineage addLineage) { // Validate from entity EntityReference from = addLineage.getEdge().getFromEntity(); @@ -121,7 +117,6 @@ private boolean areValidEntities(EntityReference from, EntityReference to) { || !(to.getType().equals(Entity.TABLE) || to.getType().equals(Entity.DASHBOARD_DATA_MODEL)); } - @Transaction public boolean deleteLineage(String fromEntity, String fromId, String toEntity, String toId) { // Validate from entity EntityReference from = Entity.getEntityReferenceById(fromEntity, UUID.fromString(fromId), Include.NON_DELETED); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index 3c651206fafc..e038b4fd8e1a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.Pipeline; @@ -145,7 +144,6 @@ private PipelineStatus getPipelineStatus(Pipeline pipeline) { PipelineStatus.class); } - @Transaction public Pipeline addPipelineStatus(String fqn, PipelineStatus pipelineStatus) { // Validate the request content Pipeline pipeline = daoCollection.pipelineDAO().findEntityByName(fqn); @@ -166,7 +164,6 @@ public Pipeline addPipelineStatus(String fqn, PipelineStatus pipelineStatus) { return pipeline.withPipelineStatus(pipelineStatus); } - @Transaction public Pipeline deletePipelineStatus(String fqn, Long timestamp) { // Validate the request content Pipeline pipeline = dao.findEntityByName(fqn); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java index 9d2e1bce23f5..b1e3c3f9cf77 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java @@ -4,7 +4,6 @@ import java.util.List; import java.util.UUID; import javax.ws.rs.core.Response; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.analytics.ReportData.ReportDataType; import org.openmetadata.service.util.JsonUtils; @@ -19,7 +18,6 @@ public ReportDataRepository(CollectionDAO dao) { this.daoCollection = dao; } - @Transaction public Response addReportData(ReportData reportData) { reportData.setId(UUID.randomUUID()); daoCollection diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/RoleRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/RoleRepository.java index b68f4ff7929d..3added2782c8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/RoleRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/RoleRepository.java @@ -22,7 +22,6 @@ import java.util.List; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.entity.teams.Role; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Relationship; @@ -89,7 +88,6 @@ public void prepare(Role role, boolean update) { *

This method ensures that the role and its policy are stored correctly. */ @Override - @Transaction public void storeEntity(Role role, boolean update) { // Don't store policy. Build it on the fly based on relationships List policies = role.getPolicies(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java index d4d304ba0733..446bc1b58c66 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java @@ -32,7 +32,6 @@ import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.SearchIndex; @@ -167,7 +166,6 @@ public SearchIndex getSampleData(UUID searchIndexId, boolean authorizePII) { return searchIndex; } - @Transaction public SearchIndex addSampleData(UUID searchIndexId, SearchIndexSampleData sampleData) { // Validate the request content SearchIndex searchIndex = daoCollection.searchIndexDAO().findEntityById(searchIndexId); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index ee615093d4c1..bc86040f7da5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -42,7 +42,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.data.CreateTableProfile; @@ -181,7 +180,6 @@ public void setFullyQualifiedName(Table table) { ColumnUtil.setColumnFQN(table.getFullyQualifiedName(), table.getColumns()); } - @Transaction public Table addJoins(UUID tableId, TableJoins joins) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -213,7 +211,6 @@ public Table addJoins(UUID tableId, TableJoins joins) { return table.withJoins(getJoins(table)); } - @Transaction public Table addLifeCycle(String fqn, LifeCycle lifeCycle) { // Validate the request content Table table = daoCollection.tableDAO().findEntityByName(fqn); @@ -261,7 +258,6 @@ public Table addLifeCycle(String fqn, LifeCycle lifeCycle) { return table.withLifeCycle(currentLifeCycle); } - @Transaction public Table addSampleData(UUID tableId, TableData tableData) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -286,7 +282,6 @@ public Table addSampleData(UUID tableId, TableData tableData) { return table.withSampleData(tableData); } - @Transaction public Table getSampleData(UUID tableId, boolean authorizePII) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -308,7 +303,6 @@ public Table getSampleData(UUID tableId, boolean authorizePII) { return table; } - @Transaction public Table deleteSampleData(UUID tableId) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -318,14 +312,12 @@ public Table deleteSampleData(UUID tableId) { return table; } - @Transaction public TableProfilerConfig getTableProfilerConfig(Table table) { return JsonUtils.readValue( daoCollection.entityExtensionDAO().getExtension(table.getId().toString(), TABLE_PROFILER_CONFIG_EXTENSION), TableProfilerConfig.class); } - @Transaction public TestSuite getTestSuite(Table table) { List entityRelationshipRecords = daoCollection.relationshipDAO().findTo(table.getId().toString(), TABLE, Relationship.CONTAINS.ordinal()); @@ -338,7 +330,6 @@ public TestSuite getTestSuite(Table table) { : null; } - @Transaction public Table addTableProfilerConfig(UUID tableId, TableProfilerConfig tableProfilerConfig) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -371,7 +362,6 @@ public Table addTableProfilerConfig(UUID tableId, TableProfilerConfig tableProfi return table.withTableProfilerConfig(tableProfilerConfig); } - @Transaction public Table deleteTableProfilerConfig(UUID tableId) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -401,7 +391,6 @@ private Column getColumnNameForProfiler(List columnList, ColumnProfile c return null; } - @Transaction public Table addTableProfileData(UUID tableId, CreateTableProfile createTableProfile) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -458,7 +447,6 @@ public Table addTableProfileData(UUID tableId, CreateTableProfile createTablePro return table.withProfile(createTableProfile.getTableProfile()); } - @Transaction public void deleteTableProfile(String fqn, String entityType, Long timestamp) { // Validate the request content String extension; @@ -485,7 +473,6 @@ public void deleteTableProfile(String fqn, String entityType, Long timestamp) { daoCollection.profilerDataTimeSeriesDao().deleteAtTimestamp(fqn, extension, timestamp); } - @Transaction public ResultList getTableProfiles(String fqn, Long startTs, Long endTs) { List tableProfiles; tableProfiles = @@ -498,7 +485,6 @@ public ResultList getTableProfiles(String fqn, Long startTs, Long return new ResultList<>(tableProfiles, startTs.toString(), endTs.toString(), tableProfiles.size()); } - @Transaction public ResultList getColumnProfiles(String fqn, Long startTs, Long endTs) { List columnProfiles; columnProfiles = @@ -511,7 +497,6 @@ public ResultList getColumnProfiles(String fqn, Long startTs, Lon return new ResultList<>(columnProfiles, startTs.toString(), endTs.toString(), columnProfiles.size()); } - @Transaction public ResultList getSystemProfiles(String fqn, Long startTs, Long endTs) { List systemProfiles; systemProfiles = @@ -539,7 +524,6 @@ private void setColumnProfile(List columnList) { } } - @Transaction public Table getLatestTableProfile(String fqn, boolean authorizePII) { Table table = dao.findEntityByName(fqn, ALL); TableProfile tableProfile = @@ -560,7 +544,6 @@ public Table getLatestTableProfile(String fqn, boolean authorizePII) { return table; } - @Transaction public Table addCustomMetric(UUID tableId, CustomMetric customMetric) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -598,7 +581,6 @@ public Table addCustomMetric(UUID tableId, CustomMetric customMetric) { return table; } - @Transaction public Table deleteCustomMetric(UUID tableId, String columnName, String metricName) { // Validate the request content Table table = dao.findEntityById(tableId); @@ -633,7 +615,6 @@ public Table deleteCustomMetric(UUID tableId, String columnName, String metricNa return table; } - @Transaction public Table addDataModel(UUID tableId, DataModel dataModel) { Table table = dao.findEntityById(tableId); table.withDataModel(dataModel); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java index ab034fa64ab5..70345c56bb1c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseRepository.java @@ -18,7 +18,6 @@ import javax.json.JsonPatch; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.tests.ResultSummary; import org.openmetadata.schema.tests.TestCase; @@ -50,8 +49,8 @@ public class TestCaseRepository extends EntityRepository { private static final String TEST_SUITE_FIELD = "testSuite"; private static final String TEST_CASE_RESULT_FIELD = "testCaseResult"; public static final String COLLECTION_PATH = "/v1/dataQuality/testCases"; - private static final String UPDATE_FIELDS = "entityLink,testSuite,testDefinition"; - private static final String PATCH_FIELDS = "entityLink,testSuite,testDefinition"; + private static final String UPDATE_FIELDS = "owner,entityLink,testSuite,testDefinition"; + private static final String PATCH_FIELDS = "owner,entityLink,testSuite,testDefinition"; public static final String TESTCASE_RESULT_EXTENSION = "testCase.testCaseResult"; public TestCaseRepository(CollectionDAO dao) { @@ -196,7 +195,6 @@ public void storeRelationships(TestCase test) { test.getTestDefinition().getId(), test.getId(), TEST_DEFINITION, TEST_CASE, Relationship.APPLIED_TO); } - @Transaction public RestUtil.PutResponse addTestCaseResult( String updatedBy, UriInfo uriInfo, String fqn, TestCaseResult testCaseResult) { // Validate the request content @@ -219,7 +217,6 @@ public RestUtil.PutResponse addTestCaseResult( return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); } - @Transaction public RestUtil.PutResponse deleteTestCaseResult(String updatedBy, String fqn, Long timestamp) { // Validate the request content TestCase testCase = dao.findEntityByName(fqn); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestTransactionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestTransactionRepository.java new file mode 100644 index 000000000000..9b56f727459d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestTransactionRepository.java @@ -0,0 +1,61 @@ +package org.openmetadata.service.jdbi3; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.Date; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.type.DailyCount; +import org.openmetadata.schema.type.UsageDetails; +import org.openmetadata.service.util.FullyQualifiedName; +import org.openmetadata.service.util.RestUtil; + +@Slf4j +public class TestTransactionRepository { + private static final String FAILED_TO_UPDATE_SETTINGS = "Failed to Update Settings"; + public static final String INTERNAL_SERVER_ERROR_WITH_REASON = "Internal Server Error. Reason :"; + private final CollectionDAO dao; + + public TestTransactionRepository(CollectionDAO dao) { + this.dao = dao; + } + + public Table createOrUpdateTableWithTransaction(Table table) throws IOException { + dao.tableDAO().insert(table, FullyQualifiedName.buildHash(table.getFullyQualifiedName())); + return getTable(table.getId()); + } + + public Table updateTableWithTransaction(Table table) throws JsonProcessingException { + dao.tableDAO().update(table); + return table; + } + + public Table updateTableWithTransactionWithError(Table table, int count) throws JsonProcessingException { + dao.tableDAO().update(table); + return table; + } + + public void updateUsageStatsWithTransaction(Table table, int count) { + String today = RestUtil.DATE_FORMAT.format(new Date()); + DailyCount dailyCount = new DailyCount().withCount(count).withDate(today); + dao.usageDAO().insertOrReplaceCount(dailyCount.getDate(), table.getId().toString(), "Table", dailyCount.getCount()); + } + + public void updateUsageStatsWithTransactionWithError(Table table, int count) { + throw new IllegalArgumentException("Rollback Transaction"); + } + + public Table createOrUpdateTableWithJdbiUnitOfWork(Table table) throws JsonProcessingException { + dao.tableDAO().insert(table, FullyQualifiedName.buildHash(table.getFullyQualifiedName())); + return table; + } + + public Table getTable(UUID id) throws IOException { + return dao.tableDAO().findEntityById(id); + } + + public UsageDetails getUsage(UUID id) throws IOException { + return dao.usageDAO().getLatestUsage(id.toString()); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index d901e11e7644..256c91bffde6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -30,7 +30,6 @@ import java.util.function.BiPredicate; import java.util.function.Function; import java.util.stream.Collectors; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.api.feed.ResolveTask; import org.openmetadata.schema.entity.data.Topic; @@ -161,7 +160,6 @@ public Topic getSampleData(UUID topicId, boolean authorizePII) { return topic; } - @Transaction public Topic addSampleData(UUID topicId, TopicSampleData sampleData) { // Validate the request content Topic topic = daoCollection.topicDAO().findEntityById(topicId); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java index 337908ce23bd..77baaf334da0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java @@ -28,7 +28,6 @@ import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.Chart; import org.openmetadata.schema.entity.data.Dashboard; @@ -58,47 +57,40 @@ public UsageRepository(CollectionDAO dao) { this.dao = dao; } - @Transaction public EntityUsage get(String entityType, String id, String date, int days) { EntityReference ref = Entity.getEntityReferenceById(entityType, UUID.fromString(id), Include.NON_DELETED); List usageDetails = dao.usageDAO().getUsageById(id, date, days - 1); return new EntityUsage().withUsage(usageDetails).withEntity(ref); } - @Transaction public EntityUsage getByName(String entityType, String fqn, String date, int days) { EntityReference ref = Entity.getEntityReferenceByName(entityType, fqn, Include.NON_DELETED); List usageDetails = dao.usageDAO().getUsageById(ref.getId().toString(), date, days - 1); return new EntityUsage().withUsage(usageDetails).withEntity(ref); } - @Transaction public RestUtil.PutResponse create(String entityType, String id, DailyCount usage) { // Validate data entity for which usage is being collected Entity.getEntityReferenceById(entityType, UUID.fromString(id), Include.NON_DELETED); return addUsage(POST, entityType, id, usage); } - @Transaction public RestUtil.PutResponse createByName(String entityType, String fullyQualifiedName, DailyCount usage) { EntityReference ref = Entity.getEntityReferenceByName(entityType, fullyQualifiedName, Include.NON_DELETED); return addUsage(POST, entityType, ref.getId().toString(), usage); } - @Transaction public RestUtil.PutResponse createOrUpdate(String entityType, UUID id, DailyCount usage) { // Validate data entity for which usage is being collected Entity.getEntityReferenceById(entityType, id, Include.NON_DELETED); return addUsage(PUT, entityType, id.toString(), usage); } - @Transaction public RestUtil.PutResponse createOrUpdateByName(String entityType, String fullyQualifiedName, DailyCount usage) { EntityReference ref = Entity.getEntityReferenceByName(entityType, fullyQualifiedName, Include.NON_DELETED); return addUsage(PUT, entityType, ref.getId().toString(), usage); } - @Transaction public void computePercentile(String entityType, String date) { dao.usageDAO().computePercentile(entityType, date); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WebAnalyticEventRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WebAnalyticEventRepository.java index f02580275848..7b35ababb663 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WebAnalyticEventRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WebAnalyticEventRepository.java @@ -5,7 +5,6 @@ import java.util.List; import java.util.UUID; import javax.ws.rs.core.Response; -import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.analytics.WebAnalyticEvent; import org.openmetadata.schema.analytics.WebAnalyticEventData; import org.openmetadata.schema.analytics.type.WebAnalyticEventType; @@ -46,7 +45,6 @@ public void storeRelationships(WebAnalyticEvent entity) { // No relationships to store beyond what is stored in the super class } - @Transaction public Response addWebAnalyticEventData(WebAnalyticEventData webAnalyticEventData) { webAnalyticEventData.setEventId(UUID.randomUUID()); storeTimeSeries( @@ -58,7 +56,6 @@ public Response addWebAnalyticEventData(WebAnalyticEventData webAnalyticEventDat return Response.ok(webAnalyticEventData).build(); } - @Transaction public void deleteWebAnalyticEventData(WebAnalyticEventType name, Long timestamp) { deleteExtensionBeforeTimestamp(name.value(), WEB_ANALYTICS_EVENT_DATA_EXTENSION, timestamp); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java new file mode 100644 index 000000000000..afd185683c6c --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/HttpGetRequestJdbiUnitOfWorkEventListener.java @@ -0,0 +1,24 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import lombok.extern.slf4j.Slf4j; +import org.glassfish.jersey.server.monitoring.RequestEvent; +import org.glassfish.jersey.server.monitoring.RequestEventListener; + +@Slf4j +class HttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener { + + private final JdbiTransactionAspect transactionAspect; + + HttpGetRequestJdbiUnitOfWorkEventListener(JdbiHandleManager handleManager) { + this.transactionAspect = new JdbiTransactionAspect(handleManager); + } + + @Override + public void onEvent(RequestEvent event) { + RequestEvent.Type type = event.getType(); + LOG.debug("Handling GET Request Event {} {}", type, Thread.currentThread().getId()); + if (type == RequestEvent.Type.FINISHED) { + transactionAspect.terminateHandle(); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java new file mode 100644 index 000000000000..46ac9f5aee93 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiHandleManager.java @@ -0,0 +1,18 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import java.util.concurrent.ThreadFactory; +import org.jdbi.v3.core.Handle; + +public interface JdbiHandleManager { + Handle get(); + + void clear(); + + default ThreadFactory createThreadFactory() { + throw new UnsupportedOperationException("Thread factory creation is not supported"); + } + + default String getConversationId() { + return String.valueOf(Thread.currentThread().getId()); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionAspect.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionAspect.java new file mode 100644 index 000000000000..6cbcccc648c3 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiTransactionAspect.java @@ -0,0 +1,85 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Handles; + +@Slf4j +public class JdbiTransactionAspect { + private final JdbiHandleManager handleManager; + private final Set IN_TRANSACTION_HANDLES = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public JdbiTransactionAspect(JdbiHandleManager handleManager) { + this.handleManager = handleManager; + } + + public void begin(boolean autoCommit) { + try { + Handle handle = handleManager.get(); + handle.getConnection().setAutoCommit(autoCommit); + handle.getConfig(Handles.class).setForceEndTransactions(false); + handle.begin(); + IN_TRANSACTION_HANDLES.add(handle.hashCode()); + LOG.debug( + "Begin Transaction Thread Id [{}] has handle id [{}] Transaction {} Level {}", + Thread.currentThread().getId(), + handle.hashCode(), + handle.isInTransaction(), + handle.getTransactionIsolationLevel()); + } catch (Exception ex) { + terminateHandle(); + } + } + + public void commit() { + Handle handle = handleManager.get(); + if (handle == null) { + LOG.debug( + "Handle was found to be null during commit for Thread Id [{}]. It might have already been closed", + Thread.currentThread().getId()); + return; + } + try { + handle.getConnection().commit(); + LOG.debug( + "Performing commit Thread Id [{}] has handle id [{}] Transaction {} Level {}", + Thread.currentThread().getId(), + handle.hashCode(), + handle.isInTransaction(), + handle.getTransactionIsolationLevel()); + } catch (Exception ex) { + rollback(); + } + } + + public void rollback() { + if (IN_TRANSACTION_HANDLES.contains(handleManager.get().hashCode())) { + Handle handle = handleManager.get(); + if (handle == null) { + LOG.debug("Handle was found to be null during rollback for [{}]", Thread.currentThread().getId()); + return; + } + try { + handle.getConnection().rollback(); + LOG.debug( + "Performed rollback on Thread Id [{}] has handle id [{}] Transaction {} Level {}", + Thread.currentThread().getId(), + handle.hashCode(), + handle.isInTransaction(), + handle.getTransactionIsolationLevel()); + } catch (Exception e) { + LOG.debug("Failed to rollback transaction due to", e); + } finally { + terminateHandle(); + } + } + } + + public void terminateHandle() { + IN_TRANSACTION_HANDLES.remove(handleManager.get().hashCode()); + handleManager.clear(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java new file mode 100644 index 000000000000..b6066bf831d0 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkApplicationEventListener.java @@ -0,0 +1,40 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import java.util.Set; +import javax.annotation.Nullable; +import javax.ws.rs.HttpMethod; +import lombok.extern.slf4j.Slf4j; +import org.glassfish.jersey.server.monitoring.ApplicationEvent; +import org.glassfish.jersey.server.monitoring.ApplicationEventListener; +import org.glassfish.jersey.server.monitoring.RequestEvent; +import org.glassfish.jersey.server.monitoring.RequestEventListener; + +@Slf4j +public class JdbiUnitOfWorkApplicationEventListener implements ApplicationEventListener { + + private final JdbiUnitOfWorkProvider unitOfWorkProvider; + private final Set excludedPaths; + + public JdbiUnitOfWorkApplicationEventListener(JdbiUnitOfWorkProvider unitOfWorkProvider, Set excludedPaths) { + this.unitOfWorkProvider = unitOfWorkProvider; + this.excludedPaths = excludedPaths; + } + + @Override + public void onEvent(ApplicationEvent event) { + LOG.debug("Received Application event {}", event.getType()); + } + + @Override + @Nullable + public RequestEventListener onRequest(RequestEvent event) { + String path = event.getUriInfo().getPath(); + if (excludedPaths.stream().anyMatch(path::contains)) { + return null; + } + if (event.getContainerRequest().getMethod().equals(HttpMethod.GET)) { + return new HttpGetRequestJdbiUnitOfWorkEventListener(unitOfWorkProvider.getHandleManager()); + } + return new NonHttpGetRequestJdbiUnitOfWorkEventListener(unitOfWorkProvider.getHandleManager()); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java new file mode 100644 index 000000000000..5268422c175f --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/JdbiUnitOfWorkProvider.java @@ -0,0 +1,52 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import com.google.common.reflect.Reflection; +import java.util.*; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Jdbi; + +@Slf4j +@SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"}) +public class JdbiUnitOfWorkProvider { + + private final JdbiHandleManager handleManager; + + private JdbiUnitOfWorkProvider(JdbiHandleManager handleManager) { + this.handleManager = handleManager; + } + + public static JdbiUnitOfWorkProvider withDefault(Jdbi dbi) { + JdbiHandleManager handleManager = new RequestScopedJdbiHandleManager(dbi); + return new JdbiUnitOfWorkProvider(handleManager); + } + + public static JdbiUnitOfWorkProvider withLinked(Jdbi dbi) { + JdbiHandleManager handleManager = new LinkedRequestScopedJdbiHandleManager(dbi); + return new JdbiUnitOfWorkProvider(handleManager); + } + + public JdbiHandleManager getHandleManager() { + return handleManager; + } + + /** + * getWrappedInstanceForDaoClass generates a proxy instance of the dao class for which the jdbi unit of work aspect + * would be wrapped around with. This method however may be used in case the classpath scanning is disabled. If the + * original class is null or contains no relevant JDBI annotations, this method throws an exception + * + * @param daoClass the DAO class for which a proxy needs to be created fo + * @return the wrapped instance ready to be passed around + */ + public static Object getWrappedInstanceForDaoClass(JdbiUnitOfWorkProvider provider, Class daoClass) { + if (daoClass == null) { + throw new IllegalArgumentException("DAO Class cannot be null"); + } + LOG.debug( + "Binding class [{}] with proxy handler [{}] ", + daoClass.getSimpleName(), + provider.getHandleManager().getClass().getSimpleName()); + ManagedHandleInvocationHandler handler = new ManagedHandleInvocationHandler<>(provider, daoClass); + Object proxiedInstance = Reflection.newProxy(daoClass, handler); + return daoClass.cast(proxiedInstance); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java new file mode 100644 index 000000000000..7a86198ea122 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/LinkedRequestScopedJdbiHandleManager.java @@ -0,0 +1,95 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; + +/** + * This implementation provides a handle scoped to a thread and all other threads Y spawned from X All Y threads must + * follow a particular name format extracted from the conversation id This is one of the ways the manager can know of + * the grouping and establish re-usability of handles across such grouped threads.
+ *
+ * It can be used to service requests where only a single handle instance has to be used by multiple threads that are + * spawned with the specified name format from an initial thread. Use this only when you have complete control over the + * threads you create. The threads must not run once the parent thread is returned to the pool or else the handles will + * be invalid or in other words parent thread must block on the results of children.
+ * It relies on the fact that the {@code Jdbi.Handle} is inherently thread safe and can be used to service dao requests + * between multiple threads. Note: Not suitable when you can not set the name format for the newly spawned threads. + */ +@Slf4j +class LinkedRequestScopedJdbiHandleManager implements JdbiHandleManager { + + private final Map parentThreadHandleMap = new ConcurrentHashMap<>(); + private final Jdbi dbi; + + public LinkedRequestScopedJdbiHandleManager(Jdbi dbi) { + this.dbi = dbi; + } + + @Override + public Handle get() { + String parent = substringBetween(Thread.currentThread().getName()); + Handle handle; + if (parent == null) { + handle = getHandle(); + LOG.debug("Owner of handle [{}] : Parent Thread Id [{}]", handle.hashCode(), Thread.currentThread().getId()); + + } else { + handle = parentThreadHandleMap.get(parent); + if (handle == null) { + throw new IllegalStateException( + String.format( + "Handle to be reused in child thread [%s] is null for parent thread [%s]", + Thread.currentThread().getId(), parent)); + } + LOG.debug("Reusing parent thread handle [{}] for [{}]", handle.hashCode(), Thread.currentThread().getId()); + } + return handle; + } + + @Override + public void clear() { + String parent = getConversationId(); + Handle handle = parentThreadHandleMap.get(parent); + if (handle != null) { + handle.close(); + LOG.debug("Closed handle Thread Id [{}] has handle id [{}]", Thread.currentThread().getId(), handle.hashCode()); + + parentThreadHandleMap.remove(parent); + LOG.debug("Clearing handle member for parent thread [{}] ", Thread.currentThread().getId()); + } + } + + @Override + public ThreadFactory createThreadFactory() { + String threadName = String.format("[%s]-%%d", getConversationId()); + return new ThreadFactoryBuilder().setNameFormat(threadName).build(); + } + + private Handle getHandle() { + String threadIdentity = getConversationId(); + if (parentThreadHandleMap.containsKey(threadIdentity)) { + return parentThreadHandleMap.get(threadIdentity); + } + Handle handle = dbi.open(); + parentThreadHandleMap.putIfAbsent(threadIdentity, handle); + return handle; + } + + @Nullable + private String substringBetween(String threadName) { + final int start = threadName.indexOf("["); + if (start != -1) { + final int end = threadName.indexOf("]", start + "[".length()); + if (end != -1) { + return threadName.substring(start + "[".length(), end); + } + } + return null; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java new file mode 100644 index 000000000000..dc24310e1a54 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/ManagedHandleInvocationHandler.java @@ -0,0 +1,68 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.sqlobject.CreateSqlObject; +import org.openmetadata.service.jdbi3.CollectionDAO; + +@Slf4j +public class ManagedHandleInvocationHandler implements InvocationHandler { + private static final Object[] NO_ARGS = {}; + private final Class underlying; + private final JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider; + + public ManagedHandleInvocationHandler(JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider, Class underlying) { + this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider; + this.underlying = underlying; + } + + /** + * {@inheritDoc} + * + *

    + *
  • {@code proxy.toString()} delegates to {@link ManagedHandleInvocationHandler#toString} + *
  • other method calls are dispatched to {@link ManagedHandleInvocationHandler#handleInvocation}. + *
+ */ + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (args == null) { + args = NO_ARGS; + } + if (args.length == 0 && method.getName().equals("toString")) { + return toString(); + } + return handleInvocation(method, args); + } + + private Object handleInvocation(Method method, Object[] args) throws Throwable { + Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get(); + LOG.debug( + "{}.{} [{}] Thread Id [{}] with handle id [{}]", + method.getDeclaringClass().getSimpleName(), + method.getName(), + underlying.getSimpleName(), + Thread.currentThread().getId(), + handle.hashCode()); + + if (CollectionDAO.class.isAssignableFrom(underlying) && method.isAnnotationPresent(CreateSqlObject.class)) { + return getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, method.getReturnType()); + } else { + Object dao = handle.attach(underlying); + try { + return method.invoke(dao, args); + } catch (Exception ex) { + throw ex.getCause(); + } + } + } + + @Override + public String toString() { + return "Proxy[" + underlying.getSimpleName() + "]"; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java new file mode 100644 index 000000000000..6ac779c9cfef --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/NonHttpGetRequestJdbiUnitOfWorkEventListener.java @@ -0,0 +1,44 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import javax.ws.rs.HttpMethod; +import lombok.extern.slf4j.Slf4j; +import org.glassfish.jersey.server.monitoring.RequestEvent; +import org.glassfish.jersey.server.monitoring.RequestEventListener; + +@Slf4j +class NonHttpGetRequestJdbiUnitOfWorkEventListener implements RequestEventListener { + + private final JdbiTransactionAspect transactionAspect; + + NonHttpGetRequestJdbiUnitOfWorkEventListener(JdbiHandleManager handleManager) { + this.transactionAspect = new JdbiTransactionAspect(handleManager); + } + + @Override + public void onEvent(RequestEvent event) { + RequestEvent.Type type = event.getType(); + String httpMethod = event.getContainerRequest().getMethod(); + + LOG.debug("Handling {} Request Event {} {}", httpMethod, type, Thread.currentThread().getId()); + boolean isTransactional = isTransactional(event); + if (isTransactional) { + if (type == RequestEvent.Type.RESOURCE_METHOD_START) { + transactionAspect.begin(false); + } else if (type == RequestEvent.Type.RESP_FILTERS_START) { + transactionAspect.commit(); + } else if (type == RequestEvent.Type.ON_EXCEPTION) { + transactionAspect.rollback(); + } else if (type == RequestEvent.Type.FINISHED) { + transactionAspect.terminateHandle(); + } + } + } + + private boolean isTransactional(RequestEvent event) { + String httpMethod = event.getContainerRequest().getMethod(); + return httpMethod.equals(HttpMethod.POST) + || httpMethod.equals(HttpMethod.PUT) + || httpMethod.equals(HttpMethod.PATCH) + || httpMethod.equals(HttpMethod.DELETE); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java new file mode 100644 index 000000000000..27ffb1d907bd --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/unitofwork/RequestScopedJdbiHandleManager.java @@ -0,0 +1,40 @@ +package org.openmetadata.service.jdbi3.unitofwork; + +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; + +@Slf4j +class RequestScopedJdbiHandleManager implements JdbiHandleManager { + + private final Jdbi dbi; + + @SuppressWarnings("ThreadLocalUsage") + private final ThreadLocal threadLocal = new ThreadLocal<>(); + + public RequestScopedJdbiHandleManager(Jdbi dbi) { + this.dbi = dbi; + } + + @Override + public Handle get() { + if (threadLocal.get() == null) { + threadLocal.set(dbi.open()); + } + Handle handle = threadLocal.get(); + LOG.debug("handle [{}] : Thread Id [{}]", handle.hashCode(), Thread.currentThread().getId()); + return handle; + } + + @Override + public void clear() { + Handle handle = threadLocal.get(); + if (handle != null) { + handle.close(); + LOG.debug("Closed handle Thread Id [{}] has handle id [{}]", Thread.currentThread().getId(), handle.hashCode()); + + threadLocal.remove(); + LOG.debug("Clearing handle member for thread [{}] ", Thread.currentThread().getId()); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java index 26d5984d29c6..8f91be4729fe 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/CollectionRegistry.java @@ -40,6 +40,7 @@ import org.openmetadata.schema.type.CollectionInfo; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.security.auth.AuthenticatorHandler; import org.reflections.Reflections; @@ -158,8 +159,10 @@ public static void addTestResource(Object testResource) { /** Register resources from CollectionRegistry */ public void registerResources( Jdbi jdbi, + JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider, Environment environment, OpenMetadataApplicationConfig config, + CollectionDAO daoObject, Authorizer authorizer, AuthenticatorHandler authenticatorHandler) { // Build list of ResourceDescriptors @@ -167,7 +170,7 @@ public void registerResources( CollectionDetails details = e.getValue(); String resourceClass = details.resourceClass; try { - Object resource = createResource(jdbi, resourceClass, config, authorizer, authenticatorHandler); + Object resource = createResource(jdbi, resourceClass, daoObject, config, authorizer, authenticatorHandler); details.setResource(resource); environment.jersey().register(resource); LOG.info("Registering {} with order {}", resourceClass, details.order); @@ -231,6 +234,7 @@ private static List getCollections() { private static Object createResource( Jdbi jdbi, String resourceClass, + CollectionDAO daoObject, OpenMetadataApplicationConfig config, Authorizer authorizer, AuthenticatorHandler authHandler) @@ -238,7 +242,6 @@ private static Object createResource( InstantiationException { // Decorate Collection DAO - CollectionDAO daoObject = jdbi.onDemand(CollectionDAO.class); Objects.requireNonNull(daoObject, "CollectionDAO must not be null"); Object resource = null; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/EventResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/EventResource.java index b9c6772710a4..35447fa49961 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/EventResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/events/EventResource.java @@ -50,7 +50,7 @@ @Consumes(MediaType.APPLICATION_JSON) @Collection(name = "events") public class EventResource { - @Getter private final ChangeEventRepository dao; + @Getter private final ChangeEventRepository repository; public static class EventList extends ResultList { @@ -64,7 +64,7 @@ public EventList(List data, String beforeCursor, String afterCursor public EventResource(CollectionDAO dao, Authorizer authorizer) { Objects.requireNonNull(dao, "ChangeEventRepository must not be null"); - this.dao = new ChangeEventRepository(dao); + this.repository = new ChangeEventRepository(dao); } @GET @@ -126,7 +126,7 @@ public ResultList get( List entityRestoredList = EntityList.getEntityList("entityRestored", entityRestored); List entityDeletedList = EntityList.getEntityList("entityDeleted", entityDeleted); List events = - dao.list(timestamp, entityCreatedList, entityUpdatedList, entityRestoredList, entityDeletedList); + repository.list(timestamp, entityCreatedList, entityUpdatedList, entityRestoredList, entityDeletedList); events.sort(EntityUtil.compareChangeEvent); // Sort change events based on time return new EventList(events, null, null, events.size()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/TestTransactionResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/TestTransactionResource.java new file mode 100644 index 000000000000..0a45865e5e6d --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/TestTransactionResource.java @@ -0,0 +1,237 @@ +package org.openmetadata.service.resources.system; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.swagger.v3.oas.annotations.Hidden; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.io.IOException; +import java.util.Objects; +import java.util.UUID; +import javax.validation.Valid; +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.sqlobject.transaction.Transaction; +import org.openmetadata.schema.api.data.CreateTable; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.type.UsageDetails; +import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.TestTransactionRepository; +import org.openmetadata.service.resources.Collection; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.util.RestUtil; + +@Path("/v1/system/testtransactions") +@Tag(name = "System", description = "APIs related to System configuration and settings.") +@Hidden +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +@Collection(name = "system") +@Slf4j +public class TestTransactionResource { + public static final String COLLECTION_PATH = "/v1/util"; + private final TestTransactionRepository testTransactionRepository; + private OpenMetadataApplicationConfig applicationConfig; + + public TestTransactionResource(CollectionDAO dao, Authorizer authorizer) { + Objects.requireNonNull(dao, "SystemRepository must not be null"); + this.testTransactionRepository = new TestTransactionRepository(dao); + } + + @SuppressWarnings("unused") // Method used for reflection + public void initialize(OpenMetadataApplicationConfig config) { + this.applicationConfig = config; + } + + @PUT + @Path("/createwithtransactions") + @Operation( + operationId = "createOrUpdate", + summary = "Update setting", + description = "Update existing settings", + responses = { + @ApiResponse( + responseCode = "200", + description = "Settings", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Response.class))) + }) + public Response createOrUpdateTableWithTransaction( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateTable create) + throws IOException { + Table table = getTable(create); + table = testTransactionRepository.createOrUpdateTableWithTransaction(table); + return new RestUtil.PutResponse<>(Response.Status.CREATED, table, RestUtil.ENTITY_CREATED).toResponse(); + } + + @PUT + @Path("/createwithunitofwork") + @Operation( + operationId = "createOrUpdate", + summary = "Update setting", + description = "Update existing settings", + responses = { + @ApiResponse( + responseCode = "200", + description = "Settings", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Response.class))) + }) + public Response createOrUpdateTableWithJdbi( + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateTable create) + throws IOException { + Table table = getTable(create); + table = testTransactionRepository.createOrUpdateTableWithTransaction(table); + return new RestUtil.PutResponse<>(Response.Status.CREATED, table, RestUtil.ENTITY_CREATED).toResponse(); + } + + @PUT + @Transaction + @Path("/updatewithtransaction") + @Operation( + operationId = "createOrUpdate", + summary = "Update setting", + description = "Update existing settings", + responses = { + @ApiResponse( + responseCode = "200", + description = "Settings", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = CreateTable.class))) + }) + public Table updatewithtransaction( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid CreateTable create, + @QueryParam("dailyCount") Integer dailyCount) + throws JsonProcessingException { + Table table = getTable(create); + table = testTransactionRepository.updateTableWithTransaction(table); + testTransactionRepository.updateUsageStatsWithTransaction(table, dailyCount); + return table; + } + + @PUT + @Path("/updatewithjdbi") + @Operation( + operationId = "createOrUpdate", + summary = "Update setting", + responses = { + @ApiResponse( + responseCode = "200", + description = "Settings", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Table.class))) + }) + public Table updatewithjdbi( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid Table create, + @QueryParam("dailyCount") Integer dailyCount) + throws JsonProcessingException { + Table table = testTransactionRepository.updateTableWithTransaction(create); + testTransactionRepository.updateUsageStatsWithTransaction(table, dailyCount); + return table; + } + + @PUT + @Transaction + @Path("/updatewithtransactionwitherror") + @Operation( + operationId = "createOrUpdate", + summary = "Update setting", + description = "Update existing settings", + responses = { + @ApiResponse( + responseCode = "200", + description = "Settings", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = CreateTable.class))) + }) + public Table updatewithtransactionwitherror( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid Table create, + @QueryParam("dailyCount") Integer dailyCount) + throws JsonProcessingException { + Table table = testTransactionRepository.updateTableWithTransaction(create); + testTransactionRepository.updateUsageStatsWithTransactionWithError(table, dailyCount); + return table; + } + + @PUT + @Path("/updatewithjdbiwitherror") + @Operation( + operationId = "createOrUpdate", + summary = "Update setting", + responses = { + @ApiResponse( + responseCode = "200", + description = "Settings", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Table.class))) + }) + public Table updatewithjdbiwitherror( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Valid Table create, + @QueryParam("dailyCount") Integer dailyCount) + throws JsonProcessingException { + Table table = testTransactionRepository.updateTableWithTransaction(create); + testTransactionRepository.updateUsageStatsWithTransactionWithError(table, dailyCount); + return table; + } + + @GET + @Path("/{id}") + @Operation( + operationId = "getTableByID", + summary = "Get a table by Id", + description = "Get a table by `Id`", + responses = { + @ApiResponse( + responseCode = "200", + description = "table", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Table.class))), + @ApiResponse(responseCode = "404", description = "Table for instance {id} is not found") + }) + public Table get( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "table Id", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) + throws IOException { + return testTransactionRepository.getTable(id); + } + + @GET + @Path("/{id}/usage") + @Operation( + operationId = "getTableByID", + summary = "Get a table by Id", + description = "Get a table by `Id`", + responses = { + @ApiResponse( + responseCode = "200", + description = "table", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Table.class))), + @ApiResponse(responseCode = "404", description = "Table for instance {id} is not found") + }) + public UsageDetails getUsage( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "table Id", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) + throws IOException { + return testTransactionRepository.getUsage(id); + } + + public Table getTable(CreateTable create) { + String fullyQualifiedName = String.format("test_service.db_name.schema_name.%s", create.getName()); + return new Table() + .withId(UUID.randomUUID()) + .withName(create.getName()) + .withDisplayName(create.getDisplayName()) + .withFullyQualifiedName(fullyQualifiedName) + .withColumns(create.getColumns()) + .withUpdatedAt(System.currentTimeMillis()) + .withUpdatedBy("test"); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java index e07f08029642..3eee11739721 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/Authorizer.java @@ -15,17 +15,17 @@ import java.util.List; import javax.ws.rs.core.SecurityContext; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.ResourcePermission; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.security.policyevaluator.ResourceContextInterface; public interface Authorizer { /** Initialize the authorizer */ - void init(OpenMetadataApplicationConfig openMetadataApplicationConfig, Jdbi jdbi); + void init(OpenMetadataApplicationConfig openMetadataApplicationConfig, CollectionDAO daoObject); /** Returns a list of operations that the authenticated user (subject) can perform */ List listPermissions(SecurityContext securityContext, String user); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java index f2bb949f734d..16cc1067d10d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/DefaultAuthorizer.java @@ -19,10 +19,10 @@ import java.util.List; import javax.ws.rs.core.SecurityContext; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.ResourcePermission; import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.security.policyevaluator.PolicyEvaluator; import org.openmetadata.service.security.policyevaluator.ResourceContextInterface; @@ -32,7 +32,7 @@ public class DefaultAuthorizer implements Authorizer { @Override - public void init(OpenMetadataApplicationConfig config, Jdbi dbi) { + public void init(OpenMetadataApplicationConfig config, CollectionDAO dao) { LOG.info("Initializing DefaultAuthorizer with config {}", config.getAuthorizerConfiguration()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java index ca121d768343..ebc1374e9e1d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/NoopAuthorizer.java @@ -17,7 +17,6 @@ import java.util.UUID; import javax.ws.rs.core.SecurityContext; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; @@ -26,6 +25,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.exception.EntityNotFoundException; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.UserRepository; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.security.policyevaluator.PolicyEvaluator; @@ -35,7 +35,7 @@ @Slf4j public class NoopAuthorizer implements Authorizer { @Override - public void init(OpenMetadataApplicationConfig openMetadataApplicationConfig, Jdbi jdbi) { + public void init(OpenMetadataApplicationConfig openMetadataApplicationConfig, CollectionDAO collectionDAO) { addAnonymousUser(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/AuthenticatorHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/AuthenticatorHandler.java index 92895cf02f28..e712e6ed3d3f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/AuthenticatorHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/AuthenticatorHandler.java @@ -7,7 +7,6 @@ import java.util.UUID; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.api.teams.CreateUser; import org.openmetadata.schema.auth.ChangePasswordRequest; import org.openmetadata.schema.auth.JWTAuthMechanism; @@ -21,10 +20,11 @@ import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.auth.JwtResponse; import org.openmetadata.service.exception.CustomExceptionMessage; +import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.security.jwt.JWTTokenGenerator; public interface AuthenticatorHandler { - void init(OpenMetadataApplicationConfig config, Jdbi jdbi); + void init(OpenMetadataApplicationConfig config, CollectionDAO daoObject); JwtResponse loginUser(LoginRequest loginRequest) throws IOException, TemplateException; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/BasicAuthenticator.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/BasicAuthenticator.java index 46f4e0840497..2419a185e595 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/BasicAuthenticator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/BasicAuthenticator.java @@ -42,7 +42,6 @@ import javax.ws.rs.BadRequestException; import javax.ws.rs.core.UriInfo; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.TokenInterface; import org.openmetadata.schema.api.configuration.LoginConfiguration; @@ -90,9 +89,9 @@ public class BasicAuthenticator implements AuthenticatorHandler { private boolean isSelfSignUpAvailable; @Override - public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) { - this.userRepository = new UserRepository(jdbi.onDemand(CollectionDAO.class)); - this.tokenRepository = new TokenRepository(jdbi.onDemand(CollectionDAO.class)); + public void init(OpenMetadataApplicationConfig config, CollectionDAO collectionDAO) { + this.userRepository = new UserRepository(collectionDAO); + this.tokenRepository = new TokenRepository(collectionDAO); this.authorizerConfiguration = config.getAuthorizerConfiguration(); this.loginAttemptCache = new LoginAttemptCache(config); SmtpSettings smtpSettings = config.getSmtpSettings(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java index d12998874a65..1ff204a3e91d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/LdapAuthenticator.java @@ -28,7 +28,6 @@ import java.util.Objects; import java.util.UUID; import lombok.extern.slf4j.Slf4j; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.api.configuration.LoginConfiguration; import org.openmetadata.schema.auth.LdapConfiguration; @@ -59,15 +58,15 @@ public class LdapAuthenticator implements AuthenticatorHandler { private LoginConfiguration loginConfiguration; @Override - public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) { + public void init(OpenMetadataApplicationConfig config, CollectionDAO collectionDAO) { if (config.getAuthenticationConfiguration().getProvider().equals(AuthProvider.LDAP) && config.getAuthenticationConfiguration().getLdapConfiguration() != null) { ldapLookupConnectionPool = getLdapConnectionPool(config.getAuthenticationConfiguration().getLdapConfiguration()); } else { throw new IllegalStateException("Invalid or Missing Ldap Configuration."); } - this.userRepository = new UserRepository(jdbi.onDemand(CollectionDAO.class)); - this.tokenRepository = new TokenRepository(jdbi.onDemand(CollectionDAO.class)); + this.userRepository = new UserRepository(collectionDAO); + this.tokenRepository = new TokenRepository(collectionDAO); this.ldapConfiguration = config.getAuthenticationConfiguration().getLdapConfiguration(); this.loginAttemptCache = new LoginAttemptCache(config); this.loginConfiguration = config.getApplicationConfiguration().getLoginConfig(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/NoopAuthenticator.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/NoopAuthenticator.java index 7f26560e2cc3..30fe9524dc09 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/NoopAuthenticator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/auth/NoopAuthenticator.java @@ -3,16 +3,16 @@ import static org.openmetadata.service.exception.CatalogExceptionMessage.FORBIDDEN_AUTHENTICATOR_OP; import javax.ws.rs.core.Response; -import org.jdbi.v3.core.Jdbi; import org.openmetadata.schema.auth.LoginRequest; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.auth.JwtResponse; import org.openmetadata.service.exception.CustomExceptionMessage; +import org.openmetadata.service.jdbi3.CollectionDAO; public class NoopAuthenticator implements AuthenticatorHandler { @Override - public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) { + public void init(OpenMetadataApplicationConfig config, CollectionDAO collectionDAO) { /* deprecated unused */ } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java index a43283fad316..b5a979e49768 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/NotificationHandler.java @@ -15,6 +15,7 @@ import static org.openmetadata.service.Entity.TEAM; import static org.openmetadata.service.Entity.USER; +import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -30,6 +31,7 @@ import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.Handle; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.entity.teams.Team; import org.openmetadata.schema.entity.teams.User; @@ -40,34 +42,42 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.UserRepository; +import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.socket.WebSocketManager; @Slf4j public class NotificationHandler { - private final CollectionDAO dao; private final ObjectMapper mapper; + private final JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider; private final ExecutorService threadScheduler; - public NotificationHandler(CollectionDAO dao) { - this.dao = dao; + public NotificationHandler(JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) { this.mapper = new ObjectMapper(); this.threadScheduler = Executors.newFixedThreadPool(1); + this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider; } public void processNotifications(ContainerResponseContext responseContext) { threadScheduler.submit( () -> { try { - handleNotifications(responseContext); - } catch (JsonProcessingException e) { - LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", e); + Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get(); + handle.getConnection().setAutoCommit(true); + CollectionDAO collectionDAO = + (CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class); + handleNotifications(responseContext, collectionDAO); + } catch (Exception ex) { + LOG.error("[NotificationHandler] Failed to use mapper in converting to Json", ex); + } finally { + jdbiUnitOfWorkProvider.getHandleManager().clear(); } }); } - private void handleNotifications(ContainerResponseContext responseContext) throws JsonProcessingException { + private void handleNotifications(ContainerResponseContext responseContext, CollectionDAO collectionDAO) + throws JsonProcessingException { int responseCode = responseContext.getStatus(); if (responseCode == Response.Status.CREATED.getStatusCode() && responseContext.getEntity() != null @@ -75,19 +85,19 @@ private void handleNotifications(ContainerResponseContext responseContext) throw Thread thread = (Thread) responseContext.getEntity(); switch (thread.getType()) { case Task: - handleTaskNotification(thread); + handleTaskNotification(thread, collectionDAO); break; case Conversation: - handleConversationNotification(thread); + handleConversationNotification(thread, collectionDAO); break; case Announcement: - handleAnnouncementNotification(thread); + handleAnnouncementNotification(thread, collectionDAO); break; } } } - private void handleTaskNotification(Thread thread) throws JsonProcessingException { + private void handleTaskNotification(Thread thread, CollectionDAO collectionDAO) throws JsonProcessingException { String jsonThread = mapper.writeValueAsString(thread); if (thread.getPostsCount() == 0) { List assignees = thread.getTask().getAssignees(); @@ -99,7 +109,9 @@ private void handleTaskNotification(Thread thread) throws JsonProcessingExceptio } else if (Entity.TEAM.equals(e.getType())) { // fetch all that are there in the team List records = - dao.relationshipDAO().findTo(e.getId().toString(), TEAM, Relationship.HAS.ordinal(), Entity.USER); + collectionDAO + .relationshipDAO() + .findTo(e.getId().toString(), TEAM, Relationship.HAS.ordinal(), Entity.USER); records.forEach(eRecord -> receiversList.add(eRecord.getId())); } }); @@ -114,7 +126,8 @@ private void handleTaskNotification(Thread thread) throws JsonProcessingExceptio } } - private void handleAnnouncementNotification(Thread thread) throws JsonProcessingException { + private void handleAnnouncementNotification(Thread thread, CollectionDAO collectionDAO) + throws JsonProcessingException { String jsonThread = mapper.writeValueAsString(thread); AnnouncementDetails announcementDetails = thread.getAnnouncement(); Long currentTimestamp = Instant.now().getEpochSecond(); @@ -124,7 +137,8 @@ private void handleAnnouncementNotification(Thread thread) throws JsonProcessing } } - private void handleConversationNotification(Thread thread) throws JsonProcessingException { + private void handleConversationNotification(Thread thread, CollectionDAO collectionDAO) + throws JsonProcessingException { String jsonThread = mapper.writeValueAsString(thread); WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread); List mentions; @@ -138,13 +152,13 @@ private void handleConversationNotification(Thread thread) throws JsonProcessing entityLink -> { String fqn = entityLink.getEntityFQN(); if (USER.equals(entityLink.getEntityType())) { - User user = dao.userDAO().findEntityByName(fqn); + User user = collectionDAO.userDAO().findEntityByName(fqn); WebSocketManager.getInstance().sendToOne(user.getId(), WebSocketManager.MENTION_CHANNEL, jsonThread); } else if (TEAM.equals(entityLink.getEntityType())) { - Team team = dao.teamDAO().findEntityByName(fqn); + Team team = collectionDAO.teamDAO().findEntityByName(fqn); // fetch all that are there in the team List records = - dao.relationshipDAO().findTo(team.getId().toString(), TEAM, Relationship.HAS.ordinal(), USER); + collectionDAO.relationshipDAO().findTo(team.getId().toString(), TEAM, Relationship.HAS.ordinal(), USER); // Notify on WebSocket for Realtime WebSocketManager.getInstance().sendToManyWithString(records, WebSocketManager.MENTION_CHANNEL, jsonThread); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/Jdbi3TransactionTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/Jdbi3TransactionTest.java new file mode 100644 index 000000000000..25776bd03c2e --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/Jdbi3TransactionTest.java @@ -0,0 +1,102 @@ +package org.openmetadata.service.jdbi3; + +import static java.lang.String.format; +import static org.openmetadata.service.resources.EntityResourceTest.C1; +import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS; + +import io.dropwizard.testing.ResourceHelpers; +import java.util.List; +import java.util.Map; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import org.apache.http.client.HttpResponseException; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.openmetadata.schema.api.data.CreateTable; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.type.*; +import org.openmetadata.service.OpenMetadataApplicationTest; +import org.openmetadata.service.util.TestUtils; + +public class Jdbi3TransactionTest extends OpenMetadataApplicationTest { + + private static Jdbi jdbi; + protected static final String CONFIG_PATH = ResourceHelpers.resourceFilePath("openmetadata-secure-test.yaml"); + + @Test + void runTransactions(TestInfo testInfo) throws Exception { + CreateTable create = createTable(testInfo, "testTransactions"); + Table table = createTable(create, "createwithunitofwork", ADMIN_AUTH_HEADERS); + table = getTable(table.getId().toString(), ADMIN_AUTH_HEADERS); + Assertions.assertEquals(create.getName(), table.getName()); + table.setDisplayName("testUpdate1"); + table = updateTable(table, "updatewithjdbi", 10, ADMIN_AUTH_HEADERS); + Assertions.assertEquals("testUpdate1", table.getDisplayName()); + UsageDetails usageDetails = getUsageDetails(table.getId().toString(), ADMIN_AUTH_HEADERS); + Assertions.assertEquals(usageDetails.getDailyStats().getCount(), 10); + String oldDisplayName = table.getDisplayName(); + table.setDisplayName("testUpdate1WithFailure"); + try { + updateTable(table, "updatewithjdbiwitherror", 100, ADMIN_AUTH_HEADERS); + } catch (Exception e) { + // ignore exception + } + Table newTable = getTable(table.getId().toString(), ADMIN_AUTH_HEADERS); + // old values should be there + Assertions.assertEquals(oldDisplayName, newTable.getDisplayName()); + usageDetails = getUsageDetails(table.getId().toString(), ADMIN_AUTH_HEADERS); + Assertions.assertEquals(usageDetails.getDailyStats().getCount(), 10); + create.setDisplayName("testUpdate1WithFailure"); + } + + private CreateTable createTable(TestInfo test, String displayName) { + String name = test.getDisplayName().replaceAll("\\(.*\\)", ""); + return new CreateTable() + .withName(name) + .withDisplayName(displayName) + .withDatabaseSchema("sample_schema") + .withColumns( + List.of( + new Column() + .withName(C1) + .withDisplayName("c1") + .withDataType(ColumnDataType.VARCHAR) + .withDataLength(10))); + } + + public final Table createTable(CreateTable createRequest, String endPoint, Map authHeaders) + throws HttpResponseException { + WebTarget target = + getClient() + .target(format("http://localhost:%s/api/v1/system/testtransactions/%s", APP.getLocalPort(), endPoint)); + return TestUtils.put(target, createRequest, Table.class, Response.Status.CREATED, authHeaders); + } + + public final Table updateTable(Table createRequest, String endPoint, int dailyCount, Map authHeaders) + throws HttpResponseException { + WebTarget target = + getClient() + .target( + format( + "http://localhost:%s/api/v1/system/testtransactions/%s?dailyCount=%d", + APP.getLocalPort(), endPoint, dailyCount)); + return TestUtils.put(target, createRequest, Table.class, Response.Status.OK, authHeaders); + } + + public final Table getTable(String id, Map authHeaders) throws HttpResponseException { + return TestUtils.get( + getClient().target(format("http://localhost:%s/api/v1/system/testtransactions/%s", APP.getLocalPort(), id)), + Table.class, + authHeaders); + } + + public final UsageDetails getUsageDetails(String id, Map authHeaders) throws HttpResponseException { + return TestUtils.get( + getClient() + .target(format("http://localhost:%s/api/v1/system/testtransactions/%s/usage", APP.getLocalPort(), id)), + UsageDetails.class, + authHeaders); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java index 6e7ec26d684f..278768c10e0f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/EntityResourceTest.java @@ -2308,7 +2308,7 @@ private void validateChangeEvents( Awaitility.await("Wait for expected change event at timestamp " + timestamp) .pollInterval(Duration.ofMillis(100L)) - .atMost(Duration.ofMillis(300 * 100L)) // 300 iterations for 30 seconds + .atMost(Duration.ofMillis(600 * 100L)) // 300 iterations for 30 seconds .until( () -> eventHolder.hasExpectedEvent( @@ -2353,7 +2353,7 @@ private void validateDeletedEvent( Awaitility.await("Wait for expected deleted event at timestamp " + timestamp) .pollInterval(Duration.ofMillis(100L)) - .atMost(Duration.ofMillis(100 * 100L)) // 100 iterations + .atMost(Duration.ofMillis(600 * 100L)) // 100 iterations .until( () -> eventHolder.hasDeletedEvent(getChangeEvents(null, null, null, entityType, timestamp, authHeaders), id)); diff --git a/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml b/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml index 13fa2bd456d6..69ee0e526609 100644 --- a/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml +++ b/openmetadata-service/src/test/resources/openmetadata-secure-test.yaml @@ -96,12 +96,12 @@ logging: database: # the name of the JDBC driver, h2 is used for testing - driverClass: com.postgresql.Driver + driverClass: org.postgresql.Driver # the username and password user: test password: # the JDBC URL; the database is called openmetadata_test_db - url: jdbc:postgres://localhost:3307/openmetadata_test_db?useSSL=false&serverTimezone=UTC + url: jdbc:postgresql://localhost:3307/openmetadata_test_db?useSSL=false&serverTimezone=UTC migrationConfiguration: flywayPath: "../bootstrap/sql/migrations/flyway" diff --git a/pom.xml b/pom.xml index adcda9896d5d..c31358fd7eab 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,7 @@ 1.7.36 2.15.2 2.1.6 + 2.1.7 1.0 2.40 2.1.1 @@ -210,7 +211,7 @@ io.dropwizard dropwizard-testing - ${dropwizard.version} + ${dropwizard.testing.version} junit