|
@@ -2,14 +2,16 @@ package org.dromara.server.mq.consumer;
|
|
|
|
|
|
|
|
import cn.hutool.json.JSONObject;
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
import cn.hutool.json.JSONUtil;
|
|
|
-import lombok.Data;
|
|
|
|
|
|
|
+import com.alibaba.nacos.api.config.annotation.NacosValue;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
import org.dromara.server.mq.event.kafka.EventStrategyContext;
|
|
import org.dromara.server.mq.event.kafka.EventStrategyContext;
|
|
|
import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
|
|
import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
|
|
import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
|
|
|
|
|
|
|
@@ -25,11 +27,14 @@ import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC
|
|
|
*/
|
|
*/
|
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
-//@Component
|
|
|
|
|
|
|
+@Component
|
|
|
public class KafkaConsumer {
|
|
public class KafkaConsumer {
|
|
|
private final EventStrategyContext eventStrategyContext;
|
|
private final EventStrategyContext eventStrategyContext;
|
|
|
private final YktEventStrategyContext yktEventStrategyContext;
|
|
private final YktEventStrategyContext yktEventStrategyContext;
|
|
|
- @KafkaListener(topics = "eventBus", groupId = "test-group-id")
|
|
|
|
|
|
|
+ @Value("${spring.system.tenantId}")
|
|
|
|
|
+ private String tenantId;
|
|
|
|
|
+
|
|
|
|
|
+// @KafkaListener(topics = "eventBus", groupId = "test-group-id")
|
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
log.info("[接收到Kafka消息]-[{}]", receiveMsg);
|
|
log.info("[接收到Kafka消息]-[{}]", receiveMsg);
|
|
@@ -47,19 +52,22 @@ public class KafkaConsumer {
|
|
|
* 一卡通云端业务操作本地同步处理
|
|
* 一卡通云端业务操作本地同步处理
|
|
|
* @param record kafka消息
|
|
* @param record kafka消息
|
|
|
*/
|
|
*/
|
|
|
- @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "ykt_local_listener")
|
|
|
|
|
|
|
+ @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "YTK_${spring.system.tenantId}")
|
|
|
public void cloudOperationSync(ConsumerRecord<String, String> record){
|
|
public void cloudOperationSync(ConsumerRecord<String, String> record){
|
|
|
- KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
|
|
- log.info("[接收到Kafka消息]-[{}]", receiveMsg);
|
|
|
|
|
try{
|
|
try{
|
|
|
|
|
+ KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
|
|
+ log.info("[接收到Kafka消息]-[{}]", receiveMsg);
|
|
|
|
|
+ String tenantId = receiveMsg.getHeader().getTenantId();
|
|
|
|
|
+ if(!tenantId.equals(this.tenantId)){
|
|
|
|
|
+ log.info("消息所属租户不匹配");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
|
- // String tenantId = receiveMsg.getHeader().getTenantId();
|
|
|
|
|
- String tenantId = "";
|
|
|
|
|
- JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
|
|
- yktEventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
|
|
|
|
+ String sender = receiveMsg.getHeader().getSender();
|
|
|
|
|
|
|
|
|
|
+ yktEventStrategyContext.doMsgHandle(sender, eventType, receiveMsg.getBody());
|
|
|
} catch (Exception e){
|
|
} catch (Exception e){
|
|
|
- log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
|
|
|
|
|
+ log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", record.value(), e.getMessage(), e);
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|