|
@@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
import org.dromara.server.mq.event.kafka.EventStrategyContext;
|
|
import org.dromara.server.mq.event.kafka.EventStrategyContext;
|
|
|
|
|
+import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -25,6 +26,7 @@ import org.springframework.kafka.annotation.KafkaListener;
|
|
|
//@Component
|
|
//@Component
|
|
|
public class KafkaConsumer {
|
|
public class KafkaConsumer {
|
|
|
private final EventStrategyContext eventStrategyContext;
|
|
private final EventStrategyContext eventStrategyContext;
|
|
|
|
|
+ private final YktEventStrategyContext yktEventStrategyContext;
|
|
|
@KafkaListener(topics = "eventBus", groupId = "test-group-id")
|
|
@KafkaListener(topics = "eventBus", groupId = "test-group-id")
|
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
@@ -36,6 +38,26 @@ public class KafkaConsumer {
|
|
|
eventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
eventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 一卡通云端业务操作本地同步处理
|
|
|
|
|
+ * @param record kafka消息
|
|
|
|
|
+ */
|
|
|
|
|
+ @KafkaListener(topics = "ykt_operation", groupId = "ykt_local_listener")
|
|
|
|
|
+ public void cloudOperationSync(ConsumerRecord<String, String> record){
|
|
|
|
|
+ KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
|
|
+ log.info("[接收到Kafka消息]-[{}]", receiveMsg);
|
|
|
|
|
+ try{
|
|
|
|
|
+ String eventType = receiveMsg.getHeader().getEventType();
|
|
|
|
|
+ // String tenantId = receiveMsg.getHeader().getTenantId();
|
|
|
|
|
+ String tenantId = "";
|
|
|
|
|
+ JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
|
|
+ yktEventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e){
|
|
|
|
|
+ log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|