Преглед на файлове

fix: 消费服务
1.发送的kafka消息写入到消息发送表

luo.yibo@datuai.com преди 1 година
родител
ревизия
5d1676ed06

+ 5 - 0
ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/domain/bo/RemoteSendMessageRecordBo.java

@@ -62,4 +62,9 @@ public class RemoteSendMessageRecordBo implements Serializable {
     private String consumeStatus;
 
     private String tenantId;
+
+    /**
+     * 创建时间
+     */
+    private Date createTime;
 }

+ 33 - 3
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/producer/KafkaCommonProducer.java

@@ -1,12 +1,16 @@
 package org.dromara.common.message.kafka.producer;
 
+import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.util.IdUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson2.JSON;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.dubbo.config.annotation.DubboReference;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.dromara.backstage.api.RemoteSendMessageRecordService;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
 import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.utils.SpringUtils;
 import org.dromara.common.message.kafka.domain.KafkaHeader;
@@ -18,6 +22,7 @@ import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @RequiredArgsConstructor
 @Slf4j
@@ -25,11 +30,14 @@ import java.util.concurrent.CompletableFuture;
 public class KafkaCommonProducer {
     private final KafkaTemplate<String, String> kafkaTemplate;
     private final DefaultConfig defaultConfig;
+    @DubboReference
+    private final RemoteSendMessageRecordService sendMessageRecordService;
 
-    public void sendKafkaMessage(String topic, KafkaMessage<?> data) {
+    public Boolean sendKafkaMessage(String topic, KafkaMessage<?> data) {
         String jsonMessage = JSON.toJSONString(data);
         String eventType = data.getHeader().getEventType();
         String sender = data.getHeader().getSender();
+        AtomicBoolean isSender = new AtomicBoolean(false);
         try {
             ProducerRecord<String, String> record;
             record = new ProducerRecord<>(topic, jsonMessage);
@@ -39,20 +47,25 @@ public class KafkaCommonProducer {
                 if (ex != null) {
                     log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender),
                         EventTypeEnum.getMessage(eventType), data, ex);
+                    isSender.set(true);
                 } else {
                     log.info("[{}]-[{}]-[发送到kafka消息成功]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data);
+                    isSender.set(true);
                 }
             });
+            return isSender.get();
         } catch (Exception e) {
             log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+            return false;
         }
     }
 
     public void sendKafkaMessage(String topic, String eventType, String sender, Object data) {
         KafkaMessage<Object> message = new KafkaMessage<>();
         KafkaHeader header = message.getHeader();
+        String eventId = IdUtil.simpleUUID();
         header.setTimestamp(System.currentTimeMillis());
-        header.setEventId(IdUtil.simpleUUID());
+        header.setEventId(eventId);
         if(ObjectUtil.isNotEmpty(JSONUtil.parseObj(data).get("tenantId"))) {
             header.setTenantId(JSONUtil.parseObj(data).get("tenantId").toString());
         } else {
@@ -64,6 +77,23 @@ public class KafkaCommonProducer {
         message.setHeader(header);
         message.setBody(data);
 
-        SpringUtils.getAopProxy(this).sendKafkaMessage(topic, message);
+        Boolean sendResult = SpringUtils.getAopProxy(this).sendKafkaMessage(topic, message);
+
+        RemoteSendMessageRecordBo remoteSendMessageRecordBo = new RemoteSendMessageRecordBo();
+        remoteSendMessageRecordBo.setMqType("kafka");
+        remoteSendMessageRecordBo.setTopic(topic);
+        remoteSendMessageRecordBo.setEventType(eventType);
+        remoteSendMessageRecordBo.setResult(sendResult ? "S" : "F");
+        remoteSendMessageRecordBo.setMessage(JSONUtil.toJsonStr(message));
+        remoteSendMessageRecordBo.setEventId(eventId);
+        remoteSendMessageRecordBo.setSender(sender);
+        remoteSendMessageRecordBo.setTenantId(header.getTenantId());
+        remoteSendMessageRecordBo.setCreateTime(DateUtil.date());
+        try {
+            sendMessageRecordService.insertByBo(remoteSendMessageRecordBo);
+        } catch (Exception e) {
+            log.error("[{}]-[{}]-[kafka消息记录入库异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+        }
+
     }
 }

+ 1 - 1
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -70,6 +70,7 @@ public class BaseBusiness {
     @DubboReference
     private final RemoteRegisterInfoService remoteRegisterInfoService;
 
+
     /**
      * 生成原始消费记录
      *
@@ -275,7 +276,6 @@ public class BaseBusiness {
     @Async
     public void sendCloudConsume(ConsumptionBo bo) {
         kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.TO_CLOUD_TOPIC, EventTypeConstants.CONSUME, EventSenderEnum.CONSUME.code(), bo);
-        // kafkaNormalProducer.sendKafkaMessage("test", EventTypeConstants.CONSUME, EventSenderEnum.CONSUME.code(), bo);
     }
 
     /**

+ 2 - 3
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/controller/v1/ConsumeController.java

@@ -9,7 +9,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.dromara.common.core.config.DefaultConfig;
 import org.dromara.common.core.constant.CacheNames;
-import org.dromara.common.core.constant.DefaultConstants;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.domain.model.ErrorInfo;
 import org.dromara.common.core.domain.model.ErrorResult;
@@ -189,11 +188,11 @@ public class ConsumeController {
     private void sendConsumeToCloud(JSONObject jsonObject, Object record) {
         if (ObjectUtil.isEmpty(jsonObject.get("body"))) {
             // 发送消息
-            if (ObjectUtil.equals(defaultConfig.getLocationFlag(), DefaultConstants.LOCAL_FLAG)) {
+            // if (ObjectUtil.equals(defaultConfig.getLocationFlag(), DefaultConstants.LOCAL_FLAG)) {
                 ConsumptionBo bo = recordConvertStrategy.convert(record, "YC");
                 bo.setConsumeDate(jsonObject.getDate("consumeDate"));
                 ThreadUtil.execAsync(() -> baseBusiness.sendCloudConsume(bo));
-            }
+            // }
         }
     }
 }