diff --git a/CHANGELOG.md b/CHANGELOG.md
index f1866ea07a352..cea3d9406f75f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)
+- Add runAs(Subject subject) to Client interface ([#16976](https://github.com/opensearch-project/OpenSearch/pull/16976))
### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java
index 322b435bdf35c..5ac3e54580037 100644
--- a/server/src/main/java/org/opensearch/client/Client.java
+++ b/server/src/main/java/org/opensearch/client/Client.java
@@ -91,6 +91,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
+import org.opensearch.identity.Subject;
import java.util.Map;
@@ -125,6 +126,8 @@ public interface Client extends OpenSearchClient, Releasable {
*/
AdminClient admin();
+ Client runAs(Subject subject);
+
/**
* Index a JSON source associated with a given index.
*
diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java
index f4683ab516cef..b7ac801aabcfa 100644
--- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java
+++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java
@@ -425,6 +425,8 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.xcontent.MediaType;
+import org.opensearch.identity.RunAsSubjectClient;
+import org.opensearch.identity.Subject;
import org.opensearch.threadpool.ThreadPool;
import java.util.Map;
@@ -464,6 +466,11 @@ public final AdminClient admin() {
return admin;
}
+ @Override
+ public final Client runAs(Subject subject) {
+ return new RunAsSubjectClient(this, subject);
+ }
+
@Override
public final ActionFuture execute(
ActionType action,
diff --git a/server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java b/server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java
new file mode 100644
index 0000000000000..8cef60a718604
--- /dev/null
+++ b/server/src/main/java/org/opensearch/identity/RunAsSubjectClient.java
@@ -0,0 +1,56 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.identity;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.action.ActionRequest;
+import org.opensearch.action.ActionType;
+import org.opensearch.client.Client;
+import org.opensearch.client.FilterClient;
+import org.opensearch.common.annotation.InternalApi;
+import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.core.action.ActionResponse;
+
+/**
+ * Implementation of client that will run transport actions in a stashed context and inject the name of the provided
+ * subject into the context.
+ *
+ * @opensearch.internal
+ */
+@InternalApi
+public class RunAsSubjectClient extends FilterClient {
+
+ private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class);
+
+ private final Subject subject;
+
+ public RunAsSubjectClient(Client delegate, Subject subject) {
+ super(delegate);
+ this.subject = subject;
+ }
+
+ @Override
+ protected void doExecute(
+ ActionType action,
+ Request request,
+ ActionListener listener
+ ) {
+ try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) {
+ subject.runAs(() -> {
+ logger.info("Running transport action with subject: {}", subject.getPrincipal().getName());
+ super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore));
+ return null;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/server/src/test/java/org/opensearch/identity/RunAsSubjectClientTests.java b/server/src/test/java/org/opensearch/identity/RunAsSubjectClientTests.java
new file mode 100644
index 0000000000000..e85baa6488396
--- /dev/null
+++ b/server/src/test/java/org/opensearch/identity/RunAsSubjectClientTests.java
@@ -0,0 +1,96 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.identity;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.test.MockLogAppender;
+import org.opensearch.test.OpenSearchSingleNodeTestCase;
+import org.junit.Before;
+
+import java.security.Principal;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RunAsSubjectClientTests extends OpenSearchSingleNodeTestCase {
+
+ private final Subject TEST_SUBJECT = new Subject() {
+ @Override
+ public Principal getPrincipal() {
+ return new NamedPrincipal("testSubject");
+ }
+ };
+
+ @Before
+ public void setup() {
+ client().threadPool().getThreadContext().stashContext(); // start with fresh context
+ }
+
+ public void testThatContextIsRestoredOnActionListenerResponse() throws Exception {
+ try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(RunAsSubjectClient.class))) {
+ mockLogAppender.addExpectation(
+ new MockLogAppender.SeenEventExpectation(
+ "testSubject",
+ "org.opensearch.identity.RunAsSubjectClient",
+ Level.INFO,
+ "Running transport action with subject: testSubject"
+ )
+ );
+
+ client().threadPool().getThreadContext().putHeader("test_header", "foo");
+
+ client().runAs(TEST_SUBJECT).admin().cluster().health(new ClusterHealthRequest(), new ActionListener<>() {
+ @Override
+ public void onResponse(ClusterHealthResponse clusterHealthResponse) {
+ String testHeader = client().threadPool().getThreadContext().getHeader("test_header");
+ assertThat(testHeader, equalTo("foo"));
+
+ mockLogAppender.assertAllExpectationsMatched();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ fail("Expected cluster health action to succeed");
+ }
+ });
+ }
+ }
+
+ public void testThatContextIsRestoredOnActionListenerFailure() throws Exception {
+ try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(RunAsSubjectClient.class))) {
+ mockLogAppender.addExpectation(
+ new MockLogAppender.SeenEventExpectation(
+ "testSubject",
+ "org.opensearch.identity.RunAsSubjectClient",
+ Level.INFO,
+ "Running transport action with subject: testSubject"
+ )
+ );
+ client().threadPool().getThreadContext().putHeader("test_header", "bar");
+
+ client().runAs(TEST_SUBJECT).admin().cluster().health(new ClusterHealthRequest("dne"), new ActionListener<>() {
+ @Override
+ public void onResponse(ClusterHealthResponse clusterHealthResponse) {
+ fail("Expected cluster health action to fail");
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ String testHeader = client().threadPool().getThreadContext().getHeader("test_header");
+ assertThat(testHeader, equalTo("bar"));
+
+ mockLogAppender.assertAllExpectationsMatched();
+ }
+ });
+ }
+ }
+}