|
@@ -0,0 +1,76 @@
|
|
|
|
|
+package org.dromara.server.mq.consumer;
|
|
|
|
|
+
|
|
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
|
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
|
|
|
|
|
+import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
|
|
|
+import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
|
|
+import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
|
|
|
|
|
+import org.dromara.server.mq.event.kafka.YktEventStrategyContext;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|
|
|
|
+import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 一卡通内部操作双向同步kafka消息接收处理
|
|
|
|
|
+ */
|
|
|
|
|
+@RequiredArgsConstructor
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+@ConditionalOnExpression("'local'.equals('${locationFlag}')")
|
|
|
|
|
+public class KafkaConsumerYkt {
|
|
|
|
|
+
|
|
|
|
|
+ private final YktEventStrategyContext yktEventStrategyContext;
|
|
|
|
|
+ private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
|
|
|
|
|
+
|
|
|
|
|
+ @Value("${spring.system.tenantId}")
|
|
|
|
|
+ private String tenantId;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 一卡通云端业务操作本地同步处理
|
|
|
|
|
+ * @param record kafka消息
|
|
|
|
|
+ */
|
|
|
|
|
+ @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "YTK_${spring.system.tenantId}")
|
|
|
|
|
+ public void cloudOperationSync(ConsumerRecord<String, String> record){
|
|
|
|
|
+ KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
|
|
+ try{
|
|
|
|
|
+ if(receiveMsg.getHeader().getTenantId().equals(this.tenantId)){
|
|
|
|
|
+ String eventType = receiveMsg.getHeader().getEventType();
|
|
|
|
|
+ String sender = receiveMsg.getHeader().getSender();
|
|
|
|
|
+ yktEventStrategyContext.doMsgHandle(sender, eventType, receiveMsg.getBody());
|
|
|
|
|
+ try {
|
|
|
|
|
+ syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "Y"));
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ log.error("消息发送记录更新失败", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e){
|
|
|
|
|
+ try {
|
|
|
|
|
+ syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "N"));
|
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
|
+ log.error("消息发送记录更新失败", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", record.value(), e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 初始化消息记录Bo
|
|
|
|
|
+ * @param consumeStatus 消费状态
|
|
|
|
|
+ */
|
|
|
|
|
+ private static RemoteSendMessageRecordBo initBo(KafkaMessage msg, String consumeStatus){
|
|
|
|
|
+ RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
|
|
|
|
|
+ KafkaHeader header = msg.getHeader();
|
|
|
|
|
+ bo.setEventId(header.getEventId());
|
|
|
|
|
+ bo.setSender(header.getSender());
|
|
|
|
|
+ bo.setEventType(header.getEventType());
|
|
|
|
|
+ bo.setMessage(msg.getBody().toString());
|
|
|
|
|
+ bo.setConsumeStatus(consumeStatus);
|
|
|
|
|
+ return bo;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|