|
@@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
|
|
import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
|
|
|
import org.dromara.common.core.utils.SpringUtils;
|
|
import org.dromara.common.core.utils.SpringUtils;
|
|
|
|
|
+import org.dromara.common.core.utils.StringUtils;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
|
|
import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
|
|
@@ -56,16 +57,20 @@ public class KafkaLocalConsumer {
|
|
|
//判断 offset
|
|
//判断 offset
|
|
|
Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset, partition);
|
|
Boolean canConsume = xfOffsetService.judgeCanConsume(topic, groupId, offset, partition);
|
|
|
if(!canConsume){
|
|
if(!canConsume){
|
|
|
- log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},partition:{}]", record.value(), offset,topic,groupId,partition);
|
|
|
|
|
|
|
+ log.info("[kafka消息处理]-[消息:{}-[已消费,不能重复消费,offset: {},topic:{},groupId:{},partition:{}]", value, offset,topic,groupId,partition);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
|
|
|
|
+ KafkaMessage<?> receiveMsg = JSONUtil.toBean(value, KafkaMessage.class);
|
|
|
try{
|
|
try{
|
|
|
KafkaHeader header = receiveMsg.getHeader();
|
|
KafkaHeader header = receiveMsg.getHeader();
|
|
|
- JSONObject bodyObj = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
|
|
- String tenantId = ObjectUtil.isNotEmpty(header.getTenantId()) ? header.getTenantId():
|
|
|
|
|
- bodyObj.getOrDefault("tenantId", "0").toString();
|
|
|
|
|
|
|
+ String tenantId;
|
|
|
|
|
+ if (StringUtils.isNotEmpty(header.getTenantId())) {
|
|
|
|
|
+ tenantId = header.getTenantId();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ JSONObject bodyObj = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
|
|
+ tenantId = bodyObj.getStr("tenantId");
|
|
|
|
|
+ }
|
|
|
if(tenantId.equals(this.tenantId)){
|
|
if(tenantId.equals(this.tenantId)){
|
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
|
String sender = receiveMsg.getHeader().getSender();
|
|
String sender = receiveMsg.getHeader().getSender();
|