Bläddra i källkod

fix: 消费服务
1.本地消费定时对账成功后发送云端同步处理

luo.yibo@datuai.com 1 år sedan
förälder
incheckning
e97b864843
12 ändrade filer med 245 tillägg och 58 borttagningar
  1. 4 0
      ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/RemoteSendMessageRecordService.java
  2. 5 0
      ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/domain/bo/RemoteSendMessageRecordBo.java
  3. 59 19
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/producer/KafkaCommonProducer.java
  4. 15 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/dubbo/RemoteSendMessageRecordServiceImpl.java
  5. 11 3
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/ISendMessageRecordService.java
  6. 54 27
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/impl/SendMessageRecordServiceImpl.java
  7. 16 0
      ruoyi-server/ruoyi-server-base/src/main/java/org/dromara/server/base/service/yktOperation/SyncRemoteSendMessageRecordService.java
  8. 10 0
      ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java
  9. 49 2
      ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/ConsumeBusiness.java
  10. 11 3
      ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/controller/v1/ConsumeController.java
  11. 7 2
      ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/task/ScheduledTasks.java
  12. 4 2
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

+ 4 - 0
ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/RemoteSendMessageRecordService.java

@@ -2,6 +2,8 @@ package org.dromara.backstage.api;
 
 import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 
+import java.util.List;
+
 /**
  * 消息发送记录接口
  *
@@ -12,5 +14,7 @@ public interface RemoteSendMessageRecordService {
     Boolean updateByBo(RemoteSendMessageRecordBo bo) throws Exception;
     Boolean insertByBo(RemoteSendMessageRecordBo bo) throws Exception;
 
+    List<RemoteSendMessageRecordBo> queryConsumeErrorList(RemoteSendMessageRecordBo remoteBo);
 
+    Boolean updateConsumeStatusById(Long messageId);
 }

+ 5 - 0
ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/domain/bo/RemoteSendMessageRecordBo.java

@@ -67,4 +67,9 @@ public class RemoteSendMessageRecordBo implements Serializable {
      * 创建时间
      */
     private Date createTime;
+
+    /**
+     * 更新时间
+     */
+    private Date updateTime;
 }

+ 59 - 19
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/producer/KafkaCommonProducer.java

@@ -33,6 +33,29 @@ public class KafkaCommonProducer {
     @DubboReference
     private final RemoteSendMessageRecordService sendMessageRecordService;
 
+    /**
+     * Initializes a RemoteSendMessageRecordBo object with the provided Kafka message, result, and topic.
+     *
+     * @param msg    The KafkaMessage object containing the message and header information.
+     * @param result The result of the message processing.
+     * @param topic  The Kafka topic to which the message was sent.
+     * @return A fully initialized RemoteSendMessageRecordBo object.
+     */
+    private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String result, String topic) {
+        RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
+        KafkaHeader header = msg.getHeader();
+        bo.setResult(result);
+        bo.setMqType("kafka");
+        bo.setTopic(topic);
+        bo.setEventId(header.getEventId());
+        bo.setSender(header.getSender());
+        bo.setEventType(header.getEventType());
+        bo.setMessage(JSONUtil.toJsonStr(msg));
+        bo.setTenantId(header.getTenantId());
+        bo.setCreateTime(DateUtil.date());
+        return bo;
+    }
+
     public Boolean sendKafkaMessage(String topic, KafkaMessage<?> data) {
         String jsonMessage = JSON.toJSONString(data);
         String eventType = data.getHeader().getEventType();
@@ -46,16 +69,34 @@ public class KafkaCommonProducer {
             send.whenComplete((result, ex) -> {
                 if (ex != null) {
                     log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender),
-                        EventTypeEnum.getMessage(eventType), data, ex);
-                    isSender.set(true);
+                              EventTypeEnum.getMessage(eventType), data, ex);
+                    isSender.set(false);
+                    try {
+                        sendMessageRecordService.insertByBo(initBo(data, "F", topic));
+                    } catch (Exception e) {
+                        log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType),
+                                  data, e);
+                    }
                 } else {
                     log.info("[{}]-[{}]-[发送到kafka消息成功]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data);
                     isSender.set(true);
+                    try {
+                        sendMessageRecordService.insertByBo(initBo(data, "S", topic));
+                    } catch (Exception e) {
+                        log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType),
+                                  data, e);
+                    }
                 }
             });
             return isSender.get();
         } catch (Exception e) {
             log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+            try {
+                sendMessageRecordService.insertByBo(initBo(data, "F", topic));
+            } catch (Exception ex) {
+                log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data,
+                          ex);
+            }
             return false;
         }
     }
@@ -66,7 +107,7 @@ public class KafkaCommonProducer {
         String eventId = IdUtil.simpleUUID();
         header.setTimestamp(System.currentTimeMillis());
         header.setEventId(eventId);
-        if(ObjectUtil.isNotEmpty(JSONUtil.parseObj(data).get("tenantId"))) {
+        if (ObjectUtil.isNotEmpty(JSONUtil.parseObj(data).get("tenantId"))) {
             header.setTenantId(JSONUtil.parseObj(data).get("tenantId").toString());
         } else {
             header.setTenantId(defaultConfig.getTenantId());
@@ -79,21 +120,20 @@ public class KafkaCommonProducer {
 
         Boolean sendResult = SpringUtils.getAopProxy(this).sendKafkaMessage(topic, message);
 
-        RemoteSendMessageRecordBo remoteSendMessageRecordBo = new RemoteSendMessageRecordBo();
-        remoteSendMessageRecordBo.setMqType("kafka");
-        remoteSendMessageRecordBo.setTopic(topic);
-        remoteSendMessageRecordBo.setEventType(eventType);
-        remoteSendMessageRecordBo.setResult(sendResult ? "S" : "F");
-        remoteSendMessageRecordBo.setMessage(JSONUtil.toJsonStr(message));
-        remoteSendMessageRecordBo.setEventId(eventId);
-        remoteSendMessageRecordBo.setSender(sender);
-        remoteSendMessageRecordBo.setTenantId(header.getTenantId());
-        remoteSendMessageRecordBo.setCreateTime(DateUtil.date());
-        try {
-            sendMessageRecordService.insertByBo(remoteSendMessageRecordBo);
-        } catch (Exception e) {
-            log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
-        }
-
+        // RemoteSendMessageRecordBo remoteSendMessageRecordBo = new RemoteSendMessageRecordBo();
+        // remoteSendMessageRecordBo.setMqType("kafka");
+        // remoteSendMessageRecordBo.setTopic(topic);
+        // remoteSendMessageRecordBo.setEventType(eventType);
+        // remoteSendMessageRecordBo.setResult(sendResult ? "S" : "F");
+        // remoteSendMessageRecordBo.setMessage(JSONUtil.toJsonStr(message));
+        // remoteSendMessageRecordBo.setEventId(eventId);
+        // remoteSendMessageRecordBo.setSender(sender);
+        // remoteSendMessageRecordBo.setTenantId(header.getTenantId());
+        // remoteSendMessageRecordBo.setCreateTime(DateUtil.date());
+        // try {
+        //     sendMessageRecordService.insertByBo(remoteSendMessageRecordBo);
+        // } catch (Exception e) {
+        //     log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+        // }
     }
 }

+ 15 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/dubbo/RemoteSendMessageRecordServiceImpl.java

@@ -7,9 +7,13 @@ import org.apache.dubbo.config.annotation.DubboService;
 import org.dromara.backstage.api.RemoteSendMessageRecordService;
 import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
+import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
 import org.dromara.backstage.basics.service.ISendMessageRecordService;
 import org.springframework.stereotype.Service;
 
+import java.util.Collections;
+import java.util.List;
+
 @RequiredArgsConstructor
 @Service
 @DubboService
@@ -26,4 +30,15 @@ public class RemoteSendMessageRecordServiceImpl implements RemoteSendMessageReco
     public Boolean insertByBo(RemoteSendMessageRecordBo bo) throws Exception {
         return sendMessageRecordService.insertByBo(BeanUtil.copyProperties(bo, SendMessageRecordBo.class));
     }
+
+    @Override
+    public List<RemoteSendMessageRecordBo> queryConsumeErrorList(RemoteSendMessageRecordBo remoteBo) {
+        List<SendMessageRecordVo> list = sendMessageRecordService.queryConsumeErrorList(BeanUtil.copyProperties(remoteBo, SendMessageRecordBo.class));
+        return Collections.singletonList(BeanUtil.copyProperties(list, RemoteSendMessageRecordBo.class));
+    }
+
+    @Override
+    public Boolean updateConsumeStatusById(Long messageId) {
+        return sendMessageRecordService.updateConsumeStatusById(messageId);
+    }
 }

+ 11 - 3
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/ISendMessageRecordService.java

@@ -1,10 +1,9 @@
 package org.dromara.backstage.basics.service;
 
-import org.dromara.backstage.basics.domain.SendMessageRecord;
-import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
 import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
-import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
 import org.dromara.common.mybatis.core.page.PageQuery;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
 
 import java.util.Collection;
 import java.util.List;
@@ -72,4 +71,13 @@ public interface ISendMessageRecordService {
      */
     Boolean updateBoByEventId(SendMessageRecordBo bo);
 
+    /**
+     * 查询消费失败的消息发送记录列表。
+     *
+     * @param bo 查询条件
+     * @return 消费失败的消息发送记录列表
+     */
+    List<SendMessageRecordVo> queryConsumeErrorList(SendMessageRecordBo bo);
+
+    Boolean updateConsumeStatusById(Long messageId);
 }

+ 54 - 27
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/impl/SendMessageRecordServiceImpl.java

@@ -1,26 +1,27 @@
 package org.dromara.backstage.basics.service.impl;
 
-import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
-import org.dromara.common.core.utils.MapstructUtils;
-import org.dromara.common.core.utils.StringUtils;
-import org.dromara.common.mybatis.core.page.TableDataInfo;
-import org.dromara.common.mybatis.core.page.PageQuery;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.RequiredArgsConstructor;
-import org.springframework.stereotype.Service;
+import org.dromara.backstage.basics.domain.SendMessageRecord;
 import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
 import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
-import org.dromara.backstage.basics.domain.SendMessageRecord;
 import org.dromara.backstage.basics.mapper.SendMessageRecordMapper;
 import org.dromara.backstage.basics.service.ISendMessageRecordService;
+import org.dromara.common.core.utils.MapstructUtils;
+import org.dromara.common.core.utils.StringUtils;
+import org.dromara.common.mybatis.core.page.PageQuery;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.springframework.stereotype.Service;
 
+import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.Collection;
 
 /**
  * 消息发送记录Service业务层处理
@@ -41,7 +42,7 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
      * @return 消息发送记录
      */
     @Override
-    public SendMessageRecordVo queryById(Long recordId){
+    public SendMessageRecordVo queryById(Long recordId) {
         return baseMapper.selectVoById(recordId);
     }
 
@@ -82,23 +83,24 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
         lqw.eq(StringUtils.isNotBlank(bo.getMessage()), SendMessageRecord::getMessage, bo.getMessage());
         lqw.eq(StringUtils.isNotBlank(bo.getEventId()), SendMessageRecord::getEventId, bo.getEventId());
         lqw.eq(StringUtils.isNotBlank(bo.getSender()), SendMessageRecord::getSender, bo.getSender());
+        lqw.eq(StringUtils.isNotBlank(bo.getConsumeStatus()), SendMessageRecord::getConsumeStatus, bo.getConsumeStatus());
         return lqw;
     }
 
-    private QueryWrapper<SendMessageRecord> buildQueryWrapper(SendMessageRecordBo bo,String tableAlias) {
+    private QueryWrapper<SendMessageRecord> buildQueryWrapper(SendMessageRecordBo bo, String tableAlias) {
         QueryWrapper<SendMessageRecord> lqw = new QueryWrapper<>();
         String columnPrefix = "";
-        if(StringUtils.isNotBlank(tableAlias)){
+        if (StringUtils.isNotBlank(tableAlias)) {
             columnPrefix = tableAlias + ".";
         }
-        lqw.eq(bo.getRecordId() != null, columnPrefix+"record_id", bo.getRecordId());
-        lqw.eq(StringUtils.isNotBlank(bo.getMqType()), columnPrefix+"mq_type", bo.getMqType());
-        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), columnPrefix+"topic", bo.getTopic());
-        lqw.eq(StringUtils.isNotBlank(bo.getEventType()), columnPrefix+"event_type", bo.getEventType());
-        lqw.eq(StringUtils.isNotBlank(bo.getResult()), columnPrefix+"result", bo.getResult());
-        lqw.eq(StringUtils.isNotBlank(bo.getMessage()), columnPrefix+"message", bo.getMessage());
-        lqw.eq(StringUtils.isNotBlank(bo.getEventId()), columnPrefix+"event_id", bo.getEventId());
-        lqw.eq(StringUtils.isNotBlank(bo.getSender()), columnPrefix+"sender", bo.getSender());
+        lqw.eq(bo.getRecordId() != null, columnPrefix + "record_id", bo.getRecordId());
+        lqw.eq(StringUtils.isNotBlank(bo.getMqType()), columnPrefix + "mq_type", bo.getMqType());
+        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), columnPrefix + "topic", bo.getTopic());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventType()), columnPrefix + "event_type", bo.getEventType());
+        lqw.eq(StringUtils.isNotBlank(bo.getResult()), columnPrefix + "result", bo.getResult());
+        lqw.eq(StringUtils.isNotBlank(bo.getMessage()), columnPrefix + "message", bo.getMessage());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventId()), columnPrefix + "event_id", bo.getEventId());
+        lqw.eq(StringUtils.isNotBlank(bo.getSender()), columnPrefix + "sender", bo.getSender());
         return lqw;
     }
 
@@ -136,8 +138,8 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
     /**
      * 保存前的数据校验
      */
-    private void validEntityBeforeSave(SendMessageRecord entity){
-        //做一些数据校验,如唯一约束
+    private void validEntityBeforeSave(SendMessageRecord entity) {
+        // 做一些数据校验,如唯一约束
     }
 
     /**
@@ -149,8 +151,8 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
      */
     @Override
     public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
-        if(isValid){
-            //做一些业务上的校验,判断是否需要校验
+        if (isValid) {
+            // 做一些业务上的校验,判断是否需要校验
         }
         return baseMapper.deleteByIds(ids) > 0;
     }
@@ -158,10 +160,35 @@ public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
     @Override
     public Boolean updateBoByEventId(SendMessageRecordBo bo) {
         UpdateWrapper<SendMessageRecord> updateWrapper = new UpdateWrapper<>();
-        updateWrapper.eq("event_id",bo.getEventId());
+        updateWrapper.eq("event_id", bo.getEventId());
         SendMessageRecord update = MapstructUtils.convert(bo, SendMessageRecord.class);
         update.setUpdateTime(new Date());
-        return baseMapper.update(update,updateWrapper) > 0;
+        return baseMapper.update(update, updateWrapper) > 0;
     }
 
+    /**
+     * 查询消费失败的消息发送记录列表。
+     *
+     * @param bo 查询条件
+     * @return 消费失败的消息发送记录列表
+     */
+    @Override
+    public List<SendMessageRecordVo> queryConsumeErrorList(SendMessageRecordBo bo) {
+        LambdaQueryWrapper<SendMessageRecord> lqw = Wrappers.lambdaQuery();
+        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), SendMessageRecord::getTopic, bo.getTopic());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventType()), SendMessageRecord::getEventType, bo.getEventType());
+        lqw.eq(StringUtils.isNotBlank(bo.getConsumeStatus()), SendMessageRecord::getConsumeStatus, bo.getConsumeStatus());
+        lqw.gt(bo.getUpdateTime() != null, SendMessageRecord::getUpdateTime, bo.getUpdateTime());
+
+        return baseMapper.selectVoList(lqw);
+    }
+
+    @Override
+    public Boolean updateConsumeStatusById(Long messageId) {
+        LambdaUpdateWrapper<SendMessageRecord> updateWrapper = new LambdaUpdateWrapper<SendMessageRecord>()
+                                                                   .set(SendMessageRecord::getConsumeStatus, "Y")
+                                                                   .set(SendMessageRecord::getUpdateTime, new Date())
+                                                                   .eq(SendMessageRecord::getRecordId, messageId);
+        return baseMapper.update(null, updateWrapper) > 0;
+    }
 }

+ 16 - 0
ruoyi-server/ruoyi-server-base/src/main/java/org/dromara/server/base/service/yktOperation/SyncRemoteSendMessageRecordService.java

@@ -8,6 +8,8 @@ import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.common.core.exception.ServiceException;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+
 /**
  * 更新消息记录
  */
@@ -28,4 +30,18 @@ public class SyncRemoteSendMessageRecordService {
             throw new ServiceException("消息发送记录修改失败");
         }
     }
+
+    /**
+     * 查询消费失败的消息记录列表。
+     *
+     * @param remoteBo 消息发送记录业务对象,用于过滤查询条件
+     * @return 消费失败的消息记录列表
+     */
+    public List<RemoteSendMessageRecordBo> queryConsumeErrorList(RemoteSendMessageRecordBo remoteBo) {
+        return sendMessageRecordService.queryConsumeErrorList(remoteBo);
+    }
+
+    public Boolean updateConsumeStatusById(Long messageId) {
+        return sendMessageRecordService.updateConsumeStatusById(messageId);
+    }
 }

+ 10 - 0
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -8,6 +8,7 @@ import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.backstage.api.domain.vo.RemoteCardVo;
 import org.dromara.backstage.api.domain.vo.RemoteMealTypeVo;
 import org.dromara.backstage.api.domain.vo.RemoteUserAccountVo;
@@ -28,6 +29,7 @@ import org.dromara.common.message.kafka.enums.EventSenderEnum;
 import org.dromara.common.message.kafka.producer.KafkaCommonProducer;
 import org.dromara.common.redis.utils.RedisUtils;
 import org.dromara.common.tenant.helper.TenantHelper;
+import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
 import org.dromara.server.common.domain.consume.bo.ConsumptionBo;
 import org.dromara.server.common.domain.vo.yc.YcPushConsumeInfoVo;
 import org.dromara.server.consume.domain.bo.*;
@@ -66,6 +68,7 @@ public class BaseBusiness {
     private final IPtBagService bagService;
     private final KafkaCommonProducer kafkaNormalProducer;
     private final DefaultConfig defaultConfig;
+    private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
 
     @DubboReference
     private final RemoteRegisterInfoService remoteRegisterInfoService;
@@ -277,6 +280,13 @@ public class BaseBusiness {
         kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.TO_CLOUD_TOPIC, EventTypeConstants.CONSUME, EventSenderEnum.CONSUME.code(), bo);
     }
 
+    public List<RemoteSendMessageRecordBo> queryConsumeErrorList(RemoteSendMessageRecordBo remoteBo) {
+        return syncRemoteSendMessageRecordService.queryConsumeErrorList(remoteBo);
+    }
+
+    public Boolean updateConsumeStatusById(Long messageId) {
+        return syncRemoteSendMessageRecordService.updateConsumeStatusById(messageId);
+    }
     /**
      * 创建一条消费明细记录
      *

+ 49 - 2
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/ConsumeBusiness.java

@@ -1,17 +1,21 @@
 package org.dromara.server.consume.business;
 
+import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.thread.ThreadUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.backstage.api.domain.vo.RemoteCardVo;
 import org.dromara.backstage.api.domain.vo.RemoteMealTypeVo;
 import org.dromara.backstage.api.domain.vo.RemoteOperatorVo;
 import org.dromara.backstage.api.domain.vo.RemoteUserAccountVo;
-import org.dromara.common.core.config.AsyncConfig;
+import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.constant.ApiErrorTypeConstants;
+import org.dromara.common.core.constant.DefaultConstants;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.domain.model.ErrorInfo;
 import org.dromara.common.core.enums.ConsumeRecordTypeEnum;
@@ -46,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ConsumeBusiness {
     private final CheckBusiness checkBusiness;
     private final BaseBusiness baseBusiness;
-    private final AsyncConfig asyncConfig;
+    private final DefaultConfig defaultConfig;
     // private final KafkaNormalProducer kafkaProducer;
 
     /**
@@ -209,6 +213,11 @@ public class ConsumeBusiness {
                 if (R.isSuccess(result)) {
                     doMessage.add(MessageFormat.format("[入账成功]-[{0}]", JsonUtils.toJsonString(p)));
                     success.getAndIncrement();
+                    // 如果是本地消费服务成功后需要发消息到云端同步处理
+                    if (ObjectUtil.equals(defaultConfig.getLocationFlag(), DefaultConstants.LOCAL_FLAG)) {
+                        ConsumptionBo bo = BeanUtil.copyProperties(p, ConsumptionBo.class);
+                        ThreadUtil.execAsync(() -> baseBusiness.sendCloudConsume(bo));
+                    }
                 } else {
                     doMessage.add(MessageFormat.format("[入账失败]-[{0}]-[{1}]", JsonUtils.toJsonString(p),JSONUtil.toJsonStr(result.getData())));
                     fail.getAndIncrement();
@@ -222,4 +231,42 @@ public class ConsumeBusiness {
         doMessage.forEach(System.out::println);
         return R.ok(MessageFormat.format("[对账处理完成]-[待处理:{0}条,成功:{1}条,失败:{2}条]", total, success.get(), fail.get()));
     }
+
+    public R<ErrorInfo> syncReconciliation(String consumeDate) {
+        RemoteSendMessageRecordBo remoteBo = new RemoteSendMessageRecordBo();
+        remoteBo.setEventType("12000001");
+        remoteBo.setUpdateTime(DateUtil.parseDateTime(consumeDate));
+        remoteBo.setConsumeStatus("N");
+        List<RemoteSendMessageRecordBo> messageRecordBoList = baseBusiness.queryConsumeErrorList(remoteBo);
+        if (CollectionUtil.isEmpty(messageRecordBoList)) {
+            return R.ok();
+        }
+        int total = messageRecordBoList.size();
+        AtomicInteger success = new AtomicInteger();
+        AtomicInteger fail = new AtomicInteger();
+        List<String> doMessage = new ArrayList<>();
+        messageRecordBoList.forEach(p -> {
+            ConsumptionBo bo = JSONUtil.toBean(p.getMessage(), ConsumptionBo.class);
+            bo.setUseType(SystemUseTypeEnum.CONSUME.code());
+            bo.setRecordId(0L);
+            try {
+                R<ErrorInfo> result = fullOrder(bo, "", "");
+                if (R.isSuccess(result)) {
+                    doMessage.add(MessageFormat.format("[同步消费成功]-[{0}]", JsonUtils.toJsonString(p)));
+                    success.getAndIncrement();
+                    baseBusiness.updateConsumeStatusById(p.getRecordId());
+                } else {
+                    doMessage.add(
+                        MessageFormat.format("[同步消费失败]-[{0}]-[{1}]", JsonUtils.toJsonString(p), JSONUtil.toJsonStr(result.getData())));
+                    fail.getAndIncrement();
+                }
+            } catch (Exception e) {
+                doMessage.add(MessageFormat.format("[同步消费失败]-[{0}]-[{1}]", JsonUtils.toJsonString(p), e.getStackTrace()));
+                fail.getAndIncrement();
+                log.error("[对账失败]-[{}]", JSONUtil.toJsonStr(p), e);
+            }
+        });
+        doMessage.forEach(System.out::println);
+        return R.ok(MessageFormat.format("[同步消费完成]-[待处理:{0}条,成功:{1}条,失败:{2}条]", total, success.get(), fail.get()));
+    }
 }

+ 11 - 3
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/controller/v1/ConsumeController.java

@@ -1,6 +1,5 @@
 package org.dromara.server.consume.controller.v1;
 
-import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.thread.ThreadUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONObject;
@@ -13,7 +12,6 @@ import org.dromara.common.core.constant.DefaultConstants;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.domain.model.ErrorInfo;
 import org.dromara.common.core.domain.model.ErrorResult;
-import org.dromara.common.core.enums.CreditTypeEnum;
 import org.dromara.common.core.enums.SystemUseTypeEnum;
 import org.dromara.common.redis.utils.RedisUtils;
 import org.dromara.server.common.domain.consume.bo.ConsumptionBo;
@@ -145,6 +143,16 @@ public class ConsumeController {
         return consumeBusiness.originalReconciliation(consumeDate);
     }
 
+    /**
+     * 消费同步对账,将云平台消息消费未成功的消费记录重新生成
+     *
+     * @return 生成结果
+     */
+    @PostMapping("/consume/sync/reconciliation/{consumeDate}")
+    public R<ErrorInfo> syncReconciliation(@PathVariable("consumeDate") java.lang.String consumeDate) {
+        return consumeBusiness.syncReconciliation(consumeDate);
+    }
+
     /**
      * 消费业务初步处理
      *
@@ -165,7 +173,7 @@ public class ConsumeController {
 
         R<ErrorInfo> errorInfo;
         if (Objects.equals(type, "requestConsume")) {
-            int recordType = bo.getRecordStatus().intValue();
+            // int recordType = bo.getRecordStatus().intValue();
             //if (ObjectUtil.equals(bo.getCreditType(), CreditTypeEnum.TERM_CONSUME.code()) && recordType == 364) {
             //    // 如果是消费机请求消费,而且是正常消费记录将消费时间设置为当前时间,以防消费时时钟不对造成实际消费时间不正确
             //    bo.setConsumeDate(DateUtil.date());

+ 7 - 2
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/task/ScheduledTasks.java

@@ -26,8 +26,13 @@ import org.springframework.stereotype.Component;
 public class ScheduledTasks {
     private final ConsumeBusiness consumeBusiness;
 
-    @Scheduled(cron = "0 45 9,14,20,23 * * *")
-    public void syncDoorOpenRecord() {
+    /**
+     * 执行原始消费对账任务。
+     * 该方法在每天的9:45, 14:45, 20:45和23:45自动调用,用于处理有原始消费记录但没有消费明细的消费记录,并将这些记录写入消费明细。
+     * 如果对账过程中出现错误,会记录错误信息;如果成功,则记录警告消息。
+     */
+    @Scheduled(cron = "0 55 9,14,20,23 * * *")
+    public void originalReconciliation() {
         String consumeDate = DateUtil.format(DateUtil.date(), "yyyy-MM-dd 00:00:00");
         R<ErrorInfo> result = consumeBusiness.originalReconciliation(consumeDate);
         if (R.isError(result)) {

+ 4 - 2
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

@@ -1,5 +1,6 @@
 package org.dromara.server.mq.consumer;
 
+import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.IdUtil;
 import cn.hutool.core.util.ObjUtil;
 import cn.hutool.core.util.ObjectUtil;
@@ -52,7 +53,7 @@ public class KafkaCloudConsumer {
     private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String consumeStatus, String topic) {
         RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
         KafkaHeader header = msg.getHeader();
-        bo.setMessage("kafka");
+        bo.setMqType("kafka");
         bo.setTopic(topic);
         bo.setEventId(header.getEventId());
         bo.setSender(header.getSender());
@@ -60,6 +61,7 @@ public class KafkaCloudConsumer {
         bo.setMessage(msg.getBody().toString());
         bo.setTenantId(header.getTenantId());
         bo.setConsumeStatus(consumeStatus);
+        bo.setCreateTime(DateUtil.date());
         return bo;
     }
 
@@ -90,7 +92,7 @@ public class KafkaCloudConsumer {
         String sender = receiveMsg.getHeader().getSender();
         //在eventBus主题中,sender=005是由本系统发出,无需业务处理
         if (ObjUtil.notEqual(sender, "005") && ObjUtil.notEqual(sender, "006")) {
-            doMessageHandle(receiveMsg, "");
+            doMessageHandle(receiveMsg, topic);
         }
 
         // 记录