Explorar el Código

fix(消费服务): 增加手工补发kafka消息功能

autumnal_wind hace 11 meses
padre
commit
27833a5876

+ 40 - 2
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -1,6 +1,7 @@
 package org.dromara.server.consume.business;
 
 import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.ObjUtil;
 import cn.hutool.core.util.ObjectUtil;
@@ -436,8 +437,8 @@ public class BaseBusiness {
      */
     public void completeUploadRecord(ConsumptionBo bo, RemoteUserAccountVo accountVo) {
         // 消费记录上传完成,还有一些后续工作,不需要知道处理结果,采用异步任务提交
-        // taskExecutor.submit(() -> sendConsumeToKafka(bo, accountVo));
-        // taskExecutor.submit(() -> sendCloudConsume(bo));
+        taskExecutor.submit(() -> sendConsumeToKafka(bo, accountVo));
+        taskExecutor.submit(() -> sendCloudConsume(bo));
     }
 
 
@@ -513,6 +514,43 @@ public class BaseBusiness {
     public List<ConsumptionBo> selectOriginalReconciliation(Date consumeDate) {
         return TenantHelper.ignore(() -> originalService.selectReconciliationData(consumeDate));
     }
+    public List<ConsumptionBo> selectOriginalNoSend(Date consumeDate) {
+        List<XfConsumeDetailOriginalVo> list = originalService.queryListByConsumeDate(consumeDate);
+        List<ConsumptionBo>  consumptionBos = new ArrayList<>();
+        list.parallelStream().forEach(p->{
+            ConsumptionBo bo = new ConsumptionBo();
+            bo.setRecordId(p.getRecordId());
+            bo.setUserId(p.getUserId());
+            bo.setUserNumb(p.getUserNumb());
+            bo.setRealName(p.getRealName());
+            bo.setCardNo(p.getCardNo());
+            bo.setFactoryId(p.getFactoryId());
+            bo.setConsumeMoney(p.getConsumeMoney());
+            bo.setBalance(p.getConsumeBalance());
+            bo.setConsumeDate(p.getConsumeDate());
+            bo.setMealType(1L);
+            bo.setTermNo(p.getTermNo());
+            bo.setTermRecordId(p.getTermRecordId());
+            bo.setRecordStatus(364L);
+
+            consumptionBos.add(bo);
+        });
+        return consumptionBos;
+    }
+
+    public R<Void> sendToJwKafkaTest(Date consumeDate){
+        List<ConsumptionBo> list = selectOriginalNoSend(consumeDate);
+        if (CollUtil.isNotEmpty(list)) {
+            list.parallelStream().forEach(p->{
+                List<RemoteUserAccountVo> accountVoList = RedisUtils.getCacheList(CacheNames.PT_USER_ACCOUNT_LIST);
+                RemoteUserAccountVo accountVo = accountVoList.parallelStream()
+                    .filter(k -> k.getUserId().equals(p.getUserId())).findFirst().orElse(null);
+                taskExecutor.submit(() -> sendConsumeToKafka(p, accountVo));
+                taskExecutor.submit(() -> sendCloudConsume(p));
+            });
+        }
+        return R.ok();
+    }
     //endregion
 
     /**

+ 9 - 5
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/controller/v1/ConsumeController.java

@@ -10,8 +10,10 @@ import org.dromara.common.core.domain.R;
 import org.dromara.common.core.domain.model.ErrorInfo;
 import org.dromara.common.core.domain.model.ErrorResult;
 import org.dromara.common.core.enums.SystemUseTypeEnum;
+import org.dromara.common.core.utils.DateUtils;
 import org.dromara.common.redis.utils.RedisUtils;
 import org.dromara.server.common.domain.consume.bo.ConsumptionBo;
+import org.dromara.server.consume.business.BaseBusiness;
 import org.dromara.server.consume.business.ConsumeBusiness;
 import org.dromara.server.consume.convert.strategy.RecordConvertStrategyContent;
 import org.springframework.http.HttpStatus;
@@ -20,6 +22,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.Date;
 import java.util.Objects;
 
 /**
@@ -42,6 +45,7 @@ public class ConsumeController {
     private final RecordConvertStrategyContent recordConvertStrategy;
     private final ConsumeBusiness consumeBusiness;
     private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
+    private final BaseBusiness baseBusiness;
 
     /**
      * 请求消费(校园码)
@@ -115,14 +119,14 @@ public class ConsumeController {
     }
 
     /**
-     * kafka 推送消费信息处理
+     * kafka 推送消费信息处理,向教务系统
      *
-     * @param bo 消费业务对象
+     * @param date 推送日期
      * @return 处理结果
      */
-    @PostMapping("/Consume/kafka")
-    public R<ErrorInfo> consumeKafka(@RequestBody ConsumptionBo bo) {
-        return consumeBusiness.fullOrder(bo, "", "");
+    @PostMapping("/Consume/kafka/{date}")
+    public R<Void> consumeKafka(@PathVariable("date") String date) {
+        return baseBusiness.sendToJwKafkaTest(DateUtils.parseDate(date));
     }
 
     /**

+ 4 - 3
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/domain/vo/XfConsumeDetailOriginalVo.java

@@ -7,6 +7,7 @@ import org.dromara.server.consume.domain.XfConsumeDetailOriginal;
 
 import java.io.Serial;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.util.Date;
 
 
@@ -67,7 +68,7 @@ public class XfConsumeDetailOriginalVo implements Serializable {
     /**
      * 消费金额
      */
-    private Long consumeMoney;
+    private BigDecimal consumeMoney;
 
     /**
      * 卡流水号
@@ -82,7 +83,7 @@ public class XfConsumeDetailOriginalVo implements Serializable {
     /**
      * 卡上余额
      */
-    private Long cardValue;
+    private BigDecimal cardValue;
 
     /**
      * 卡使用次数
@@ -92,7 +93,7 @@ public class XfConsumeDetailOriginalVo implements Serializable {
     /**
      * 消费账户金额
      */
-    private Long consumeBalance;
+    private BigDecimal consumeBalance;
 
     /**
      * 设备机号

+ 2 - 0
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/service/IConsumeDetailOriginalService.java

@@ -95,4 +95,6 @@ public interface IConsumeDetailOriginalService {
      * @return 原始消费记录视图对象,如果未找到匹配的记录则返回null
      */
     XfConsumeDetailOriginalVo queryByConsumeDate(Long cardNo, Long termNo, Long termRecordId, Date consumeDate);
+
+    List<XfConsumeDetailOriginalVo> queryListByConsumeDate(Date date);
 }

+ 6 - 0
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/service/impl/ConsumeDetailOriginalServiceImpl.java

@@ -201,4 +201,10 @@ public class ConsumeDetailOriginalServiceImpl implements IConsumeDetailOriginalS
         }
         return flag;
     }
+
+    @Override
+    public List<XfConsumeDetailOriginalVo> queryListByConsumeDate(Date date) {
+        return baseMapper.selectVoList(new LambdaQueryWrapper<XfConsumeDetailOriginal>()
+            .gt(XfConsumeDetailOriginal::getConsumeDate, date));
+    }
 }