瀏覽代碼

kafka消息发送

bing 1 年之前
父節點
當前提交
5b5cac29bb

+ 7 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/KafkaTopicConstants.java

@@ -0,0 +1,7 @@
+package org.dromara.common.message.kafka.constant;
+
+public class KafkaTopicConstants {
+
+    // 本地库与信创云 同步 主题
+    public static final String SYNC_DATA_TOPIC = "ykt_operation";
+}

+ 2 - 1
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/bo/PtParameterBo.java

@@ -8,6 +8,7 @@ import io.github.linpeilie.annotations.AutoMapper;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import jakarta.validation.constraints.*;
+import org.dromara.common.tenant.core.TenantEntity;
 
 /**
  * 系统参数业务对象 t_pt_parameter
@@ -18,7 +19,7 @@ import jakarta.validation.constraints.*;
 @Data
 @EqualsAndHashCode(callSuper = true)
 @AutoMapper(target = PtParameter.class, reverseConvertGenerate = false)
-public class PtParameterBo extends BaseEntity {
+public class PtParameterBo extends TenantEntity {
 
     /**
      * 参数id

+ 35 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaProducer.java

@@ -1,10 +1,17 @@
 package org.dromara.backstage.mq;
 
+import com.alibaba.fastjson2.JSON;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
+import java.util.concurrent.CompletableFuture;
+
 @RequiredArgsConstructor
 @Slf4j
 @Component
@@ -24,4 +31,32 @@ public class KafkaProducer {
         log.debug("发送消息到kafka消息系统结束");
     }
 
+    public void sendSyncData(String topic, KafkaMessage<?> data){
+        try{
+            KafkaHeader header = data.getHeader();
+            String eventId = header.getEventId();
+            String sender = header.getSender();
+            String eventType = header.getEventType();
+            String tenantId = header.getTenantId();
+            String jsonMessage = JSON.toJSONString(data);
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
+            log.info("发送同步数据到kafka消息系统, data: " + jsonMessage);
+            CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
+            send.whenComplete((result, ex) -> {
+                if (ex != null) {
+                    log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
+
+
+                    // todo 异常信息入库
+                } else {
+                    log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
+                }
+            });
+        }catch (Exception e){
+            log.error("同步数据发送到kafka消息系统异常,data: " + data, e);
+            // todo 异常信息入库
+        }
+
+    }
+
 }

+ 1 - 1
ruoyi-modules/ruoyi-backstage/src/test/java/org/dromara/backstage/mq/KafkaProducerTest.java

@@ -13,6 +13,6 @@ public class KafkaProducerTest {
     @Test
     public void send()
     {
-        kafkaProcedurer.send("ykt_local_listener", "test message");
+        kafkaProcedurer.send("ykt_local_listener", "test message2");
     }
 }

+ 3 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumer.java

@@ -11,6 +11,8 @@ import org.dromara.server.mq.event.kafka.EventStrategyContext;
 import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
 import org.springframework.kafka.annotation.KafkaListener;
 
+import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
+
 /**
  * name: KafkaNormalConsumer
  * package: org.dromara.server.mq.consumer
@@ -45,7 +47,7 @@ public class KafkaConsumer {
      * 一卡通云端业务操作本地同步处理
      * @param record kafka消息
      */
-    @KafkaListener(topics = "ykt_operation", groupId = "ykt_local_listener")
+    @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "ykt_local_listener")
     public void cloudOperationSync(ConsumerRecord<String, String> record){
         KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
         log.info("[接收到Kafka消息]-[{}]", receiveMsg);