Browse Source

Merge remote-tracking branch 'origin/master'

luoyb 1 năm trước cách đây
mục cha
commit
7a7dc2f5fe
32 tập tin đã thay đổi với 1163 bổ sung72 xóa
  1. 1 0
      ruoyi-common/pom.xml
  2. 22 0
      ruoyi-common/ruoyi-common-message/pom.xml
  3. 7 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/KafkaTopicConstants.java
  4. 104 0
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/MessageEventTypeConstants.java
  5. 5 1
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/domain/KafkaHeader.java
  6. 1 1
      ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/domain/KafkaMessage.java
  7. 15 0
      ruoyi-modules/ruoyi-backstage/pom.xml
  8. 20 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/aop/annotation/SyncDataToLocal.java
  9. 100 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/aop/aspect/SyncDataToLocalAspect.java
  10. 16 11
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/controller/PtParameterController.java
  11. 106 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/controller/SendMessageRecordController.java
  12. 75 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/SendMessageRecord.java
  13. 2 1
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/bo/PtParameterBo.java
  14. 73 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/bo/SendMessageRecordBo.java
  15. 81 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/vo/SendMessageRecordVo.java
  16. 15 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/mapper/SendMessageRecordMapper.java
  17. 69 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/ISendMessageRecordService.java
  18. 154 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/impl/SendMessageRecordServiceImpl.java
  19. 1 2
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaNormalConsumer.java
  20. 98 0
      ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaProducer.java
  21. 20 0
      ruoyi-modules/ruoyi-backstage/src/main/resources/mapper/basics/SendMessageRecordMapper.xml
  22. 18 0
      ruoyi-modules/ruoyi-backstage/src/test/java/org/dromara/backstage/mq/KafkaProducerTest.java
  23. 18 0
      ruoyi-server/ruoyi-server-base/src/main/java/org/dromara/server/base/service/yktOperation/SyncRemotePtParameterService.java
  24. 1 0
      ruoyi-server/ruoyi-server-base/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  25. 4 3
      ruoyi-server/ruoyi-server-common/pom.xml
  26. 0 33
      ruoyi-server/ruoyi-server-common/src/main/java/org/dromara/server/common/domain/bo/KafkaHeader.java
  27. 0 19
      ruoyi-server/ruoyi-server-common/src/main/java/org/dromara/server/common/domain/bo/KafkaMessage.java
  28. 26 0
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/constant/kafka/YktOperationEventConstraints.java
  29. 27 1
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumer.java
  30. 15 0
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/IYktEventStrategy.java
  31. 25 0
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/YktEventStrategyContext.java
  32. 44 0
      ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/impl/yktOperation/PtParameterEventStrategyImpl.java

+ 1 - 0
ruoyi-common/pom.xml

@@ -44,6 +44,7 @@
         <module>ruoyi-common-social</module>
         <module>ruoyi-common-nacos</module>
         <module>ruoyi-common-bus</module>
+        <module>ruoyi-common-message</module>
     </modules>
 
     <artifactId>ruoyi-common</artifactId>

+ 22 - 0
ruoyi-common/ruoyi-common-message/pom.xml

@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.dromara</groupId>
+        <artifactId>ruoyi-common</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ruoyi-common-message</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

+ 7 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/KafkaTopicConstants.java

@@ -0,0 +1,7 @@
+package org.dromara.common.message.kafka.constant;
+
+public class KafkaTopicConstants {
+
+    // 本地库与信创云 同步 主题
+    public static final String SYNC_DATA_TOPIC = "ykt_operation";
+}

+ 104 - 0
ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/constant/MessageEventTypeConstants.java

@@ -0,0 +1,104 @@
+package org.dromara.common.message.kafka.constant;
+
+/**
+ * 消息 的event type
+ */
+public class MessageEventTypeConstants {
+
+    /**
+     * ptParameter_ADD("YKT_101_ADD","系统参数新增","业务参数-系统参数"),
+     *     ptParameter_EDIT("YKT_101_EDIT","系统参数修改","业务参数-系统参数"),
+     *     ptParameter_REMOVE("YKT_101_REMOVE","系统参数删除","业务参数-系统参数"),
+     *     ptMealtype_ADD("YKT_103_ADD","营业时段新增","业务参数-营业时段"),
+     *     ptMealtype_EDIT("YKT_103_EDIT","营业时段修改","业务参数-营业时段"),
+     *     ptMealtype_REMOVE("YKT_103_REMOVE","营业时段删除","业务参数-营业时段"),
+     *     ptRoom_ADD("YKT_105_ADD","房间新增","房间信息"),
+     *     ptRoom_EDIT("YKT_105_EDIT","房间修改","房间信息"),
+     *     ptRoom_REMOVE("YKT_105_REMOVE","房间删除","房间信息"),
+     *     ptArea_ADD("YKT_106_ADD","区域新增","房间信息"),
+     *     ptArea_EDIT("YKT_106_EDIT","区域修改","房间信息"),
+     *     ptArea_REMOVE("YKT_106_REMOVE","区域删除","房间信息"),
+     *     ptAccount_ADD("YKT_120_ADD","结算账户新增","商户信息-结算账户"),
+     *     ptAccount_EDIT("YKT_120_EDIT","结算账户修改","商户信息-结算账户"),
+     *     ptAccount_REMOVE("YKT_120_REMOVE","结算账户删除","商户信息-结算账户"),
+     *     ptUserAccount_OPEN_EDIT("YKT_107_EDIT","开户","支付管理-账户管理"),
+     *     ptUserAccount_CLOSE_EDIT("YKT_107_EDIT","销户","支付管理-账户管理"),
+     *     ptUserAccount_FREEZE_EDIT("YKT_107_EDIT","冻结","支付管理-账户管理"),
+     *     ptUserAccount_UNFREEZE_EDIT("YKT_107_EDIT","解冻","支付管理-账户管理"),
+     *     ptUserAccount_RESETTIME_EDIT("YKT_108_EDIT","重置有效期","支付管理-账户管理"),
+     *     ptUserAccount_RESETTYPE_EDIT("YKT_107_EDIT","重置卡类别","支付管理-账户管理"),
+     *     user_ADD("YKT_107_ADD","用户新增","系统权限-用户管理"),
+     *     ptCard_CHARGE_EDIT("YKT_109_EDIT","充值","卡务中心-卡务操作"),
+     *     ptCard_refund_EDIT("YKT_110_EDIT","退款","卡务中心-卡务操作"),
+     *     ptCard_supply_EDIT("YKT_111_EDIT","错扣补款","卡务中心-卡务操作"),
+     *     ptCard_multicharge_EDIT("YKT_109_EDIT","批量充值","卡务中心-卡务操作"),
+     *     ptCard_multirefund_EDIT("YKT_110_EDIT","批量退款","卡务中心-卡务操作"),
+     *     ptCard_multiSet_EDIT("YKT_111_EDIT","批量设置余额","卡务中心-卡务操作"),
+     *     ptCard_card_EDIT("YKT_108_EDIT","卡片挂失/解挂","卡务中心-卡务操作"),
+     *     subsidy_EDIT("YKT_109_EDIT","补助定时任务","补助管理"),
+     *     xfTerm_ADD("YKT_112_ADD","设备新增","消费系统-消费设备"),
+     *     xfTerm_EDIT("YKT_112_EDIT","设备修改","消费系统-消费设备"),
+     *     xfTerm_REMOVE("YKT_112_REMOVE","设备删除","消费系统-消费设备"),
+     *     xfTerm_param_EDIT("YKT_113_EDIT","设备参数设置","消费系统-消费设备"),
+     *     xfDiscount_ADD("YKT_114_ADD","折扣新增","消费设备-折扣管理"),
+     *     xfDiscount_EDIT("YKT_114_EDIT","折扣修改","消费设备-折扣管理"),
+     *     xfDiscount_REMOVE("YKT_114_REMOVE","折扣删除","消费设备-折扣管理"),
+     *     xfDiscountterm_EDIT("YKT_115_EDIT","设备绑定","消费设备-折扣管理"),
+     *     xfQuota_ADD("YKT_116_ADD","限额新增","消费设备-限额管理"),
+     *     xfQuota_EDIT("YKT_116_EDIT","限额修改","消费设备-限额管理"),
+     *     xfQuota_REMOVE("YKT_116_REMOVE","限额删除","消费设备-限额管理"),
+     *     xfQuotaterm_EDIT("YKT_117_EDIT","设备绑定","消费设备-限额管理"),
+     *     xfLimited_ADD("YKT_118_ADD","限次新增","消费设备-限次管理"),
+     *     xfLimited_EDIT("YKT_118_EDIT","限次修改","消费设备-限次管理"),
+     *     xfLimited_REMOVE("YKT_118_REMOVE","限次删除","消费设备-限次管理"),
+     *     xfLimitedterm_EDIT("YKT_119_EDIT","设备绑定","消费设备-限次管理"),
+     *     CLIENT_consumer_ADD("CLIENT_101_ADD","消费扣费","刷卡消费");
+     */
+    public static final String	ptParameter_ADD = "YKT_101_ADD";
+    public static final String	ptParameter_EDIT = "YKT_101_EDIT";
+    public static final String	ptParameter_REMOVE = "YKT_101_REMOVE";
+    public static final String	ptMealtype_ADD = "YKT_103_ADD";
+    public static final String	ptMealtype_EDIT = "YKT_103_EDIT";
+    public static final String	ptMealtype_REMOVE = "YKT_103_REMOVE";
+    public static final String	ptRoom_ADD = "YKT_105_ADD";
+    public static final String	ptRoom_EDIT = "YKT_105_EDIT";
+    public static final String	ptRoom_REMOVE = "YKT_105_REMOVE";
+    public static final String	ptArea_ADD = "YKT_106_ADD";
+    public static final String	ptArea_EDIT = "YKT_106_EDIT";
+    public static final String	ptArea_REMOVE = "YKT_106_REMOVE";
+    public static final String	ptAccount_ADD = "YKT_120_ADD";
+    public static final String	ptAccount_EDIT = "YKT_120_EDIT";
+    public static final String	ptAccount_REMOVE = "YKT_120_REMOVE";
+    public static final String	ptUserAccount_OPEN_EDIT = "YKT_107_EDIT";
+    public static final String	ptUserAccount_CLOSE_EDIT = "YKT_107_EDIT";
+    public static final String	ptUserAccount_FREEZE_EDIT = "YKT_107_EDIT";
+    public static final String	ptUserAccount_UNFREEZE_EDIT = "YKT_107_EDIT";
+    public static final String	ptUserAccount_RESETTIME_EDIT = "YKT_108_EDIT";
+    public static final String	ptUserAccount_RESETTYPE_EDIT = "YKT_107_EDIT";
+    public static final String	user_ADD = "YKT_107_ADD";
+    public static final String	ptCard_CHARGE_EDIT = "YKT_109_EDIT";
+    public static final String	ptCard_refund_EDIT = "YKT_110_EDIT";
+    public static final String	ptCard_supply_EDIT = "YKT_111_EDIT";
+    public static final String	ptCard_multicharge_EDIT = "YKT_109_EDIT";
+    public static final String	ptCard_multirefund_EDIT = "YKT_110_EDIT";
+    public static final String	ptCard_multiSet_EDIT = "YKT_111_EDIT";
+    public static final String	ptCard_card_EDIT = "YKT_108_EDIT";
+    public static final String	subsidy_EDIT = "YKT_109_EDIT";
+    public static final String	xfTerm_ADD = "YKT_112_ADD";
+    public static final String	xfTerm_EDIT = "YKT_112_EDIT";
+    public static final String	xfTerm_REMOVE = "YKT_112_REMOVE";
+    public static final String	xfTerm_param_EDIT = "YKT_113_EDIT";
+    public static final String	xfDiscount_ADD = "YKT_114_ADD";
+    public static final String	xfDiscount_EDIT = "YKT_114_EDIT";
+    public static final String	xfDiscount_REMOVE = "YKT_114_REMOVE";
+    public static final String	xfDiscountterm_EDIT = "YKT_115_EDIT";
+    public static final String	xfQuota_ADD = "YKT_116_ADD";
+    public static final String	xfQuota_EDIT = "YKT_116_EDIT";
+    public static final String	xfQuota_REMOVE = "YKT_116_REMOVE";
+    public static final String	xfQuotaterm_EDIT = "YKT_117_EDIT";
+    public static final String	xfLimited_ADD = "YKT_118_ADD";
+    public static final String	xfLimited_EDIT = "YKT_118_EDIT";
+    public static final String	xfLimited_REMOVE = "YKT_118_REMOVE";
+    public static final String	xfLimitedterm_EDIT = "YKT_119_EDIT";
+
+}

+ 5 - 1
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/domain/bo/KafkaHeader.java → ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/domain/KafkaHeader.java

@@ -1,4 +1,4 @@
-package org.dromara.backstage.mq.domain.bo;
+package org.dromara.common.message.kafka.domain;
 
 import lombok.Data;
 
@@ -30,4 +30,8 @@ public class KafkaHeader {
      * 事件Id
      */
     private String eventId;
+    /**
+     * 租户编号
+     */
+    private String tenantId;
 }

+ 1 - 1
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/domain/bo/KafkaMessage.java → ruoyi-common/ruoyi-common-message/src/main/java/org/dromara/common/message/kafka/domain/KafkaMessage.java

@@ -1,4 +1,4 @@
-package org.dromara.backstage.mq.domain.bo;
+package org.dromara.common.message.kafka.domain;
 
 import lombok.Data;
 /**

+ 15 - 0
ruoyi-modules/ruoyi-backstage/pom.xml

@@ -117,6 +117,21 @@
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-message</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-test</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 20 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/aop/annotation/SyncDataToLocal.java

@@ -0,0 +1,20 @@
+package org.dromara.backstage.aop.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * 数据同步至本地服务数据库注解
+ *
+ * @author bing
+ */
+@Inherited
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface SyncDataToLocal {
+    /**
+     * 消息的event_type
+     */
+    String eventType() default "";
+
+}

+ 100 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/aop/aspect/SyncDataToLocalAspect.java

@@ -0,0 +1,100 @@
+package org.dromara.backstage.aop.aspect;
+
+import cn.hutool.core.lang.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.CodeSignature;
+import org.dromara.backstage.aop.annotation.SyncDataToLocal;
+import org.dromara.backstage.mq.KafkaProducer;
+import org.dromara.common.core.domain.R;
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.common.satoken.utils.LoginHelper;
+import org.dromara.system.api.model.LoginUser;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 同步数据至本地 的 切面
+ *
+ * @author bing
+ */
+@RequiredArgsConstructor
+@Slf4j
+@Aspect
+@Component
+public class SyncDataToLocalAspect {
+
+    private final KafkaProducer kafkaProducer;
+
+
+    /**
+     * 处理完请求后执行
+     *
+     * @param joinPoint 切点
+     */
+    @AfterReturning(pointcut = "@annotation(syncDataToLocal)", returning = "jsonResult")
+    public void doAfterReturning(JoinPoint joinPoint, SyncDataToLocal syncDataToLocal, Object jsonResult) {
+//        System.err.println("进入呢么?");
+        if (jsonResult instanceof R<?> r) {
+            if (r.getCode() == R.SUCCESS) {
+                sendSyncMessage(joinPoint, syncDataToLocal);
+            }else{
+                log.error("同步数据消息未发送:controller 方法返回结果未失败!");
+            }
+        }else{
+            log.error("同步数据消息未发送:controller 方法返回不是R类型!");
+        }
+
+    }
+
+    private void sendSyncMessage(JoinPoint joinPoint, SyncDataToLocal controllerSyncData2Local) {
+        try {
+            KafkaMessage<Object> data = new KafkaMessage<>();
+            KafkaHeader header = data.getHeader();
+            header.setTimestamp(System.currentTimeMillis());
+            header.setEventId(UUID.randomUUID().toString());
+            header.setEventType(controllerSyncData2Local.eventType());
+            String sender = header.getSender();
+            String eventType = header.getEventType();
+            if(StringUtils.isBlank(sender) && StringUtils.isNotBlank(eventType)){
+                header.setSender(eventType.substring(0, eventType.lastIndexOf("_")));
+            }
+            String tenantId = header.getTenantId();
+            if(StringUtils.isBlank(tenantId)){
+                header.setTenantId(LoginHelper.getTenantId());
+            }
+
+            Object[] args = joinPoint.getArgs();
+            int length = args.length;
+            if(length == 1){
+                data.setBody(args[0]);
+            }else if(length >1){
+                CodeSignature signature = (CodeSignature) joinPoint.getSignature();
+                String[] paramNames = signature.getParameterNames();
+                Map<String, Object> params = new HashMap<>();
+                for (int i = 0; i < length; i++) {
+//                System.out.println("参数名: " + paramNames[i] + ", 参数值: " + args[i]);
+                    params.put(paramNames[i], args[i]);
+                }
+                data.setBody(params);
+            }else{
+                data.setBody(null);
+            }
+            LoginUser loginUser = LoginHelper.getLoginUser();
+            kafkaProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, loginUser.getUserId(),data);
+        }catch (Exception e){
+            log.error("同步数据消息未发送:发送消息异常!", e);
+        }
+    }
+
+
+}

+ 16 - 11
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/controller/PtParameterController.java

@@ -6,6 +6,8 @@ import lombok.RequiredArgsConstructor;
 import jakarta.servlet.http.HttpServletResponse;
 import jakarta.validation.constraints.*;
 import cn.dev33.satoken.annotation.SaCheckPermission;
+import org.dromara.backstage.aop.annotation.SyncDataToLocal;
+import org.dromara.common.message.kafka.constant.MessageEventTypeConstants;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.validation.annotation.Validated;
 import org.dromara.common.idempotent.annotation.RepeatSubmit;
@@ -75,22 +77,12 @@ public class PtParameterController extends BaseController {
     @SaCheckPermission("basicParameter:ptParameter:add")
     @Log(title = "系统参数", businessType = BusinessType.INSERT)
     @RepeatSubmit()
+    @SyncDataToLocal(eventType = MessageEventTypeConstants.ptParameter_ADD)
     @PostMapping()
     public R<Void> add(@Validated(AddGroup.class) @RequestBody PtParameterBo bo) {
         return toAjax(ptParameterService.insertByBo(bo));
     }
 
-    /**
-     * 修改系统参数
-     */
-    @SaCheckPermission("basicParameter:ptParameter:edit")
-    @Log(title = "系统参数", businessType = BusinessType.UPDATE)
-    @RepeatSubmit()
-    @PutMapping()
-    public R<Void> edit(@Validated(EditGroup.class) @RequestBody PtParameterBo bo) {
-        return toAjax(ptParameterService.updateByBo(bo));
-    }
-
     /**
      * 删除系统参数
      *
@@ -99,10 +91,23 @@ public class PtParameterController extends BaseController {
     @SaCheckPermission("basicParameter:ptParameter:remove")
     @Log(title = "系统参数", businessType = BusinessType.DELETE)
     @DeleteMapping("/{paramIds}")
+    @SyncDataToLocal(eventType = MessageEventTypeConstants.ptParameter_EDIT)
     public R<Void> remove(@NotEmpty(message = "主键不能为空")
                           @PathVariable Long[] paramIds) {
         return toAjax(ptParameterService.deleteWithValidByIds(List.of(paramIds), true));
     }
+
+    /**
+     * 修改系统参数
+     */
+    @SaCheckPermission("basicParameter:ptParameter:edit")
+    @Log(title = "系统参数", businessType = BusinessType.UPDATE)
+    @RepeatSubmit()
+    @SyncDataToLocal(eventType = MessageEventTypeConstants.ptParameter_REMOVE)
+    @PutMapping()
+    public R<Void> edit(@Validated(EditGroup.class) @RequestBody PtParameterBo bo) {
+        return toAjax(ptParameterService.updateByBo(bo));
+    }
     /**
      * 根据参数键名查询参数值
      *

+ 106 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/controller/SendMessageRecordController.java

@@ -0,0 +1,106 @@
+package org.dromara.backstage.basics.controller;
+
+import java.util.List;
+
+import lombok.RequiredArgsConstructor;
+import jakarta.servlet.http.HttpServletResponse;
+import jakarta.validation.constraints.*;
+import cn.dev33.satoken.annotation.SaCheckPermission;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.validation.annotation.Validated;
+import org.dromara.common.idempotent.annotation.RepeatSubmit;
+import org.dromara.common.log.annotation.Log;
+import org.dromara.common.web.core.BaseController;
+import org.dromara.common.mybatis.core.page.PageQuery;
+import org.dromara.common.core.domain.R;
+import org.dromara.common.core.validate.AddGroup;
+import org.dromara.common.core.validate.EditGroup;
+import org.dromara.common.log.enums.BusinessType;
+import org.dromara.common.excel.utils.ExcelUtil;
+import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
+import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
+import org.dromara.backstage.basics.service.ISendMessageRecordService;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+
+/**
+ * 消息发送记录
+ * 前端访问路由地址为:/basics/sendMessageRecord
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+@Validated
+@RequiredArgsConstructor
+@RestController
+@RequestMapping("/basics/sendMessageRecord")
+public class SendMessageRecordController extends BaseController {
+
+    private final ISendMessageRecordService sendMessageRecordService;
+
+    /**
+     * 查询消息发送记录列表
+     */
+    @SaCheckPermission("basics:sendMessageRecord:list")
+    @GetMapping("/list")
+    public TableDataInfo<SendMessageRecordVo> list(SendMessageRecordBo bo, PageQuery pageQuery) {
+        return sendMessageRecordService.queryPageList(bo, pageQuery);
+    }
+
+    /**
+     * 导出消息发送记录列表
+     */
+    @SaCheckPermission("basics:sendMessageRecord:export")
+    @Log(title = "消息发送记录", businessType = BusinessType.EXPORT)
+    @PostMapping("/export")
+    public void export(SendMessageRecordBo bo, HttpServletResponse response) {
+        List<SendMessageRecordVo> list = sendMessageRecordService.queryList(bo);
+        ExcelUtil.exportExcel(list, "消息发送记录", SendMessageRecordVo.class, response);
+    }
+
+    /**
+     * 获取消息发送记录详细信息
+     *
+     * @param recordId 主键
+     */
+    @SaCheckPermission("basics:sendMessageRecord:query")
+    @GetMapping("/{recordId}")
+    public R<SendMessageRecordVo> getInfo(@NotNull(message = "主键不能为空")
+                                     @PathVariable Long recordId) {
+        return R.ok(sendMessageRecordService.queryById(recordId));
+    }
+
+    /**
+     * 新增消息发送记录
+     */
+    @SaCheckPermission("basics:sendMessageRecord:add")
+    @Log(title = "消息发送记录", businessType = BusinessType.INSERT)
+    @RepeatSubmit()
+    @PostMapping()
+    public R<Void> add(@Validated(AddGroup.class) @RequestBody SendMessageRecordBo bo) {
+        return toAjax(sendMessageRecordService.insertByBo(bo));
+    }
+
+    /**
+     * 修改消息发送记录
+     */
+    @SaCheckPermission("basics:sendMessageRecord:edit")
+    @Log(title = "消息发送记录", businessType = BusinessType.UPDATE)
+    @RepeatSubmit()
+    @PutMapping()
+    public R<Void> edit(@Validated(EditGroup.class) @RequestBody SendMessageRecordBo bo) {
+        return toAjax(sendMessageRecordService.updateByBo(bo));
+    }
+
+    /**
+     * 删除消息发送记录
+     *
+     * @param recordIds 主键串
+     */
+    @SaCheckPermission("basics:sendMessageRecord:remove")
+    @Log(title = "消息发送记录", businessType = BusinessType.DELETE)
+    @DeleteMapping("/{recordIds}")
+    public R<Void> remove(@NotEmpty(message = "主键不能为空")
+                          @PathVariable Long[] recordIds) {
+        return toAjax(sendMessageRecordService.deleteWithValidByIds(List.of(recordIds), true));
+    }
+}

+ 75 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/SendMessageRecord.java

@@ -0,0 +1,75 @@
+package org.dromara.backstage.basics.domain;
+
+import org.dromara.common.tenant.core.TenantEntity;
+import com.baomidou.mybatisplus.annotation.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * 消息发送记录对象 t_send_message_record
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+@Data
+@TableName("t_send_message_record")
+public class SendMessageRecord implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键id
+     */
+    private Long recordId;
+
+    /**
+     * 消息类型:kafka、rabbitmq、rocketmq
+     */
+    private String mqType;
+
+    /**
+     * 消息主题
+     */
+    private String topic;
+
+    /**
+     * 消息事件类型
+     */
+    private String eventType;
+
+    /**
+     * 发送结果:S 成功,F 失败
+     */
+    private String result;
+
+    /**
+     * 消息
+     */
+    private String message;
+
+    /**
+     * $column.columnComment
+     */
+    private String eventId;
+
+    /**
+     * 发送方
+     */
+    private String sender;
+
+    /**
+     * 租户编号
+     */
+    private String tenantId;
+
+    private Long createBy;
+
+    private Date createTime;
+
+
+}

+ 2 - 1
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/bo/PtParameterBo.java

@@ -8,6 +8,7 @@ import io.github.linpeilie.annotations.AutoMapper;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import jakarta.validation.constraints.*;
+import org.dromara.common.tenant.core.TenantEntity;
 
 /**
  * 系统参数业务对象 t_pt_parameter
@@ -18,7 +19,7 @@ import jakarta.validation.constraints.*;
 @Data
 @EqualsAndHashCode(callSuper = true)
 @AutoMapper(target = PtParameter.class, reverseConvertGenerate = false)
-public class PtParameterBo extends BaseEntity {
+public class PtParameterBo extends TenantEntity {
 
     /**
      * 参数id

+ 73 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/bo/SendMessageRecordBo.java

@@ -0,0 +1,73 @@
+package org.dromara.backstage.basics.domain.bo;
+
+import org.dromara.backstage.basics.domain.SendMessageRecord;
+import org.dromara.common.mybatis.core.domain.BaseEntity;
+import org.dromara.common.core.validate.AddGroup;
+import org.dromara.common.core.validate.EditGroup;
+import io.github.linpeilie.annotations.AutoMapper;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import jakarta.validation.constraints.*;
+import org.dromara.common.tenant.core.TenantEntity;
+
+/**
+ * 消息发送记录业务对象 t_send_message_record
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AutoMapper(target = SendMessageRecord.class, reverseConvertGenerate = false)
+public class SendMessageRecordBo extends TenantEntity {
+
+    /**
+     * 主键id
+     */
+    @NotNull(message = "主键id不能为空", groups = { AddGroup.class, EditGroup.class })
+    private Long recordId;
+
+    /**
+     * 消息类型:kafka、rabbitmq、rocketmq
+     */
+    @NotBlank(message = "消息类型:kafka、rabbitmq、rocketmq不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String mqType;
+
+    /**
+     * 消息主题
+     */
+    @NotBlank(message = "消息主题不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String topic;
+
+    /**
+     * 消息事件类型
+     */
+    @NotBlank(message = "消息事件类型不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String eventType;
+
+    /**
+     * 发送结果:S 成功,F 失败
+     */
+    @NotBlank(message = "发送结果:S 成功,F 失败不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String result;
+
+    /**
+     * 消息
+     */
+    @NotBlank(message = "消息不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String message;
+
+    /**
+     * $column.columnComment
+     */
+    @NotBlank(message = "$column.columnComment不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String eventId;
+
+    /**
+     * 发送方
+     */
+    @NotBlank(message = "发送方不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String sender;
+
+
+}

+ 81 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/domain/vo/SendMessageRecordVo.java

@@ -0,0 +1,81 @@
+package org.dromara.backstage.basics.domain.vo;
+
+import org.dromara.backstage.basics.domain.SendMessageRecord;
+import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
+import com.alibaba.excel.annotation.ExcelProperty;
+import org.dromara.common.excel.annotation.ExcelDictFormat;
+import org.dromara.common.excel.convert.ExcelDictConvert;
+import io.github.linpeilie.annotations.AutoMapper;
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.Date;
+
+
+
+/**
+ * 消息发送记录视图对象 t_send_message_record
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+@Data
+@ExcelIgnoreUnannotated
+@AutoMapper(target = SendMessageRecord.class)
+public class SendMessageRecordVo implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键id
+     */
+    @ExcelProperty(value = "主键id")
+    private Long recordId;
+
+    /**
+     * 消息类型:kafka、rabbitmq、rocketmq
+     */
+    @ExcelProperty(value = "消息类型:kafka、rabbitmq、rocketmq")
+    private String mqType;
+
+    /**
+     * 消息主题
+     */
+    @ExcelProperty(value = "消息主题")
+    private String topic;
+
+    /**
+     * 消息事件类型
+     */
+    @ExcelProperty(value = "消息事件类型")
+    private String eventType;
+
+    /**
+     * 发送结果:S 成功,F 失败
+     */
+    @ExcelProperty(value = "发送结果:S 成功,F 失败")
+    private String result;
+
+    /**
+     * 消息
+     */
+    @ExcelProperty(value = "消息")
+    private String message;
+
+    /**
+     * $column.columnComment
+     */
+    @ExcelProperty(value = "${comment}", converter = ExcelDictConvert.class)
+    @ExcelDictFormat(readConverterExp = "$column.readConverterExp()")
+    private String eventId;
+
+    /**
+     * 发送方
+     */
+    @ExcelProperty(value = "发送方")
+    private String sender;
+
+
+}

+ 15 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/mapper/SendMessageRecordMapper.java

@@ -0,0 +1,15 @@
+package org.dromara.backstage.basics.mapper;
+
+import org.dromara.backstage.basics.domain.SendMessageRecord;
+import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
+import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
+
+/**
+ * 消息发送记录Mapper接口
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+public interface SendMessageRecordMapper extends BaseMapperPlus<SendMessageRecord, SendMessageRecordVo> {
+
+}

+ 69 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/ISendMessageRecordService.java

@@ -0,0 +1,69 @@
+package org.dromara.backstage.basics.service;
+
+import org.dromara.backstage.basics.domain.SendMessageRecord;
+import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
+import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.dromara.common.mybatis.core.page.PageQuery;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * 消息发送记录Service接口
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+public interface ISendMessageRecordService {
+
+    /**
+     * 查询消息发送记录
+     *
+     * @param recordId 主键
+     * @return 消息发送记录
+     */
+    SendMessageRecordVo queryById(Long recordId);
+
+    /**
+     * 分页查询消息发送记录列表
+     *
+     * @param bo        查询条件
+     * @param pageQuery 分页参数
+     * @return 消息发送记录分页列表
+     */
+    TableDataInfo<SendMessageRecordVo> queryPageList(SendMessageRecordBo bo, PageQuery pageQuery);
+
+    /**
+     * 查询符合条件的消息发送记录列表
+     *
+     * @param bo 查询条件
+     * @return 消息发送记录列表
+     */
+    List<SendMessageRecordVo> queryList(SendMessageRecordBo bo);
+
+    /**
+     * 新增消息发送记录
+     *
+     * @param bo 消息发送记录
+     * @return 是否新增成功
+     */
+    Boolean insertByBo(SendMessageRecordBo bo);
+
+    /**
+     * 修改消息发送记录
+     *
+     * @param bo 消息发送记录
+     * @return 是否修改成功
+     */
+    Boolean updateByBo(SendMessageRecordBo bo);
+
+    /**
+     * 校验并批量删除消息发送记录信息
+     *
+     * @param ids     待删除的主键集合
+     * @param isValid 是否进行有效性校验
+     * @return 是否删除成功
+     */
+    Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid);
+}

+ 154 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/basics/service/impl/SendMessageRecordServiceImpl.java

@@ -0,0 +1,154 @@
+package org.dromara.backstage.basics.service.impl;
+
+import org.dromara.common.core.utils.MapstructUtils;
+import org.dromara.common.core.utils.StringUtils;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.dromara.common.mybatis.core.page.PageQuery;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
+import org.dromara.backstage.basics.domain.vo.SendMessageRecordVo;
+import org.dromara.backstage.basics.domain.SendMessageRecord;
+import org.dromara.backstage.basics.mapper.SendMessageRecordMapper;
+import org.dromara.backstage.basics.service.ISendMessageRecordService;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+/**
+ * 消息发送记录Service业务层处理
+ *
+ * @author bing
+ * @date 2024-10-30
+ */
+@RequiredArgsConstructor
+@Service
+public class SendMessageRecordServiceImpl implements ISendMessageRecordService {
+
+    private final SendMessageRecordMapper baseMapper;
+
+    /**
+     * 查询消息发送记录
+     *
+     * @param recordId 主键
+     * @return 消息发送记录
+     */
+    @Override
+    public SendMessageRecordVo queryById(Long recordId){
+        return baseMapper.selectVoById(recordId);
+    }
+
+    /**
+     * 分页查询消息发送记录列表
+     *
+     * @param bo        查询条件
+     * @param pageQuery 分页参数
+     * @return 消息发送记录分页列表
+     */
+    @Override
+    public TableDataInfo<SendMessageRecordVo> queryPageList(SendMessageRecordBo bo, PageQuery pageQuery) {
+        LambdaQueryWrapper<SendMessageRecord> lqw = buildQueryWrapper(bo);
+        Page<SendMessageRecordVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
+        return TableDataInfo.build(result);
+    }
+
+    /**
+     * 查询符合条件的消息发送记录列表
+     *
+     * @param bo 查询条件
+     * @return 消息发送记录列表
+     */
+    @Override
+    public List<SendMessageRecordVo> queryList(SendMessageRecordBo bo) {
+        LambdaQueryWrapper<SendMessageRecord> lqw = buildQueryWrapper(bo);
+        return baseMapper.selectVoList(lqw);
+    }
+
+    private LambdaQueryWrapper<SendMessageRecord> buildQueryWrapper(SendMessageRecordBo bo) {
+        Map<String, Object> params = bo.getParams();
+        LambdaQueryWrapper<SendMessageRecord> lqw = Wrappers.lambdaQuery();
+        lqw.eq(bo.getRecordId() != null, SendMessageRecord::getRecordId, bo.getRecordId());
+        lqw.eq(StringUtils.isNotBlank(bo.getMqType()), SendMessageRecord::getMqType, bo.getMqType());
+        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), SendMessageRecord::getTopic, bo.getTopic());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventType()), SendMessageRecord::getEventType, bo.getEventType());
+        lqw.eq(StringUtils.isNotBlank(bo.getResult()), SendMessageRecord::getResult, bo.getResult());
+        lqw.eq(StringUtils.isNotBlank(bo.getMessage()), SendMessageRecord::getMessage, bo.getMessage());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventId()), SendMessageRecord::getEventId, bo.getEventId());
+        lqw.eq(StringUtils.isNotBlank(bo.getSender()), SendMessageRecord::getSender, bo.getSender());
+        return lqw;
+    }
+
+    private QueryWrapper<SendMessageRecord> buildQueryWrapper(SendMessageRecordBo bo,String tableAlias) {
+        QueryWrapper<SendMessageRecord> lqw = new QueryWrapper<>();
+        String columnPrefix = "";
+        if(StringUtils.isNotBlank(tableAlias)){
+            columnPrefix = tableAlias + ".";
+        }
+        lqw.eq(bo.getRecordId() != null, columnPrefix+"record_id", bo.getRecordId());
+        lqw.eq(StringUtils.isNotBlank(bo.getMqType()), columnPrefix+"mq_type", bo.getMqType());
+        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), columnPrefix+"topic", bo.getTopic());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventType()), columnPrefix+"event_type", bo.getEventType());
+        lqw.eq(StringUtils.isNotBlank(bo.getResult()), columnPrefix+"result", bo.getResult());
+        lqw.eq(StringUtils.isNotBlank(bo.getMessage()), columnPrefix+"message", bo.getMessage());
+        lqw.eq(StringUtils.isNotBlank(bo.getEventId()), columnPrefix+"event_id", bo.getEventId());
+        lqw.eq(StringUtils.isNotBlank(bo.getSender()), columnPrefix+"sender", bo.getSender());
+        return lqw;
+    }
+
+    /**
+     * 新增消息发送记录
+     *
+     * @param bo 消息发送记录
+     * @return 是否新增成功
+     */
+    @Override
+    public Boolean insertByBo(SendMessageRecordBo bo) {
+        SendMessageRecord add = MapstructUtils.convert(bo, SendMessageRecord.class);
+        validEntityBeforeSave(add);
+        boolean flag = baseMapper.insert(add) > 0;
+        if (flag) {
+            bo.setRecordId(add.getRecordId());
+        }
+        return flag;
+    }
+
+    /**
+     * 修改消息发送记录
+     *
+     * @param bo 消息发送记录
+     * @return 是否修改成功
+     */
+    @Override
+    public Boolean updateByBo(SendMessageRecordBo bo) {
+        SendMessageRecord update = MapstructUtils.convert(bo, SendMessageRecord.class);
+        validEntityBeforeSave(update);
+        return baseMapper.updateById(update) > 0;
+    }
+
+    /**
+     * 保存前的数据校验
+     */
+    private void validEntityBeforeSave(SendMessageRecord entity){
+        //做一些数据校验,如唯一约束
+    }
+
+    /**
+     * 校验并批量删除消息发送记录信息
+     *
+     * @param ids     待删除的主键集合
+     * @param isValid 是否进行有效性校验
+     * @return 是否删除成功
+     */
+    @Override
+    public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
+        if(isValid){
+            //做一些业务上的校验,判断是否需要校验
+        }
+        return baseMapper.deleteByIds(ids) > 0;
+    }
+}

+ 1 - 2
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaNormalConsumer.java

@@ -12,11 +12,10 @@ import org.dromara.backstage.business.consume.strategy.Impl.ConsumeStrategyConte
 import org.dromara.backstage.consumption.domain.bo.ConsumeRecordBo;
 import org.dromara.backstage.consumption.domain.bo.ConsumptionBo;
 import org.dromara.backstage.consumption.domain.vo.XfConsumeDetailOriginalVo;
-import org.dromara.backstage.mq.domain.bo.KafkaMessage;
 import org.dromara.common.core.domain.R;
 import org.dromara.common.core.enums.CreditTypeEnum;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Component;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
 
 import java.math.BigDecimal;
 import java.util.HashMap;

+ 98 - 0
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaProducer.java

@@ -0,0 +1,98 @@
+package org.dromara.backstage.mq;
+
+import cn.hutool.core.lang.UUID;
+import com.alibaba.excel.util.StringUtils;
+import com.alibaba.fastjson2.JSON;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
+import org.dromara.backstage.basics.service.ISendMessageRecordService;
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.common.satoken.utils.LoginHelper;
+import org.dromara.system.api.model.LoginUser;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.concurrent.CompletableFuture;
+
+@RequiredArgsConstructor
+@Slf4j
+@Component
+public class KafkaProducer {
+
+    private final KafkaTemplate<String, String> kafkaTemplate;
+
+    private final ISendMessageRecordService sendMessageRecordService;
+
+    /**
+     * Send.
+     *
+     * @param topic   the topic
+     * @param message the message
+     */
+    public void send(String topic , String message) {
+        log.debug("发送消息到kafka消息系统, message:" + message);
+        kafkaTemplate.send(topic, message);
+        log.debug("发送消息到kafka消息系统结束");
+    }
+
+
+
+    public void sendKafkaMessage(String topic,Long createBy, KafkaMessage<?> data){
+        try{
+            String jsonMessage = JSON.toJSONString(data);
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
+            log.info("发送同步数据到kafka消息系统, data: " + jsonMessage);
+            CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
+            send.whenComplete((result, ex) -> {
+                if (ex != null) {
+                    log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
+
+                    // 异常信息入库
+                    insertRecord("F",createBy, data);
+                } else {
+                    log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
+                    insertRecord("S",createBy, data);
+                }
+            });
+        }catch (Exception e){
+            log.error("同步数据发送到kafka消息系统异常,data: " + data, e);
+            insertRecord("F",createBy, data);
+        }
+
+    }
+
+    //记录入库
+    public void insertRecord(String result,Long createBy, KafkaMessage<?> data){
+        try{
+            KafkaHeader header = data.getHeader();
+            String eventId = header.getEventId();
+            String sender = header.getSender();
+            String eventType = header.getEventType();
+            String tenantId = header.getTenantId();
+            // 信息入库
+            SendMessageRecordBo bo = new SendMessageRecordBo();
+            bo.setEventId(eventId);
+            bo.setSender(sender);
+            bo.setEventType(eventType);
+            bo.setTenantId(tenantId);
+            bo.setResult(result);
+            bo.setMessage(JSON.toJSONString(data));
+//            LoginUser loginUser = LoginHelper.getLoginUser();
+            bo.setCreateBy(createBy);
+            bo.setMqType("kafka");
+            bo.setTopic(KafkaTopicConstants.SYNC_DATA_TOPIC);
+            bo.setTenantId(tenantId);
+            bo.setCreateTime(new Date());
+            sendMessageRecordService.insertByBo(bo);
+        }catch (Exception e){
+            log.error("kafka消息记录入库异常,data: " + data, e);
+        }
+    }
+
+}

+ 20 - 0
ruoyi-modules/ruoyi-backstage/src/main/resources/mapper/basics/SendMessageRecordMapper.xml

@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper
+PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.dromara.backstage.basics.mapper.SendMessageRecordMapper">
+
+    <resultMap type="org.dromara.backstage.basics.domain.SendMessageRecord" id="SendMessageRecordResult">
+            <result property="recordId"    column="record_id"    />
+            <result property="mqType"    column="mq_type"    />
+            <result property="tenantId"    column="tenant_id"    />
+            <result property="topic"    column="topic"    />
+            <result property="eventType"    column="event_type"    />
+            <result property="result"    column="result"    />
+            <result property="createBy"    column="create_by"    />
+            <result property="createTime"    column="create_time"    />
+            <result property="message"    column="message"    />
+            <result property="eventId"    column="event_id"    />
+            <result property="sender"    column="sender"    />
+    </resultMap>
+</mapper>

+ 18 - 0
ruoyi-modules/ruoyi-backstage/src/test/java/org/dromara/backstage/mq/KafkaProducerTest.java

@@ -0,0 +1,18 @@
+package org.dromara.backstage.mq;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class KafkaProducerTest {
+
+    @Autowired
+    private KafkaProducer kafkaProcedurer;
+
+    @Test
+    public void send()
+    {
+        kafkaProcedurer.send("ykt_local_listener", "test message2");
+    }
+}

+ 18 - 0
ruoyi-server/ruoyi-server-base/src/main/java/org/dromara/server/base/service/yktOperation/SyncRemotePtParameterService.java

@@ -0,0 +1,18 @@
+package org.dromara.server.base.service.yktOperation;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * 系统参数同步
+ */
+@Service
+@Slf4j
+@RequiredArgsConstructor
+public class SyncRemotePtParameterService {
+
+    public void addPtParameter(){
+
+    }
+}

+ 1 - 0
ruoyi-server/ruoyi-server-base/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@@ -8,3 +8,4 @@ org.dromara.server.base.service.user.strategy.SyncUserStrategyContent
 org.dromara.server.base.service.user.strategy.impl.SyncTeacherStrategyImpl
 org.dromara.server.base.service.user.strategy.impl.SyncGraduateStrategyImpl
 org.dromara.server.base.service.user.strategy.impl.SyncTraineeStrategyImpl
+org.dromara.server.base.service.yktOperation.SyncRemotePtParameterService

+ 4 - 3
ruoyi-server/ruoyi-server-common/pom.xml

@@ -10,7 +10,7 @@
     <artifactId>ruoyi-server-common</artifactId>
     <packaging>jar</packaging>
 
-    <name>ruoyi-server-base</name>
+    <name>ruoyi-server-common</name>
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -19,8 +19,9 @@
     <dependencies>
 
         <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-message</artifactId>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
 </project>

+ 0 - 33
ruoyi-server/ruoyi-server-common/src/main/java/org/dromara/server/common/domain/bo/KafkaHeader.java

@@ -1,33 +0,0 @@
-package org.dromara.server.common.domain.bo;
-
-import lombok.Data;
-
-/**
- * name: KafkaHeader
- * package: org.dromara.stream.domain.bo
- * description: kafka消息头
- * date: 2024-10-15 11:22:26 11:22
- *
- * @author luoyibo
- * @version 0.1
- * @since JDK 1.8
- */
-@Data
-public class KafkaHeader {
-    /**
-     * 发送方
-     */
-    private String sender;
-    /**
-     * 时间戳
-     */
-    private Long timestamp;
-    /**
-     * 事件类型
-     */
-    private String eventType;
-    /**
-     * 事件Id
-     */
-    private String eventId;
-}

+ 0 - 19
ruoyi-server/ruoyi-server-common/src/main/java/org/dromara/server/common/domain/bo/KafkaMessage.java

@@ -1,19 +0,0 @@
-package org.dromara.server.common.domain.bo;
-
-import lombok.Data;
-/**
- * name: KafkaMessage
- * package: org.dromara.stream.domain.bo
- * description: Kafka消息内容
- * date: 2024-10-15 11:28:02 11:28
- *
- * @author luoyibo
- * @version 0.1
- * @since JDK 1.8
- */
-@Data
-public class KafkaMessage<T> {
-    private KafkaHeader header=new KafkaHeader();
-
-    private T body;
-}

+ 26 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/constant/kafka/YktOperationEventConstraints.java

@@ -0,0 +1,26 @@
+package org.dromara.server.mq.constant.kafka;
+
+
+public class YktOperationEventConstraints {
+
+    /**
+     * 系统参数功能模块标识
+     */
+    public static final String PARAMETER_SENDER = "YKT_101";
+
+    /**
+     * 系统参数新增
+     */
+    public static final String PARAMETER_ADD = PARAMETER_SENDER + "_ADD";
+
+    /**
+     * 系统参数修改
+     */
+    public static final String PARAMETER_EDIT = PARAMETER_SENDER + "_EDIT";
+
+    /**
+     * 系统参数删除
+     */
+    public static final String PARAMETER_DEL = PARAMETER_SENDER + "_DEL";
+
+}

+ 27 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaConsumer.java

@@ -2,13 +2,17 @@ package org.dromara.server.mq.consumer;
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
+import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.dromara.server.common.domain.bo.KafkaMessage;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.mq.event.kafka.EventStrategyContext;
+import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
 import org.springframework.kafka.annotation.KafkaListener;
 
+import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
+
 /**
  * name: KafkaNormalConsumer
  * package: org.dromara.server.mq.consumer
@@ -24,6 +28,7 @@ import org.springframework.kafka.annotation.KafkaListener;
 //@Component
 public class KafkaConsumer {
     private final EventStrategyContext eventStrategyContext;
+    private final YktEventStrategyContext yktEventStrategyContext;
     @KafkaListener(topics = "eventBus", groupId = "test-group-id")
     public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
         KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
@@ -37,4 +42,25 @@ public class KafkaConsumer {
             log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
         }
     }
+
+    /**
+     * 一卡通云端业务操作本地同步处理
+     * @param record kafka消息
+     */
+    @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "ykt_local_listener")
+    public void cloudOperationSync(ConsumerRecord<String, String> record){
+        KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
+        log.info("[接收到Kafka消息]-[{}]", receiveMsg);
+        try{
+            String eventType = receiveMsg.getHeader().getEventType();
+    //          String tenantId = receiveMsg.getHeader().getTenantId();
+            String tenantId = "";
+            JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
+            yktEventStrategyContext.doMsgHandle(eventType, eventMsg);
+
+        } catch (Exception e){
+            log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
+
+        }
+    }
 }

+ 15 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/IYktEventStrategy.java

@@ -0,0 +1,15 @@
+package org.dromara.server.mq.event.kafka;
+
+import cn.hutool.json.JSONObject;
+
+/**
+ * 一卡通统一事件处理接口
+ */
+public interface IYktEventStrategy {
+    /**
+     *
+     * @param eventType 事件类型
+     * @param msg 消息体
+     */
+    void doMsgHandle(String eventType,JSONObject msg);
+}

+ 25 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/YktEventStrategyContext.java

@@ -0,0 +1,25 @@
+package org.dromara.server.mq.event.kafka;
+
+import cn.hutool.json.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 一卡通内部操作事件处理上下文
+ */
+@Service
+public class YktEventStrategyContext {
+    private final Map<String, IYktEventStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    public YktEventStrategyContext(Map<String, IYktEventStrategy> strategyMap) {
+        this.strategyMap.putAll(strategyMap);
+    }
+
+    public void doMsgHandle(String eventType, JSONObject msg) {
+        strategyMap.get(eventType).doMsgHandle(eventType, msg);
+    }
+}

+ 44 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/event/kafka/impl/yktOperation/PtParameterEventStrategyImpl.java

@@ -0,0 +1,44 @@
+package org.dromara.server.mq.event.kafka.impl.yktOperation;
+
+import cn.hutool.json.JSONObject;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.server.base.service.dept.SyncRemoteDeptService;
+import org.dromara.server.base.service.yktOperation.SyncRemotePtParameterService;
+import org.dromara.server.mq.constant.kafka.YktOperationEventConstraints;
+import org.dromara.server.mq.event.kafka.IYktEventStrategy;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author Hz
+ * @date 2024/10/29
+ * @description 系统参数功能同步策略
+ */
+@Slf4j
+@RequiredArgsConstructor
+@Service(YktOperationEventConstraints.PARAMETER_SENDER)
+public class PtParameterEventStrategyImpl implements IYktEventStrategy {
+
+    private final SyncRemotePtParameterService parameterService;
+    @Override
+    public void doMsgHandle(String eventType, JSONObject msg) {
+        switch (eventType) {
+            case YktOperationEventConstraints.PARAMETER_ADD:{
+                log.info("新增系统参数");
+            }
+            break;
+            case YktOperationEventConstraints.PARAMETER_EDIT:{
+                log.info("修改系统参数");
+            }
+            break;
+            case YktOperationEventConstraints.PARAMETER_DEL:{
+                log.info("删除系统参数");
+            }
+            break;
+            default:
+                log.info("未知事件");
+
+        }
+    }
+
+}