Ver código fonte

fix: mq服务
1.云端消费kafka消息时将消费结果写入消息表中

luo.yibo@datuai.com 1 ano atrás
pai
commit
2f68971996

+ 3 - 2
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/controller/v1/ConsumeController.java

@@ -9,6 +9,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.constant.CacheNames;
+import org.dromara.common.core.constant.DefaultConstants;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.domain.model.ErrorInfo;
 import org.dromara.common.core.domain.model.ErrorResult;
@@ -188,11 +189,11 @@ public class ConsumeController {
     private void sendConsumeToCloud(JSONObject jsonObject, Object record) {
         if (ObjectUtil.isEmpty(jsonObject.get("body"))) {
             // 发送消息
-            // if (ObjectUtil.equals(defaultConfig.getLocationFlag(), DefaultConstants.LOCAL_FLAG)) {
+            if (ObjectUtil.equals(defaultConfig.getLocationFlag(), DefaultConstants.LOCAL_FLAG)) {
                 ConsumptionBo bo = recordConvertStrategy.convert(record, "YC");
                 bo.setConsumeDate(jsonObject.getDate("consumeDate"));
                 ThreadUtil.execAsync(() -> baseBusiness.sendCloudConsume(bo));
-            // }
+            }
         }
     }
 }

+ 30 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

@@ -7,10 +7,13 @@ import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.utils.SpringUtils;
 import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
 import org.dromara.server.mq.event.kafka.IYktEventStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.kafka.annotation.KafkaListener;
@@ -32,6 +35,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnExpression("'cloud'.equals('${locationFlag}')")
 public class KafkaCloudConsumer {
     private final DefaultConfig defaultConfig;
+    private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
 
     /**
      * eventBus主题监听 第三方对接相关
@@ -59,6 +63,22 @@ public class KafkaCloudConsumer {
         doMessageHandle(receiveMsg);
     }
 
+    /**
+     * 初始化消息记录Bo
+     *
+     * @param consumeStatus 消费状态
+     */
+    private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String consumeStatus) {
+        RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
+        KafkaHeader header = msg.getHeader();
+        bo.setEventId(header.getEventId());
+        bo.setSender(header.getSender());
+        bo.setEventType(header.getEventType());
+        bo.setMessage(msg.getBody().toString());
+        bo.setConsumeStatus(consumeStatus);
+        return bo;
+    }
+
     /**
      * 公用消息消费处理事件
      *
@@ -85,8 +105,18 @@ public class KafkaCloudConsumer {
             }
             IYktEventStrategy eventStrategy = SpringUtils.getBean(sender, IYktEventStrategy.class);
             eventStrategy.doMsgHandle(eventType, eventMsg);
+            try {
+                syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "Y"));
+            } catch (Exception ex) {
+                log.error("消息发送记录更新失败", ex);
+            }
 
         } catch (Exception e) {
+            try {
+                syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "N"));
+            } catch (Exception ex) {
+                log.error("消息发送记录更新失败", ex);
+            }
             log.error("[kafka消息处理失败]-[消息:{}-[错误:]", receiveMsg, e);
         }
     }

+ 0 - 27
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/YktEventStrategyContext.java

@@ -1,27 +0,0 @@
-package org.dromara.server.mq.event.kafka;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 一卡通内部操作事件处理上下文
- */
-@Service
-public class YktEventStrategyContext {
-    private final Map<String, IYktEventStrategy> strategyMap = new ConcurrentHashMap<>();
-
-    @Autowired
-    public YktEventStrategyContext(Map<String, IYktEventStrategy> strategyMap) {
-        this.strategyMap.putAll(strategyMap);
-    }
-
-    /**
-     * 处理一卡通内部操作事件
-     */
-    public void doMsgHandle(String sender, String eventType,Object msg) throws Exception{
-        strategyMap.get(sender).doMsgHandle(eventType, msg);
-    }
-}

+ 22 - 22
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/impl/cloud/ConsumeEventStrategyImpl.java

@@ -39,26 +39,26 @@ public class ConsumeEventStrategyImpl implements IYktEventStrategy {
         }
     }
 
-    public static void main(java.lang.String[] args) {
-        java.lang.String msg = "{\n" +
-            "    \"cardNo\": 35193,\n" +
-            "    \"consumeDate\": \"2025-02-19 19:32:30\",\n" +
-            "    \"consumeMoney\": 0.00,\n" +
-            "    \"creditType\": \"TERM_CONSUME\",\n" +
-            "    \"factoryId\": 3656457030,\n" +
-            "    \"operatorId\": 0,\n" +
-            "    \"realName\": \"苏兵\",\n" +
-            "    \"recordStatus\": 364,\n" +
-            "    \"statusFlag\": 4,\n" +
-            "    \"termNo\": 100,\n" +
-            "    \"termRecordId\": 47309,\n" +
-            "    \"userNo\": 0,\n" +
-            "    \"userNumb\": \"15674973790\"\n" +
-            "  }";
-        JSONObject entries = JSONUtil.parseObj(msg);
-        entries.set("creditType", null);
-        entries.set("useType", null);
-        RemoteConsumeBo recordBo = JSONUtil.toBean(entries, RemoteConsumeBo.class);
-        System.err.println(recordBo);
-    }
+    // public static void main(String[] args) {
+    //     java.lang.String msg = "{\n" +
+    //         "    \"cardNo\": 35193,\n" +
+    //         "    \"consumeDate\": \"2025-02-19 19:32:30\",\n" +
+    //         "    \"consumeMoney\": 0.00,\n" +
+    //         "    \"creditType\": \"TERM_CONSUME\",\n" +
+    //         "    \"factoryId\": 3656457030,\n" +
+    //         "    \"operatorId\": 0,\n" +
+    //         "    \"realName\": \"苏兵\",\n" +
+    //         "    \"recordStatus\": 364,\n" +
+    //         "    \"statusFlag\": 4,\n" +
+    //         "    \"termNo\": 100,\n" +
+    //         "    \"termRecordId\": 47309,\n" +
+    //         "    \"userNo\": 0,\n" +
+    //         "    \"userNumb\": \"15674973790\"\n" +
+    //         "  }";
+    //     JSONObject entries = JSONUtil.parseObj(msg);
+    //     entries.set("creditType", null);
+    //     entries.set("useType", null);
+    //     RemoteConsumeBo recordBo = JSONUtil.toBean(entries, RemoteConsumeBo.class);
+    //     System.err.println(recordBo);
+    // }
 }