소스 검색

fix: 消费服务
1.云端定时处理接收kafka消息后没有处理成功的消费记录

luo.yibo@datuai.com 1 년 전
부모
커밋
7d40fd6bf0

+ 1 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/SendMessageRecord.java

@@ -70,6 +70,7 @@ public class SendMessageRecord implements Serializable {
     private Date createTime;
 
     private String consumeStatus;
+
     private Date updateTime;
 
 }

+ 8 - 2
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/dubbo/RemoteSendMessageRecordServiceImpl.java

@@ -11,7 +11,7 @@ import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
 import org.dromara.backstage.basics.service.ISendMessageRecordService;
 import org.springframework.stereotype.Service;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 
 @RequiredArgsConstructor
@@ -34,7 +34,13 @@ public class RemoteSendMessageRecordServiceImpl implements RemoteSendMessageReco
     @Override
     public List<RemoteSendMessageRecordBo> queryConsumeErrorList(RemoteSendMessageRecordBo remoteBo) {
         List<SendMessageRecordVo> list = sendMessageRecordService.queryConsumeErrorList(BeanUtil.copyProperties(remoteBo, SendMessageRecordBo.class));
-        return Collections.singletonList(BeanUtil.copyProperties(list, RemoteSendMessageRecordBo.class));
+
+        List<RemoteSendMessageRecordBo> remoteList = new ArrayList<>();
+        list.forEach(p -> {
+            RemoteSendMessageRecordBo bo = BeanUtil.copyProperties(p, RemoteSendMessageRecordBo.class);
+            remoteList.add(bo);
+        });
+        return remoteList;
     }
 
     @Override

+ 9 - 3
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/impl/SendMessageRecordServiceImpl.java

@@ -113,11 +113,15 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
     @Override
     public Boolean insertByBo(SendMessageRecordBo bo) {
         SendMessageRecord add = MapstructUtils.convert(bo, SendMessageRecord.class);
-        add.setUpdateTime(new Date());
+        if (add != null) {
+            add.setUpdateTime(new Date());
+        }
         validEntityBeforeSave(add);
         boolean flag = baseMapper.insert(add) > 0;
         if (flag) {
-            bo.setRecordId(add.getRecordId());
+            if (add != null) {
+                bo.setRecordId(add.getRecordId());
+            }
         }
         return flag;
     }
@@ -162,7 +166,9 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
         UpdateWrapper<SendMessageRecord> updateWrapper = new UpdateWrapper<>();
         updateWrapper.eq("event_id", bo.getEventId());
         SendMessageRecord update = MapstructUtils.convert(bo, SendMessageRecord.class);
-        update.setUpdateTime(new Date());
+        if (update != null) {
+            update.setUpdateTime(new Date());
+        }
         return baseMapper.update(update, updateWrapper) > 0;
     }
 

+ 2 - 0
ruoyi-server/ruoyi-server-base/src/main/java/org/dromara/server/base/service/yktOperation/SyncRemoteSendMessageRecordService.java

@@ -1,5 +1,6 @@
 package org.dromara.server.base.service.yktOperation;
 
+import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
@@ -25,6 +26,7 @@ public class SyncRemoteSendMessageRecordService {
      * 远程调用消息发送新增
      */
     public void insertSendMessageRecord(RemoteSendMessageRecordBo bo) throws Exception{
+        log.info("[云端kafka消息处表情况表]-[{}]", JSONUtil.toJsonStr(bo));
         boolean flag = sendMessageRecordService.insertByBo(bo);
         if (!flag){
             throw new ServiceException("消息发送记录修改失败");

+ 6 - 0
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/ConsumeBusiness.java

@@ -232,6 +232,12 @@ public class ConsumeBusiness {
         return R.ok(MessageFormat.format("[对账处理完成]-[待处理:{0}条,成功:{1}条,失败:{2}条]", total, success.get(), fail.get()));
     }
 
+    /**
+     * 云端同步对账,将接收到kafka消息但没有处理成功的消费记录重新处理.处理成功后将记录的处理状态设置成已处理
+     *
+     * @param consumeDate 待处理日期
+     * @return 处理结果.
+     */
     public R<ErrorInfo> syncReconciliation(String consumeDate) {
         RemoteSendMessageRecordBo remoteBo = new RemoteSendMessageRecordBo();
         remoteBo.setEventType("12000001");

+ 22 - 1
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/task/ScheduledTasks.java

@@ -4,6 +4,8 @@ import cn.hutool.core.date.DateUtil;
 import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.config.DefaultConfig;
+import org.dromara.common.core.constant.DefaultConstants;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.domain.model.ErrorInfo;
 import org.dromara.server.consume.business.ConsumeBusiness;
@@ -25,13 +27,14 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ScheduledTasks {
     private final ConsumeBusiness consumeBusiness;
+    private final DefaultConfig defaultConfig;
 
     /**
      * 执行原始消费对账任务。
      * 该方法在每天的9:45, 14:45, 20:45和23:45自动调用,用于处理有原始消费记录但没有消费明细的消费记录,并将这些记录写入消费明细。
      * 如果对账过程中出现错误,会记录错误信息;如果成功,则记录警告消息。
      */
-    @Scheduled(cron = "0 55 9,14,20,23 * * *")
+    @Scheduled(cron = "0 55 9,10,14,16,20,23 * * *")
     public void originalReconciliation() {
         String consumeDate = DateUtil.format(DateUtil.date(), "yyyy-MM-dd 00:00:00");
         R<ErrorInfo> result = consumeBusiness.originalReconciliation(consumeDate);
@@ -41,4 +44,22 @@ public class ScheduledTasks {
             log.warn(result.getMsg());
         }
     }
+
+    /**
+     * 执行云同步对账任务。
+     * 该方法在每天的9:45, 14:45, 20:45和23:45自动调用,用于处理云端部署环境下的消费记录对账。
+     * 如果对账过程中出现错误,会记录错误信息;如果成功,则记录警告消息。
+     */
+    @Scheduled(cron = "0 45 9,10,14,16,20,23 * * *")
+    public void CloudSyncReconciliation() {
+        if (defaultConfig.getLocationFlag().equals(DefaultConstants.CLOUD_FLAG)) {
+            String consumeDate = DateUtil.format(DateUtil.date(), "yyyy-MM-dd 00:00:00");
+            R<ErrorInfo> result = consumeBusiness.syncReconciliation(consumeDate);
+            if (R.isError(result)) {
+                log.error(JSONUtil.toJsonStr(result.getData()));
+            } else {
+                log.warn(result.getMsg());
+            }
+        }
+    }
 }