-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring Index Creation for Improved Code Reuse and Consistency #932
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ | |
import org.opensearch.action.update.UpdateResponse; | ||
import org.opensearch.ad.constant.ADCommonMessages; | ||
import org.opensearch.ad.indices.ADIndex; | ||
import org.opensearch.ad.indices.AnomalyDetectionIndices; | ||
import org.opensearch.ad.indices.ADIndexManagement; | ||
import org.opensearch.ad.model.AnomalyDetector; | ||
import org.opensearch.ad.model.AnomalyResult; | ||
import org.opensearch.ad.model.DetectorProfileName; | ||
|
@@ -37,7 +37,6 @@ | |
import org.opensearch.ad.transport.RCFPollingAction; | ||
import org.opensearch.ad.transport.RCFPollingRequest; | ||
import org.opensearch.ad.transport.handler.AnomalyIndexHandler; | ||
import org.opensearch.ad.util.DiscoveryNodeFilterer; | ||
import org.opensearch.ad.util.ExceptionUtil; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
|
@@ -50,11 +49,12 @@ | |
import org.opensearch.timeseries.common.exception.TimeSeriesException; | ||
import org.opensearch.timeseries.model.FeatureData; | ||
import org.opensearch.timeseries.model.IntervalTimeConfiguration; | ||
import org.opensearch.timeseries.util.DiscoveryNodeFilterer; | ||
|
||
public class ExecuteADResultResponseRecorder { | ||
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class); | ||
|
||
private AnomalyDetectionIndices anomalyDetectionIndices; | ||
private ADIndexManagement anomalyDetectionIndices; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename anomalyDetectionIndices , to be clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will have a different one for forecast. Their functionalities are different. |
||
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler; | ||
private ADTaskManager adTaskManager; | ||
private DiscoveryNodeFilterer nodeFilter; | ||
|
@@ -65,7 +65,7 @@ public class ExecuteADResultResponseRecorder { | |
private int rcfMinSamples; | ||
|
||
public ExecuteADResultResponseRecorder( | ||
AnomalyDetectionIndices anomalyDetectionIndices, | ||
ADIndexManagement anomalyDetectionIndices, | ||
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler, | ||
ADTaskManager adTaskManager, | ||
DiscoveryNodeFilterer nodeFilter, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,14 +37,13 @@ | |
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.ad.constant.ADCommonName; | ||
import org.opensearch.ad.indices.AnomalyDetectionIndices; | ||
import org.opensearch.ad.indices.ADIndexManagement; | ||
import org.opensearch.ad.model.ADTask; | ||
import org.opensearch.ad.model.ADTaskState; | ||
import org.opensearch.ad.model.ADTaskType; | ||
import org.opensearch.ad.model.AnomalyDetector; | ||
import org.opensearch.ad.model.AnomalyDetectorJob; | ||
import org.opensearch.ad.model.DetectorInternalState; | ||
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction; | ||
import org.opensearch.ad.util.ExceptionUtil; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.service.ClusterService; | ||
|
@@ -60,6 +59,7 @@ | |
import org.opensearch.search.builder.SearchSourceBuilder; | ||
import org.opensearch.timeseries.common.exception.ResourceNotFoundException; | ||
import org.opensearch.timeseries.constant.CommonName; | ||
import org.opensearch.timeseries.function.ExecutorFunction; | ||
|
||
/** | ||
* Migrate AD data to support backward compatibility. | ||
|
@@ -71,14 +71,14 @@ public class ADDataMigrator { | |
private final Client client; | ||
private final ClusterService clusterService; | ||
private final NamedXContentRegistry xContentRegistry; | ||
private final AnomalyDetectionIndices detectionIndices; | ||
private final ADIndexManagement detectionIndices; | ||
private final AtomicBoolean dataMigrated; | ||
|
||
public ADDataMigrator( | ||
Client client, | ||
ClusterService clusterService, | ||
NamedXContentRegistry xContentRegistry, | ||
AnomalyDetectionIndices detectionIndices | ||
ADIndexManagement detectionIndices | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. detectionIndices is a good name which is agnostic to "what". |
||
) { | ||
this.client = client; | ||
this.clusterService = clusterService; | ||
|
@@ -94,12 +94,12 @@ public void migrateData() { | |
if (!dataMigrated.getAndSet(true)) { | ||
logger.info("Start migrating AD data"); | ||
|
||
if (!detectionIndices.doesAnomalyDetectorJobIndexExist()) { | ||
if (!detectionIndices.doesJobIndexExist()) { | ||
logger.info("AD job index doesn't exist, no need to migrate"); | ||
return; | ||
} | ||
|
||
if (detectionIndices.doesDetectorStateIndexExist()) { | ||
if (detectionIndices.doesStateIndexExist()) { | ||
migrateDetectorInternalStateToRealtimeTask(); | ||
} else { | ||
// If detection index doesn't exist, create index and backfill realtime task. | ||
|
@@ -179,7 +179,7 @@ public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detec | |
} | ||
String jobId = job.getName(); | ||
|
||
AnomalyDetectorFunction createRealtimeTaskFunction = () -> { | ||
ExecutorFunction createRealtimeTaskFunction = () -> { | ||
GetRequest getRequest = new GetRequest(DETECTION_STATE_INDEX, jobId); | ||
client.get(getRequest, ActionListener.wrap(r -> { | ||
if (r != null && r.isExists()) { | ||
|
@@ -204,7 +204,7 @@ public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detec | |
|
||
private void checkIfRealtimeTaskExistsAndBackfill( | ||
AnomalyDetectorJob job, | ||
AnomalyDetectorFunction createRealtimeTaskFunction, | ||
ExecutorFunction createRealtimeTaskFunction, | ||
ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs, | ||
boolean migrateAll | ||
) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am guessing that the supplier would be reused for forecast. Did you want to rename "anomalyDetectionIndices"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have a different one for forecast. Their functionalities are different.