Jelajahi Sumber

feat(海康消费机对接): 线程程优化处理

luo.yibo@datuai.com 1 tahun lalu
induk
melakukan
cbf7d36517

+ 17 - 0
ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/config/ThreadPoolConfig.java

@@ -3,9 +3,13 @@ package org.dromara.common.core.config;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.dromara.common.core.config.properties.ThreadPoolProperties;
 import org.dromara.common.core.utils.Threads;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -18,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor;
  **/
 @Slf4j
 @AutoConfiguration
+@EnableConfigurationProperties(ThreadPoolProperties.class)
 public class ThreadPoolConfig {
 
     /**
@@ -27,6 +32,18 @@ public class ThreadPoolConfig {
 
     private ScheduledExecutorService scheduledExecutorService;
 
+    @Bean(name = "threadPoolTaskExecutor")
+    @ConditionalOnProperty(prefix = "thread-pool", name = "enabled", havingValue = "true")
+    public ThreadPoolTaskExecutor threadPoolTaskExecutor(ThreadPoolProperties threadPoolProperties) {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(core);
+        executor.setMaxPoolSize(core * 2);
+        executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
+        executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        return executor;
+    }
+
     /**
      * 执行周期性或定时任务
      */

+ 30 - 0
ruoyi-common/ruoyi-common-core/src/main/java/org/dromara/common/core/config/properties/ThreadPoolProperties.java

@@ -0,0 +1,30 @@
+package org.dromara.common.core.config.properties;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * 线程池 配置属性
+ *
+ * @author Lion Li
+ */
+@Data
+@ConfigurationProperties(prefix = "thread-pool")
+public class ThreadPoolProperties {
+
+    /**
+     * 是否开启线程池
+     */
+    private boolean enabled;
+
+    /**
+     * 队列最大长度
+     */
+    private int queueCapacity;
+
+    /**
+     * 线程池维护线程所允许的空闲时间
+     */
+    private int keepAliveSeconds;
+
+}

+ 29 - 24
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/service/impl/SendDeviceServiceImpl.java

@@ -34,12 +34,12 @@ import org.dromara.server.hik.service.ISendDeviceService;
 import org.dromara.server.hik.utils.DigestHttpUtil;
 import org.dromara.server.hik.utils.JsonConfig;
 import org.jetbrains.annotations.NotNull;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 import org.springframework.validation.annotation.Validated;
 
 import java.text.MessageFormat;
 import java.util.*;
-import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * 向消费机发送信息服务接口实现
@@ -56,8 +56,8 @@ import java.util.concurrent.ScheduledExecutorService;
 @RequiredArgsConstructor
 public class SendDeviceServiceImpl implements ISendDeviceService {
     private final DigestHttpUtil digestHttpUtil;
-    private final ScheduledExecutorService scheduledExecutorService;
     private final DefaultConfig defaultConfig;
+    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
     @DubboReference
     private final RemotePtXfTermService remotePtXfTermService;
@@ -380,8 +380,8 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         if (CollectionUtil.isEmpty(termList)) {
             throw new ServiceException("没有要处理的设备");
         }
-        termList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
+        termList.forEach(p -> {
+            threadPoolTaskExecutor.submit(() -> {
                 DeviceDto deviceDto = getDeviceDto(p);
                 R<Void> result = this.createOperatorEmpInfo(deviceDto, empDto);
                 log.info(result.getMsg());
@@ -426,8 +426,8 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
             return R.warn("没有要配置的设备");
         }
 
-        termList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
+        termList.forEach(p -> {
+            threadPoolTaskExecutor.submit(() -> {
                 DeviceDto dto = getDeviceDto(p);
                 R<Void> result = setHttpHostByDto(dto);
                 log.info(result.getMsg());
@@ -472,8 +472,8 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
     public R<Void> deleteEmpFromDevice(Long termNo) {
         DeviceDto deviceDto = getDeviceDto(termNo);
         List<RemoteUserAccountVo> accountVoList = remoteUserAccountService.getUserAccountVoList();
-        accountVoList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
+        accountVoList.forEach(p -> {
+            threadPoolTaskExecutor.submit(() -> {
                 EmpInfoDto empDto = getEmpInfoDto(p, true, false, false, false, false);
                 R<Void> result = createOperatorEmpInfo(deviceDto, empDto);
                 log.info(result.getMsg());
@@ -496,8 +496,8 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
             return R.warn("没有要处理人员的设备");
         }
 
-        termList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
+        termList.forEach(p -> {
+            threadPoolTaskExecutor.submit(() -> {
                 DeviceDto device = getDeviceDto(p);
                 R<Void> result = createOperatorEmpInfo(device, empInfo);
                 log.info(result.getMsg());
@@ -520,8 +520,8 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         }
 
         // 并行处理
-        termList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
+        termList.forEach(p -> {
+            threadPoolTaskExecutor.submit(() -> {
                 DeviceDto device = getDeviceDto(p);
                 accountVoList.parallelStream().forEach(t -> {
                     EmpInfoDto empInfo = getEmpInfoDto(t, true, false, false, false, false);
@@ -534,7 +534,6 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
     }
     // endregion
 
-
     @Override
     public R<Void> deleteAllCardByUserNo(DeviceDto device, String userNo) {
         CardListDto cardList = new CardListDto();
@@ -570,6 +569,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
 
 
     // region 向消费机上传人员相关
+
     @Override
     public R<Void> upLoadEmpToDevice(UploadEmpDto uploadEmpDto) {
         DeviceDto device = uploadEmpDto.getDevice();
@@ -611,12 +611,17 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         if (CollectionUtil.isEmpty(accountVoList)) {
             return R.warn("没有要处理的人员");
         }
-        // 并行处理人员列表
-        accountVoList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
-                EmpInfoDto empDto = getEmpInfoDto(p, false, false, false, false, false);
-                R<Void> result = createOperatorEmpInfo(deviceDto, empDto);
-                log.info(result.getMsg());
+
+        // 处理人员列表
+        accountVoList.forEach(p -> {
+            threadPoolTaskExecutor.submit(() -> {
+                try {
+                    EmpInfoDto empDto = getEmpInfoDto(p, false, false, false, false, false);
+                    R<Void> result = createOperatorEmpInfo(deviceDto, empDto);
+                    log.info(result.getMsg());
+                } catch (Exception e) {
+                    log.error("处理人员{}异常: {}", p.getUserId(), e.getMessage(), e);
+                }
             });
         });
         return R.ok();
@@ -646,11 +651,11 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
             return R.warn("没有要处理的人员");
         }
 
-        // 并行处理
-        termList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
-                DeviceDto device = getDeviceDto(p);
-                accountVoList.parallelStream().forEach(t -> {
+        // 循环处理
+        termList.forEach(p -> {
+            DeviceDto device = getDeviceDto(p);
+            accountVoList.forEach(t -> {
+                threadPoolTaskExecutor.submit(()->{
                     EmpInfoDto empInfo = getEmpInfoDto(t, false, false, false, false, false);
                     R<Void> result = createOperatorEmpInfo(device, empInfo);
                     log.info(result.getMsg());

+ 8 - 2
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/task/ScheduledTasks.java

@@ -6,6 +6,8 @@ import org.dromara.server.hik.service.ISendDeviceService;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 /**
  * 海康设备定时任务
  * <p>
@@ -22,6 +24,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ScheduledTasks {
     private final ISendDeviceService sendDeviceService;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     /**
      * 定时任务方法,用于每天凌晨1点30分将员工信息上传至设备。
@@ -31,7 +34,10 @@ public class ScheduledTasks {
      * 注意:此方法依赖于外部服务接口的具体实现,确保目标设备和员工信息管理后台接口可用。
      */
     @Scheduled(cron = "0 30 1 * * *")
-    public void upLoadEmpToDevice(){
-        sendDeviceService.upLoadEmpToDevice();
+    public void upLoadEmpToDevice() {
+        scheduledExecutorService.submit(() -> {
+            log.info("向所有海康消费机下发人员定时任务开始执行");
+            sendDeviceService.upLoadEmpToDevice();
+        });
     }
 }