Skip to content

Commit

Permalink
Converts .opendistro-job-scheduler-lock into a system index
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Aug 25, 2023
1 parent 7021e39 commit 9b3be74
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.jobscheduler.spi.utils;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand Down Expand Up @@ -44,7 +45,7 @@

public final class LockService {
private static final Logger logger = LogManager.getLogger(LockService.class);
private static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock";
public static final String LOCK_INDEX_NAME = ".opendistro-job-scheduler-lock";

private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -77,20 +78,25 @@ public boolean lockIndexExist() {

@VisibleForTesting
void createLockIndex(ActionListener<Boolean> listener) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping());
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping());
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

Expand Down Expand Up @@ -180,7 +186,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) {
}

private void updateLock(final LockModel updateLock, ActionListener<LockModel> listener) {
try {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME)
.id(updateLock.getLockId())
.setIfSeqNo(updateLock.getSeqNo())
Expand Down Expand Up @@ -212,11 +218,14 @@ private void updateLock(final LockModel updateLock, ActionListener<LockModel> li
} catch (IOException e) {
logger.error("IOException occurred updating lock.", e);
listener.onResponse(null);
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

private void createLock(final LockModel tempLock, ActionListener<LockModel> listener) {
try {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId())
.source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
Expand All @@ -239,29 +248,37 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
} catch (IOException e) {
logger.error("IOException occurred creating lock", e);
listener.onFailure(e);
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

public void findLock(final String lockId, ActionListener<LockModel> listener) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onResponse(null);
} else {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onResponse(null);
} else {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString());
parser.nextToken();
listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm()));
} catch (IOException e) {
logger.error("IOException occurred finding lock", e);
listener.onResponse(null);
}
}
}
}, exception -> {
logger.error("Exception occurred finding lock", exception);
listener.onFailure(exception);
}));
}, exception -> {
logger.error("Exception occurred finding lock", exception);
listener.onFailure(exception);
}));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

/**
Expand Down Expand Up @@ -293,19 +310,24 @@ public void release(final LockModel lock, ActionListener<Boolean> listener) {
* or not the delete was successful
*/
public void deleteLock(final String lockId, ActionListener<Boolean> listener) {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND
);
}, exception -> {
if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) {
logger.debug("Index is not found to delete lock. {}", exception.getMessage());
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND
);
}, exception -> {
if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) {
logger.debug("Index is not found to delete lock. {}", exception.getMessage());
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
Expand All @@ -61,7 +63,7 @@

import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {
public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
Expand All @@ -81,6 +83,13 @@ public JobSchedulerPlugin() {
this.indexToJobProviders = new HashMap<>();
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(

Check warning on line 88 in src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java#L88

Added line #L88 was not covered by tests
new SystemIndexDescriptor(LockService.LOCK_INDEX_NAME, "Stores lock documents used for plugin job execution")
);
}

@Override
public Collection<Object> createComponents(
Client client,
Expand Down

0 comments on commit 9b3be74

Please sign in to comment.