Sfoglia il codice sorgente

feature:记录消费的offset

xiari 1 anno fa
parent
commit
0fbb846c7a

+ 2 - 2
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaCloudConsumer.java

@@ -76,7 +76,7 @@ public class KafkaCloudConsumer {
 
         // 记录
         // 记录offset
-        xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date());
+        xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date(), partition);
     }
 
     /**
@@ -106,7 +106,7 @@ public class KafkaCloudConsumer {
         doMessageHandle(receiveMsg);
 
         // 记录offset
-        xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date());
+        xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date(),partition);
     }
 
     /**

+ 1 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/consumer/KafkaLocalConsumer.java

@@ -87,7 +87,7 @@ public class KafkaLocalConsumer {
             log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", record.value(), e.getMessage(), e);
         }finally {
             // 记录offset
-            xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date());
+            xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date(),partition);
         }
     }
 

+ 1 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/service/IXfOffsetService.java

@@ -51,7 +51,7 @@ public interface IXfOffsetService {
      */
     Boolean insertByBo(XfOffsetBo bo);
 
-    Boolean insert(String offsetId, Long offSet, String topic, String groupId, String record, Date createTime);
+    Boolean insert(String offsetId, Long offSet, String topic, String groupId, String record, Date createTime, Integer partition);
 
     /**
      * 修改记录kafka消费的offset

+ 2 - 1
ruoyi-server/ruoyi-server-mqdata/src/main/java/org/dromara/server/mq/service/impl/XfOffsetServiceImpl.java

@@ -122,7 +122,7 @@ public class XfOffsetServiceImpl implements IXfOffsetService {
      * @return
      */
     @Override
-    public Boolean insert(String offsetId, Long offSet, String topic, String groupId, String record, Date createTime) {
+    public Boolean insert(String offsetId, Long offSet, String topic, String groupId, String record, Date createTime, Integer partitions) {
         XfOffset add = new XfOffset();
         add.setOffsetId(offsetId);
         add.setOffSet(offSet);
@@ -130,6 +130,7 @@ public class XfOffsetServiceImpl implements IXfOffsetService {
         add.setGroupId(groupId);
         add.setRecord(record);
         add.setCreateTime(createTime);
+        add.setPartitions(partitions);
         return baseMapper.insert(add) > 0;
     }