Skip to content

Commit

Permalink
Make LoadingCache members final variables (open-metadata#12648)
Browse files Browse the repository at this point in the history
* Make LoadingCache members final variables

* Simplify singleton classes

* Reuse Entity fields instead of declaring them again
  • Loading branch information
sureshms authored Jul 29, 2023
1 parent 98b38e4 commit b7e3242
Show file tree
Hide file tree
Showing 56 changed files with 391 additions and 649 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@ public static boolean shouldDisplayEntityChangeOnFeed(@NonNull String entityType
return !ACTIVITY_FEED_EXCLUDED_ENTITIES.contains(entityType);
}

public static Fields getFields(String entityType, String fields) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
return entityRepository.getFields(fields);
}

public static Fields getFields(String entityType, List<String> fields) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
return entityRepository.getFields(String.join(",", fields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void sendReportsToTeams(
if (!CommonUtil.nullOrEmpty(email)) {
emails.add(email);
} else {
team.getUsers().forEach(user -> emails.add(SubjectCache.getInstance().getUserById(user.getId()).getEmail()));
team.getUsers().forEach(user -> emails.add(SubjectCache.getUserById(user.getId()).getEmail()));
}
try {
DataInsightTotalAssetTemplate totalAssetTemplate =
Expand All @@ -119,15 +119,14 @@ private void sendReportsToTeams(
createOwnershipTemplate(searchClient, team.getName(), scheduleTime, currentTime, numberOfDaysChange);
DataInsightDescriptionAndOwnerTemplate tierTemplate =
createTierTemplate(searchClient, team.getName(), scheduleTime, currentTime, numberOfDaysChange);
EmailUtil.getInstance()
.sendDataInsightEmailNotificationToUser(
emails,
totalAssetTemplate,
descriptionTemplate,
ownershipTemplate,
tierTemplate,
EmailUtil.getInstance().getDataInsightReportSubject(),
EmailUtil.DATA_INSIGHT_REPORT_TEMPLATE);
EmailUtil.sendDataInsightEmailNotificationToUser(
emails,
totalAssetTemplate,
descriptionTemplate,
ownershipTemplate,
tierTemplate,
EmailUtil.getDataInsightReportSubject(),
EmailUtil.DATA_INSIGHT_REPORT_TEMPLATE);
} catch (Exception ex) {
LOG.error("[DataInsightReport] Failed for Team: {}, Reason : {}", team.getName(), ex.getMessage());
}
Expand All @@ -149,15 +148,14 @@ private void sendToAdmins(SearchClient searchClient, Long scheduleTime, Long cur
createOwnershipTemplate(searchClient, null, scheduleTime, currentTime, numberOfDaysChange);
DataInsightDescriptionAndOwnerTemplate tierTemplate =
createTierTemplate(searchClient, null, scheduleTime, currentTime, numberOfDaysChange);
EmailUtil.getInstance()
.sendDataInsightEmailNotificationToUser(
emailList,
totalAssetTemplate,
descriptionTemplate,
ownershipTemplate,
tierTemplate,
EmailUtil.getInstance().getDataInsightReportSubject(),
EmailUtil.DATA_INSIGHT_REPORT_TEMPLATE);
EmailUtil.sendDataInsightEmailNotificationToUser(
emailList,
totalAssetTemplate,
descriptionTemplate,
ownershipTemplate,
tierTemplate,
EmailUtil.getDataInsightReportSubject(),
EmailUtil.DATA_INSIGHT_REPORT_TEMPLATE);
} catch (Exception ex) {
LOG.error("[DataInsightReport] Failed for Admin, Reason : {}", ex.getMessage(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,26 @@
import javax.annotation.CheckForNull;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;

@Slf4j
public class ActivityFeedAlertCache {
private static final ActivityFeedAlertCache INSTANCE = new ActivityFeedAlertCache();
private static volatile boolean initialized = false;
protected static final LoadingCache<String, EventSubscription> eventSubCache =
protected static final LoadingCache<String, EventSubscription> EVENT_SUB_CACHE =
CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(3, TimeUnit.MINUTES)
.build(new ActivityFeedAlertLoader());
protected static EventSubscriptionRepository eventSubscriptionRepository;
private static String activityFeedAlertName;
private static final String ACTIVITY_FEED_ALERT = "ActivityFeedAlert";

public static void initialize(String alertName, EventSubscriptionRepository repo) {
if (!initialized) {
eventSubscriptionRepository = repo;
initialized = true;
activityFeedAlertName = alertName;
}
}

public static ActivityFeedAlertCache getInstance() {
return INSTANCE;
private ActivityFeedAlertCache() {
// Private constructor for static class
}

public EventSubscription getActivityFeedAlert() throws EntityNotFoundException {
public static EventSubscription getActivityFeedAlert() throws EntityNotFoundException {
try {
return eventSubCache.get(activityFeedAlertName);
return EVENT_SUB_CACHE.get(ACTIVITY_FEED_ALERT);
} catch (ExecutionException | UncheckedExecutionException ex) {
throw new EntityNotFoundException(ex.getMessage());
}
Expand All @@ -48,8 +38,7 @@ public EventSubscription getActivityFeedAlert() throws EntityNotFoundException {
static class ActivityFeedAlertLoader extends CacheLoader<String, EventSubscription> {
@Override
public EventSubscription load(@CheckForNull String alertName) throws IOException {
EventSubscription alert =
eventSubscriptionRepository.getByName(null, alertName, eventSubscriptionRepository.getFields("*"));
EventSubscription alert = Entity.getEntityByName(Entity.EVENT_SUBSCRIPTION, alertName, "*", Include.NON_DELETED);
LOG.debug("Loaded Alert {}", alert);
return alert;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public static boolean shouldTriggerAlert(String entityType, FilteringRules confi

public static boolean shouldProcessActivityFeedRequest(ChangeEvent event) {
// Check Trigger Conditions
FilteringRules filteringRules = ActivityFeedAlertCache.getInstance().getActivityFeedAlert().getFilteringRules();
FilteringRules filteringRules = ActivityFeedAlertCache.getActivityFeedAlert().getFilteringRules();
return AlertUtil.shouldTriggerAlert(event.getEntityType(), filteringRules)
&& AlertUtil.evaluateAlertConditions(event, filteringRules.getRules());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public boolean matchAnyOwnerName(String... ownerNameList) throws IOException {
EntityReference ownerReference = entity.getOwner();
if (ownerReference != null) {
if (USER.equals(ownerReference.getType())) {
User user = SubjectCache.getInstance().getSubjectContext(ownerReference.getId()).getUser();
User user = SubjectCache.getSubjectContext(ownerReference.getId()).getUser();
for (String name : ownerNameList) {
if (user.getName().equals(name)) {
return true;
}
}
} else if (TEAM.equals(ownerReference.getType())) {
Team team = SubjectCache.getInstance().getTeam(ownerReference.getId());
Team team = SubjectCache.getTeam(ownerReference.getId());
for (String name : ownerNameList) {
if (team.getName().equals(name)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
import org.openmetadata.service.resources.events.EventResource;

/**
Expand All @@ -52,11 +50,9 @@
public class SubscriptionPublisher extends AbstractAlertPublisher {
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
@Getter private BatchEventProcessor<EventPubSub.ChangeEventHolder> processor;
private final EventSubscriptionRepository eventSubscriptionRepository;

public SubscriptionPublisher(EventSubscription eventSub, CollectionDAO dao) {
public SubscriptionPublisher(EventSubscription eventSub) {
super(eventSub);
this.eventSubscriptionRepository = new EventSubscriptionRepository(dao);
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class EmailPublisher extends SubscriptionPublisher {
private final CollectionDAO daoCollection;

public EmailPublisher(EventSubscription eventSub, CollectionDAO dao) {
super(eventSub, dao);
super(eventSub);
if (eventSub.getSubscriptionType() == EMAIL) {
this.emailAlertConfig = JsonUtils.convertValue(eventSub.getSubscriptionConfig(), EmailAlertConfig.class);
this.daoCollection = dao;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class GChatPublisher extends SubscriptionPublisher {
private final CollectionDAO daoCollection;

public GChatPublisher(EventSubscription eventSub, CollectionDAO dao) {
super(eventSub, dao);
super(eventSub);
if (eventSub.getSubscriptionType() == G_CHAT_WEBHOOK) {
this.daoCollection = dao;
this.webhook = JsonUtils.convertValue(eventSub.getSubscriptionConfig(), Webhook.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class GenericPublisher extends SubscriptionPublisher {
private final CollectionDAO daoCollection;

public GenericPublisher(EventSubscription eventSub, CollectionDAO dao) {
super(eventSub, dao);
super(eventSub);
if (eventSub.getSubscriptionType() == GENERIC_WEBHOOK) {
this.daoCollection = dao;
this.webhook = JsonUtils.convertValue(eventSub.getSubscriptionConfig(), Webhook.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class MSTeamsPublisher extends SubscriptionPublisher {
private final CollectionDAO daoCollection;

public MSTeamsPublisher(EventSubscription eventSub, CollectionDAO dao) {
super(eventSub, dao);
super(eventSub);
if (eventSub.getSubscriptionType() == MS_TEAMS_WEBHOOK) {
this.daoCollection = dao;
this.webhook = JsonUtils.convertValue(eventSub.getSubscriptionConfig(), Webhook.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class SlackEventPublisher extends SubscriptionPublisher {
private final CollectionDAO daoCollection;

public SlackEventPublisher(EventSubscription eventSub, CollectionDAO dao) {
super(eventSub, dao);
super(eventSub);
if (eventSub.getSubscriptionType() == SLACK_WEBHOOK) {
this.daoCollection = dao;
this.webhook = JsonUtils.convertValue(eventSub.getSubscriptionConfig(), Webhook.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ void applyTag(

default List<TagLabel> getTags(String targetFQN) {
List<TagLabel> tags = getTagsInternal(FullyQualifiedName.buildHash(targetFQN));
tags.forEach(tagLabel -> tagLabel.setDescription(TagLabelCache.getInstance().getDescription(tagLabel)));
tags.forEach(tagLabel -> tagLabel.setDescription(TagLabelCache.getDescription(tagLabel)));
return tags;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ void checkMutuallyExclusive(List<TagLabel> tagLabels) {
// When two tags have the same parent that is mutuallyExclusive, then throw an error
String parentFqn = FullyQualifiedName.getParentFQN(tagLabel.getTagFQN());
TagLabel stored = map.put(parentFqn, tagLabel);
if (stored != null && TagLabelCache.getInstance().mutuallyExclusive(tagLabel)) {
if (stored != null && TagLabelCache.mutuallyExclusive(tagLabel)) {
throw new IllegalArgumentException(CatalogExceptionMessage.mutuallyExclusiveLabels(tagLabel, stored));
}
}
Expand Down Expand Up @@ -1598,7 +1598,7 @@ public EntityUpdater(T original, T updated, Operation operation) {
this.updatingUser =
updated.getUpdatedBy().equalsIgnoreCase(ADMIN_USER_NAME)
? new User().withName(ADMIN_USER_NAME).withIsAdmin(true)
: SubjectCache.getInstance().getSubjectContext(updated.getUpdatedBy()).getUser();
: SubjectCache.getSubjectContext(updated.getUpdatedBy()).getUser();
}

/** Compare original and updated entities and perform updates. Update the entity version and track changes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ public Thread create(Thread thread) throws IOException {
// Validate about data entity is valid and get the owner for that entity
EntityLink about = EntityLink.parse(thread.getAbout());
EntityRepository<?> repository = Entity.getEntityRepository(about.getEntityType());
String field = "owner";
if (!repository.supportsOwner) {
field = "id";
}
String field = repository.supportsOwner ? "owner" : "";
EntityInterface aboutEntity = Entity.getEntity(about, field, ALL);
thread.withEntityId(aboutEntity.getId()); // Add entity id to thread
return createThread(thread, about, aboutEntity.getOwner());
Expand All @@ -139,7 +136,7 @@ public Thread create(Thread thread) throws IOException {
private Thread createThread(Thread thread, EntityLink about, EntityReference entityOwner)
throws JsonProcessingException {
// Validate user creating thread
User createdByUser = SubjectCache.getInstance().getUser(thread.getCreatedBy());
User createdByUser = SubjectCache.getUser(thread.getCreatedBy());

if (thread.getType() == ThreadType.Task) {
thread.withTask(thread.getTask().withId(getNextTaskId())); // Assign taskId for a task
Expand Down Expand Up @@ -326,7 +323,7 @@ private void storeMentions(Thread thread, String message) {
@Transaction
public Thread addPostToThread(String id, Post post, String userName) throws IOException {
// Validate the user posting the message
User fromUser = SubjectCache.getInstance().getUser(post.getFrom());
User fromUser = SubjectCache.getUser(post.getFrom());

// Update the thread with the new post
Thread thread = EntityUtil.validate(id, dao.feedDAO().findById(id), Thread.class);
Expand Down Expand Up @@ -387,10 +384,6 @@ public DeleteResponse<Thread> deleteThread(Thread thread, String deletedByUser)
return new DeleteResponse<>(thread, RestUtil.ENTITY_DELETED);
}

public EntityReference getOwnerReference(String username) {
return dao.userDAO().findEntityByName(EntityInterfaceUtil.quoteName(username)).getEntityReference();
}

@Transaction
public ThreadCount getThreadsCount(FeedFilter filter, String link) {
List<List<String>> result;
Expand Down Expand Up @@ -475,7 +468,7 @@ public ResultList<Thread> list(FeedFilter filter, String link, int limitPosts, S
total = filteredThreads.getTotalCount();
} else {
// Only data assets are added as about
User user = userId != null ? SubjectCache.getInstance().getUserById(userId) : null;
User user = userId != null ? SubjectCache.getUserById(userId) : null;
List<String> teamNameHash = getTeamNames(user);
String userNameHash = getUserNameHash(user);
List<String> jsons =
Expand Down Expand Up @@ -631,7 +624,7 @@ public final PatchResponse<Thread> patchThread(UriInfo uriInfo, UUID id, String
public void checkPermissionsForResolveTask(Thread thread, boolean closeTask, SecurityContext securityContext)
throws IOException {
String userName = securityContext.getUserPrincipal().getName();
User user = SubjectCache.getInstance().getUser(userName);
User user = SubjectCache.getUser(userName);
EntityLink about = EntityLink.parse(thread.getAbout());
EntityReference aboutRef = EntityUtil.validateEntityLink(about);
if (Boolean.TRUE.equals(user.getIsAdmin())) {
Expand Down Expand Up @@ -830,7 +823,7 @@ private Thread populateAssignees(Thread thread) {

/** Return the tasks created by or assigned to the user. */
private FilteredThreads getTasksOfUser(FeedFilter filter, String userId, int limit) throws IOException {
String username = SubjectCache.getInstance().getUserById(userId).getName();
String username = SubjectCache.getUserById(userId).getName();
List<String> teamIds = getTeamIds(userId);
List<String> userTeamJsonPostgres = getUserTeamJsonPostgres(userId, teamIds);
String userTeamJsonMysql = getUserTeamJsonMysql(userId, teamIds);
Expand All @@ -845,7 +838,7 @@ private FilteredThreads getTasksOfUser(FeedFilter filter, String userId, int lim

/** Return the tasks created by the user. */
private FilteredThreads getTasksAssignedBy(FeedFilter filter, String userId, int limit) throws IOException {
String username = SubjectCache.getInstance().getUserById(userId).getName();
String username = SubjectCache.getUserById(userId).getName();
List<String> jsons = dao.feedDAO().listTasksAssigned(username, limit, filter.getCondition());
List<Thread> threads = JsonUtils.readObjects(jsons, Thread.class);
int totalCount = dao.feedDAO().listCountTasksAssignedBy(username, filter.getCondition(false));
Expand All @@ -869,7 +862,7 @@ private FilteredThreads getThreadsByOwner(FeedFilter filter, String userId, int
/** Returns the threads where the user or the team they belong to were mentioned by other users with @mention. */
private FilteredThreads getThreadsByMentions(FeedFilter filter, String userId, int limit) throws IOException {

User user = SubjectCache.getInstance().getUserById(userId);
User user = SubjectCache.getUserById(userId);
String userNameHash = getUserNameHash(user);
// Return the threads where the user or team was mentioned
List<String> teamNamesHash = getTeamNames(user);
Expand All @@ -891,7 +884,7 @@ private FilteredThreads getThreadsByMentions(FeedFilter filter, String userId, i
private List<String> getTeamIds(String userId) {
List<String> teamIds = null;
if (userId != null) {
User user = SubjectCache.getInstance().getUserById(userId);
User user = SubjectCache.getUserById(userId);
teamIds = listOrEmpty(user.getTeams()).stream().map(ref -> ref.getId().toString()).collect(Collectors.toList());
}
return nullOrEmpty(teamIds) ? List.of(StringUtils.EMPTY) : teamIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void prepare(Policy policy) throws IOException {
public void storeEntity(Policy policy, boolean update) throws IOException {
store(policy, update);
if (update) {
PolicyCache.getInstance().invalidatePolicy(policy.getId());
PolicyCache.invalidatePolicy(policy.getId());
}
}

Expand All @@ -102,7 +102,7 @@ protected void preDelete(Policy entity) {
@Override
protected void cleanup(Policy policy) throws IOException {
super.cleanup(policy);
PolicyCache.getInstance().invalidatePolicy(policy.getId());
PolicyCache.invalidatePolicy(policy.getId());
}

public void validateRules(Policy policy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void storeEntity(Role role, boolean update) throws IOException {
role.withPolicies(null);
store(role, update);
if (update) {
RoleCache.getInstance().invalidateRole(role.getId());
RoleCache.invalidateRole(role.getId());
}
role.withPolicies(policies);
}
Expand Down Expand Up @@ -120,7 +120,7 @@ protected void preDelete(Role entity) {
@Override
protected void cleanup(Role role) throws IOException {
super.cleanup(role);
RoleCache.getInstance().invalidateRole(role.getId());
RoleCache.invalidateRole(role.getId());
}

/** Handles entity updated from PUT and POST operation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void updateSetting(Settings setting) {
}
dao.insertSettings(setting.getConfigType().toString(), JsonUtils.pojoToJson(setting.getConfigValue()));
// Invalidate Cache
SettingsCache.getInstance().invalidateSettings(setting.getConfigType().value());
SettingsCache.invalidateSettings(setting.getConfigType().value());
} catch (Exception ex) {
LOG.error("Failing in Updating Setting.", ex);
throw new CustomExceptionMessage(Response.Status.INTERNAL_SERVER_ERROR, ex.getMessage());
Expand Down
Loading

0 comments on commit b7e3242

Please sign in to comment.