فهرست منبع

feature: 同步业务完善
1.重新梳理无法通过界面操作的双向同步数据

luo.yibo@datuai.com 1 سال پیش
والد
کامیت
33c6f14249
13فایلهای تغییر یافته به همراه411 افزوده شده و 97 حذف شده
  1. 4 0
      ruoyi-common/ruoyi-common-message/pom.xml
  2. 22 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/EventSenderConstants.java
  3. 111 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/EventTypeConstants.java
  4. 6 2
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/KafkaTopicConstants.java
  5. 50 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/enums/EventSenderEnum.java
  6. 50 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/enums/EventTypeEnum.java
  7. 64 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/producer/KafkaNormalProducer.java
  8. 1 0
      ruoyi-common/ruoyi-common-message/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  9. 5 0
      ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/domain/vo/SysDeptVo.java
  10. 16 16
      ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java
  11. 71 67
      ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysUserServiceImpl.java
  12. 2 4
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumerYkt.java
  13. 9 8
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/SyncKafkaConsumer.java

+ 4 - 0
ruoyi-common/ruoyi-common-message/pom.xml

@@ -38,6 +38,10 @@
             <artifactId>ruoyi-api-backstage</artifactId>
             <version>${revision}</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
 
     </dependencies>
 

+ 22 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/EventSenderConstants.java

@@ -0,0 +1,22 @@
+package org.dromara.common.message.kafka.constant;
+
+/**
+ * @ClassName EventSenderConstants
+ * @Description 消费推送事件发送者常量,和EventSenderEnum对应
+ * @Author luoyibo
+ * @Date 2025-01-04 22:53
+ * @Version 1.0
+ * @since jdk17
+ */
+public interface EventSenderConstants {
+    /**
+     * 教务系统
+     */
+    String TRAIN = "002";
+
+    /**
+     * 人事系统
+     */
+    String HR = "003";
+
+}

+ 111 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/EventTypeConstants.java

@@ -0,0 +1,111 @@
+package org.dromara.common.message.kafka.constant;
+
+import org.dromara.common.message.kafka.enums.EventSenderEnum;
+
+/**
+ * @ClassName EventTypeConstants
+ * @Description 消费推送事件类型常量,EventTypeEnum对应
+ * @Author luoyibo
+ * @Date 2025-01-04 22:53
+ * @Version 1.0
+ * @since jdk17
+ */
+public interface EventTypeConstants {
+
+    //region 系统管理模块
+    /**
+     * 部门数据
+     */
+    String DEPT = EventSenderEnum.SYSTEM.code() + "00001";
+    /**
+     * 用户数据
+     */
+    String USER = EventSenderEnum.SYSTEM.code() + "00002";
+    //endregion
+
+    //region 管理平台
+    /**
+     * 一卡通账户数据
+     */
+    String ACCOUNT = EventSenderEnum.BACKSTAGE.code() + "00001";
+
+    /**
+     * 卡片数据
+     */
+    String CARD = EventSenderEnum.BACKSTAGE + "00002";
+    //endregion
+
+    //region 一卡通
+    /**
+     * 消费记录
+     */
+    String CONSUME_RECORD = EventSenderEnum.OLD.code() + "00001";
+
+    /**
+     * 报到状态
+     */
+    String REGISTER_STATUS = EventSenderEnum.OLD + "00002";
+    //endregion
+
+    //region 教务系统
+    /**
+     * 班级增加
+     */
+    String CLASS_ADD = EventSenderEnum.TRAIN.code() + "00001";
+
+    /**
+     * 班级修改
+     */
+    String CLASS_EDIT = EventSenderEnum.TRAIN + "00002";
+
+    /**
+     * 班级删除
+     */
+    String CLASS_DEL = EventSenderEnum.TRAIN + "00003";
+    /**
+     * 学员增加
+     */
+    String TRAINEE_ADD = EventSenderEnum.TRAIN.code() + "00004";
+
+    /**
+     * 学员修改
+     */
+    String TRAINEE_EDIT = EventSenderEnum.TRAIN + "00005";
+
+    /**
+     * 学员删除
+     */
+    String TRAINEE_DEL = EventSenderEnum.TRAIN + "00006";
+    //endregion
+
+    //region 业中(人事)
+    /**
+     * 班级增加
+     */
+    String DEPT_ADD = EventSenderEnum.HR.code() + "00001";
+
+    /**
+     * 班级修改
+     */
+    String DEPT_EDIT = EventSenderEnum.HR + "00002";
+
+    /**
+     * 班级删除
+     */
+    String DEPT_DEL = EventSenderEnum.HR + "00003";
+    /**
+     * 学员增加
+     */
+    String TEACHER_ADD = EventSenderEnum.HR.code() + "00004";
+
+    /**
+     * 学员修改
+     */
+    String TEACHER_EDIT = EventSenderEnum.HR + "00005";
+
+    /**
+     * 学员删除
+     */
+    String TEACHER_DEL = EventSenderEnum.HR + "00006";
+    //endregion
+}

+ 6 - 2
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/KafkaTopicConstants.java

@@ -1,7 +1,11 @@
 package org.dromara.common.message.kafka.constant;
 
-public class KafkaTopicConstants {
+public interface KafkaTopicConstants {
 
     // 本地库与信创云 同步 主题
-    public static final String SYNC_DATA_TOPIC = "ykt_operation";
+    String SYNC_DATA_TOPIC = "ykt_operation";
+    /**
+     * 根据是否推送消息配置的推送主题
+     */
+    String NORMAL_TOPIC = "normal_data_push";
 }

+ 50 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/enums/EventSenderEnum.java

@@ -0,0 +1,50 @@
+package org.dromara.common.message.kafka.enums;
+
+/**
+ * @ClassName MessageSenderEnum
+ * @Description 消费推送发送者枚举,和MessageSenderConstants常量对应
+ * @Author luoyibo
+ * @Date 2025-01-04 22:53
+ * @Version 1.0
+ * @since jdk17
+ */
+public enum EventSenderEnum {
+    SYSTEM("100","系统管理"),
+    BACKSTAGE("110","管理平台"),
+    CONSUME("120","消费系统"),
+    HOTEL("130","酒店系统"),
+    SELF("140","自助服务"),
+    TRAIN("002","教务系统"),
+    HR("003","人事系统"),
+    OLD("005","一卡通系统");
+
+    private final String code;
+    private final String name;
+
+    EventSenderEnum(String code, String name) {
+        this.code = code;
+        this.name = name;
+    }
+
+    public String code() {
+        return this.code;
+    }
+
+    public String message() {
+        return this.name;
+    }
+
+    public static String getMessage(String code) {
+        for (EventSenderEnum item : EventSenderEnum.values()) {
+            if (item.code().equals(code)) {
+                return item.name;
+            }
+        }
+        return "未知";
+    }
+
+    @Override
+    public String toString() {
+        return this.name();
+    }
+}

+ 50 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/enums/EventTypeEnum.java

@@ -0,0 +1,50 @@
+package org.dromara.common.message.kafka.enums;
+
+/**
+ * @ClassName MessageSenderEnum
+ * @Description 消费推送发送者枚举,和MessageSenderConstants常量对应
+ * @Author luoyibo
+ * @Date 2025-01-04 22:53
+ * @Version 1.0
+ * @since jdk17
+ */
+public enum EventTypeEnum {
+    DEPT(EventSenderEnum.SYSTEM.code() + "00001","部门数据");
+    // USER("110","用户数据"),
+    // ACCOUNT("120","账户数据"),
+    // CARD("130","卡片数据"),
+    // CONSUME_RECORD("140","消费记录"),
+    // REGISTER_STATUS("002","报到状态"),
+    // HR("003","人事系统"),
+    // OLD("005","一卡通系统");
+
+    private final String code;
+    private final String name;
+
+    EventTypeEnum(String code, String name) {
+        this.code = code;
+        this.name = name;
+    }
+
+    public String code() {
+        return this.code;
+    }
+
+    public String message() {
+        return this.name;
+    }
+
+    public static String getMessage(String code) {
+        for (EventTypeEnum item : EventTypeEnum.values()) {
+            if (item.code().equals(code)) {
+                return item.name;
+            }
+        }
+        return "未知";
+    }
+
+    @Override
+    public String toString() {
+        return this.name();
+    }
+}

+ 64 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/producer/KafkaNormalProducer.java

@@ -0,0 +1,64 @@
+package org.dromara.common.message.kafka.producer;
+
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson2.JSON;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.dromara.common.core.enums.CloudMqEventEnum;
+import org.dromara.common.core.utils.SpringUtils;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.common.message.kafka.enums.EventSenderEnum;
+import org.dromara.common.message.kafka.enums.EventTypeEnum;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.CompletableFuture;
+
+@RequiredArgsConstructor
+@Slf4j
+@Component
+public class KafkaNormalProducer {
+    private final KafkaTemplate<String, String> kafkaTemplate;
+
+    public void sendKafkaMessage(String topic, KafkaMessage<?> data) {
+        String jsonMessage = JSON.toJSONString(data);
+        String eventType = data.getHeader().getEventType();
+        String sender = data.getHeader().getSender();
+        try {
+            ProducerRecord<String, String> record;
+            record = new ProducerRecord<>(topic, jsonMessage);
+
+            CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
+            send.whenComplete((result, ex) -> {
+                if (ex != null) {
+                    log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender),
+                              CloudMqEventEnum.getMessage(eventType), data, ex);
+                } else {
+                    log.info("[{}]-[{}]-[发送到kafka消息成功]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data);
+                }
+            });
+        } catch (Exception e) {
+            log.error("[{}]-[{}]-[发送到kafka消息异常]-[{}]", EventSenderEnum.getMessage(sender), EventTypeEnum.getMessage(eventType), data, e);
+        }
+    }
+
+    public void sendKafkaMessage(String topic, String eventType, String sender, Object data) {
+        KafkaMessage<Object> message = new KafkaMessage<>();
+        KafkaHeader header = message.getHeader();
+        String tenantId = JSONUtil.parseObj(data).get("tenantId").toString();
+        header.setTimestamp(System.currentTimeMillis());
+        header.setEventId(IdUtil.simpleUUID());
+        header.setTenantId(tenantId);
+        header.setEventType(eventType);
+        header.setSender(sender);
+
+        message.setHeader(header);
+        message.setBody(data);
+
+        SpringUtils.getAopProxy(this).sendKafkaMessage(topic, message);
+    }
+}

+ 1 - 0
ruoyi-common/ruoyi-common-message/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@@ -1 +1,2 @@
 org.dromara.common.message.kafka.aop.aspect.SyncDataToLocalAspect
+org.dromara.common.message.kafka.producer.KafkaNormalProducer

+ 5 - 0
ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/domain/vo/SysDeptVo.java

@@ -103,6 +103,11 @@ public class SysDeptVo implements Serializable {
      */
     private Date updateTime;
 
+    /**
+     * 租户编号
+     */
+    private String tenantId;
+
     /**
      * 唯一标识
      */

+ 16 - 16
ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java

@@ -12,15 +12,16 @@ import lombok.RequiredArgsConstructor;
 import org.apache.dubbo.config.annotation.DubboReference;
 import org.dromara.backstage.api.RemotePtParameterService;
 import org.dromara.common.core.constant.CacheNames;
-import org.dromara.common.core.constant.CloudMqEventConstants;
 import org.dromara.common.core.constant.UserConstants;
 import org.dromara.common.core.exception.ServiceException;
 import org.dromara.common.core.utils.MapstructUtils;
 import org.dromara.common.core.utils.SpringUtils;
 import org.dromara.common.core.utils.StringUtils;
 import org.dromara.common.core.utils.TreeBuildUtils;
-import org.dromara.common.message.kafka.domain.KafkaHeader;
-import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.common.message.kafka.constant.EventTypeConstants;
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
+import org.dromara.common.message.kafka.enums.EventSenderEnum;
+import org.dromara.common.message.kafka.producer.KafkaNormalProducer;
 import org.dromara.common.mybatis.helper.DataBaseHelper;
 import org.dromara.common.redis.utils.CacheUtils;
 import org.dromara.common.satoken.utils.LoginHelper;
@@ -32,11 +33,9 @@ import org.dromara.system.domain.vo.SysDeptVo;
 import org.dromara.system.mapper.SysDeptMapper;
 import org.dromara.system.mapper.SysRoleMapper;
 import org.dromara.system.mapper.SysUserMapper;
-import org.dromara.system.mq.KafkaNormalProducer;
 import org.dromara.system.service.ISysDeptService;
 import org.dromara.system.service.IUserDeptService;
 import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Cacheable;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
@@ -448,17 +447,18 @@ public class SysDeptServiceImpl implements ISysDeptService {
     private void sendCloudConsume(SysDeptVo vo) {
         String pushData = remotePtParameterService.getPtParameterByKey("PUSH_MQ_DATA");
         if (ObjectUtil.isNotEmpty(pushData) && ObjectUtil.equals(pushData, "1")) {
-            KafkaMessage<SysDeptVo> message = new KafkaMessage<>();
-            KafkaHeader header = message.getHeader();
-            header.setTimestamp(System.currentTimeMillis());
-            header.setEventId(CloudMqEventConstants.DEPT);
-            header.setEventType(CloudMqEventConstants.DEPT);
-            header.setSender(CloudMqEventConstants.SENDER);
-
-            message.setHeader(header);
-            message.setBody(vo);
-
-            kafkaNormalProducer.sendKafkaMessage(CloudMqEventConstants.TOPIC, message);
+            // KafkaMessage<SysDeptVo> message = new KafkaMessage<>();
+            // KafkaHeader header = message.getHeader();
+            // header.setTimestamp(System.currentTimeMillis());
+            // header.setEventId(CloudMqEventConstants.DEPT);
+            // header.setEventType(CloudMqEventConstants.DEPT);
+            // header.setSender(CloudMqEventConstants.SENDER);
+            //
+            // message.setHeader(header);
+            // message.setBody(vo);
+            //
+            // kafkaNormalProducer.sendKafkaMessage(CloudMqEventConstants.TOPIC, message);
+            kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.NORMAL_TOPIC, EventTypeConstants.DEPT, EventSenderEnum.SYSTEM.code(), vo);
         }
     }
 }

+ 71 - 67
ruoyi-modules/ruoyi-system/src/main/java/org/dromara/system/service/impl/SysUserServiceImpl.java

@@ -19,7 +19,10 @@ import org.dromara.backstage.api.RemotePtParameterService;
 import org.dromara.backstage.api.RemoteUserAccountService;
 import org.dromara.backstage.api.domain.bo.RemoteUserAccountBo;
 import org.dromara.backstage.api.domain.vo.RemoteUserAccountVo;
-import org.dromara.common.core.constant.*;
+import org.dromara.common.core.constant.CacheNames;
+import org.dromara.common.core.constant.Constants;
+import org.dromara.common.core.constant.HttpStatus;
+import org.dromara.common.core.constant.UserConstants;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.exception.ServiceException;
 import org.dromara.common.core.exception.user.UserException;
@@ -29,8 +32,10 @@ import org.dromara.common.core.utils.StreamUtils;
 import org.dromara.common.core.utils.StringUtils;
 import org.dromara.common.encrypt.interceptor.MybatisDecryptInterceptor;
 import org.dromara.common.encrypt.interceptor.MybatisEncryptInterceptor;
-import org.dromara.common.message.kafka.domain.KafkaHeader;
-import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.common.message.kafka.constant.EventTypeConstants;
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
+import org.dromara.common.message.kafka.enums.EventSenderEnum;
+import org.dromara.common.message.kafka.producer.KafkaNormalProducer;
 import org.dromara.common.mybatis.core.page.PageQuery;
 import org.dromara.common.mybatis.core.page.TableDataInfo;
 import org.dromara.common.mybatis.helper.DataBaseHelper;
@@ -45,7 +50,6 @@ import org.dromara.system.domain.vo.SysRoleVo;
 import org.dromara.system.domain.vo.SysUserExportVo;
 import org.dromara.system.domain.vo.SysUserVo;
 import org.dromara.system.mapper.*;
-import org.dromara.system.mq.KafkaNormalProducer;
 import org.dromara.system.service.ISysUserService;
 import org.dromara.system.service.IUserDeptService;
 import org.springframework.cache.annotation.CacheEvict;
@@ -98,7 +102,7 @@ public class SysUserServiceImpl implements ISysUserService {
     }
 
     private Wrapper<SysUser> buildQueryWrapper(SysUserBo user) {
-        //手机号加密处理
+        // 手机号加密处理
         MybatisEncryptInterceptor encryptInterceptor = SpringUtils.getBean(MybatisEncryptInterceptor.class);
         user.setPhone(encryptInterceptor.encrypt(user.getPhone()));
 
@@ -112,11 +116,11 @@ public class SysUserServiceImpl implements ISysUserService {
             .eq(StringUtils.isNotBlank(user.getStatus()), "status", user.getStatus())
             .like(StringUtils.isNotBlank(user.getPhone()), "phone", user.getPhone())
             .between(params.get("beginTime") != null && params.get("endTime") != null,
-                "create_time", params.get("beginTime"), params.get("endTime"))
+                     "create_time", params.get("beginTime"), params.get("endTime"))
             .and(ObjectUtil.isNotNull(user.getDeptId()), w -> {
                 List<SysDept> deptList = deptMapper.selectList(new LambdaQueryWrapper<SysDept>()
-                    .select(SysDept::getDeptId)
-                    .apply(DataBaseHelper.findInSet(user.getDeptId(), "ancestors")));
+                                                                   .select(SysDept::getDeptId)
+                                                                   .apply(DataBaseHelper.findInSet(user.getDeptId(), "ancestors")));
                 List<Long> ids = StreamUtils.toList(deptList, SysDept::getDeptId);
                 ids.add(user.getDeptId());
                 w.in("dept_id", ids);
@@ -135,7 +139,7 @@ public class SysUserServiceImpl implements ISysUserService {
      */
     @Override
     public TableDataInfo<SysUserVo> selectAllocatedList(SysUserBo user, PageQuery pageQuery) {
-        //查询条件加密
+        // 查询条件加密
         MybatisEncryptInterceptor encrypt = SpringUtils.getBean(MybatisEncryptInterceptor.class);
         user.setPhone(encrypt.encrypt(user.getPhone()));
         QueryWrapper<SysUser> wrapper = Wrappers.query();
@@ -146,7 +150,7 @@ public class SysUserServiceImpl implements ISysUserService {
             .like(StringUtils.isNotBlank(user.getPhone()), "u.phone", user.getPhone())
             .orderByAsc("u.user_id");
         Page<SysUserVo> page = baseMapper.selectAllocatedList(pageQuery.build(), wrapper);
-        //返回数据解密
+        // 返回数据解密
         MybatisDecryptInterceptor dencrypt = SpringUtils.getBean(MybatisDecryptInterceptor.class);
         page.getRecords().stream().forEach(item -> {
             item.setPhone(dencrypt.decrypt(item.getPhone()));
@@ -163,7 +167,7 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public TableDataInfo<SysUserVo> selectUnallocatedList(SysUserBo user, PageQuery pageQuery) {
         List<Long> userIds = userRoleMapper.selectUserIdsByRoleId(user.getRoleId());
-        //查询条件加密
+        // 查询条件加密
         MybatisEncryptInterceptor encrypt = SpringUtils.getBean(MybatisEncryptInterceptor.class);
         user.setPhone(encrypt.encrypt(user.getPhone()));
         QueryWrapper<SysUser> wrapper = Wrappers.query();
@@ -174,7 +178,7 @@ public class SysUserServiceImpl implements ISysUserService {
             .like(StringUtils.isNotBlank(user.getPhone()), "u.phone", user.getPhone())
             .orderByAsc("u.user_id");
         Page<SysUserVo> page = baseMapper.selectUnallocatedList(pageQuery.build(), wrapper);
-        //返回数据解密
+        // 返回数据解密
         MybatisDecryptInterceptor dencrypt = SpringUtils.getBean(MybatisDecryptInterceptor.class);
         page.getRecords().stream().forEach(item -> {
             item.setPhone(dencrypt.decrypt(item.getPhone()));
@@ -230,10 +234,10 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public List<SysUserVo> selectUserByIds(List<Long> userIds, Long deptId) {
         return baseMapper.selectUserList(new LambdaQueryWrapper<SysUser>()
-            .select(SysUser::getUserId, SysUser::getUserName, SysUser::getNickName)
-            .eq(SysUser::getStatus, UserConstants.USER_NORMAL)
-            .eq(ObjectUtil.isNotNull(deptId), SysUser::getDeptId, deptId)
-            .in(CollUtil.isNotEmpty(userIds), SysUser::getUserId, userIds));
+                                             .select(SysUser::getUserId, SysUser::getUserName, SysUser::getNickName)
+                                             .eq(SysUser::getStatus, UserConstants.USER_NORMAL)
+                                             .eq(ObjectUtil.isNotNull(deptId), SysUser::getDeptId, deptId)
+                                             .in(CollUtil.isNotEmpty(userIds), SysUser::getUserId, userIds));
     }
 
     /**
@@ -275,8 +279,8 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public boolean checkUserNameUnique(SysUserBo user) {
         boolean exist = baseMapper.exists(new LambdaQueryWrapper<SysUser>()
-            .eq(SysUser::getUserName, user.getUserName())
-            .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
+                                              .eq(SysUser::getUserName, user.getUserName())
+                                              .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
         return exist;
     }
 
@@ -288,8 +292,8 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public boolean checkPhoneUnique(SysUserBo user) {
         return baseMapper.exists(new LambdaQueryWrapper<SysUser>()
-            .eq(SysUser::getPhone, user.getPhone())
-            .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
+                                     .eq(SysUser::getPhone, user.getPhone())
+                                     .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
     }
 
     /**
@@ -300,8 +304,8 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public boolean checkEmailUnique(SysUserBo user) {
         boolean exist = baseMapper.exists(new LambdaQueryWrapper<SysUser>()
-            .eq(SysUser::getEmail, user.getEmail())
-            .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
+                                              .eq(SysUser::getEmail, user.getEmail())
+                                              .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
         return exist;
     }
 
@@ -313,8 +317,8 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public boolean checkUserNumbUnique(SysUserBo user) {
         return baseMapper.exists(new LambdaQueryWrapper<SysUser>()
-            .eq(SysUser::getUserNumb, user.getUserNumb())
-            .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
+                                     .eq(SysUser::getUserNumb, user.getUserNumb())
+                                     .ne(ObjectUtil.isNotNull(user.getUserId()), SysUser::getUserId, user.getUserId()));
     }
 
     /**
@@ -367,11 +371,11 @@ public class SysUserServiceImpl implements ISysUserService {
             insertUserDept(user, user.getUserDeptBoList());
             // 新增用户与角色管理
             insertUserRole(user, false);
-            //一卡通账户处理
+            // 一卡通账户处理
             String autoUserAccount = remotePtParameterService.getPtParameterByKey("AUTO_CREATE_BAG");
             RemoteUserAccountBo remoteUserAccountBo = BeanUtil.copyProperties(user, RemoteUserAccountBo.class);
             if (Constants.AUTO_USER_ACCOUNT.equals(autoUserAccount)) {
-                //自动开通一卡通账户
+                // 自动开通一卡通账户
                 remoteUserAccountBo.setAccountStatus("1");
                 R<RemoteUserAccountBo> result = remoteUserAccountService.openAccount(remoteUserAccountBo);
                 if (result.getCode() == HttpStatus.SUCCESS) {
@@ -379,13 +383,13 @@ public class SysUserServiceImpl implements ISysUserService {
                     user.setCardId(data.getCardId());
                     user.setCardNo(data.getCardNo());
                     user.setUserNo(data.getUserNo());
-                    //return rows;
+                    // return rows;
                 } else {
                     throw new UserException(result.getMsg());
                 }
             } else {
-                //不自动开通
-                RemoteUserAccountVo accountVo= remoteUserAccountService.getUserAccountVoById(user.getUserId());
+                // 不自动开通
+                RemoteUserAccountVo accountVo = remoteUserAccountService.getUserAccountVoById(user.getUserId());
                 if (ObjectUtil.isEmpty(accountVo)) {
                     remoteUserAccountBo.setAccountStatus("0");
                     remoteUserAccountService.insertByBo(remoteUserAccountBo);
@@ -436,7 +440,7 @@ public class SysUserServiceImpl implements ISysUserService {
             SysUser sysUser = MapstructUtils.convert(user, SysUser.class);
             // 防止错误更新后导致的数据误删除
             int flag = baseMapper.updateById(sysUser);
-            //更新对应的一卡通账户信息
+            // 更新对应的一卡通账户信息
             remoteUserAccountService.updateByBo(BeanUtil.copyProperties(user, RemoteUserAccountBo.class));
             sendCloudConsume(baseMapper.selectVoById(user.getUserId()));
             return flag;
@@ -468,9 +472,9 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public int updateUserStatus(Long userId, String status) {
         return baseMapper.update(null,
-            new LambdaUpdateWrapper<SysUser>()
-                .set(SysUser::getStatus, status)
-                .eq(SysUser::getUserId, userId));
+                                 new LambdaUpdateWrapper<SysUser>()
+                                     .set(SysUser::getStatus, status)
+                                     .eq(SysUser::getUserId, userId));
     }
 
     /**
@@ -483,12 +487,12 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public int updateUserProfile(SysUserBo user) {
         return baseMapper.update(null,
-            new LambdaUpdateWrapper<SysUser>()
-                .set(ObjectUtil.isNotNull(user.getNickName()), SysUser::getNickName, user.getNickName())
-                .set(SysUser::getPhone, user.getPhone())
-                .set(SysUser::getEmail, user.getEmail())
-                .set(SysUser::getSex, user.getSex())
-                .eq(SysUser::getUserId, user.getUserId()));
+                                 new LambdaUpdateWrapper<SysUser>()
+                                     .set(ObjectUtil.isNotNull(user.getNickName()), SysUser::getNickName, user.getNickName())
+                                     .set(SysUser::getPhone, user.getPhone())
+                                     .set(SysUser::getEmail, user.getEmail())
+                                     .set(SysUser::getSex, user.getSex())
+                                     .eq(SysUser::getUserId, user.getUserId()));
     }
 
     /**
@@ -501,9 +505,9 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public boolean updateUserAvatar(Long userId, Long avatar) {
         return baseMapper.update(null,
-            new LambdaUpdateWrapper<SysUser>()
-                .set(SysUser::getAvatar, avatar)
-                .eq(SysUser::getUserId, userId)) > 0;
+                                 new LambdaUpdateWrapper<SysUser>()
+                                     .set(SysUser::getAvatar, avatar)
+                                     .eq(SysUser::getUserId, userId)) > 0;
     }
 
     /**
@@ -516,9 +520,9 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public int resetUserPwd(Long userId, String password) {
         return baseMapper.update(null,
-            new LambdaUpdateWrapper<SysUser>()
-                .set(SysUser::getPassword, password)
-                .eq(SysUser::getUserId, userId));
+                                 new LambdaUpdateWrapper<SysUser>()
+                                     .set(SysUser::getPassword, password)
+                                     .eq(SysUser::getUserId, userId));
     }
 
     /**
@@ -650,7 +654,7 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public String selectUserNameById(Long userId) {
         SysUser sysUser = baseMapper.selectOne(new LambdaQueryWrapper<SysUser>()
-            .select(SysUser::getUserName).eq(SysUser::getUserId, userId));
+                                                   .select(SysUser::getUserName).eq(SysUser::getUserId, userId));
         return ObjectUtil.isNull(sysUser) ? null : sysUser.getUserName();
     }
 
@@ -664,7 +668,7 @@ public class SysUserServiceImpl implements ISysUserService {
     @Cacheable(cacheNames = CacheNames.SYS_NICKNAME, key = "#userId")
     public String selectNicknameById(Long userId) {
         SysUser sysUser = baseMapper.selectOne(new LambdaQueryWrapper<SysUser>()
-            .select(SysUser::getNickName).eq(SysUser::getUserId, userId));
+                                                   .select(SysUser::getNickName).eq(SysUser::getUserId, userId));
         return ObjectUtil.isNull(sysUser) ? null : sysUser.getNickName();
     }
 
@@ -689,7 +693,7 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public String selectPhonenumberById(Long userId) {
         SysUser sysUser = baseMapper.selectOne(new LambdaQueryWrapper<SysUser>()
-            .select(SysUser::getPhone).eq(SysUser::getUserId, userId));
+                                                   .select(SysUser::getPhone).eq(SysUser::getUserId, userId));
         return ObjectUtil.isNull(sysUser) ? null : sysUser.getPhone();
     }
 
@@ -702,7 +706,7 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public String selectEmailById(Long userId) {
         SysUser sysUser = baseMapper.selectOne(new LambdaQueryWrapper<SysUser>()
-            .select(SysUser::getEmail).eq(SysUser::getUserId, userId));
+                                                   .select(SysUser::getEmail).eq(SysUser::getUserId, userId));
         return ObjectUtil.isNull(sysUser) ? null : sysUser.getEmail();
     }
 
@@ -715,7 +719,7 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public SysUserVo selectUserVoByOtherId(String otherId) {
         return baseMapper.selectVoOne(new LambdaQueryWrapper<SysUser>()
-            .eq(SysUser::getOtherId, otherId));
+                                          .eq(SysUser::getOtherId, otherId));
     }
 
     /**
@@ -728,13 +732,13 @@ public class SysUserServiceImpl implements ISysUserService {
     @Override
     public SysUserVo selectUserVoByUserName(String userName) {
         return baseMapper.selectVoOne(new LambdaQueryWrapper<SysUser>()
-            .eq(SysUser::getUserName, userName));
+                                          .eq(SysUser::getUserName, userName));
     }
 
     private void insertUserDept(SysUserBo bo, List<UserDeptBo> userDeptBoList) {
-        //将已有的人员部门关系设置成非主部门
+        // 将已有的人员部门关系设置成非主部门
         userDeptService.updateMainDeptByUserId(bo.getUserId());
-        //设置当前部门为主部门
+        // 设置当前部门为主部门
         UserDeptBo justUserDeptBo = new UserDeptBo();
         justUserDeptBo.setUserId(bo.getUserId());
         justUserDeptBo.setDeptId(bo.getDeptId());
@@ -742,11 +746,11 @@ public class SysUserServiceImpl implements ISysUserService {
         justUserDeptBo.setMainDept("Y");
 
         userDeptService.setUserDeptPost(justUserDeptBo);
-        //如果有多部门,设置多部门
+        // 如果有多部门,设置多部门
         if (CollectionUtil.isNotEmpty(userDeptBoList)) {
-            //排除主部门
+            // 排除主部门
             List<UserDeptBo> userDeptList = userDeptBoList.stream()
-                .filter(p -> ObjectUtil.notEqual(bo.getDeptId(), p.getDeptId())).toList();
+                                                .filter(p -> ObjectUtil.notEqual(bo.getDeptId(), p.getDeptId())).toList();
             if (CollectionUtil.isNotEmpty(userDeptList)) {
                 userDeptList.forEach(userDeptBo -> {
                     userDeptBo.setUserId(bo.getUserId());
@@ -761,17 +765,17 @@ public class SysUserServiceImpl implements ISysUserService {
     private void sendCloudConsume(SysUserVo vo) {
         String pushData = remotePtParameterService.getPtParameterByKey("PUSH_MQ_DATA");
         if (ObjectUtil.isNotEmpty(pushData) && ObjectUtil.equals(pushData, "1")) {
-            KafkaMessage<SysUserVo> message = new KafkaMessage<>();
-            KafkaHeader header = message.getHeader();
-            header.setTimestamp(System.currentTimeMillis());
-            header.setEventId(CloudMqEventConstants.USER);
-            header.setEventType(CloudMqEventConstants.USER);
-            header.setSender(CloudMqEventConstants.SENDER);
-
-            message.setHeader(header);
-            message.setBody(vo);
-
-            kafkaNormalProducer.sendKafkaMessage(CloudMqEventConstants.TOPIC, message);
+            // KafkaMessage<Object> message = new KafkaMessage<>();
+            // KafkaHeader header = message.getHeader();
+            // header.setTimestamp(System.currentTimeMillis());
+            // header.setEventId(CloudMqEventConstants.USER);
+            // header.setEventType(CloudMqEventConstants.USER);
+            // header.setSender(CloudMqEventConstants.SENDER);
+            //
+            // message.setHeader(header);
+            // message.setBody(vo);
+            // String topic, String eventId, String eventType, String sender, Object data
+            kafkaNormalProducer.sendKafkaMessage(KafkaTopicConstants.NORMAL_TOPIC, EventTypeConstants.USER, EventSenderEnum.SYSTEM.code(), vo);
         }
     }
 }

+ 2 - 4
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumerYkt.java

@@ -1,13 +1,11 @@
 package org.dromara.server.mq.consumer;
 
-import cn.hutool.core.util.ObjUtil;
-import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
-import org.dromara.common.core.constant.CloudMqEventConstants;
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
 import org.dromara.common.message.kafka.domain.KafkaHeader;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
@@ -70,7 +68,7 @@ public class KafkaConsumerYkt {
      * 1.部门信息、人员信息、一卡通账户信息、卡片信息
      * @param record 消息内容
      */
-    @KafkaListener(topics = CloudMqEventConstants.TOPIC, groupId = "local-group-id")
+    @KafkaListener(topics = KafkaTopicConstants.NORMAL_TOPIC, groupId = "local-group-id")
     public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
         KafkaUtils.doMessageHandle(record, eventStrategyContext, log);
     }

+ 9 - 8
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/SyncKafkaConsumer.java

@@ -1,11 +1,11 @@
 package org.dromara.server.mq.consumer;
 
-import cn.hutool.core.util.ObjUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.dromara.common.message.kafka.constant.EventSenderConstants;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.mq.event.kafka.EventStrategyContext;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
 /**
  * name: SyncKafkaConsumer
  * package: org.dromara.server.mq.consumer
- * description: 本地kafka消息消费
+ * description: 云平台kafka消费消费
  * date: 2024-10-26 11:12:56 11:12
  *
  * @author luoyibo
@@ -37,12 +37,13 @@ public class SyncKafkaConsumer {
         String tenantId = receiveMsg.getHeader().getTenantId();
         JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
         eventMsg.set("tenantId", tenantId);
-        if (ObjUtil.notEqual(sender, "005")) {
-            try {
-                eventStrategyContext.doMsgHandle(eventType, eventMsg);
-            } catch (Exception e) {
-                log.error("[业中kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e);
-            }
+        switch (sender) {
+            case EventSenderConstants.TRAIN, EventSenderConstants.HR:
+                try {
+                    eventStrategyContext.doMsgHandle(eventType, eventMsg);
+                } catch (Exception e) {
+                    log.error("[业中kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e);
+                }
         }
     }
 }