Forráskód Böngészése

feature:记录消费的offset

xiari 1 éve
szülő
commit
3609b29bf6

+ 45 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

@@ -1,5 +1,6 @@
 package org.dromara.server.mq.consumer;
 
+import cn.hutool.core.util.IdUtil;
 import cn.hutool.core.util.ObjUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONObject;
@@ -14,11 +15,15 @@ import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
 import org.dromara.common.message.kafka.domain.KafkaHeader;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
+import org.dromara.server.mq.domain.bo.XfOffsetBo;
 import org.dromara.server.mq.event.kafka.IYktEventStrategy;
+import org.dromara.server.mq.service.IXfOffsetService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
+import java.util.Date;
+
 /**
  * name: KafkaCloudConsumer
  * package: org.dromara.server.mq.consumer
@@ -37,6 +42,8 @@ public class KafkaCloudConsumer {
     private final DefaultConfig defaultConfig;
     private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
 
+    private final IXfOffsetService xfOffsetService;
+
     /**
      * eventBus主题监听 第三方对接相关
      *
@@ -44,12 +51,31 @@ public class KafkaCloudConsumer {
      */
     @KafkaListener(topics = KafkaTopicConstants.OLD_SYNC_TOPIC, groupId = "old-to-cloud-group")
     public void kafkaEventBusHandler(ConsumerRecord<String, String> record) {
-        KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
+        // offset
+        long offset = record.offset();
+        // topic
+        String topic = record.topic();
+        //groupId
+        String groupId = "old-to-cloud-group";
+        String value = record.value();
+
+        //判断 offset
+        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset);
+        if(!canConsume){
+            log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
+            return;
+        }
+
+        KafkaMessage<?> receiveMsg = JSONUtil.toBean(value, KafkaMessage.class);
         String sender = receiveMsg.getHeader().getSender();
         //在eventBus主题中,sender=005是由本系统发出,无需业务处理
         if (ObjUtil.notEqual(sender, "005") && ObjUtil.notEqual(sender, "006")) {
             doMessageHandle(receiveMsg);
         }
+
+        // 记录
+        // 记录offset
+        xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date());
     }
 
     /**
@@ -59,8 +85,26 @@ public class KafkaCloudConsumer {
      */
     @KafkaListener(topics = KafkaTopicConstants.TO_CLOUD_TOPIC, groupId = "local-to-cloud-group")
     public void kafkaToCloudHandler(ConsumerRecord<String, String> record) {
+        // offset
+        long offset = record.offset();
+        // topic
+        String topic = record.topic();
+        //groupId
+        String groupId = "local-to-cloud-group";
+        String value = record.value();
+
+        //判断 offset
+        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset);
+        if(!canConsume){
+            log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
+            return;
+        }
+
         KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
         doMessageHandle(receiveMsg);
+
+        // 记录offset
+        xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date());
     }
 
     /**

+ 24 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaLocalConsumer.java

@@ -1,5 +1,6 @@
 package org.dromara.server.mq.consumer;
 
+import cn.hutool.core.util.IdUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
@@ -12,11 +13,14 @@ import org.dromara.common.message.kafka.domain.KafkaHeader;
 import org.dromara.common.message.kafka.domain.KafkaMessage;
 import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
 import org.dromara.server.mq.event.kafka.IYktEventStrategy;
+import org.dromara.server.mq.service.IXfOffsetService;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
+import java.util.Date;
+
 import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
 
 /**
@@ -32,12 +36,29 @@ public class KafkaLocalConsumer {
     @Value("${spring.system.tenantId}")
     private String tenantId;
 
+    private final IXfOffsetService xfOffsetService;
+
     /**
      * 一卡通云端业务操作本地同步处理
      * @param record kafka消息
      */
     @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "YTK_${spring.system.tenantId}")
     public void cloudOperationSync(ConsumerRecord<String, String> record){
+        // offset
+        long offset = record.offset();
+        // topic
+        String topic = record.topic();
+        //groupId
+        String groupId = "YTK_" + tenantId;
+        String value = record.value();
+
+        //判断 offset
+        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset);
+        if(!canConsume){
+            log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
+            return;
+        }
+
         KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
         try{
             KafkaHeader header = receiveMsg.getHeader();
@@ -63,6 +84,9 @@ public class KafkaLocalConsumer {
                 log.error("消息发送记录更新失败", e);
             }
             log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", record.value(), e.getMessage(), e);
+        }finally {
+            // 记录offset
+            xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date());
         }
     }
 

+ 106 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/controller/XfOffsetController.java

@@ -0,0 +1,106 @@
+package org.dromara.server.mq.controller;
+
+import java.util.List;
+
+import lombok.RequiredArgsConstructor;
+import jakarta.servlet.http.HttpServletResponse;
+import jakarta.validation.constraints.*;
+import cn.dev33.satoken.annotation.SaCheckPermission;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.validation.annotation.Validated;
+import org.dromara.common.idempotent.annotation.RepeatSubmit;
+import org.dromara.common.log.annotation.Log;
+import org.dromara.common.web.core.BaseController;
+import org.dromara.common.mybatis.core.page.PageQuery;
+import org.dromara.common.core.domain.R;
+import org.dromara.common.core.validate.AddGroup;
+import org.dromara.common.core.validate.EditGroup;
+import org.dromara.common.log.enums.BusinessType;
+import org.dromara.common.excel.utils.ExcelUtil;
+import org.dromara.server.mq.domain.vo.XfOffsetVo;
+import org.dromara.server.mq.domain.bo.XfOffsetBo;
+import org.dromara.server.mq.service.IXfOffsetService;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+
+/**
+ * 记录kafka消费的offset
+ * 前端访问路由地址为:/mq/xfOffset
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+//@Validated
+@RequiredArgsConstructor
+//@RestController
+//@RequestMapping("/mq/xfOffset")
+public class XfOffsetController extends BaseController {
+
+    private final IXfOffsetService xfOffsetService;
+
+    /**
+     * 查询记录kafka消费的offset列表
+     */
+    @SaCheckPermission("mq:xfOffset:list")
+    @GetMapping("/list")
+    public TableDataInfo<XfOffsetVo> list(XfOffsetBo bo, PageQuery pageQuery) {
+        return xfOffsetService.queryPageList(bo, pageQuery);
+    }
+
+    /**
+     * 导出记录kafka消费的offset列表
+     */
+    @SaCheckPermission("mq:xfOffset:export")
+    @Log(title = "记录kafka消费的offset", businessType = BusinessType.EXPORT)
+    @PostMapping("/export")
+    public void export(XfOffsetBo bo, HttpServletResponse response) {
+        List<XfOffsetVo> list = xfOffsetService.queryList(bo);
+        ExcelUtil.exportExcel(list, "记录kafka消费的offset", XfOffsetVo.class, response);
+    }
+
+    /**
+     * 获取记录kafka消费的offset详细信息
+     *
+     * @param offsetId 主键
+     */
+    @SaCheckPermission("mq:xfOffset:query")
+    @GetMapping("/{offsetId}")
+    public R<XfOffsetVo> getInfo(@NotNull(message = "主键不能为空")
+                                     @PathVariable String offsetId) {
+        return R.ok(xfOffsetService.queryById(offsetId));
+    }
+
+    /**
+     * 新增记录kafka消费的offset
+     */
+    @SaCheckPermission("mq:xfOffset:add")
+    @Log(title = "记录kafka消费的offset", businessType = BusinessType.INSERT)
+    @RepeatSubmit()
+    @PostMapping()
+    public R<Void> add(@Validated(AddGroup.class) @RequestBody XfOffsetBo bo) {
+        return toAjax(xfOffsetService.insertByBo(bo));
+    }
+
+    /**
+     * 修改记录kafka消费的offset
+     */
+    @SaCheckPermission("mq:xfOffset:edit")
+    @Log(title = "记录kafka消费的offset", businessType = BusinessType.UPDATE)
+    @RepeatSubmit()
+    @PutMapping()
+    public R<Void> edit(@Validated(EditGroup.class) @RequestBody XfOffsetBo bo) {
+        return toAjax(xfOffsetService.updateByBo(bo));
+    }
+
+    /**
+     * 删除记录kafka消费的offset
+     *
+     * @param offsetIds 主键串
+     */
+    @SaCheckPermission("mq:xfOffset:remove")
+    @Log(title = "记录kafka消费的offset", businessType = BusinessType.DELETE)
+    @DeleteMapping("/{offsetIds}")
+    public R<Void> remove(@NotEmpty(message = "主键不能为空")
+                          @PathVariable String[] offsetIds) {
+        return toAjax(xfOffsetService.deleteWithValidByIds(List.of(offsetIds), true));
+    }
+}

+ 55 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/domain/XfOffset.java

@@ -0,0 +1,55 @@
+package org.dromara.server.mq.domain;
+
+import org.dromara.common.mybatis.core.domain.BaseEntity;
+import com.baomidou.mybatisplus.annotation.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * 记录kafka消费的offset对象 t_xf_offset
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@TableName("t_xf_offset")
+public class XfOffset implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键
+     */
+    @TableId(value = "offset_id", type = IdType.ASSIGN_ID)
+    private String offsetId;
+
+    /**
+     * 消费的offset
+     */
+    private Long offSet;
+
+    /**
+     * 消费主题
+     */
+    private String topic;
+
+    /**
+     * 消费者组
+     */
+    private String groupId;
+
+    /**
+     * 消息内容
+     */
+    private String record;
+
+    private Date createTime;
+
+
+}

+ 65 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/domain/bo/XfOffsetBo.java

@@ -0,0 +1,65 @@
+package org.dromara.server.mq.domain.bo;
+
+import com.alibaba.excel.annotation.ExcelProperty;
+import org.dromara.server.mq.domain.XfOffset;
+import org.dromara.common.mybatis.core.domain.BaseEntity;
+import org.dromara.common.core.validate.AddGroup;
+import org.dromara.common.core.validate.EditGroup;
+import io.github.linpeilie.annotations.AutoMapper;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import jakarta.validation.constraints.*;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * 记录kafka消费的offset业务对象 t_xf_offset
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@AutoMapper(target = XfOffset.class, reverseConvertGenerate = false)
+public class XfOffsetBo implements Serializable {
+
+
+    @Serial
+    private static final long serialVersionUID = 5104875116250871680L;
+    /**
+     * 主键
+     */
+    @NotBlank(message = "主键不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String offsetId;
+
+    /**
+     * 消费的offset
+     */
+    @NotNull(message = "消费的offset不能为空", groups = { AddGroup.class, EditGroup.class })
+    private Long offSet;
+
+    /**
+     * 消费主题
+     */
+    @NotBlank(message = "消费主题不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String topic;
+
+    /**
+     * 消费者组
+     */
+    @NotBlank(message = "消费者组不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String groupId;
+
+    /**
+     * 消息内容
+     */
+    @NotBlank(message = "消息内容不能为空", groups = { AddGroup.class, EditGroup.class })
+    private String record;
+
+    @NotNull(message = "创建时间不能为空", groups = { AddGroup.class, EditGroup.class })
+    private Date createTime;
+
+
+}

+ 65 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/domain/vo/XfOffsetVo.java

@@ -0,0 +1,65 @@
+package org.dromara.server.mq.domain.vo;
+
+import org.dromara.server.mq.domain.XfOffset;
+import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
+import com.alibaba.excel.annotation.ExcelProperty;
+import org.dromara.common.excel.annotation.ExcelDictFormat;
+import org.dromara.common.excel.convert.ExcelDictConvert;
+import io.github.linpeilie.annotations.AutoMapper;
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.Date;
+
+
+
+/**
+ * 记录kafka消费的offset视图对象 t_xf_offset
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+@Data
+@ExcelIgnoreUnannotated
+@AutoMapper(target = XfOffset.class)
+public class XfOffsetVo implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键
+     */
+    @ExcelProperty(value = "主键")
+    private String offsetId;
+
+    /**
+     * 消费的offset
+     */
+    @ExcelProperty(value = "消费的offset")
+    private Long offSet;
+
+    /**
+     * 消费主题
+     */
+    @ExcelProperty(value = "消费主题")
+    private String topic;
+
+    /**
+     * 消费者组
+     */
+    @ExcelProperty(value = "消费者组")
+    private String groupId;
+
+    /**
+     * 消息内容
+     */
+    @ExcelProperty(value = "消息内容")
+    private String record;
+
+    @ExcelProperty(value = "创建时间")
+    private Date createTime;
+
+
+}

+ 20 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/mapper/XfOffsetMapper.java

@@ -0,0 +1,20 @@
+package org.dromara.server.mq.mapper;
+
+import org.apache.ibatis.annotations.Select;
+import org.dromara.server.mq.domain.XfOffset;
+import org.dromara.server.mq.domain.vo.XfOffsetVo;
+import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
+
+/**
+ * 记录kafka消费的offsetMapper接口
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+public interface XfOffsetMapper extends BaseMapperPlus<XfOffset, XfOffsetVo> {
+
+
+    @Select("select off_set from t_xf_offset where topic = #{topic} and group_id = #{groupId} order by off_set desc limit 1")
+    Long selectOffsetByTopicAndGroupId(String topic, String groupId);
+
+}

+ 92 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/service/IXfOffsetService.java

@@ -0,0 +1,92 @@
+package org.dromara.server.mq.service;
+
+import org.dromara.server.mq.domain.XfOffset;
+import org.dromara.server.mq.domain.vo.XfOffsetVo;
+import org.dromara.server.mq.domain.bo.XfOffsetBo;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.dromara.common.mybatis.core.page.PageQuery;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 记录kafka消费的offsetService接口
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+public interface IXfOffsetService {
+
+    /**
+     * 查询记录kafka消费的offset
+     *
+     * @param offsetId 主键
+     * @return 记录kafka消费的offset
+     */
+    XfOffsetVo queryById(String offsetId);
+
+    /**
+     * 分页查询记录kafka消费的offset列表
+     *
+     * @param bo        查询条件
+     * @param pageQuery 分页参数
+     * @return 记录kafka消费的offset分页列表
+     */
+    TableDataInfo<XfOffsetVo> queryPageList(XfOffsetBo bo, PageQuery pageQuery);
+
+    /**
+     * 查询符合条件的记录kafka消费的offset列表
+     *
+     * @param bo 查询条件
+     * @return 记录kafka消费的offset列表
+     */
+    List<XfOffsetVo> queryList(XfOffsetBo bo);
+
+    /**
+     * 新增记录kafka消费的offset
+     *
+     * @param bo 记录kafka消费的offset
+     * @return 是否新增成功
+     */
+    Boolean insertByBo(XfOffsetBo bo);
+
+    Boolean insert(String offsetId, Long offSet, String topic, String groupId, String record, Date createTime);
+
+    /**
+     * 修改记录kafka消费的offset
+     *
+     * @param bo 记录kafka消费的offset
+     * @return 是否修改成功
+     */
+    Boolean updateByBo(XfOffsetBo bo);
+
+    /**
+     * 校验并批量删除记录kafka消费的offset信息
+     *
+     * @param ids     待删除的主键集合
+     * @param isValid 是否进行有效性校验
+     * @return 是否删除成功
+     */
+    Boolean deleteWithValidByIds(Collection<String> ids, Boolean isValid);
+
+    /**
+     * 根据topic和groupId查询offset
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    Long selectOffsetByTopicAndGroupId(String topic, String groupId);
+
+    /**
+     * 判断是否可以消费
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    Boolean judgeCanConsume(String topic, String groupId, Long offset);
+
+
+
+
+}

+ 201 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/service/impl/XfOffsetServiceImpl.java

@@ -0,0 +1,201 @@
+package org.dromara.server.mq.service.impl;
+
+import org.dromara.common.core.utils.MapstructUtils;
+import org.dromara.common.core.utils.StringUtils;
+import org.dromara.common.mybatis.core.page.TableDataInfo;
+import org.dromara.common.mybatis.core.page.PageQuery;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+import org.dromara.server.mq.domain.bo.XfOffsetBo;
+import org.dromara.server.mq.domain.vo.XfOffsetVo;
+import org.dromara.server.mq.domain.XfOffset;
+import org.dromara.server.mq.mapper.XfOffsetMapper;
+import org.dromara.server.mq.service.IXfOffsetService;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+/**
+ * 记录kafka消费的offsetService业务层处理
+ *
+ * @author LionLi
+ * @date 2025-03-12
+ */
+@RequiredArgsConstructor
+@Service
+public class XfOffsetServiceImpl implements IXfOffsetService {
+
+    private final XfOffsetMapper baseMapper;
+
+    /**
+     * 查询记录kafka消费的offset
+     *
+     * @param offsetId 主键
+     * @return 记录kafka消费的offset
+     */
+    @Override
+    public XfOffsetVo queryById(String offsetId){
+        return baseMapper.selectVoById(offsetId);
+    }
+
+    /**
+     * 分页查询记录kafka消费的offset列表
+     *
+     * @param bo        查询条件
+     * @param pageQuery 分页参数
+     * @return 记录kafka消费的offset分页列表
+     */
+    @Override
+    public TableDataInfo<XfOffsetVo> queryPageList(XfOffsetBo bo, PageQuery pageQuery) {
+        LambdaQueryWrapper<XfOffset> lqw = buildQueryWrapper(bo);
+        Page<XfOffsetVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
+        return TableDataInfo.build(result);
+    }
+
+    /**
+     * 查询符合条件的记录kafka消费的offset列表
+     *
+     * @param bo 查询条件
+     * @return 记录kafka消费的offset列表
+     */
+    @Override
+    public List<XfOffsetVo> queryList(XfOffsetBo bo) {
+        LambdaQueryWrapper<XfOffset> lqw = buildQueryWrapper(bo);
+        return baseMapper.selectVoList(lqw);
+    }
+
+    private LambdaQueryWrapper<XfOffset> buildQueryWrapper(XfOffsetBo bo) {
+        LambdaQueryWrapper<XfOffset> lqw = Wrappers.lambdaQuery();
+        lqw.eq(StringUtils.isNotBlank(bo.getOffsetId()), XfOffset::getOffsetId, bo.getOffsetId());
+        lqw.eq(bo.getOffSet() != null, XfOffset::getOffSet, bo.getOffSet());
+        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), XfOffset::getTopic, bo.getTopic());
+        lqw.eq(StringUtils.isNotBlank(bo.getGroupId()), XfOffset::getGroupId, bo.getGroupId());
+        lqw.eq(StringUtils.isNotBlank(bo.getRecord()), XfOffset::getRecord, bo.getRecord());
+        return lqw;
+    }
+
+    private QueryWrapper<XfOffset> buildQueryWrapper(XfOffsetBo bo,String tableAlias) {
+        QueryWrapper<XfOffset> lqw = new QueryWrapper<>();
+        String columnPrefix = "";
+        if(StringUtils.isNotBlank(tableAlias)){
+            columnPrefix = tableAlias + ".";
+        }
+        lqw.eq(StringUtils.isNotBlank(bo.getOffsetId()), columnPrefix+"offset_id", bo.getOffsetId());
+        lqw.eq(bo.getOffSet() != null, columnPrefix+"offset", bo.getOffSet());
+        lqw.eq(StringUtils.isNotBlank(bo.getTopic()), columnPrefix+"topic", bo.getTopic());
+        lqw.eq(StringUtils.isNotBlank(bo.getGroupId()), columnPrefix+"group_id", bo.getGroupId());
+        lqw.eq(StringUtils.isNotBlank(bo.getRecord()), columnPrefix+"record", bo.getRecord());
+        return lqw;
+    }
+
+    /**
+     * 新增记录kafka消费的offset
+     *
+     * @param bo 记录kafka消费的offset
+     * @return 是否新增成功
+     */
+    @Override
+    public Boolean insertByBo(XfOffsetBo bo) {
+        XfOffset add = MapstructUtils.convert(bo, XfOffset.class);
+        validEntityBeforeSave(add);
+        boolean flag = baseMapper.insert(add) > 0;
+        if (flag) {
+            bo.setOffsetId(add.getOffsetId());
+        }
+        return flag;
+    }
+
+    /**
+     * 新增记录kafka消费的offset
+     * @param offsetId
+     * @param offSet
+     * @param topic
+     * @param groupId
+     * @param record
+     * @param createTime
+     * @return
+     */
+    @Override
+    public Boolean insert(String offsetId, Long offSet, String topic, String groupId, String record, Date createTime) {
+        XfOffset add = new XfOffset();
+        add.setOffsetId(offsetId);
+        add.setOffSet(offSet);
+        add.setTopic(topic);
+        add.setGroupId(groupId);
+        add.setRecord(record);
+        add.setCreateTime(createTime);
+        return baseMapper.insert(add) > 0;
+    }
+
+
+    /**
+     * 修改记录kafka消费的offset
+     *
+     * @param bo 记录kafka消费的offset
+     * @return 是否修改成功
+     */
+    @Override
+    public Boolean updateByBo(XfOffsetBo bo) {
+        XfOffset update = MapstructUtils.convert(bo, XfOffset.class);
+        validEntityBeforeSave(update);
+        return baseMapper.updateById(update) > 0;
+    }
+
+    /**
+     * 保存前的数据校验
+     */
+    private void validEntityBeforeSave(XfOffset entity){
+        // 做一些数据校验,如唯一约束
+    }
+
+    /**
+     * 校验并批量删除记录kafka消费的offset信息
+     *
+     * @param ids     待删除的主键集合
+     * @param isValid 是否进行有效性校验
+     * @return 是否删除成功
+     */
+    @Override
+    public Boolean deleteWithValidByIds(Collection<String> ids, Boolean isValid) {
+        if(isValid){
+            // 做一些业务上的校验,判断是否需要校验
+        }
+        return baseMapper.deleteByIds(ids) > 0;
+    }
+
+    /**
+     * 根据topic和groupId查询offset
+     *
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    @Override
+    public Long selectOffsetByTopicAndGroupId(String topic, String groupId) {
+        return baseMapper.selectOffsetByTopicAndGroupId(topic, groupId);
+    }
+
+    /**
+     * 判断是否可以消费
+     *
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    @Override
+    public Boolean judgeCanConsume(String topic, String groupId, Long offset) {
+        Long consumeOffset = baseMapper.selectOffsetByTopicAndGroupId(topic, groupId);
+        if (consumeOffset != null) {
+            return offset > consumeOffset;
+        }
+        return true;
+    }
+
+
+}

+ 15 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/resources/mapper/mq/XfOffsetMapper.xml

@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper
+PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.dromara.server.mq.mapper.XfOffsetMapper">
+
+    <resultMap type="org.dromara.server.mq.domain.XfOffset" id="XfOffsetResult">
+            <result property="offsetId"    column="offset_id"    />
+            <result property="offset"    column="offset"    />
+            <result property="topic"    column="topic"    />
+            <result property="groupId"    column="group_id"    />
+            <result property="record"    column="record"    />
+            <result property="createTime"    column="create_time"    />
+    </resultMap>
+</mapper>