Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] deprecated job-metadata-index #2340

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ configurations.all {
resolutionStrategy.force "org.apache.httpcomponents:httpcore:4.4.13"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,9 @@ private DataSourceServiceImpl createDataSourceService() {
private AsyncQueryExecutorService createAsyncQueryExecutorService(
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier,
SparkExecutionEngineConfig sparkExecutionEngineConfig) {
StateStore stateStore = new StateStore(client, clusterService);
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
new OpensearchAsyncQueryJobMetadataStorageService(stateStore);
EMRServerlessClient emrServerlessClient =
createEMRServerlessClient(sparkExecutionEngineConfig.getRegion());
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
Expand All @@ -319,8 +320,7 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(
new StateStore(client, clusterService), emrServerlessClient, pluginSettings));
new SessionManager(stateStore, emrServerlessClient, pluginSettings));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
dispatchQueryResponse.getQueryId(),
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getJobId(), dispatchQueryResponse.getSessionId());
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,166 +7,31 @@

package org.opensearch.sql.spark.asyncquery;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createJobMetaData;

import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.StateStore;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

public static final String JOB_METADATA_INDEX = ".ql-job-metadata";
private static final String JOB_METADATA_INDEX_MAPPING_FILE_NAME =
"job-metadata-index-mapping.yml";
private static final String JOB_METADATA_INDEX_SETTINGS_FILE_NAME =
"job-metadata-index-settings.yml";
private static final Logger LOG = LogManager.getLogger();
private final Client client;
private final ClusterService clusterService;

/**
* This class implements JobMetadataStorageService interface using OpenSearch as underlying
* storage.
*
* @param client opensearch NodeClient.
* @param clusterService ClusterService.
*/
public OpensearchAsyncQueryJobMetadataStorageService(
Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}
private final StateStore stateStore;

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) {
createJobMetadataIndex();
}
IndexRequest indexRequest = new IndexRequest(JOB_METADATA_INDEX);
indexRequest.id(asyncQueryJobMetadata.getJobId());
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<IndexResponse> indexResponseActionFuture;
IndexResponse indexResponse;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
indexRequest.source(AsyncQueryJobMetadata.convertToXContent(asyncQueryJobMetadata));
indexResponseActionFuture = client.index(indexRequest);
indexResponse = indexResponseActionFuture.actionGet();
} catch (Exception e) {
throw new RuntimeException(e);
}

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.debug("JobMetadata : {} successfully created", asyncQueryJobMetadata.getJobId());
} else {
throw new RuntimeException(
"Saving job metadata information failed with result : "
+ indexResponse.getResult().getLowercase());
}
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
createJobMetaData(stateStore, queryId.getDataSourceName()).apply(asyncQueryJobMetadata);
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
if (!this.clusterService.state().routingTable().hasIndex(JOB_METADATA_INDEX)) {
createJobMetadataIndex();
return Optional.empty();
}
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream()
.findFirst();
}

private void createJobMetadataIndex() {
try {
InputStream mappingFileStream =
OpensearchAsyncQueryJobMetadataStorageService.class
.getClassLoader()
.getResourceAsStream(JOB_METADATA_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =
OpensearchAsyncQueryJobMetadataStorageService.class
.getClassLoader()
.getResourceAsStream(JOB_METADATA_INDEX_SETTINGS_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(JOB_METADATA_INDEX);
createIndexRequest
.mapping(IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML)
.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest);
}
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();
if (createIndexResponse.isAcknowledged()) {
LOG.info("Index: {} creation Acknowledged", JOB_METADATA_INDEX);
} else {
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
throw new RuntimeException(
"Internal server error while creating"
+ JOB_METADATA_INDEX
+ " index:: "
+ e.getMessage());
}
}

private List<AsyncQueryJobMetadata> searchInJobMetadataIndex(QueryBuilder query) {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(JOB_METADATA_INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(query);
searchSourceBuilder.size(1);
searchRequest.source(searchSourceBuilder);
// https://github.com/opensearch-project/sql/issues/1801.
searchRequest.preference("_primary_first");
ActionFuture<SearchResponse> searchResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
searchResponseActionFuture = client.search(searchRequest);
}
SearchResponse searchResponse = searchResponseActionFuture.actionGet();
if (searchResponse.status().getStatus() != 200) {
throw new RuntimeException(
"Fetching job metadata information failed with status : " + searchResponse.status());
} else {
List<AsyncQueryJobMetadata> list = new ArrayList<>();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
String sourceAsString = searchHit.getSourceAsString();
AsyncQueryJobMetadata asyncQueryJobMetadata;
try {
asyncQueryJobMetadata = AsyncQueryJobMetadata.toJobMetadata(sourceAsString);
} catch (IOException e) {
throw new RuntimeException(e);
}
list.add(asyncQueryJobMetadata);
}
return list;
}
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
AsyncQueryId queryId = new AsyncQueryId(qid);
return StateStore.getJobMetaData(stateStore, queryId.getDataSourceName())
.apply(queryId.docId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.sql.spark.utils.IDUtils.decode;
import static org.opensearch.sql.spark.utils.IDUtils.encode;

import lombok.Data;

/** Async query id. */
@Data
public class AsyncQueryId {
private final String id;

public static AsyncQueryId newAsyncQueryId(String datasourceName) {
return new AsyncQueryId(encode(datasourceName));
}

public String getDataSourceName() {
return decode(id);
}

/** OpenSearch DocId. */
public String docId() {
return "qid" + id;
}

@Override
public String toString() {
return "asyncQueryId=" + id;

Check warning on line 33 in spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryId.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryId.java#L33

Added line #L33 was not covered by tests
}
}
Loading
Loading