Przeglądaj źródła

feature: 同步服务完善
1.重新梳理了除界面操作外的其它kafka消息推送与消费

luoyb 1 rok temu
rodzic
commit
141090b801

+ 2 - 19
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/cardCenter/service/impl/PtCardServiceImpl.java

@@ -149,7 +149,7 @@ public class PtCardServiceImpl implements IPtCardService {
             }
         }
         PtCardVo vo = baseMapper.selectVoById(bo.getCardId());
-        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.CARD, EventSenderEnum.SYSTEM.code(), vo);
+        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.CARD, EventSenderEnum.BACKSTAGE.code(), vo);
         return flag;
     }
 
@@ -165,7 +165,7 @@ public class PtCardServiceImpl implements IPtCardService {
         validEntityBeforeSave(update);
         int count =  baseMapper.updateById(update) ;
         PtCardVo vo = baseMapper.selectVoById(bo.getCardId());
-        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.CARD, EventSenderEnum.SYSTEM.code(), vo);
+        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.CARD, EventSenderEnum.BACKSTAGE.code(), vo);
         return count>0;
     }
 
@@ -536,21 +536,4 @@ public class PtCardServiceImpl implements IPtCardService {
 
         return true;
     }
-
-    private void sendCloudConsume(PtCardVo vo) {
-        String pushData = remotePtParameterService.getPtParameterByKey("PUSH_MQ_DATA");
-        if (ObjectUtil.isNotEmpty(pushData) && ObjectUtil.equals(pushData, "1")) {
-            KafkaMessage<PtCardVo> message = new KafkaMessage<>();
-            KafkaHeader header = message.getHeader();
-            header.setTimestamp(System.currentTimeMillis());
-            header.setEventId(CloudMqEventConstants.CARD);
-            header.setEventType(CloudMqEventConstants.CARD);
-            header.setSender(CloudMqEventConstants.SENDER);
-
-            message.setHeader(header);
-            message.setBody(vo);
-
-            kafkaNormalProducer.sendKafkaMessage(CloudMqEventConstants.TOPIC, message);
-        }
-    }
 }

+ 5 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/payment/domain/vo/PtUserAccountVo.java

@@ -261,4 +261,9 @@ public class PtUserAccountVo implements Serializable {
      * 更新时间
      */
     private Date updateTime;
+
+    /**
+     * 租户编号
+     */
+    private String tenantId;
 }

+ 2 - 2
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/payment/service/impl/PtUserAccountServiceImpl.java

@@ -232,7 +232,7 @@ public class PtUserAccountServiceImpl implements IPtUserAccountService {
             bagService.initAccountBag(bo.getUserId());
         }
         PtUserAccountVo vo = baseMapper.selectVoById(bo.getUserId());
-        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.ACCOUNT, EventSenderEnum.SYSTEM.code(), vo);
+        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.ACCOUNT, EventSenderEnum.BACKSTAGE.code(), vo);
         return flag;
     }
 
@@ -248,7 +248,7 @@ public class PtUserAccountServiceImpl implements IPtUserAccountService {
         validEntityBeforeSave(update);
         int count = baseMapper.updateById(update);
         PtUserAccountVo vo = baseMapper.selectVoById(bo.getUserId());
-        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.ACCOUNT, EventSenderEnum.SYSTEM.code(), vo);
+        kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.ACCOUNT, EventSenderEnum.BACKSTAGE.code(), vo);
         return count > 0;
     }
 

+ 2 - 0
ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/dubbo/RemoteUserServiceImpl.java

@@ -348,6 +348,8 @@ public class RemoteUserServiceImpl implements RemoteUserService {
             bo.setUserDeptBoList(userDeptBoList);
             return userService.insertUser(bo) > 0 ? R.ok() : R.fail(message);
         } catch (Exception e) {
+            log.error("message:{}", String.valueOf(e));
+
             return R.fail(message+','+ e.getMessage());
         }
     }

+ 0 - 13
ruoyi-server/ruoyi-server-consume/src/main/java/org/dromara/server/consume/business/BaseBusiness.java

@@ -248,19 +248,6 @@ public class BaseBusiness {
 
     @Async
     public void sendCloudConsume(ConsumptionBo bo){
-        //KafkaMessage<ConsumptionBo> message = new KafkaMessage<>();
-        //KafkaHeader header = message.getHeader();
-        //header.setTimestamp(System.currentTimeMillis());
-        //header.setEventId(CloudMqEventConstants.CONSUME);
-        //header.setEventType(CloudMqEventConstants.CONSUME);
-        //header.setSender(CloudMqEventConstants.SENDER);
-        //
-        //message.setHeader(header);
-        //message.setBody(bo);
-        //
-        //kafkaProducer.sendKafkaMessage("TO_CLOUD_EVENT", message);
-        //log.info("请求云端消费:{}", JSONUtil.toJsonStr(bo));
-
         kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, EventTypeConstants.CONSUME, EventSenderEnum.CONSUME.code(), bo);
     }
 

+ 13 - 12
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/impl/cloud/PushUserEventStrategyImpl.java

@@ -7,8 +7,10 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.dromara.common.core.constant.CloudMqEventConstants;
+import org.dromara.common.message.kafka.constant.EventSenderConstants;
 import org.dromara.common.message.kafka.constant.EventTypeConstants;
 import org.dromara.server.mq.event.kafka.IEventStrategy;
+import org.dromara.server.mq.event.kafka.IYktEventStrategy;
 import org.dromara.system.api.RemoteDeptService;
 import org.dromara.system.api.RemoteUserService;
 import org.dromara.system.api.domain.bo.RemoteDeptBo;
@@ -27,21 +29,20 @@ import org.springframework.stereotype.Service;
  */
 @Slf4j
 @RequiredArgsConstructor
-@Service(EventTypeConstants.USER)
-public class PushUserEventStrategyImpl implements IEventStrategy {
+//@Service(EventSenderConstants.SYSTEM)
+public class PushUserEventStrategyImpl implements IYktEventStrategy {
     @DubboReference
     private final RemoteUserService remoteUserService;
 
     @Override
-    public void doMsgHandle(JSONObject data) {
-        RemoteUserBo remoteBo = JSONUtil.toBean(data, RemoteUserBo.class);
-        log.info("[处理云端->本地用户同步请求]-[用户信息:{}]", JSONUtil.toJsonStr(remoteBo));
-        RemoteUserVo remoteVo = remoteUserService.selectUserById(remoteBo.getUserId());
-        if(ObjectUtil.isEmpty(remoteVo)){
-            remoteUserService.insertUser(remoteBo);
-        } else {
-            remoteUserService.updateUser(remoteBo);
-        }
-
+    public void doMsgHandle(String eventType, Object msg) throws Exception {
+    //    RemoteUserBo remoteBo = JSONUtil.toBean(JSONUtil.parseObj(msg), RemoteUserBo.class);
+    //    log.info("[处理云端->本地用户同步请求]-[用户信息:{}]", JSONUtil.toJsonStr(remoteBo));
+    //    RemoteUserVo remoteVo = remoteUserService.selectUserById(remoteBo.getUserId());
+    //    if(ObjectUtil.isEmpty(remoteVo)){
+    //        remoteUserService.insertUser(remoteBo);
+    //    } else {
+    //        remoteUserService.updateUser(remoteBo);
+    //    }
     }
 }