Skip to content

Commit

Permalink
rebuild and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
TheFatRatre committed Nov 21, 2024
1 parent 8457c52 commit 69ec089
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 34 deletions.
10 changes: 10 additions & 0 deletions common/src/main/java/org/dromara/dynamictp/common/em/JreEnum.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public enum JreEnum {

public static final String DEFAULT_JAVA_VERSION = "1.8";

private static final int JRE_VERSION_OFFSET = 8;

/**
* get current JRE version
*
Expand All @@ -72,6 +74,14 @@ public static JreEnum currentVersion() {
return VERSION;
}

/**
* get current JRE integer version
* @return JRE integer version
*/
public static int currentIntVersion() {
return JreEnum.currentVersion().ordinal() + JRE_VERSION_OFFSET;
}

/**
* is current version
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ public class ExecutorStats extends Metrics {
*/
private String executorAliasName;

/**
* 线程池名字
*/
@Deprecated
private String poolName;

/**
* 线程池别名
*/
@Deprecated
private String poolAliasName;

/**
* 核心线程数
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps pr
TpMainFields oldFields = ExecutorConverter.toMainFields(executorWrapper);
doRefresh(executorWrapper, props);
TpMainFields newFields = ExecutorConverter.toMainFields(executorWrapper);
if (oldFields.equals(newFields) && !executorWrapper.isVirtualThreadExecutor()) {
if (oldFields.equals(newFields)) {
log.debug("DynamicTp refresh, main properties of [{}] have not changed.",
executorWrapper.getThreadPoolName());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
ExecutorStats executorStats = convertCommon(executor);
executorStats.setExecutorName(wrapper.getThreadPoolName());
executorStats.setExecutorAliasName(wrapper.getThreadPoolAliasName());
executorStats.setPoolName(wrapper.getThreadPoolName());
executorStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
executorStats.setRunTimeoutCount(provider.getRunTimeoutCount());
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ public class JMXCollector extends AbstractCollector {

@Override
public void collect(ExecutorStats threadPoolStats) {
if (GAUGE_CACHE.containsKey(threadPoolStats.getPoolName())) {
ExecutorStats poolStats = (ExecutorStats) GAUGE_CACHE.get(threadPoolStats.getPoolName());
if (GAUGE_CACHE.containsKey(threadPoolStats.getExecutorName())) {
ExecutorStats poolStats = (ExecutorStats) GAUGE_CACHE.get(threadPoolStats.getExecutorName());
BeanCopierUtil.copyProperties(threadPoolStats, poolStats);
} else {
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getPoolName());
ObjectName name = new ObjectName(DTP_METRIC_NAME_PREFIX + ":name=" + threadPoolStats.getExecutorName());
ThreadPoolStatsJMX stats = new ThreadPoolStatsJMX(threadPoolStats);
server.registerMBean(stats, name);
} catch (JMException e) {
log.error("collect thread pool stats error", e);
}
GAUGE_CACHE.put(threadPoolStats.getPoolName(), threadPoolStats);
GAUGE_CACHE.put(threadPoolStats.getExecutorName(), threadPoolStats);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private Object registerAndReturnCommon(Object bean, String beanName) {
} else {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {
if (beanDefinition.getBeanClassName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
if (VirtualThreadExecutorProxy.class.getName().equals("org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy")) {
return doRegisterAndReturnCommon(bean, beanName);
}
return bean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.ALLOW_CORE_THREAD_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWAIT_TERMINATION_SECONDS;
Expand Down Expand Up @@ -74,7 +75,9 @@
@Slf4j
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {

private static final Integer JDK_VERSION_21_OFFSET = 21 - 8;
private static final Integer JRE_VERSION_21 = 21;

private static final String VIRTUAL_THREAD_EXECUTOR_TYPE = "virtual";

private Environment environment;

Expand Down Expand Up @@ -104,6 +107,7 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
try {
args = buildConstructorArgs(executorTypeClass, e);
} catch (UnsupportedOperationException exception) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", e.getThreadPoolName());
return;
}
BeanRegistrationUtil.register(registry, e.getThreadPoolName(), executorTypeClass, propertyValues, args);
Expand All @@ -115,15 +119,17 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
propertyValues.put(THREAD_POOL_NAME, props.getThreadPoolName());
propertyValues.put(THREAD_POOL_ALIAS_NAME, props.getThreadPoolAliasName());

propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
if (!props.getExecutorType().equals(VIRTUAL_THREAD_EXECUTOR_TYPE)) {
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
}

val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
propertyValues.put(NOTIFY_ITEMS, notifyItems);
Expand All @@ -144,13 +150,13 @@ private Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) th
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
int jdkVersion = JreEnum.currentVersion().ordinal();
if (jdkVersion < JDK_VERSION_21_OFFSET) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", props.getThreadPoolName());
int jreVersion = JreEnum.currentIntVersion();
if (jreVersion < JRE_VERSION_21) {
throw new UnsupportedOperationException();
}
ThreadFactory factory = Thread.ofVirtual().name(props.getThreadPoolName()).factory();
return new Object[]{
Executors.newVirtualThreadPerTaskExecutor()
Executors.newThreadPerTaskExecutor(factory)
};
} else {
taskQueue = buildLbq(props.getQueueType(),
Expand Down

0 comments on commit 69ec089

Please sign in to comment.