|
@@ -63,7 +63,7 @@ public class KafkaCloudConsumer {
|
|
|
//判断 offset
|
|
//判断 offset
|
|
|
Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset, partition);
|
|
Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset, partition);
|
|
|
if(!canConsume){
|
|
if(!canConsume){
|
|
|
- log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
|
|
|
|
|
|
|
+ log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},partition:{}]", record.value(), offset,topic,groupId,partition);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -98,7 +98,7 @@ public class KafkaCloudConsumer {
|
|
|
//判断 offset
|
|
//判断 offset
|
|
|
Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset,partition);
|
|
Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset,partition);
|
|
|
if(!canConsume){
|
|
if(!canConsume){
|
|
|
- log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},]", record.value(), offset,topic,groupId);
|
|
|
|
|
|
|
+ log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},partition:{}]", record.value(), offset,topic,groupId,partition);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|