Skip to content

Commit

Permalink
close apache#2617
Browse files Browse the repository at this point in the history
  • Loading branch information
haohao0103 committed Aug 6, 2024
1 parent 86e1003 commit f23f1ca
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public CachedSchemaTransactionV2(MetaDriver metaDriver,
attachment = this.idCache.attachment(new SchemaCaches<>(acSize));
}
this.arrayCaches = attachment;
this.listenChanges();
// this.listenChanges();
}

private static Id generateId(HugeType type, Id id) {
Expand Down Expand Up @@ -107,44 +107,11 @@ private void listenChanges() {
};
this.graphParams().loadGraphStore().provider().listen(this.storeEventListener);

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received schema cache event: {}",
this.graph(), event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALID.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Id.class);
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);

id = generateId(type, id);
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
this.idCache.invalidate(id);

// Invalidate name cache
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(),
schema.name());
this.nameCache.invalidate(prefixedName);
}
this.resetCachedAll(type);
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
this.clearCache(false);
return true;
}
return false;
};
EventHub schemaEventHub = this.graphParams().schemaEventHub();
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}
// Listen cache event: "cache.clear", ...
MetaManager.instance().listenSchemaCacheClear((e) -> {
this.clearCache(true);
LOG.debug("Graph {} Schema cache cleared", this.graph());
});
}

public void clearCache(boolean notify) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -59,8 +61,13 @@

import com.google.common.collect.ImmutableMap;

import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class MetaManager {

private static final Logger LOG = Log.logger(MetaManager.class);
private static final String SCHEMA_CLEAR_KEY = "SCHEMA_CLEAR_KEY";
public static final String META_PATH_DELIMITER = "/";
public static final String META_PATH_JOIN = "-";

Expand Down Expand Up @@ -130,6 +137,7 @@ public class MetaManager {
private KafkaMetaManager kafkaMetaManager;
private SchemaTemplateMetaManager schemaTemplateManager;
private LockMetaManager lockMetaManager;
private ConcurrentHashMap<String, AtomicBoolean> listenerInitialized;

private MetaManager() {
}
Expand Down Expand Up @@ -187,6 +195,7 @@ private void initManagers(String cluster) {
this.kafkaMetaManager = new KafkaMetaManager(this.metaDriver, cluster);
this.schemaTemplateManager = new SchemaTemplateMetaManager(this.metaDriver, cluster);
this.lockMetaManager = new LockMetaManager(this.metaDriver, cluster);
this.listenerInitialized = new ConcurrentHashMap<>();
}

public <T> void listenGraphSpaceAdd(Consumer<T> consumer) {
Expand Down Expand Up @@ -242,7 +251,19 @@ public <T> void listenGraphClear(Consumer<T> consumer) {
}

public <T> void listenSchemaCacheClear(Consumer<T> consumer) {
this.graphMetaManager.listenSchemaCacheClear(consumer);
if (isListenerInitialized(SCHEMA_CLEAR_KEY)) {
this.graphMetaManager.listenSchemaCacheClear(consumer);
LOG.debug("Schema cache clear listener registered");
}
}

public Boolean isListenerInitialized(String listenerName) {
listenerInitialized.putIfAbsent(listenerName, new AtomicBoolean(false));
AtomicBoolean flag = listenerInitialized.get(listenerName);
if (!flag.get()) {
return flag.compareAndSet(false, true);
}
return false;
}

public <T> void listenGraphCacheClear(Consumer<T> consumer) {
Expand Down

0 comments on commit f23f1ca

Please sign in to comment.