diff --git a/CHANGELOG.md b/CHANGELOG.md index f1866ea07a352..cea3d9406f75f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909)) - Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881)) - Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760) +- Add runAs(Subject subject) to Client interface ([#16976](https://github.com/opensearch-project/OpenSearch/pull/16976)) ### Deprecated - Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712)) diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index 322b435bdf35c..5ac3e54580037 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -91,6 +91,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.identity.Subject; import java.util.Map; @@ -125,6 +126,8 @@ public interface Client extends OpenSearchClient, Releasable { */ AdminClient admin(); + Client runAs(Subject subject); + /** * Index a JSON source associated with a given index. *

diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index f4683ab516cef..b7ac801aabcfa 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -425,6 +425,8 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.tasks.TaskId; import org.opensearch.core.xcontent.MediaType; +import org.opensearch.identity.RunAsSubjectClient; +import org.opensearch.identity.Subject; import org.opensearch.threadpool.ThreadPool; import java.util.Map; @@ -464,6 +466,11 @@ public final AdminClient admin() { return admin; } + @Override + public final Client runAs(Subject subject) { + return new RunAsSubjectClient(this, subject); + } + @Override public final ActionFuture execute( ActionType action, diff --git a/server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java b/server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java new file mode 100644 index 0000000000000..8cef60a718604 --- /dev/null +++ b/server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionType; +import org.opensearch.client.Client; +import org.opensearch.client.FilterClient; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; + +/** + * Implementation of client that will run transport actions in a stashed context and inject the name of the provided + * subject into the context. + * + * @opensearch.internal + */ +@InternalApi +public class RunAsSubjectClient extends FilterClient { + + private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class); + + private final Subject subject; + + public RunAsSubjectClient(Client delegate, Subject subject) { + super(delegate); + this.subject = subject; + } + + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) { + subject.runAs(() -> { + logger.info("Running transport action with subject: {}", subject.getPrincipal().getName()); + super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore)); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/server/src/test/java/org/opensearch/identity/RunAsSubjectClientTests.java b/server/src/test/java/org/opensearch/identity/RunAsSubjectClientTests.java new file mode 100644 index 0000000000000..e85baa6488396 --- /dev/null +++ b/server/src/test/java/org/opensearch/identity/RunAsSubjectClientTests.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.test.MockLogAppender; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.Before; + +import java.security.Principal; + +import static org.hamcrest.Matchers.equalTo; + +public class RunAsSubjectClientTests extends OpenSearchSingleNodeTestCase { + + private final Subject TEST_SUBJECT = new Subject() { + @Override + public Principal getPrincipal() { + return new NamedPrincipal("testSubject"); + } + }; + + @Before + public void setup() { + client().threadPool().getThreadContext().stashContext(); // start with fresh context + } + + public void testThatContextIsRestoredOnActionListenerResponse() throws Exception { + try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(RunAsSubjectClient.class))) { + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "testSubject", + "org.opensearch.identity.RunAsSubjectClient", + Level.INFO, + "Running transport action with subject: testSubject" + ) + ); + + client().threadPool().getThreadContext().putHeader("test_header", "foo"); + + client().runAs(TEST_SUBJECT).admin().cluster().health(new ClusterHealthRequest(), new ActionListener<>() { + @Override + public void onResponse(ClusterHealthResponse clusterHealthResponse) { + String testHeader = client().threadPool().getThreadContext().getHeader("test_header"); + assertThat(testHeader, equalTo("foo")); + + mockLogAppender.assertAllExpectationsMatched(); + } + + @Override + public void onFailure(Exception e) { + fail("Expected cluster health action to succeed"); + } + }); + } + } + + public void testThatContextIsRestoredOnActionListenerFailure() throws Exception { + try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(RunAsSubjectClient.class))) { + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "testSubject", + "org.opensearch.identity.RunAsSubjectClient", + Level.INFO, + "Running transport action with subject: testSubject" + ) + ); + client().threadPool().getThreadContext().putHeader("test_header", "bar"); + + client().runAs(TEST_SUBJECT).admin().cluster().health(new ClusterHealthRequest("dne"), new ActionListener<>() { + @Override + public void onResponse(ClusterHealthResponse clusterHealthResponse) { + fail("Expected cluster health action to fail"); + } + + @Override + public void onFailure(Exception e) { + String testHeader = client().threadPool().getThreadContext().getHeader("test_header"); + assertThat(testHeader, equalTo("bar")); + + mockLogAppender.assertAllExpectationsMatched(); + } + }); + } + } +}