Sfoglia il codice sorgente

perf(消费服务): 记录有效性验证优化

1.ThreadPoolConfig 主动初始化线程
2.使用 submit(Callable) + Future 替代原始的 execute 和 CountDownLatch 进行异步结果检查
autumnal_wind 11 mesi fa
parent
commit
f37d0b5a57

+ 19 - 0
ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/config/ThreadPoolConfig.java

@@ -5,12 +5,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.dromara.common.core.config.properties.ThreadPoolProperties;
 import org.dromara.common.core.utils.Threads;
+import org.slf4j.MDC;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -40,7 +42,24 @@ public class ThreadPoolConfig {
         executor.setMaxPoolSize(core * 2);
         executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
         executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
+        executor.setThreadNamePrefix("threadPoolTaskExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        // 设置 TaskDecorator,用于传递 MDC 上下文
+        executor.setTaskDecorator(task -> {
+            Map<String, String> contextMap = MDC.getCopyOfContextMap();
+            return () -> {
+                if (contextMap != null) {
+                    MDC.setContextMap(contextMap); // 恢复上下文
+                }
+                try {
+                    task.run();
+                } finally {
+                    MDC.clear(); // 清理,防止内存泄漏
+                }
+            };
+        });
+
+        executor.initialize(); // 主动初始化线程池
         return executor;
     }
 

+ 79 - 35
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/check/CommonCheck.java

@@ -32,8 +32,7 @@ import java.math.BigDecimal;
 import java.text.MessageFormat;
 import java.time.Duration;
 import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -90,7 +89,8 @@ public class CommonCheck {
      */
     private R<ErrorInfo> executeTermValidationChain(AllowConsumeValidationContext ctx) {
         // 创建验证任务列表
-        List<Supplier<R<ErrorInfo>>> validationTasks = new ArrayList<>();
+        // List<Supplier<R<ErrorInfo>>> validationTasks = new ArrayList<>();
+        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
 
         validationTasks.add(() -> checkParam(ctx));
         validationTasks.add(() -> checkTerm(ctx));
@@ -100,53 +100,97 @@ public class CommonCheck {
         AtomicReference<R<ErrorInfo>> firstError = new AtomicReference<>(null);
 
         // 使用CountDownLatch跟踪任务完成
-        CountDownLatch latch = new CountDownLatch(validationTasks.size());
-        AtomicBoolean cancelled = new AtomicBoolean(false);
-
-        // 提交所有验证任务
-        for (int i = 0, j = validationTasks.size(); i < j; i++) {
-            long starTime = System.currentTimeMillis();
-            final int taskIndex = i;
-            final Supplier<R<ErrorInfo>> task = validationTasks.get(taskIndex);
-            taskExecutor.execute(() -> {
-                if (cancelled.get()) {
-                    latch.countDown();
-                    return;
+        // CountDownLatch latch = new CountDownLatch(validationTasks.size());
+        // AtomicBoolean cancelled = new AtomicBoolean(false);
+        List<Future<R<ErrorInfo>>> futures = new ArrayList<>();
+        // 提交所有任务
+        for (Callable<R<ErrorInfo>> task : validationTasks) {
+            futures.add(taskExecutor.submit(task));
+        }
+        int taskIndex = 0;
+        try {
+            for (Future<R<ErrorInfo>> future : futures) {
+                long starTime = System.currentTimeMillis();
+                if (firstError.get() != null) {
+                    future.cancel(true); // 取消剩余任务
+                    continue;
                 }
 
                 try {
-                    R<ErrorInfo> result = task.get();
-                    // 如果发现错误且尚未设置错误结果
-                    if (result != null && R.isError(result) &&
-                        firstError.compareAndSet(null, result)) {
-                        // 取消其他任务(通过中断)
-                        taskExecutor.getThreadPoolExecutor().getQueue().clear();
+                    R<ErrorInfo> result = future.get(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS);
+                    // 发现错误,立即取消其他任务
+                    if (result != null && R.isError(result)) {
+                        if (firstError.compareAndSet(null, result)) {
+                            futures.forEach(f -> f.cancel(true));
+                        }
                     }
-                } catch (Exception e) {
+                } catch (ExecutionException e) {
+                    log.error("{}验证执行异常",getTaskName(taskIndex), e);
                     if (firstError.compareAndSet(null, createError(TradeStatusEnum.SysError))) {
-                        taskExecutor.getThreadPoolExecutor().getQueue().clear();
+                        futures.forEach(f -> f.cancel(true));
+                    }
+                } catch (TimeoutException e) {
+                    log.warn("{}超时",getTaskName(taskIndex));
+                    if (firstError.compareAndSet(null, createError(TradeStatusEnum.VALIDATION_TIMEOUT))) {
+                        futures.forEach(f -> f.cancel(true));
                     }
                 } finally {
                     log.info("{}完成,耗时:{} ms",getTaskName(taskIndex),System.currentTimeMillis()-starTime);
-                    latch.countDown();
+                    taskIndex++;
                 }
-            });
-        }
-
-        try {
-            // 等待所有任务完成或超时
-            if (!latch.await(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS)) {
-                return createError(TradeStatusEnum.VALIDATION_TIMEOUT);
             }
-
-            // 返回第一个发现的错误,如果没有错误则返回成功
             return firstError.get() != null ? firstError.get() : R.ok();
+
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            log.error("error:{}", e.getMessage(), e);
+            Thread.currentThread().interrupt(); // 保留中断状态
+            log.error("验证过程被中断", e);
             return createError(TradeStatusEnum.SysError);
         }
+        // // 提交所有验证任务
+        // for (int i = 0, j = validationTasks.size(); i < j; i++) {
+        //     long starTime = System.currentTimeMillis();
+        //     final int taskIndex = i;
+        //     final Supplier<R<ErrorInfo>> task = validationTasks.get(taskIndex);
+        //     taskExecutor.execute(() -> {
+        //         if (cancelled.get()) {
+        //             latch.countDown();
+        //             return;
+        //         }
+        //
+        //         try {
+        //             R<ErrorInfo> result = task.get();
+        //             // 如果发现错误且尚未设置错误结果
+        //             if (result != null && R.isError(result) &&
+        //                 firstError.compareAndSet(null, result)) {
+        //                 // 取消其他任务(通过中断)
+        //                 taskExecutor.getThreadPoolExecutor().getQueue().clear();
+        //             }
+        //         } catch (Exception e) {
+        //             if (firstError.compareAndSet(null, createError(TradeStatusEnum.SysError))) {
+        //                 taskExecutor.getThreadPoolExecutor().getQueue().clear();
+        //             }
+        //         } finally {
+        //             log.info("{}完成,耗时:{} ms",getTaskName(taskIndex),System.currentTimeMillis()-starTime);
+        //             latch.countDown();
+        //         }
+        //     });
+        // }
+        //
+        // try {
+        //     // 等待所有任务完成或超时
+        //     if (!latch.await(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS)) {
+        //         return createError(TradeStatusEnum.VALIDATION_TIMEOUT);
+        //     }
+        //
+        //     // 返回第一个发现的错误,如果没有错误则返回成功
+        //     return firstError.get() != null ? firstError.get() : R.ok();
+        // } catch (InterruptedException e) {
+        //     Thread.currentThread().interrupt();
+        //     log.error("error:{}", e.getMessage(), e);
+        //     return createError(TradeStatusEnum.SysError);
+        // }
     }
+
     private String getTaskName(int taskIndex) {
         return switch (taskIndex) {
             case 0 -> "参数验证";