Skip to content

Commit

Permalink
Use SQL thread pool
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 4, 2024
1 parent 1e489b5 commit f9fe064
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
Expand All @@ -34,6 +35,9 @@
* plugin.
*/
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
// Share SQL plugin thread pool
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);

private static ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();
Expand Down Expand Up @@ -96,7 +100,7 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
LOGGER.error(throwable);
}
};
threadPool.generic().submit(runnable);
threadPool.executor(ASYNC_QUERY_THREAD_POOL_NAME).submit(runnable);
}

void doRefresh(ScheduledAsyncQueryJobRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.generic()).submit(captor.capture());
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
runnable.run();
Expand Down Expand Up @@ -143,7 +145,8 @@ public void testDoRefreshThrowsException() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.generic()).submit(captor.capture());
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
runnable.run();
Expand Down

0 comments on commit f9fe064

Please sign in to comment.