Преглед на файлове

feature: 同步服务完善
1.全量人员双向同步

luoyb преди 1 година
родител
ревизия
184b5bc82b

+ 2 - 2
pom.xml

@@ -85,7 +85,7 @@
             </properties>
             <activation>
                 <!-- 默认环境 -->
-                <activeByDefault>true</activeByDefault>
+                <activeByDefault>false</activeByDefault>
             </activation>
         </profile>
         <profile>
@@ -109,7 +109,7 @@
             </properties>
             <activation>
                 <!-- 默认环境 -->
-                <activeByDefault>false</activeByDefault>
+                <activeByDefault>true</activeByDefault>
             </activation>
         </profile>
     </profiles>

+ 1 - 1
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -255,7 +255,7 @@ public class BaseBusiness {
         message.setHeader(header);
         message.setBody(bo);
 
-        kafkaProducer.sendKafkaMessage(CloudMqEventConstants.TOPIC, message);
+        kafkaProducer.sendKafkaMessage("TO_CLOUD_EVENT", message);
         log.info("请求云端消费:{}", JSONUtil.toJsonStr(bo));
     }
 

+ 3 - 12
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumer.java

@@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.dromara.backstage.api.RemotePtParameterService;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.mq.event.kafka.EventStrategyContext;
+import org.slf4j.Logger;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -31,18 +32,8 @@ import org.springframework.stereotype.Component;
 public class KafkaConsumer {
     private final EventStrategyContext eventStrategyContext;
 
-    @KafkaListener(topics = "TO_CLOUD_EVENT", groupId = "test2-group-id")
+    @KafkaListener(topics = "TO_CLOUD_EVENT", groupId = "cloud-group-id")
     public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
-        KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
-        String eventType = receiveMsg.getHeader().getEventType();
-        String sender = receiveMsg.getHeader().getSender();
-        JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
-        if (ObjUtil.equals(sender, "000")) {
-            try {
-                eventStrategyContext.doMsgHandle(eventType, eventMsg);
-            } catch (Exception e) {
-                log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
-            }
-        }
+        KafkaUtils.doMessageHandle(record, eventStrategyContext, log);
     }
 }

+ 15 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumerYkt.java

@@ -1,13 +1,17 @@
 package org.dromara.server.mq.consumer;
 
+import cn.hutool.core.util.ObjUtil;
+import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
+import org.dromara.common.core.constant.CloudMqEventConstants;
 import org.dromara.common.message.kafka.domain.KafkaHeader;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
+import org.dromara.server.mq.event.kafka.EventStrategyContext;
 import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@@ -28,6 +32,8 @@ public class KafkaConsumerYkt {
     private final YktEventStrategyContext yktEventStrategyContext;
     private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
 
+    private final EventStrategyContext eventStrategyContext;
+
     @Value("${spring.system.tenantId}")
     private String tenantId;
 
@@ -59,6 +65,15 @@ public class KafkaConsumerYkt {
         }
     }
 
+    /**
+     * 云端推送的消息本地处理
+     * 1.部门信息、人员信息、一卡通账户信息、卡片信息
+     * @param record 消息内容
+     */
+    @KafkaListener(topics = CloudMqEventConstants.TOPIC, groupId = "local-group-id")
+    public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
+        KafkaUtils.doMessageHandle(record, eventStrategyContext, log);
+    }
     /**
      * 初始化消息记录Bo
      * @param consumeStatus 消费状态

+ 35 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaUtils.java

@@ -0,0 +1,35 @@
+package org.dromara.server.mq.consumer;
+
+import cn.hutool.core.util.ObjUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.server.mq.event.kafka.EventStrategyContext;
+import org.slf4j.Logger;
+
+/**
+ * name: KafkaUtils
+ * package: org.dromara.server.mq.consumer
+ * description:
+ * date: 2025-01-07 15:00:15 15:00
+ *
+ * @author luoyibo
+ * @version 0.1
+ * @since JDK 1.8
+ */
+public class KafkaUtils {
+    public static void doMessageHandle(ConsumerRecord<String, String> record, EventStrategyContext eventStrategyContext, Logger log) {
+        KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
+        String eventType = receiveMsg.getHeader().getEventType();
+        String sender = receiveMsg.getHeader().getSender();
+        JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
+        if (ObjUtil.equals(sender, "000")) {
+            try {
+                eventStrategyContext.doMsgHandle(eventType, eventMsg);
+            } catch (Exception e) {
+                log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
+            }
+        }
+    }
+}

+ 2 - 2
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/SyncKafkaConsumer.java

@@ -24,12 +24,12 @@ import org.springframework.stereotype.Component;
  */
 @RequiredArgsConstructor
 @Slf4j
-@Component
+//@Component
 @ConditionalOnExpression("'local'.equals('${locationFlag}')")
 public class SyncKafkaConsumer {
     private final EventStrategyContext eventStrategyContext;
 
-    @KafkaListener(topics = "old-kafka-jw", groupId = "test-ykt2")
+    //@KafkaListener(topics = "old-kafka-jw", groupId = "test-ykt2")
     public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
         KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
         String eventType = receiveMsg.getHeader().getEventType();