Prechádzať zdrojové kódy

feature:记录消费的offset

xiari 1 rok pred
rodič
commit
b487a7714f

+ 4 - 2
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

@@ -58,9 +58,10 @@ public class KafkaCloudConsumer {
         //groupId
         String groupId = "old-to-cloud-group";
         String value = record.value();
+        int partition = record.partition();
 
         //判断 offset
-        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset);
+        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset, partition);
         if(!canConsume){
             log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
             return;
@@ -92,9 +93,10 @@ public class KafkaCloudConsumer {
         //groupId
         String groupId = "local-to-cloud-group";
         String value = record.value();
+        int partition = record.partition();
 
         //判断 offset
-        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset);
+        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset,partition);
         if(!canConsume){
             log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
             return;

+ 2 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaLocalConsumer.java

@@ -51,9 +51,10 @@ public class KafkaLocalConsumer {
         //groupId
         String groupId = "YTK_" + tenantId;
         String value = record.value();
+        int partition = record.partition();
 
         //判断 offset
-        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset);
+        Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset, partition);
         if(!canConsume){
             log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
             return;

+ 3 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/domain/XfOffset.java

@@ -17,7 +17,7 @@ import java.util.Date;
  */
 @Data
 @EqualsAndHashCode(callSuper = false)
-@TableName("t_xf_offset")
+@TableName("t_xf_offset_his")
 public class XfOffset implements Serializable {
 
     @Serial
@@ -51,5 +51,7 @@ public class XfOffset implements Serializable {
 
     private Date createTime;
 
+    private Integer partitions;
+
 
 }

+ 2 - 0
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/domain/vo/XfOffsetVo.java

@@ -62,4 +62,6 @@ public class XfOffsetVo implements Serializable {
     private Date createTime;
 
 
+    private Integer partitions;
+
 }

+ 2 - 2
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/mapper/XfOffsetMapper.java

@@ -14,7 +14,7 @@ import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
 public interface XfOffsetMapper extends BaseMapperPlus<XfOffset, XfOffsetVo> {
 
 
-    @Select("select off_set from t_xf_offset where topic = #{topic} and group_id = #{groupId} order by off_set desc limit 1")
-    Long selectOffsetByTopicAndGroupId(String topic, String groupId);
+    @Select("select off_set from t_xf_offset_his where topic = #{topic} and group_id = #{groupId} and partitions = #{partition} order by off_set desc limit 1")
+    Long selectOffsetByTopicAndGroupId(String topic, String groupId, Integer partition);
 
 }

+ 3 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/service/IXfOffsetService.java

@@ -82,9 +82,11 @@ public interface IXfOffsetService {
      * 判断是否可以消费
      * @param topic
      * @param groupId
+     * @param offset
+     * @param partition 分区号
      * @return
      */
-    Boolean judgeCanConsume(String topic, String groupId, Long offset);
+    Boolean judgeCanConsume(String topic, String groupId, Long offset, Integer partition);
 
 
 

+ 2 - 2
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/service/impl/XfOffsetServiceImpl.java

@@ -189,8 +189,8 @@ public class XfOffsetServiceImpl implements IXfOffsetService {
      * @return
      */
     @Override
-    public Boolean judgeCanConsume(String topic, String groupId, Long offset) {
-        Long consumeOffset = baseMapper.selectOffsetByTopicAndGroupId(topic, groupId);
+    public Boolean judgeCanConsume(String topic, String groupId, Long offset, Integer partition) {
+        Long consumeOffset = baseMapper.selectOffsetByTopicAndGroupId(topic, groupId, partition);
         if (consumeOffset != null) {
             return offset > consumeOffset;
         }