Эх сурвалжийг харах

Merge remote-tracking branch 'origin/master'

xiari 1 жил өмнө
parent
commit
505fcfdb6b

+ 5 - 0
ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/domain/bo/RemoteSendMessageRecordBo.java

@@ -62,4 +62,9 @@ public class RemoteSendMessageRecordBo implements Serializable {
     private String consumeStatus;
 
     private String tenantId;
+
+    /**
+     * 创建时间
+     */
+    private Date createTime;
 }

+ 33 - 3
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/producer/KafkaCommonProducer.java

@@ -1,12 +1,16 @@
 package org.dromara.common.message.kafka.producer;
 
+import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.IdUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson2.JSON;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.dubbo.config.annotation.DubboReference;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.dromara.backstage.api.RemoteSendMessageRecordService;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.utils.SpringUtils;
 import org.dromara.common.message.kafka.domain.KafkaHeader;
@@ -18,6 +22,7 @@ import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @RequiredArgsConstructor
 @Slf4j
@@ -25,11 +30,14 @@ import java.util.concurrent.CompletableFuture;
 public class KafkaCommonProducer {
     private final KafkaTemplate<String, String> kafkaTemplate;
     private final DefaultConfig defaultConfig;
+    @DubboReference
+    private final RemoteSendMessageRecordService sendMessageRecordService;
 
-    public void sendKafkaMessage(String topic, KafkaMessage<?> data) {
+    public Boolean sendKafkaMessage(String topic, KafkaMessage<?> data) {
         String jsonMessage = JSON.toJSONString(data);
         String eventType = data.getHeader().getEventType();
         String sender = data.getHeader().getSender();
+        AtomicBoolean isSender = new AtomicBoolean(false);
         try {
             ProducerRecord<String, String> record;
             record = new ProducerRecord<>(topic, jsonMessage);
@@ -39,20 +47,25 @@ public class KafkaCommonProducer {
                 if (ex != null) {
                     log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender),
                         EventTypeEnum.getMessage(eventType), data, ex);
+                    isSender.set(true);
                 } else {
                     log.info("[{}]-[{}]-[发送到kafka消息成功]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data);
+                    isSender.set(true);
                 }
             });
+            return isSender.get();
         } catch (Exception e) {
             log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+            return false;
         }
     }
 
     public void sendKafkaMessage(String topic, String eventType, String sender, Object data) {
         KafkaMessage<Object> message = new KafkaMessage<>();
         KafkaHeader header = message.getHeader();
+        String eventId = IdUtil.simpleUUID();
         header.setTimestamp(System.currentTimeMillis());
-        header.setEventId(IdUtil.simpleUUID());
+        header.setEventId(eventId);
         if(ObjectUtil.isNotEmpty(JSONUtil.parseObj(data).get("tenantId"))) {
             header.setTenantId(JSONUtil.parseObj(data).get("tenantId").toString());
         } else {
@@ -64,6 +77,23 @@ public class KafkaCommonProducer {
         message.setHeader(header);
         message.setBody(data);
 
-        SpringUtils.getAopProxy(this).sendKafkaMessage(topic, message);
+        Boolean sendResult = SpringUtils.getAopProxy(this).sendKafkaMessage(topic, message);
+
+        RemoteSendMessageRecordBo remoteSendMessageRecordBo = new RemoteSendMessageRecordBo();
+        remoteSendMessageRecordBo.setMqType("kafka");
+        remoteSendMessageRecordBo.setTopic(topic);
+        remoteSendMessageRecordBo.setEventType(eventType);
+        remoteSendMessageRecordBo.setResult(sendResult ? "S" : "F");
+        remoteSendMessageRecordBo.setMessage(JSONUtil.toJsonStr(message));
+        remoteSendMessageRecordBo.setEventId(eventId);
+        remoteSendMessageRecordBo.setSender(sender);
+        remoteSendMessageRecordBo.setTenantId(header.getTenantId());
+        remoteSendMessageRecordBo.setCreateTime(DateUtil.date());
+        try {
+            sendMessageRecordService.insertByBo(remoteSendMessageRecordBo);
+        } catch (Exception e) {
+            log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+        }
+
     }
 }

+ 1 - 1
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/impl/PtRoomServiceImpl.java

@@ -102,7 +102,7 @@ public class PtRoomServiceImpl implements IPtRoomService {
         lqw.in(CollectionUtil.isNotEmpty(bo.getFloorIds()), PtRoom::getAreaId, bo.getFloorIds())
             .and(StringUtils.isNotBlank(bo.getRoomCode()),
                 e -> e.eq(PtRoom::getRoomCode, bo.getRoomCode()).or().eq(PtRoom::getRoomName, bo.getRoomCode()));
-        lqw.orderByDesc(PtRoom::getCreateTime);
+        lqw.orderByAsc(PtRoom::getRoomCode);
         Page<PtRoomVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
         List<PtRoomVo> records = result.getRecords();
         if (CollectionUtil.isNotEmpty(records)) {

+ 1 - 1
ruoyi-modules/ruoyi-hotel/src/main/java/org/dromara/hotel/controller/KfOrderController.java

@@ -191,7 +191,7 @@ public class KfOrderController extends BaseController {
     }
 
     /**
-     * 删除散客入住
+     * 解锁房间
      *
      * @param ids 主键串
      */

+ 50 - 9
ruoyi-modules/ruoyi-hotel/src/main/java/org/dromara/hotel/service/impl/KfOrderServiceImpl.java

@@ -9,6 +9,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.dromara.backstage.api.RemotePtRoomService;
 import org.dromara.common.core.config.DefaultConfig;
@@ -43,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 /**
  * 散客入住Service业务层处理
@@ -52,6 +54,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
 @RequiredArgsConstructor
 @Service
+@Slf4j
 public class KfOrderServiceImpl implements IKfOrderService {
     @DubboReference
     private final RemotePtRoomService roomService;
@@ -237,7 +240,7 @@ public class KfOrderServiceImpl implements IKfOrderService {
             for (int i = 0; i < orders.size(); i++) {
                 orders.get(i).setOrderStatus(HotelBusinessConstants.ORDER_STATUS_SF);
                 orders.get(i).setOrderType(HotelBusinessConstants.ORDER_TYPE_TK);
-                orders.get(i).setOrderIndex(i);
+                orders.get(i).setOrderIndex(i+1);
             }
             // 根据teamId 删除历史锁房
             baseMapper.delete(new LambdaQueryWrapper<KfOrder>()
@@ -455,15 +458,53 @@ public class KfOrderServiceImpl implements IKfOrderService {
      */
     @Override
     public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
-        if (isValid) {
-            // TODO 做一些业务上的校验,判断是否需要校验
+        if (ids == null || ids.isEmpty()) {
+            return false;
+        }
+
+        try {
+            // 1. 根据id查询要解锁的订单列表
+            List<KfOrderVo> delOrders = baseMapper.selectVoList(
+                Wrappers.lambdaQuery(KfOrder.class)
+                    .select(KfOrder::getRoomCode, KfOrder::getTeamId)
+                    .in(KfOrder::getId, ids));
+
+            if (delOrders.isEmpty()) {
+                return false;
+            }
+
+            // 2. 批量删除要解锁的订单
+            int count = baseMapper.deleteByIds(ids);
+
+            // 3. 批量修改房间状态
+            List<String> roomCodes = delOrders.stream().map(KfOrderVo::getRoomCode).toList();
+            baseMapper.updateRoomStatusBatch(roomCodes, HotelBusinessConstants.ROOM_STATUS_OK);
+            // 4. 重设锁房缓存
+            if (count > 0) {
+                Long teamId = delOrders.get(0).getTeamId();
+                RedisUtils.deleteObject(CacheNames.CLASS_ROOM + "_" + teamId);
+                // 获取当前团队的所有房间代码
+                Set<String> allRoomCodes = new HashSet<>();
+                allRoomCodes.addAll(
+                    baseMapper.selectVoList(
+                            Wrappers.lambdaQuery(KfOrder.class)
+                                .select(KfOrder::getRoomCode)
+                                .eq(KfOrder::getTeamId, teamId)
+                                .eq(KfOrder::getOrderStatus, HotelBusinessConstants.ORDER_STATUS_SF))
+                        .stream()
+                        .map(KfOrderVo::getRoomCode)
+                        .collect(Collectors.toSet())
+                );
+
+                RedisUtils.setCacheObject(CacheNames.CLASS_ROOM + "_" + teamId, allRoomCodes);
+            }
+
+            return count > 0;
+        } catch (Exception e) {
+            // 记录异常日志
+            log.error("解锁房间失败", e);
+            return false;
         }
-        int count = 0;
-        List<KfOrderVo> vo =
-            baseMapper.selectVoList(Wrappers.lambdaQuery(KfOrder.class).select(KfOrder::getRoomCode).in(KfOrder::getId, ids));
-        count = baseMapper.deleteByIds(ids);
-        baseMapper.updateRoomStatusBatch(vo.stream().map(KfOrderVo::getRoomCode).toList() , HotelBusinessConstants.ROOM_STATUS_OK);
-        return count > 0;
     }
 
     /**

+ 1 - 1
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -70,6 +70,7 @@ public class BaseBusiness {
     @DubboReference
     private final RemoteRegisterInfoService remoteRegisterInfoService;
 
+
     /**
      * 生成原始消费记录
      *
@@ -275,7 +276,6 @@ public class BaseBusiness {
     @Async
     public void sendCloudConsume(ConsumptionBo bo) {
         kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.TO_CLOUD_TOPIC, EventTypeConstants.CONSUME, EventSenderEnum.CONSUME.code(), bo);
-        // kafkaNormalProducer.sendKafkaMessage("test", EventTypeConstants.CONSUME, EventSenderEnum.CONSUME.code(), bo);
     }
 
     /**

+ 30 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

@@ -7,10 +7,13 @@ import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.utils.SpringUtils;
 import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
 import org.dromara.server.mq.event.kafka.IYktEventStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.kafka.annotation.KafkaListener;
@@ -32,6 +35,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnExpression("'cloud'.equals('${locationFlag}')")
 public class KafkaCloudConsumer {
     private final DefaultConfig defaultConfig;
+    private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
 
     /**
      * eventBus主题监听 第三方对接相关
@@ -59,6 +63,22 @@ public class KafkaCloudConsumer {
         doMessageHandle(receiveMsg);
     }
 
+    /**
+     * 初始化消息记录Bo
+     *
+     * @param consumeStatus 消费状态
+     */
+    private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String consumeStatus) {
+        RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
+        KafkaHeader header = msg.getHeader();
+        bo.setEventId(header.getEventId());
+        bo.setSender(header.getSender());
+        bo.setEventType(header.getEventType());
+        bo.setMessage(msg.getBody().toString());
+        bo.setConsumeStatus(consumeStatus);
+        return bo;
+    }
+
     /**
      * 公用消息消费处理事件
      *
@@ -85,8 +105,18 @@ public class KafkaCloudConsumer {
             }
             IYktEventStrategy eventStrategy = SpringUtils.getBean(sender, IYktEventStrategy.class);
             eventStrategy.doMsgHandle(eventType, eventMsg);
+            try {
+                syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "Y"));
+            } catch (Exception ex) {
+                log.error("消息发送记录更新失败", ex);
+            }
 
         } catch (Exception e) {
+            try {
+                syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "N"));
+            } catch (Exception ex) {
+                log.error("消息发送记录更新失败", ex);
+            }
             log.error("[kafka消息处理失败]-[消息:{}-[错误:]", receiveMsg, e);
         }
     }

+ 0 - 27
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/YktEventStrategyContext.java

@@ -1,27 +0,0 @@
-package org.dromara.server.mq.event.kafka;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 一卡通内部操作事件处理上下文
- */
-@Service
-public class YktEventStrategyContext {
-    private final Map<String, IYktEventStrategy> strategyMap = new ConcurrentHashMap<>();
-
-    @Autowired
-    public YktEventStrategyContext(Map<String, IYktEventStrategy> strategyMap) {
-        this.strategyMap.putAll(strategyMap);
-    }
-
-    /**
-     * 处理一卡通内部操作事件
-     */
-    public void doMsgHandle(String sender, String eventType,Object msg) throws Exception{
-        strategyMap.get(sender).doMsgHandle(eventType, msg);
-    }
-}

+ 22 - 22
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/impl/cloud/ConsumeEventStrategyImpl.java

@@ -39,26 +39,26 @@ public class ConsumeEventStrategyImpl implements IYktEventStrategy {
         }
     }
 
-    public static void main(java.lang.String[] args) {
-        java.lang.String msg = "{\n" +
-            "    \"cardNo\": 35193,\n" +
-            "    \"consumeDate\": \"2025-02-19 19:32:30\",\n" +
-            "    \"consumeMoney\": 0.00,\n" +
-            "    \"creditType\": \"TERM_CONSUME\",\n" +
-            "    \"factoryId\": 3656457030,\n" +
-            "    \"operatorId\": 0,\n" +
-            "    \"realName\": \"苏兵\",\n" +
-            "    \"recordStatus\": 364,\n" +
-            "    \"statusFlag\": 4,\n" +
-            "    \"termNo\": 100,\n" +
-            "    \"termRecordId\": 47309,\n" +
-            "    \"userNo\": 0,\n" +
-            "    \"userNumb\": \"15674973790\"\n" +
-            "  }";
-        JSONObject entries = JSONUtil.parseObj(msg);
-        entries.set("creditType", null);
-        entries.set("useType", null);
-        RemoteConsumeBo recordBo = JSONUtil.toBean(entries, RemoteConsumeBo.class);
-        System.err.println(recordBo);
-    }
+    // public static void main(String[] args) {
+    //     java.lang.String msg = "{\n" +
+    //         "    \"cardNo\": 35193,\n" +
+    //         "    \"consumeDate\": \"2025-02-19 19:32:30\",\n" +
+    //         "    \"consumeMoney\": 0.00,\n" +
+    //         "    \"creditType\": \"TERM_CONSUME\",\n" +
+    //         "    \"factoryId\": 3656457030,\n" +
+    //         "    \"operatorId\": 0,\n" +
+    //         "    \"realName\": \"苏兵\",\n" +
+    //         "    \"recordStatus\": 364,\n" +
+    //         "    \"statusFlag\": 4,\n" +
+    //         "    \"termNo\": 100,\n" +
+    //         "    \"termRecordId\": 47309,\n" +
+    //         "    \"userNo\": 0,\n" +
+    //         "    \"userNumb\": \"15674973790\"\n" +
+    //         "  }";
+    //     JSONObject entries = JSONUtil.parseObj(msg);
+    //     entries.set("creditType", null);
+    //     entries.set("useType", null);
+    //     RemoteConsumeBo recordBo = JSONUtil.toBean(entries, RemoteConsumeBo.class);
+    //     System.err.println(recordBo);
+    // }
 }