|
|
@@ -1,13 +1,17 @@
|
|
|
package org.dromara.server.mq.consumer;
|
|
|
|
|
|
import cn.hutool.core.util.ObjUtil;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.dubbo.config.annotation.DubboReference;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.dromara.backstage.api.RemotePtParameterService;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
import org.dromara.server.mq.event.kafka.EventStrategyContext;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
@@ -24,19 +28,26 @@ import org.springframework.stereotype.Component;
|
|
|
@RequiredArgsConstructor
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
+@ConditionalOnExpression("'cloud'.equals('${locationFlag}')")
|
|
|
public class KafkaConsumer {
|
|
|
private final EventStrategyContext eventStrategyContext;
|
|
|
+ @DubboReference
|
|
|
+ private final RemotePtParameterService remotePtParameterService;
|
|
|
|
|
|
- @KafkaListener(topics = "TO_CLOUD_EVENT", groupId = "test-group-id")
|
|
|
+ @KafkaListener(topics = "TO_CLOUD_EVENT", groupId = "test1-group-id")
|
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
- //log.info("[接收到Kafka消息]-[{}]", receiveMsg);
|
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
|
String sender = receiveMsg.getHeader().getSender();
|
|
|
JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
if(ObjUtil.equals(sender,"000")) {
|
|
|
try {
|
|
|
- eventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
+ //是接收云端消费
|
|
|
+ String cloudConsume = remotePtParameterService.getPtParameterByKey("GET_CLOUD_CONSUME");
|
|
|
+ if(ObjectUtil.equals(cloudConsume,"1")){
|
|
|
+ log.info("[处理云端消费请求]-[消信息:{}]", receiveMsg);
|
|
|
+ eventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
|
}
|