From 9b3be74ae139b9cefe746f18351d5bdf2080e520 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 24 Aug 2023 17:13:03 +0000 Subject: [PATCH] Converts .opendistro-job-scheduler-lock into a system index Signed-off-by: Joshua Palis --- .../jobscheduler/spi/utils/LockService.java | 116 +++++++++++------- .../jobscheduler/JobSchedulerPlugin.java | 11 +- 2 files changed, 79 insertions(+), 48 deletions(-) diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 0ddd4a79..4770a07f 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -8,6 +8,7 @@ */ package org.opensearch.jobscheduler.spi.utils; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -44,7 +45,7 @@ public final class LockService { private static final Logger logger = LogManager.getLogger(LockService.class); - private static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock"; + public static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock"; private final Client client; private final ClusterService clusterService; @@ -77,20 +78,25 @@ public boolean lockIndexExist() { @VisibleForTesting void createLockIndex(ActionListener listener) { - if (lockIndexExist()) { - listener.onResponse(true); - } else { - final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()); - client.admin() - .indices() - .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { - if (exception instanceof ResourceAlreadyExistsException - || exception.getCause() instanceof ResourceAlreadyExistsException) { - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + if (lockIndexExist()) { + listener.onResponse(true); + } else { + final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } @@ -180,7 +186,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) { } private void updateLock(final LockModel updateLock, ActionListener listener) { - try { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME) .id(updateLock.getLockId()) .setIfSeqNo(updateLock.getSeqNo()) @@ -212,11 +218,14 @@ private void updateLock(final LockModel updateLock, ActionListener li } catch (IOException e) { logger.error("IOException occurred updating lock.", e); listener.onResponse(null); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } private void createLock(final LockModel tempLock, ActionListener listener) { - try { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId()) .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) @@ -239,29 +248,37 @@ private void createLock(final LockModel tempLock, ActionListener list } catch (IOException e) { logger.error("IOException occurred creating lock", e); listener.onFailure(e); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); } } public void findLock(final String lockId, ActionListener listener) { - GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); - client.get(getRequest, ActionListener.wrap(response -> { - if (!response.isExists()) { - listener.onResponse(null); - } else { - try { - XContentParser parser = XContentType.JSON.xContent() - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); - parser.nextToken(); - listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); - } catch (IOException e) { - logger.error("IOException occurred finding lock", e); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + parser.nextToken(); + listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); + } catch (IOException e) { + logger.error("IOException occurred finding lock", e); + listener.onResponse(null); + } } - } - }, exception -> { - logger.error("Exception occurred finding lock", exception); - listener.onFailure(exception); - })); + }, exception -> { + logger.error("Exception occurred finding lock", exception); + listener.onFailure(exception); + })); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } } /** @@ -293,19 +310,24 @@ public void release(final LockModel lock, ActionListener listener) { * or not the delete was successful */ public void deleteLock(final String lockId, ActionListener listener) { - DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); - client.delete(deleteRequest, ActionListener.wrap(response -> { - listener.onResponse( - response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND - ); - }, exception -> { - if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { - logger.debug("Index is not found to delete lock. {}", exception.getMessage()); - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); + client.delete(deleteRequest, ActionListener.wrap(response -> { + listener.onResponse( + response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND + ); + }, exception -> { + if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { + logger.debug("Index is not found to delete lock. {}", exception.getMessage()); + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } } /** diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java index f99b8897..96d7eddc 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java @@ -37,10 +37,12 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.jobscheduler.utils.JobDetailsService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.script.ScriptService; @@ -61,7 +63,7 @@ import com.google.common.collect.ImmutableList; -public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin { +public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin { public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler"; public static final String JS_BASE_URI = "/_plugins/_job_scheduler"; @@ -81,6 +83,13 @@ public JobSchedulerPlugin() { this.indexToJobProviders = new HashMap<>(); } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return Collections.singletonList( + new SystemIndexDescriptor(LockService.LOCK_INDEX_NAME, "Stores lock documents used for plugin job execution") + ); + } + @Override public Collection createComponents( Client client,