|
@@ -7,6 +7,7 @@ import cn.hutool.json.JSONUtil;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
|
+import org.dromara.common.core.config.DefaultConfig;
|
|
|
import org.dromara.common.core.utils.SpringUtils;
|
|
import org.dromara.common.core.utils.SpringUtils;
|
|
|
import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
@@ -30,6 +31,8 @@ import org.springframework.stereotype.Component;
|
|
|
@Component
|
|
@Component
|
|
|
@ConditionalOnExpression("'cloud'.equals('${locationFlag}')")
|
|
@ConditionalOnExpression("'cloud'.equals('${locationFlag}')")
|
|
|
public class KafkaCloudConsumer {
|
|
public class KafkaCloudConsumer {
|
|
|
|
|
+ private final DefaultConfig defaultConfig;
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* eventBus主题监听 第三方对接相关
|
|
* eventBus主题监听 第三方对接相关
|
|
|
*
|
|
*
|
|
@@ -71,6 +74,9 @@ public class KafkaCloudConsumer {
|
|
|
if (ObjectUtil.isNotEmpty(eventMsg.get("tenantId"))) {
|
|
if (ObjectUtil.isNotEmpty(eventMsg.get("tenantId"))) {
|
|
|
tenantId = eventMsg.get("tenantId").toString();
|
|
tenantId = eventMsg.get("tenantId").toString();
|
|
|
receiveMsg.getHeader().setTenantId(tenantId);
|
|
receiveMsg.getHeader().setTenantId(tenantId);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ receiveMsg.getHeader().setTenantId(defaultConfig.getTenantId());
|
|
|
|
|
+ eventMsg.set("tenantId", defaultConfig.getTenantId());
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
if (ObjectUtil.isEmpty(eventMsg.get("tenantId"))) {
|
|
if (ObjectUtil.isEmpty(eventMsg.get("tenantId"))) {
|
|
@@ -80,7 +86,7 @@ public class KafkaCloudConsumer {
|
|
|
IYktEventStrategy eventStrategy = SpringUtils.getBean(sender, IYktEventStrategy.class);
|
|
IYktEventStrategy eventStrategy = SpringUtils.getBean(sender, IYktEventStrategy.class);
|
|
|
eventStrategy.doMsgHandle(eventType, eventMsg);
|
|
eventStrategy.doMsgHandle(eventType, eventMsg);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e);
|
|
|
|
|
|
|
+ log.error("[kafka消息处理失败]-[消息:{}-[错误:]", receiveMsg, e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|