xiari 10 сар өмнө
parent
commit
8903ce59db

+ 3 - 3
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -79,7 +79,7 @@ public class BaseBusiness {
     private final DefaultConfig defaultConfig;
     private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
     private final IXfCardLimitedService cardLimitedService;
-    private final ThreadPoolTaskExecutor taskExecutor;
+    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
     private final CardCacheManager cardCacheManager;
 
     @DubboReference
@@ -431,7 +431,7 @@ public class BaseBusiness {
      */
     public void completeUploadRecord(ConsumptionBo bo, RemoteUserAccountVo accountVo) {
         // 消费记录上传完成,还有一些后续工作,不需要知道处理结果,采用异步任务提交
-        taskExecutor.submit(() -> sendConsumeToKafka(bo, accountVo));
+        threadPoolTaskExecutor.execute(() -> sendConsumeToKafka(bo, accountVo));
         //taskExecutor.submit(() -> sendCloudConsume(bo));
     }
 
@@ -554,7 +554,7 @@ public class BaseBusiness {
                 RemoteUserAccountVo accountVo = accountVoList.parallelStream()
                     .filter(k -> k.getUserId().equals(p.getUserId())).findFirst().orElse(null);
                 //向教务推送刷卡数据
-                taskExecutor.submit(() -> sendConsumeToKafka(p, accountVo));
+                threadPoolTaskExecutor.execute(() -> sendConsumeToKafka(p, accountVo));
             });
         }
         return R.ok();

+ 135 - 51
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/check/CardConsumeValidation.java

@@ -69,70 +69,110 @@ public class CardConsumeValidation {
     }
 
     private R<ErrorInfo> executeCardValidationChain(CardConsumeValidationContext ctx) {
-        // 创建验证任务列表
-        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
-        validationTasks.add(() -> dealCardDiscount(ctx));
-        validationTasks.add(() -> dealCardLimited(ctx));
-
+        // 创建CountDownLatch 有1个任务返回错误时就唤醒主线程
+        CountDownLatch latch = new CountDownLatch(1);
         // 存储第一个错误结果
         AtomicReference<R<ErrorInfo>> firstError = new AtomicReference<>(null);
+        // 创建验证任务列表
+        List<Callable<R<ErrorInfo>>> validationTasks = getCardValidCallables(ctx, firstError, latch);
+        // 所有任务的结果集合
         List<Future<R<ErrorInfo>>> futures = new ArrayList<>();
-
         // 提交所有验证任务
         for (Callable<R<ErrorInfo>> task : validationTasks) {
             futures.add(taskExecutor.submit(task));
         }
-        int taskIndex = 0;
+
+        long starTime = System.currentTimeMillis();
+        // 最终执行的结果,true 为正常执行结束及latch减至了0;false 为超时
+        boolean finallyResult;
         try {
-            // 等待所有任务完成或出现错误
-            for (Future<R<ErrorInfo>> future : futures) {
-                long starTime = System.currentTimeMillis();
-                if (firstError.get() != null) {
-                    future.cancel(true); // 取消剩余任务
-                    continue;
-                }
-
-                try {
-                    R<ErrorInfo> result = future.get();
-                    if (result != null && R.isError(result)) {
-                        if (firstError.compareAndSet(null, result)) {
-                            // 发现错误,立即取消其他任务
-                            futures.forEach(f -> f.cancel(true)); // 发现错误立即取消其他任务
-                        }
-                    }
-                } catch (ExecutionException e) {
-                    log.error("{}执行异常", getTaskName(taskIndex), e);
-                    if (firstError.compareAndSet(null, commonCheck.createError(TradeStatusEnum.SysError))) {
-                        futures.forEach(f -> f.cancel(true));
-                    }
-                } finally {
-                    log.info("{}结束,耗时:{} ms", getTaskName(taskIndex), System.currentTimeMillis() - starTime);
-                    taskIndex++;
-                }
-            }
+            // 阻塞主线程,等待执行结果; latch 为0时,会唤醒主线程,最多阻塞10s
+            finallyResult = latch.await(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("executeCardValidationChain- main 被中断 :{}", e.getMessage());
+            Thread.currentThread().interrupt();
+            // 取消所有任务
+            futures.forEach(f -> f.cancel(true));
+            return commonCheck.createError(TradeStatusEnum.SysError);
+        }
 
-            // 如果已有错误,直接返回
-            if (firstError.get() != null) {
-                return firstError.get();
-            }
+        // 循环取消所有任务
+        futures.forEach(f -> f.cancel(true));
+        if (!finallyResult) {
+            log.error("executeCardValidationChain- 折扣和限次校验10s超时");
+            return commonCheck.createError(TradeStatusEnum.SysError);
+        }
 
-            // 所有前置任务通过,执行最后一个任务
-            long starTime = System.currentTimeMillis();
-            try {
-                return dealCardQuota(ctx);
-            } catch (Exception e) {
-                log.error("{}执行异常", getTaskName(2), e);
-                return commonCheck.createError(TradeStatusEnum.DayLimitMoney, "日限额验证失败");
-            } finally {
-                log.info("{}结束,耗时:{} ms", getTaskName(2), System.currentTimeMillis() - starTime);
-            }
+        R<ErrorInfo> errorInfoR = firstError.get();
+        log.info("executeCardValidationChain- 折扣和限次校验耗时:{}ms", System.currentTimeMillis() - starTime);
+        if (errorInfoR != null && R.isError(errorInfoR)) {
+            return errorInfoR;
+        }
 
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt(); // 保留中断状态
-            log.error("验证过程被中断", e);
-            return commonCheck.createError(TradeStatusEnum.SysError);
+        // 所有前置任务通过,执行最后一个任务
+        starTime = System.currentTimeMillis();
+        try {
+            // 限额逻辑
+            return dealCardQuota(ctx);
+        } catch (Exception e) {
+            log.error("{}执行异常", getTaskName(2), e);
+            return commonCheck.createError(TradeStatusEnum.DayLimitMoney, "日限额验证失败");
+        }finally {
+            log.info("{}结束,耗时:{} ms", getTaskName(2), System.currentTimeMillis() - starTime);
         }
 
+
+//        int taskIndex = 0;
+//        try {
+//            // 等待所有任务完成或出现错误
+//            for (Future<R<ErrorInfo>> future : futures) {
+//                long starTime = System.currentTimeMillis();
+//                if (firstError.get() != null) {
+//                    future.cancel(true); // 取消剩余任务
+//                    continue;
+//                }
+//
+//                try {
+//                    R<ErrorInfo> result = future.get();
+//                    if (result != null && R.isError(result)) {
+//                        if (firstError.compareAndSet(null, result)) {
+//                            // 发现错误,立即取消其他任务
+//                            futures.forEach(f -> f.cancel(true)); // 发现错误立即取消其他任务
+//                        }
+//                    }
+//                } catch (ExecutionException e) {
+//                    log.error("{}执行异常", getTaskName(taskIndex), e);
+//                    if (firstError.compareAndSet(null, commonCheck.createError(TradeStatusEnum.SysError))) {
+//                        futures.forEach(f -> f.cancel(true));
+//                    }
+//                } finally {
+//                    log.info("{}结束,耗时:{} ms", getTaskName(taskIndex), System.currentTimeMillis() - starTime);
+//                    taskIndex++;
+//                }
+//            }
+//
+//            // 如果已有错误,直接返回
+//            if (firstError.get() != null) {
+//                return firstError.get();
+//            }
+//
+//            // 所有前置任务通过,执行最后一个任务
+//            long starTime = System.currentTimeMillis();
+//            try {
+//                return dealCardQuota(ctx);
+//            } catch (Exception e) {
+//                log.error("{}执行异常", getTaskName(2), e);
+//                return commonCheck.createError(TradeStatusEnum.DayLimitMoney, "日限额验证失败");
+//            } finally {
+//                log.info("{}结束,耗时:{} ms", getTaskName(2), System.currentTimeMillis() - starTime);
+//            }
+//
+//        } catch (InterruptedException e) {
+//            Thread.currentThread().interrupt(); // 保留中断状态
+//            log.error("验证过程被中断", e);
+//            return commonCheck.createError(TradeStatusEnum.SysError);
+//        }
+
         // 使用CountDownLatch跟踪任务完成
         // CountDownLatch latch = new CountDownLatch(validationTasks.size());
         //
@@ -178,6 +218,50 @@ public class CardConsumeValidation {
         // }
     }
 
+    /**
+     * 获取校验的任务列表
+     * @param ctx
+     * @param firstError
+     * @param latch
+     * @return
+     */
+    private List<Callable<R<ErrorInfo>>> getCardValidCallables(CardConsumeValidationContext ctx,
+                                                               AtomicReference<R<ErrorInfo>> firstError, CountDownLatch latch) {
+        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
+        // 添加折扣验证任务
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                // 已经有错误了,直接返回
+                return null;
+            }
+            //执行具体的校验逻辑
+            R<ErrorInfo> infoR = dealCardDiscount(ctx);
+            if (infoR != null && R.isError(infoR)) {
+                // 设置错误结果
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                // 已经有错误了,直接返回
+                return null;
+            }
+            //执行具体的校验逻辑 卡片限次验证
+            R<ErrorInfo> infoR = dealCardLimited(ctx);
+            if (infoR != null && R.isError(infoR)) {
+                // 存储错误结果
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        return validationTasks;
+    }
+
     private String getTaskName(int taskIndex) {
         return switch (taskIndex) {
             case 0 -> "卡片折扣计算";

+ 124 - 44
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/check/CommonCheck.java

@@ -87,60 +87,85 @@ public class CommonCheck {
      * @return 校验结果,包含错误信息或成功标识
      */
     private R<ErrorInfo> executeTermValidationChain(AllowConsumeValidationContext ctx) {
-        // 创建验证任务列表
-        // List<Supplier<R<ErrorInfo>>> validationTasks = new ArrayList<>();
-        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
-
-        validationTasks.add(() -> checkParam(ctx));
-        validationTasks.add(() -> checkTerm(ctx));
-        validationTasks.add(() -> checkUserAccount(ctx));
-
+        // 创建CountDownLatch 有1个任务返回错误时就唤醒主线程 // 使用CountDownLatch跟踪任务完成
+        CountDownLatch latch = new CountDownLatch(1);
         // 用于存储第一个错误结果
         AtomicReference<R<ErrorInfo>> firstError = new AtomicReference<>(null);
+        // 创建验证任务列表
+        List<Callable<R<ErrorInfo>>> validationTasks = getTermValidCallables(ctx, firstError, latch);
 
-        // 使用CountDownLatch跟踪任务完成
-        // CountDownLatch latch = new CountDownLatch(validationTasks.size());
-        // AtomicBoolean cancelled = new AtomicBoolean(false);
+        // 所有任务的结果集合
         List<Future<R<ErrorInfo>>> futures = new ArrayList<>();
         // 提交所有任务
         for (Callable<R<ErrorInfo>> task : validationTasks) {
             futures.add(taskExecutor.submit(task));
         }
-        int taskIndex = 0;
-        try {
-            for (Future<R<ErrorInfo>> future : futures) {
-                long starTime = System.currentTimeMillis();
-                if (firstError.get() != null) {
-                    future.cancel(true); // 取消剩余任务
-                    continue;
-                }
-
-                try {
-                    // R<ErrorInfo> result = future.get(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS);
-                    R<ErrorInfo> result = future.get();
-                    // 发现错误,立即取消其他任务
-                    if (result != null && R.isError(result)) {
-                        if (firstError.compareAndSet(null, result)) {
-                            futures.forEach(f -> f.cancel(true));
-                        }
-                    }
-                } catch (ExecutionException e) {
-                    log.error("{}验证执行异常", getTaskName(taskIndex), e);
-                    if (firstError.compareAndSet(null, createError(TradeStatusEnum.SysError))) {
-                        futures.forEach(f -> f.cancel(true));
-                    }
-                } finally {
-                    log.info("{}结束,耗时:{} ms", getTaskName(taskIndex), System.currentTimeMillis() - starTime);
-                    taskIndex++;
-                }
-            }
-            return firstError.get() != null ? firstError.get() : R.ok();
 
+        long starTime = System.currentTimeMillis();
+        // 最终执行的结果,true 为正常执行结束及latch减至了0;false 为超时
+        boolean finallyResult;
+        try {
+            // 阻塞主线程,等待执行结果; latch 为0时,会唤醒主线程,最多阻塞10s
+            finallyResult = latch.await(10, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt(); // 保留中断状态
-            log.error("验证过程被中断", e);
+            log.error("executeTermValidationChain- main 被中断 :{}", e.getMessage());
+            Thread.currentThread().interrupt();
+            // 取消所有任务
+            futures.forEach(f -> f.cancel(true));
+            return createError(TradeStatusEnum.SysError);
+        }
+
+        // 循环取消所有任务
+        futures.forEach(f -> f.cancel(true));
+        if (!finallyResult) {
+            log.error("executeTermValidationChain- 设备、账户、参数校验10s超时");
             return createError(TradeStatusEnum.SysError);
         }
+
+        R<ErrorInfo> errorInfoR = firstError.get();
+        log.info("executeTermValidationChain-  设备、账户、参数校验耗时:{}ms", System.currentTimeMillis() - starTime);
+        if (errorInfoR != null && R.isError(errorInfoR)) {
+            return errorInfoR;
+        }else {
+            return R.ok();
+        }
+
+//        int taskIndex = 0;
+//        try {
+//            for (Future<R<ErrorInfo>> future : futures) {
+//                long starTime = System.currentTimeMillis();
+//                if (firstError.get() != null) {
+//                    future.cancel(true); // 取消剩余任务
+//                    continue;
+//                }
+//
+//                try {
+//                    // R<ErrorInfo> result = future.get(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS);
+//                    R<ErrorInfo> result = future.get();
+//                    // 发现错误,立即取消其他任务
+//                    if (result != null && R.isError(result)) {
+//                        if (firstError.compareAndSet(null, result)) {
+//                            futures.forEach(f -> f.cancel(true));
+//                        }
+//                    }
+//                } catch (ExecutionException e) {
+//                    log.error("{}验证执行异常", getTaskName(taskIndex), e);
+//                    if (firstError.compareAndSet(null, createError(TradeStatusEnum.SysError))) {
+//                        futures.forEach(f -> f.cancel(true));
+//                    }
+//                } finally {
+//                    log.info("{}结束,耗时:{} ms", getTaskName(taskIndex), System.currentTimeMillis() - starTime);
+//                    taskIndex++;
+//                }
+//            }
+//            return firstError.get() != null ? firstError.get() : R.ok();
+//
+//        } catch (InterruptedException e) {
+//            Thread.currentThread().interrupt(); // 保留中断状态
+//            log.error("验证过程被中断", e);
+//            return createError(TradeStatusEnum.SysError);
+//        }
+
         // // 提交所有验证任务
         // for (int i = 0, j = validationTasks.size(); i < j; i++) {
         //     long starTime = System.currentTimeMillis();
@@ -186,6 +211,61 @@ public class CommonCheck {
         // }
     }
 
+    /**
+     * 获取所有终端的验证任务
+     *
+     * @param ctx
+     * @return
+     */
+    private List<Callable<R<ErrorInfo>>> getTermValidCallables(AllowConsumeValidationContext ctx,
+                                                               AtomicReference<R<ErrorInfo>> firstError, CountDownLatch latch) {
+        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
+
+        // 校验参数任务
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = checkParam(ctx);
+            if (infoR != null && R.isError(infoR)) {
+                // 设置错误结果
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        //消费设备校验
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = checkTerm(ctx);
+            if (infoR != null && R.isError(infoR)) {
+                // 存储错误结果
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        //用户账户校验
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = checkUserAccount(ctx);
+            if (infoR != null && R.isError(infoR)) {
+                // 存储错误结果
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        return validationTasks;
+    }
+
     private String getTaskName(int taskIndex) {
         return switch (taskIndex) {
             case 0 -> "参数验证";
@@ -599,9 +679,9 @@ public class CommonCheck {
 
         // if (ObjectUtil.equals(currentDateStr, consumeDateStr)) {
         // 重置卡天当日消费数据
-        taskExecutor.submit(() -> baseBusiness.resetCardConsumeInfo(userCardVo, mealType, consumeMoney, consumeDate));
+        taskExecutor.execute(() -> baseBusiness.resetCardConsumeInfo(userCardVo, mealType, consumeMoney, consumeDate));
         // 重置卡片当日限制数据
-        taskExecutor.submit(() -> baseBusiness.restCardLimitedInfo(mapCardLimited, cardLimitedVo, consumeMoney));
+        taskExecutor.execute(() -> baseBusiness.restCardLimitedInfo(mapCardLimited, cardLimitedVo, consumeMoney));
         // 重置人员当日总卡余
         // taskExecutor.submit(() -> baseBusiness.resetUserBalance(userId, balance));
         // }

+ 165 - 49
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/check/ConsumeRequestCheck.java

@@ -34,10 +34,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -284,63 +281,88 @@ public class ConsumeRequestCheck {
      */
     private R<ErrorInfo> executeTermValidationChain(TermConsumeValidationContext context) {
         long startTime = System.currentTimeMillis();
-        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
-
-        validationTasks.add(() -> validateSwipeInterval(context));
-        validationTasks.add(() -> validateSingleLimit(context));
-        validationTasks.add(() -> validateCardValidity(context));
-        validationTasks.add(() -> validateCardType(context));
-        validationTasks.add(() -> validateMealLimit(context));
-        validationTasks.add(() -> validateDailyLimit(context));
-
+        // 创建CountDownLatch 有1个任务返回错误时就唤醒主线程 // 使用CountDownLatch跟踪任务完成
+        CountDownLatch latch = new CountDownLatch(1);
         // 用于存储第一个错误结果
         AtomicReference<R<ErrorInfo>> firstError = new AtomicReference<>(null);
+        // 创建验证任务列表
+        List<Callable<R<ErrorInfo>>> validationTasks = getTermValidationCallables(context, firstError, latch);
 
-        // 使用CountDownLatch跟踪任务完成
-        // CountDownLatch latch = new CountDownLatch(validationTasks.size());
-        // AtomicBoolean cancelled = new AtomicBoolean(false);
+        // 所有任务的结果
         List<Future<R<ErrorInfo>>> futures = new ArrayList<>();
         // 提交所有任务
         for (Callable<R<ErrorInfo>> task : validationTasks) {
             futures.add(threadPoolTaskExecutor.submit(task));
         }
-        int taskIndex = 0;
-        try {
-            for (Future<R<ErrorInfo>> future : futures) {
-                long starTime = System.currentTimeMillis();
-                if (firstError.get() != null) {
-
-                    future.cancel(true); // 取消剩余任务
-                    continue;
-                }
-
-                try {
-                    // R<ErrorInfo> result = future.get(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS);
-                    R<ErrorInfo> result = future.get();
-                    // 发现错误,立即取消其他任务
-                    if (result != null && R.isError(result)) {
-
-                        if (firstError.compareAndSet(null, result)) {
-                            futures.forEach(f -> f.cancel(true));
-                        }
-                    }
-                } catch (ExecutionException e) {
-                    log.error("{}验证执行异常", getTaskName(taskIndex), e);
-                    if (firstError.compareAndSet(null, createError(TradeStatusEnum.SysError))) {
-                        futures.forEach(f -> f.cancel(true));
-                    }
-                } finally {
-                    log.info("{}结束,耗时:{} ms", getTaskName(taskIndex), System.currentTimeMillis() - starTime);
-                    taskIndex++;
-                }
-            }
-            return firstError.get() != null ? firstError.get() : R.ok();
 
+        // 最终执行的结果,true 为正常执行结束及latch减至了0;false 为超时
+        boolean finallyResult;
+        try {
+            // 阻塞主线程,等待执行结果; latch 为0时,会唤醒主线程,最多阻塞10s
+            finallyResult = latch.await(10, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt(); // 保留中断状态
-            log.error("验证过程被中断", e);
+            log.error("executeTermValidationChain- main 被中断 :{}", e.getMessage());
+            Thread.currentThread().interrupt();
+            // 取消所有任务
+            futures.forEach(f -> f.cancel(true));
+            return createError(TradeStatusEnum.SysError);
+        }
+
+        // 循环取消所有任务
+        futures.forEach(f -> f.cancel(true));
+        if (!finallyResult) {
+            log.error("executeTermValidationChain- 执行消费终端校验链10s超时");
             return createError(TradeStatusEnum.SysError);
         }
+
+        R<ErrorInfo> errorInfoR = firstError.get();
+        log.info("executeTermValidationChain-  执行消费终端校验链耗时:{}ms", System.currentTimeMillis() - startTime);
+        if (errorInfoR != null && R.isError(errorInfoR)) {
+            return errorInfoR;
+        }else {
+            return R.ok();
+        }
+
+
+//        int taskIndex = 0;
+//        try {
+//            for (Future<R<ErrorInfo>> future : futures) {
+//                long starTime = System.currentTimeMillis();
+//                if (firstError.get() != null) {
+//
+//                    future.cancel(true); // 取消剩余任务
+//                    continue;
+//                }
+//
+//                try {
+//                    // R<ErrorInfo> result = future.get(VALIDATION_TIMEOUT, TimeUnit.MILLISECONDS);
+//                    R<ErrorInfo> result = future.get();
+//                    // 发现错误,立即取消其他任务
+//                    if (result != null && R.isError(result)) {
+//
+//                        if (firstError.compareAndSet(null, result)) {
+//                            futures.forEach(f -> f.cancel(true));
+//                        }
+//                    }
+//                } catch (ExecutionException e) {
+//                    log.error("{}验证执行异常", getTaskName(taskIndex), e);
+//                    if (firstError.compareAndSet(null, createError(TradeStatusEnum.SysError))) {
+//                        futures.forEach(f -> f.cancel(true));
+//                    }
+//                } finally {
+//                    log.info("{}结束,耗时:{} ms", getTaskName(taskIndex), System.currentTimeMillis() - starTime);
+//                    taskIndex++;
+//                }
+//            }
+//            return firstError.get() != null ? firstError.get() : R.ok();
+//
+//        } catch (InterruptedException e) {
+//            Thread.currentThread().interrupt(); // 保留中断状态
+//            log.error("验证过程被中断", e);
+//            return createError(TradeStatusEnum.SysError);
+//        }
+
+
         //// 创建验证任务列表
         //List<Supplier<R<ErrorInfo>>> validationTasks = new ArrayList<>();
         //
@@ -393,6 +415,100 @@ public class ConsumeRequestCheck {
         //    return commonCheck.createError(TradeStatusEnum.SysError);
         //}
     }
+
+    /**
+     *  获取任务列表
+     * @param context
+     * @param firstError
+     * @param latch
+     * @return
+     */
+    private List<Callable<R<ErrorInfo>>> getTermValidationCallables(TermConsumeValidationContext context,
+                                                                    AtomicReference<R<ErrorInfo>> firstError, CountDownLatch latch) {
+        List<Callable<R<ErrorInfo>>> validationTasks = new ArrayList<>();
+
+        // 消费时间间隔验证
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = validateSwipeInterval(context);
+            if (infoR != null && R.isError(infoR)) {
+                // 设置错误结果
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        // 单次消费限额判断
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = validateSingleLimit(context);
+            if (infoR != null && R.isError(infoR)) {
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        // 卡有效期验证
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = validateCardValidity(context);
+            if (infoR != null && R.isError(infoR)) {
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        // 卡类型验证
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = validateCardType(context);
+            if (infoR != null && R.isError(infoR)) {
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        // 餐次验证
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = validateMealLimit(context);
+            if (infoR != null && R.isError(infoR)) {
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        // 日限额验证
+        validationTasks.add(() -> {
+            R<ErrorInfo> errorInfoR = firstError.get();
+            if (errorInfoR != null && R.isError(errorInfoR)) {
+                return null;
+            }
+            R<ErrorInfo> infoR = validateDailyLimit(context);
+            if (infoR != null && R.isError(infoR)) {
+                firstError.set(infoR);
+                latch.countDown();
+            }
+            return infoR;
+        });
+        return validationTasks;
+    }
+
     //validationTasks.add(() -> validateSwipeInterval(context));
     //validationTasks.add(() -> validateSingleLimit(context));
     //validationTasks.add(() -> validateCardValidity(context));

+ 1 - 1
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/controller/v1/ConsumeController.java

@@ -176,7 +176,7 @@ public class ConsumeController {
             if (Objects.equals(type, "requestConsume")) {
                 errorInfo = consumeBusiness.createOrder(bo, mac, xfPwd);
             } else if (Objects.equals(type, "uploadRecord")) {
-                threadPoolTaskExecutor.submit(() -> consumeBusiness.postOrderAsync(bo, mac, xfPwd));
+                threadPoolTaskExecutor.execute(() -> consumeBusiness.postOrderAsync(bo, mac, xfPwd));
                 errorInfo = R.ok();
             } else {
                 errorInfo = consumeBusiness.fullOrder(bo, mac, xfPwd);

+ 8 - 8
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/service/impl/SendDeviceServiceImpl.java

@@ -416,7 +416,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
             throw new ServiceException("没有要处理的设备");
         }
         termList.forEach(p -> {
-            threadPoolTaskExecutor.submit(() -> {
+            threadPoolTaskExecutor.execute(() -> {
                 DeviceDto deviceDto = getDeviceDto(p);
                 R<Void> result = this.createOperatorEmpInfo(deviceDto, empDto);
                 log.info(result.getMsg());
@@ -462,7 +462,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         }
 
         termList.forEach(p -> {
-            threadPoolTaskExecutor.submit(() -> {
+            threadPoolTaskExecutor.execute(() -> {
                 DeviceDto dto = getDeviceDto(p);
                 R<Void> result = setHttpHostByDto(dto);
                 log.info(result.getMsg());
@@ -508,7 +508,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         DeviceDto deviceDto = getDeviceDto(termNo);
         List<RemoteUserAccountVo> accountVoList = remoteUserAccountService.getUserAccountVoList();
         accountVoList.forEach(p -> {
-            threadPoolTaskExecutor.submit(() -> {
+            threadPoolTaskExecutor.execute(() -> {
                 EmpInfoDto empDto = getEmpInfoDto(p, true, false, false, false, false, true);
                 R<Void> result = createOperatorEmpInfo(deviceDto, empDto);
                 log.info(result.getMsg());
@@ -532,7 +532,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         }
 
         termList.forEach(p -> {
-            threadPoolTaskExecutor.submit(() -> {
+            threadPoolTaskExecutor.execute(() -> {
                 DeviceDto device = getDeviceDto(p);
                 R<Void> result = createOperatorEmpInfo(device, empInfo);
                 log.info(result.getMsg());
@@ -556,7 +556,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
 
         // 并行处理
         termList.forEach(p -> {
-            threadPoolTaskExecutor.submit(() -> {
+            threadPoolTaskExecutor.execute(() -> {
                 DeviceDto device = getDeviceDto(p);
                 accountVoList.parallelStream().forEach(t -> {
                     EmpInfoDto empInfo = getEmpInfoDto(t, true, false, false, false, false, true);
@@ -649,7 +649,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
 
         // 处理人员列表
         accountVoList.forEach(p -> {
-            threadPoolTaskExecutor.submit(() -> {
+            threadPoolTaskExecutor.execute(() -> {
                 try {
                     EmpInfoDto empDto = getEmpInfoDto(p, false, false, false, false, false,true);
                     R<Void> result = createOperatorEmpInfo(deviceDto, empDto);
@@ -690,7 +690,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         termList.forEach(p -> {
             DeviceDto device = getDeviceDto(p);
             accountVoList.forEach(t -> {
-                threadPoolTaskExecutor.submit(()->{
+                threadPoolTaskExecutor.execute(()->{
                     EmpInfoDto empInfo = getEmpInfoDto(t, false, false, false, false, false,uploadPhoto);
                     R<Void> result = createOperatorEmpInfo(device, empInfo);
                     log.info(result.getMsg());
@@ -720,7 +720,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         }
         DeviceDto device = getDeviceDto(termVo);
         vos.forEach(t -> {
-            threadPoolTaskExecutor.submit(()->{
+            threadPoolTaskExecutor.execute(()->{
                 try{
                     EmpInfoDto empInfo = getEmpInfoDto(t, false, false, false, false, false,uploadPhoto);
                     R<Void> result = createOperatorEmpInfo(device, empInfo);

+ 2 - 2
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/task/ScheduledTasks.java

@@ -36,7 +36,7 @@ public class ScheduledTasks {
     // 每天11:10和17:10、7:00执行
 //    @Scheduled(cron = "0 20 11,17 * * *")
     public void upLoadEmpToDevice() {
-        scheduledExecutorService.submit(() -> {
+        scheduledExecutorService.execute(() -> {
             log.info("向所有海康消费机下发人员定时任务开始执行");
             // 下发人脸会是设备卡机,所以这里设置为false
             sendDeviceService.upLoadEmpToDevice(false);
@@ -52,7 +52,7 @@ public class ScheduledTasks {
      */
 //    @Scheduled(cron = "0 0 7 * * *")
     public void upLoadEmpToDevice2() {
-        scheduledExecutorService.submit(() -> {
+        scheduledExecutorService.execute(() -> {
             log.info("向所有海康消费机下发人员定时任务开始执行");
             // 下发人脸会是设备卡机,所以这里设置为false
             sendDeviceService.upLoadEmpToDevice(false);