Skip to content

Commit

Permalink
limit pool for task
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuangcp committed May 15, 2024
1 parent 0ba7275 commit 42f704d
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ExecutorsThreadPool {
public static ScheduledExecutorService customScheduler = new CusSchedulePool(1, "schedule-pool-");

private void baseType() {
// 创建有缓存功能的线程池
// 创建有缓存功能的线程池 无队列无最大限制 任务会立马创建线程执行,空闲线程1min后回收
ExecutorService a = Executors.newCachedThreadPool();

// 创建具有固定大小的线程池
Expand Down
12 changes: 10 additions & 2 deletions concurrency/src/main/java/thread/pool/RecommendUsePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -22,6 +23,10 @@ public class RecommendUsePool {
* 测试自定义runnable
*/
public static ThreadPoolExecutor taskPool;
/**
* 限制最高并发 批量处理任务
*/
public static ThreadPoolExecutor limitPool;

public static class TrackDiscardPolicy extends ThreadPoolExecutor.DiscardPolicy {
private final AtomicInteger counter = new AtomicInteger();
Expand All @@ -41,8 +46,8 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

public static class Task implements Runnable {

private String id;
private Runnable task;
private final String id;
private final Runnable task;

public Task(String id, Runnable task) {
this.id = id;
Expand All @@ -60,6 +65,9 @@ public void run() {
new LinkedBlockingQueue<>(5), new TrackDiscardPolicy());
taskPool = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(5), new TrackDiscardPolicy());

limitPool = new ThreadPoolExecutor(0, 20,
60L, TimeUnit.SECONDS, new SynchronousQueue<>());
}


Expand Down
104 changes: 102 additions & 2 deletions concurrency/src/test/java/thread/pool/RecommendUsePoolTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package thread.pool;


import com.hellokaton.blade.Blade;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import web.Application;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author Kuangcp
Expand Down Expand Up @@ -44,7 +51,7 @@ public void testTaskPool() throws Exception {
RecommendUsePool.taskPool.execute(new RecommendUsePool.Task("task-" + finalI, () -> {
try {
Thread.sleep(1000);
log.info("run {}", finalI);
log.info("run task-{}", finalI);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -53,4 +60,97 @@ public void testTaskPool() throws Exception {

Thread.currentThread().join(4000);
}

/**
* 测试消费批量任务
* 任务特点:高内存高CPU占用,特殊时段批量创建其他时间较空闲
* 目标:固定并发数情况下平缓消费
* <p>
* 限制内存 -Xmx500m
*/
@Test
public void testBatchTaskPool() throws Exception {
LinkedBlockingQueue<String> shardQueue = new LinkedBlockingQueue<>(100);

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
AtomicInteger counter = new AtomicInteger();
AtomicInteger consumerCnt = new AtomicInteger();

AtomicReference<Semaphore> semaphore = new AtomicReference<>(new Semaphore(2, true));

scheduler.scheduleAtFixedRate(() -> {
try {
log.info("check");
int batch = consumerCnt.incrementAndGet();
for (int i = 0; i < semaphore.get().availablePermits(); i++) {
String task = shardQueue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.nonNull(task)) {
// 并发不安全,也就是判断时条件满足,提交任务时条件不满足,会出现任务被丢弃的情况
// int activeCount = RecommendUsePool.limitPool.getActiveCount();
// int max = RecommendUsePool.limitPool.getMaximumPoolSize();
// if (activeCount == max) {
// log.warn("state {} {}", activeCount, max);
// break;
// }
// log.info("state {} {}", activeCount, max);

semaphore.get().acquire();

// TODO 此处换成MySQL或Redis 实现集群资源跑任务

RecommendUsePool.limitPool.execute(() -> {
try {
byte[] cache = new byte[100 * 1024 * 1024];
TimeUnit.SECONDS.sleep(2 + ThreadLocalRandom.current().nextInt(7));
cache[0] = 2;
cache[45] = 5;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.get().release();
}
log.info("batch={} task={}", batch, task);
});
}
}
} catch (Exception e) {
log.error("", e);
}
}, 3, 3, TimeUnit.SECONDS);

Blade.create()
.listen(33388)
// 创建任务
.get("/create", ctx -> {
Optional<String> c = ctx.request().query("c");
int count = c.map(Integer::parseInt).orElse(10);
int batch = counter.incrementAndGet();
for (int i = 0; i < count; i++) {
try {
shardQueue.put(batch + "-" + i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

ctx.text("add " + count + " queue:" + shardQueue.size());
})
// 调整并发值
.get("/con", ctx -> {
Semaphore cache = semaphore.get();

if (cache.getQueueLength() > 0 || RecommendUsePool.limitPool.getActiveCount() > 0) {
ctx.text("Still Run");
return;
}
Optional<String> c = ctx.request().query("c");
c.map(Integer::parseInt).ifPresent(v -> {
semaphore.set(new Semaphore(v, true));
ctx.text("Set " + v);
});
})
.start(Application.class);

Thread.currentThread().join();
}
}

0 comments on commit 42f704d

Please sign in to comment.