Skip to content

Commit

Permalink
Merge pull request #15390 from cdapio/bugfix/CDAP-20871-system-worker…
Browse files Browse the repository at this point in the history
…-capacity-leak

[CDAP-20871] Reset concurrent request count in system worker when returning 429 response
  • Loading branch information
arjan-bal authored Oct 31, 2023
2 parents 6bb2566 + a82d4a6 commit 0203ec5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void run(FullHttpRequest request, HttpResponder responder) {

if (requestProcessedCount.incrementAndGet() > requestLimit) {
responder.sendStatus(HttpResponseStatus.TOO_MANY_REQUESTS);
requestProcessedCount.decrementAndGet();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,58 @@ public void testValidConcurrentRequests() throws Exception {
== HttpResponseStatus.OK.code()) {
okResponse++;
} else if (responses.get(i).get().getResponseCode()
== HttpResponseStatus.TOO_MANY_REQUESTS.code()) {
== HttpResponseStatus.TOO_MANY_REQUESTS.code()) {
conflictResponse++;
}
}
Assert.assertEquals(2, okResponse);
Assert.assertEquals(concurrentRequests, okResponse + conflictResponse);
}

@Test
public void testRepeatedConcurrentRequests() throws Exception {
InetSocketAddress addr = systemWorkerService.getBindAddress();
URI uri = URI.create(
String.format("http://%s:%s", addr.getHostName(), addr.getPort()));

RunnableTaskRequest request = RunnableTaskRequest.getBuilder(
SystemWorkerServiceTest.TestRunnableClass.class.getName())
.withParam("500").build();

String reqBody = GSON.toJson(request);
List<Callable<HttpResponse>> calls = new ArrayList<>();
int concurrentRequests = 6;

for (int i = 0; i < concurrentRequests; i++) {
calls.add(() -> HttpRequests.execute(
HttpRequest.post(uri.resolve("/v3Internal/system/run").toURL())
.withBody(reqBody).build(), new DefaultHttpRequestConfig(false)));
}

Executors.newFixedThreadPool(concurrentRequests).invokeAll(calls);

// Wait for requests to complete and retry them.
Thread.sleep(1000);

int okResponse = 0;
int conflictResponse = 0;
List<Future<HttpResponse>> responses = Executors.newFixedThreadPool(
concurrentRequests).invokeAll(calls);
for (int i = 0; i < concurrentRequests; i++) {
if (responses.get(i).get().getResponseCode()
== HttpResponseStatus.OK.code()) {
okResponse++;
} else if (responses.get(i).get().getResponseCode()
== HttpResponseStatus.TOO_MANY_REQUESTS.code()) {
conflictResponse++;
}
}

Assert.assertEquals(5, okResponse);
Assert.assertEquals(concurrentRequests, okResponse + conflictResponse);
}


@Test
public void testInvalidConcurrentRequests() throws Exception {
InetSocketAddress addr = systemWorkerService.getBindAddress();
Expand Down

0 comments on commit 0203ec5

Please sign in to comment.