Skip to content

Commit

Permalink
Transaction for Delete @UnitOfWork (open-metadata#13115)
Browse files Browse the repository at this point in the history
* Fix open-metadata#12494: Jdbi Transactions opens a new connection per DAO causing recursive delete to not to rollback

* Fix open-metadata#12494: Jdbi Transactions opens a new connection per DAO causing recursive delete to not to rollback

* rebase with main

* Fix styling

* Add jdbiUnitOfWork

* Improvements for Change Event Repository and missing Annotations

* Add connection commit/rollback

* push schemaChange fix

* Improvements for Change Event Repository and missing Annotations

* Pass wrapped collectionDAO everywhere

* Pass wrapped collectionDAO everywhere

* Separate event handlers transactions

* Fix checkstyle

* Wrap PUT , POST, and DELETE in transactions

* Add Patch for transactions

* Add primary key to tag.id

* Proxy internal @CreateSQLObjects

* Fix exception handling in ManagedHandleInvocationHandler

* Java typo

* Update schema files

* Checkstyle fix and conflicts resolve issue

* Remove @JdbiUnitOfWOrk from Feed Repository

* remove unnecessary @JdbiUnitOfWork annotation

* Test Failures fix

* Test Failures fix

* Increase wait time , changeEventDAO takes time due to handle.commit()

* commit change

* No need to use commit in ChangeEventHandler

* Add a lookup for avoiding transaction that are not started and should not be rolled back

* remove JdbiUnitOfWork.java

---------

Co-authored-by: Sriharsha Chintalapani <[email protected]>
Co-authored-by: Sriharsha Chintalapani <[email protected]>
  • Loading branch information
3 people authored Sep 8, 2023
1 parent 2b2ec04 commit 5f5aeef
Show file tree
Hide file tree
Showing 49 changed files with 1,061 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ CREATE TABLE IF NOT EXISTS table_entity_extension (
jsonSchema VARCHAR(256) NOT NULL, -- Schema used for generating JSON
json JSON NOT NULL,
PRIMARY KEY (id, extension)
);
);

ALTER TABLE entity_relationship ADD INDEX from_entity_type_index(fromId, fromEntity), ADD INDEX to_entity_type_index(toId, toEntity);
ALTER TABLE tag DROP CONSTRAINT fqnHash, ADD CONSTRAINT UNIQUE(fqnHash), ADD PRIMARY KEY(id);
14 changes: 13 additions & 1 deletion bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,16 @@ CREATE TABLE IF NOT EXISTS table_entity_extension (
jsonSchema VARCHAR(256) NOT NULL, -- Schema used for generating JSON
json JSONB NOT NULL,
PRIMARY KEY (id, extension)
);
);

-- Add index on fromId and fromEntity columns
CREATE INDEX from_entity_type_index ON entity_relationship (fromId, fromEntity);

-- Add index on toId and toEntity columns
CREATE INDEX to_entity_type_index ON entity_relationship (toId, toEntity);

ALTER TABLE tag DROP CONSTRAINT IF EXISTS tag_fqnhash_key;

ALTER TABLE tag ADD CONSTRAINT unique_fqnHash UNIQUE (fqnHash);

ALTER TABLE tag ADD CONSTRAINT tag_pk PRIMARY KEY (id);
13 changes: 11 additions & 2 deletions openmetadata-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
</dependency>
<dependency>
<groupId>ru.vyarus.guicey</groupId>
<artifactId>guicey-jdbi3</artifactId>
<version>5.9.1</version>
</dependency>
<dependency>
<groupId>com.smoketurner</groupId>
<artifactId>dropwizard-swagger</artifactId>
Expand Down Expand Up @@ -231,8 +236,6 @@
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
</dependency>


<!-- Dependencies for secret store manager providers -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down Expand Up @@ -480,6 +483,12 @@
<artifactId>woodstox-core</artifactId>
<version>${woodstox.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.openmetadata.service;

import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;
import static org.openmetadata.service.util.MicrometerBundleSingleton.webAnalyticEvents;

import io.dropwizard.Application;
Expand Down Expand Up @@ -40,6 +41,7 @@
import java.security.cert.CertificateException;
import java.time.temporal.ChronoUnit;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import javax.naming.ConfigurationException;
Expand Down Expand Up @@ -85,6 +87,8 @@
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkApplicationEventListener;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.migration.Migration;
import org.openmetadata.service.migration.api.MigrationWorkflow;
import org.openmetadata.service.monitoring.EventMonitor;
Expand Down Expand Up @@ -137,12 +141,16 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ

ChangeEventConfig.initialize(catalogConfig);
final Jdbi jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory());
JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider = JdbiUnitOfWorkProvider.withDefault(jdbi);
CollectionDAO daoObject =
(CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class);
environment.jersey().register(new JdbiUnitOfWorkApplicationEventListener(jdbiUnitOfWorkProvider, new HashSet<>()));

// Configure the Fernet instance
Fernet.getInstance().setFernetKey(catalogConfig);

// Init Settings Cache
SettingsCache.initialize(jdbi.onDemand(CollectionDAO.class), catalogConfig);
SettingsCache.initialize(daoObject, catalogConfig);

// init Secret Manager
final SecretsManager secretsManager =
Expand Down Expand Up @@ -186,23 +194,23 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ
// start event hub before registering publishers
EventPubSub.start();

registerResources(catalogConfig, environment, jdbi);
registerResources(catalogConfig, environment, jdbi, jdbiUnitOfWorkProvider, daoObject);

// Register Event Handler
registerEventFilter(catalogConfig, environment, jdbi);
registerEventFilter(catalogConfig, environment, jdbiUnitOfWorkProvider);
environment.lifecycle().manage(new ManagedShutdown());
// Register Event publishers
registerEventPublisher(catalogConfig, jdbi);
registerEventPublisher(catalogConfig, daoObject);

// update entities secrets if required
new SecretsManagerUpdateService(secretsManager, catalogConfig.getClusterName()).updateEntities();

// start authorizer after event publishers
// authorizer creates admin/bot users, ES publisher should start before to index users created by authorizer
authorizer.init(catalogConfig, jdbi);
authorizer.init(catalogConfig, daoObject);

// authenticationHandler Handles auth related activities
authenticatorHandler.init(catalogConfig, jdbi);
authenticatorHandler.init(catalogConfig, daoObject);

webAnalyticEvents = MicrometerBundleSingleton.latencyTimer(catalogConfig.getEventMonitorConfiguration());
FilterRegistration.Dynamic micrometerFilter =
Expand Down Expand Up @@ -400,21 +408,22 @@ private void registerAuthenticator(OpenMetadataApplicationConfig catalogConfig)
}
}

private void registerEventFilter(OpenMetadataApplicationConfig catalogConfig, Environment environment, Jdbi jdbi) {
private void registerEventFilter(
OpenMetadataApplicationConfig catalogConfig, Environment environment, JdbiUnitOfWorkProvider provider) {
if (catalogConfig.getEventHandlerConfiguration() != null) {
ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, jdbi);
ContainerResponseFilter eventFilter = new EventFilter(catalogConfig, provider);
environment.jersey().register(eventFilter);
ContainerResponseFilter reindexingJobs = new SearchIndexEvent();
environment.jersey().register(reindexingJobs);
}
}

private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataApplicationConfig, Jdbi jdbi) {
private void registerEventPublisher(
OpenMetadataApplicationConfig openMetadataApplicationConfig, CollectionDAO daoObject) {
// register ElasticSearch Event publisher
if (openMetadataApplicationConfig.getElasticSearchConfiguration() != null) {
SearchEventPublisher searchEventPublisher =
new SearchEventPublisher(
openMetadataApplicationConfig.getElasticSearchConfiguration(), jdbi.onDemand(CollectionDAO.class));
new SearchEventPublisher(openMetadataApplicationConfig.getElasticSearchConfiguration(), daoObject);
EventPubSub.addEventHandler(searchEventPublisher);
}

Expand All @@ -429,11 +438,18 @@ private void registerEventPublisher(OpenMetadataApplicationConfig openMetadataAp
}
}

private void registerResources(OpenMetadataApplicationConfig config, Environment environment, Jdbi jdbi) {
private void registerResources(
OpenMetadataApplicationConfig config,
Environment environment,
Jdbi jdbi,
JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider,
CollectionDAO daoObject) {
List<String> extensionResources =
config.getExtensionConfiguration() != null ? config.getExtensionConfiguration().getResourcePackage() : null;
CollectionRegistry.initialize(extensionResources);
CollectionRegistry.getInstance().registerResources(jdbi, environment, config, authorizer, authenticatorHandler);
CollectionRegistry.getInstance()
.registerResources(
jdbi, jdbiUnitOfWorkProvider, environment, config, daoObject, authorizer, authenticatorHandler);
environment.jersey().register(new JsonPatchProvider());
OMErrorPageHandler eph = new OMErrorPageHandler(config.getWebConfiguration());
eph.addErrorPage(Response.Status.NOT_FOUND.getStatusCode(), "/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.type.AuditLog;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

@Slf4j
public class AuditEventHandler implements EventHandler {
private final Marker auditMarker = MarkerFactory.getMarker("AUDIT");

public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) {
public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) {
// Nothing to do
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.events.subscription.AlertsRuleEvaluator.getEntity;
import static org.openmetadata.service.formatter.util.FormatterUtil.getChangeEventFromResponseContext;
import static org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider.getWrappedInstanceForDaoClass;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.SecurityContext;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.Handle;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeEvent;
Expand All @@ -33,30 +35,37 @@
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.FeedUtils;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.NotificationHandler;

@Slf4j
public class ChangeEventHandler implements EventHandler {
private CollectionDAO dao;
private FeedRepository feedDao;
private ObjectMapper mapper;
private NotificationHandler notificationHandler;

public void init(OpenMetadataApplicationConfig config, Jdbi jdbi) {
this.dao = jdbi.onDemand(CollectionDAO.class);
this.feedDao = new FeedRepository(dao);
private JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider;

public void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbiUnitOfWorkProvider) {
this.mapper = new ObjectMapper();
this.notificationHandler = new NotificationHandler(jdbi.onDemand(CollectionDAO.class));
this.notificationHandler = new NotificationHandler(jdbiUnitOfWorkProvider);
this.jdbiUnitOfWorkProvider = jdbiUnitOfWorkProvider;
}

@SneakyThrows
public Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
String method = requestContext.getMethod();
SecurityContext securityContext = requestContext.getSecurityContext();
String loggedInUserName = securityContext.getUserPrincipal().getName();
try {
Handle handle = jdbiUnitOfWorkProvider.getHandleManager().get();
handle.getConnection().setAutoCommit(true);
CollectionDAO collectionDAO =
(CollectionDAO) getWrappedInstanceForDaoClass(jdbiUnitOfWorkProvider, CollectionDAO.class);
CollectionDAO.ChangeEventDAO changeEventDAO = collectionDAO.changeEventDAO();
FeedRepository feedRepository = new FeedRepository(collectionDAO);
if (responseContext.getEntity() != null && responseContext.getEntity().getClass().equals(Thread.class)) {
// we should move this to Email Application notifications instead of processing it here.
notificationHandler.processNotifications(responseContext);
Expand All @@ -77,7 +86,8 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon
changeEvent = copyChangeEvent(changeEvent);
changeEvent.setEntity(JsonUtils.pojoToMaskedJson(entity));
}
dao.changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));

changeEventDAO.insert(JsonUtils.pojoToJson(changeEvent));

// Add a new thread to the entity for every change event
// for the event to appear in activity feeds
Expand All @@ -86,20 +96,22 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon
for (Thread thread : listOrEmpty(FeedUtils.getThreads(changeEvent, loggedInUserName))) {
// Don't create a thread if there is no message
if (thread.getMessage() != null && !thread.getMessage().isEmpty()) {
feedDao.create(thread);
feedRepository.create(thread, changeEvent);
String jsonThread = mapper.writeValueAsString(thread);
WebSocketManager.getInstance()
.broadCastMessageToAll(WebSocketManager.FEED_BROADCAST_CHANNEL, jsonThread);
if (changeEvent.getEventType().equals(EventType.ENTITY_DELETED)) {
deleteAllConversationsRelatedToEntity(getEntity(changeEvent));
deleteAllConversationsRelatedToEntity(getEntity(changeEvent), collectionDAO);
}
}
}
}
}
}
} catch (Exception e) {
LOG.error("Failed to capture change event for method {} due to ", method, e);
LOG.error("Failed to capture the change event for method {} due to ", method, e);
} finally {
jdbiUnitOfWorkProvider.getHandleManager().clear();
}
return null;
}
Expand All @@ -115,12 +127,12 @@ private static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) {
.withCurrentVersion(changeEvent.getCurrentVersion());
}

private void deleteAllConversationsRelatedToEntity(EntityInterface entityInterface) {
private void deleteAllConversationsRelatedToEntity(EntityInterface entityInterface, CollectionDAO collectionDAO) {
String entityId = entityInterface.getId().toString();
List<String> threadIds = dao.feedDAO().findByEntityId(entityId);
List<String> threadIds = collectionDAO.feedDAO().findByEntityId(entityId);
for (String threadId : threadIds) {
dao.relationshipDAO().deleteAll(threadId, Entity.THREAD);
dao.feedDAO().delete(threadId);
collectionDAO.relationshipDAO().deleteAll(threadId, Entity.THREAD);
collectionDAO.feedDAO().delete(threadId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@

package org.openmetadata.service.events;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.ext.Provider;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;
import org.openmetadata.service.security.JwtFilter;
import org.openmetadata.service.util.ParallelStreamUtil;

Expand All @@ -34,21 +38,21 @@ public class EventFilter implements ContainerResponseFilter {
private final ForkJoinPool forkJoinPool;
private final List<EventHandler> eventHandlers;

public EventFilter(OpenMetadataApplicationConfig config, Jdbi jdbi) {
public EventFilter(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) {
this.forkJoinPool = new ForkJoinPool(FORK_JOIN_POOL_PARALLELISM);
this.eventHandlers = new ArrayList<>();
registerEventHandlers(config, jdbi);
registerEventHandlers(config, provider);
}

private void registerEventHandlers(OpenMetadataApplicationConfig config, Jdbi jdbi) {
private void registerEventHandlers(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider provider) {
try {
Set<String> eventHandlerClassNames =
new HashSet<>(config.getEventHandlerConfiguration().getEventHandlerClassNames());
for (String eventHandlerClassName : eventHandlerClassNames) {
@SuppressWarnings("unchecked")
EventHandler eventHandler =
((Class<EventHandler>) Class.forName(eventHandlerClassName)).getConstructor().newInstance();
eventHandler.init(config, jdbi);
eventHandler.init(config, provider);
eventHandlers.add(eventHandler);
LOG.info("Added event handler {}", eventHandlerClassName);
}
Expand All @@ -64,6 +68,7 @@ public void filter(ContainerRequestContext requestContext, ContainerResponseCont
if ((responseCode < 200 || responseCode > 299) || (!AUDITABLE_METHODS.contains(method))) {
return;
}

eventHandlers
.parallelStream()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.unitofwork.JdbiUnitOfWorkProvider;

public interface EventHandler {
void init(OpenMetadataApplicationConfig config, Jdbi jdbi);
void init(OpenMetadataApplicationConfig config, JdbiUnitOfWorkProvider jdbi);

Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext);

Expand Down
Loading

0 comments on commit 5f5aeef

Please sign in to comment.