Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring Index Creation for Improved Code Reuse and Consistency #932

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.8.0"
bwcVersionShort = "2.9.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -117,7 +117,7 @@ dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
compileOnly group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
compileOnly group: 'com.google.guava', name: 'guava', version:'32.0.1-jre'
compileOnly group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
implementation group: 'org.javassist', name: 'javassist', version:'3.28.0-GA'
implementation group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
Expand Down Expand Up @@ -63,6 +62,7 @@
import org.opensearch.timeseries.common.exception.InternalFailure;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;

import com.google.common.base.Throwables;

Expand All @@ -77,7 +77,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Client client;
private ThreadPool threadPool;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private AnomalyDetectionIndices anomalyDetectionIndices;
private ADIndexManagement anomalyDetectionIndices;
private ADTaskManager adTaskManager;
private NodeStateManager nodeStateManager;
private ExecuteADResultResponseRecorder recorder;
Expand Down Expand Up @@ -117,7 +117,7 @@ public void setAdTaskManager(ADTaskManager adTaskManager) {
this.adTaskManager = adTaskManager;
}

public void setAnomalyDetectionIndices(AnomalyDetectionIndices anomalyDetectionIndices) {
public void setAnomalyDetectionIndices(ADIndexManagement anomalyDetectionIndices) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
}

Expand Down Expand Up @@ -514,7 +514,7 @@ private void stopAdJobForEndRunException(
);
}

private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
private void stopAdJob(String detectorId, ExecutorFunction function) {
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
ActionListener<GetResponse> listener = ActionListener.wrap(response -> {
if (response.isExists()) {
Expand Down
52 changes: 39 additions & 13 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityColdStarter;
import org.opensearch.ad.ml.HybridThresholdingModel;
Expand Down Expand Up @@ -154,7 +154,6 @@
import org.opensearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.ad.util.Throttler;
Expand All @@ -176,6 +175,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.forecast.model.Forecaster;
import org.opensearch.forecast.settings.ForecastSettings;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
Expand All @@ -194,7 +194,10 @@
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.dataprocessor.LinearUniformImputer;
import org.opensearch.timeseries.function.ThrowingSupplierWrapper;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
Expand Down Expand Up @@ -226,7 +229,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
public static final String AD_BATCH_TASK_THREAD_POOL_NAME = "ad-batch-task-threadpool";
public static final String AD_JOB_TYPE = "opendistro_anomaly_detector";
private static Gson gson;
private AnomalyDetectionIndices anomalyDetectionIndices;
private ADIndexManagement anomalyDetectionIndices;
private AnomalyDetectorRunner anomalyDetectorRunner;
private Client client;
private ClusterService clusterService;
Expand Down Expand Up @@ -333,14 +336,19 @@ public Collection<Object> createComponents(
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
// convert from checked IOException to unchecked RuntimeException
this.anomalyDetectionIndices = ThrowingSupplierWrapper

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing that the supplier would be reused for forecast. Did you want to rename "anomalyDetectionIndices"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a different one for forecast. Their functionalities are different.

.throwingSupplierWrapper(
() -> new ADIndexManagement(
client,
clusterService,
threadPool,
settings,
nodeFilter,
TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES
)
)
.get();
this.clusterService = clusterService;

Imputer imputer = new LinearUniformImputer(true);
Expand Down Expand Up @@ -853,6 +861,9 @@ public List<Setting<?>> getSettings() {

List<Setting<?>> systemSetting = ImmutableList
.of(
// ======================================
// AD settings
// ======================================
// HCAD cache
LegacyOpenDistroAnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.DEDICATED_CACHE_SIZE,
Expand Down Expand Up @@ -894,7 +905,7 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_MULTI_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.AD_INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.AD_INDEX_PRESSURE_HARD_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.AD_MAX_PRIMARY_SHARDS,
// Security
LegacyOpenDistroAnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
Expand Down Expand Up @@ -938,7 +949,22 @@ public List<Setting<?>> getSettings() {
// clean resource
AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR,
// stats/profile API
AnomalyDetectorSettings.MAX_MODEL_SIZE_PER_NODE
AnomalyDetectorSettings.MAX_MODEL_SIZE_PER_NODE,
// ======================================
// Forecast settings
// ======================================
// result index rollover
ForecastSettings.FORECAST_RESULT_HISTORY_MAX_DOCS_PER_SHARD,
ForecastSettings.FORECAST_RESULT_HISTORY_RETENTION_PERIOD,
ForecastSettings.FORECAST_RESULT_HISTORY_ROLLOVER_PERIOD,
// resource usage control
ForecastSettings.FORECAST_MODEL_MAX_SIZE_PERCENTAGE,
// TODO: add validation code
// ForecastSettings.FORECAST_MAX_SINGLE_STREAM_FORECASTERS,
// ForecastSettings.FORECAST_MAX_HC_FORECASTERS,
ForecastSettings.FORECAST_INDEX_PRESSURE_SOFT_LIMIT,
ForecastSettings.FORECAST_INDEX_PRESSURE_HARD_LIMIT,
ForecastSettings.FORECAST_MAX_PRIMARY_SHARDS
);
return unmodifiableList(
Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
Expand All @@ -73,6 +72,7 @@
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfileName;
Expand All @@ -37,7 +37,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -50,11 +49,12 @@
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class ExecuteADResultResponseRecorder {
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);

private AnomalyDetectionIndices anomalyDetectionIndices;
private ADIndexManagement anomalyDetectionIndices;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename anomalyDetectionIndices , to be clear.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a different one for forecast. Their functionalities are different.

private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ADTaskManager adTaskManager;
private DiscoveryNodeFilterer nodeFilter;
Expand All @@ -65,7 +65,7 @@ public class ExecuteADResultResponseRecorder {
private int rcfMinSamples;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
ADIndexManagement anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/NodeStateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.common.exception.EndRunException;
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -60,6 +59,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;

/**
* Migrate AD data to support backward compatibility.
Expand All @@ -71,14 +71,14 @@ public class ADDataMigrator {
private final Client client;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;
private final AnomalyDetectionIndices detectionIndices;
private final ADIndexManagement detectionIndices;
private final AtomicBoolean dataMigrated;

public ADDataMigrator(
Client client,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
AnomalyDetectionIndices detectionIndices
ADIndexManagement detectionIndices

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detectionIndices is a good name which is agnostic to "what".

) {
this.client = client;
this.clusterService = clusterService;
Expand All @@ -94,12 +94,12 @@ public void migrateData() {
if (!dataMigrated.getAndSet(true)) {
logger.info("Start migrating AD data");

if (!detectionIndices.doesAnomalyDetectorJobIndexExist()) {
if (!detectionIndices.doesJobIndexExist()) {
logger.info("AD job index doesn't exist, no need to migrate");
return;
}

if (detectionIndices.doesDetectorStateIndexExist()) {
if (detectionIndices.doesStateIndexExist()) {
migrateDetectorInternalStateToRealtimeTask();
} else {
// If detection index doesn't exist, create index and backfill realtime task.
Expand Down Expand Up @@ -179,7 +179,7 @@ public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detec
}
String jobId = job.getName();

AnomalyDetectorFunction createRealtimeTaskFunction = () -> {
ExecutorFunction createRealtimeTaskFunction = () -> {
GetRequest getRequest = new GetRequest(DETECTION_STATE_INDEX, jobId);
client.get(getRequest, ActionListener.wrap(r -> {
if (r != null && r.isExists()) {
Expand All @@ -204,7 +204,7 @@ public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detec

private void checkIfRealtimeTaskExistsAndBackfill(
AnomalyDetectorJob job,
AnomalyDetectorFunction createRealtimeTaskFunction,
ExecutorFunction createRealtimeTaskFunction,
ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs,
boolean migrateAll
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -28,6 +27,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.google.common.annotations.VisibleForTesting;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HashRing.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ml.SingleStreamModelIdMapper;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
Expand All @@ -54,6 +53,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.google.common.collect.Sets;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HourlyCron.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.ad.transport.CronAction;
import org.opensearch.ad.transport.CronRequest;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class HourlyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(HourlyCron.class);
Expand Down
Loading