|
@@ -1,15 +1,23 @@
|
|
|
package org.dromara.backstage.mq;
|
|
package org.dromara.backstage.mq;
|
|
|
|
|
|
|
|
|
|
+import cn.hutool.core.lang.UUID;
|
|
|
|
|
+import com.alibaba.excel.util.StringUtils;
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
|
|
+import org.dromara.backstage.basics.domain.bo.SendMessageRecordBo;
|
|
|
|
|
+import org.dromara.backstage.basics.service.ISendMessageRecordService;
|
|
|
|
|
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
|
|
+import org.dromara.common.satoken.utils.LoginHelper;
|
|
|
|
|
+import org.dromara.system.api.model.LoginUser;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.kafka.support.SendResult;
|
|
import org.springframework.kafka.support.SendResult;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
+import java.util.Date;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
|
|
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
@@ -19,6 +27,8 @@ public class KafkaProducer {
|
|
|
|
|
|
|
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
|
|
|
|
|
+ private final ISendMessageRecordService sendMessageRecordService;
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* Send.
|
|
* Send.
|
|
|
*
|
|
*
|
|
@@ -31,13 +41,10 @@ public class KafkaProducer {
|
|
|
log.debug("发送消息到kafka消息系统结束");
|
|
log.debug("发送消息到kafka消息系统结束");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- public void sendSyncData(String topic, KafkaMessage<?> data){
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ public void sendKafkaMessage(String topic, KafkaMessage<?> data){
|
|
|
try{
|
|
try{
|
|
|
- KafkaHeader header = data.getHeader();
|
|
|
|
|
- String eventId = header.getEventId();
|
|
|
|
|
- String sender = header.getSender();
|
|
|
|
|
- String eventType = header.getEventType();
|
|
|
|
|
- String tenantId = header.getTenantId();
|
|
|
|
|
String jsonMessage = JSON.toJSONString(data);
|
|
String jsonMessage = JSON.toJSONString(data);
|
|
|
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
|
|
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
|
|
|
log.info("发送同步数据到kafka消息系统, data: " + jsonMessage);
|
|
log.info("发送同步数据到kafka消息系统, data: " + jsonMessage);
|
|
@@ -46,17 +53,46 @@ public class KafkaProducer {
|
|
|
if (ex != null) {
|
|
if (ex != null) {
|
|
|
log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
|
|
log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
|
|
|
|
|
|
|
|
-
|
|
|
|
|
- // todo 异常信息入库
|
|
|
|
|
|
|
+ // 异常信息入库
|
|
|
|
|
+ insertRecord("F", data);
|
|
|
} else {
|
|
} else {
|
|
|
log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
|
|
log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
|
|
|
|
|
+ insertRecord("S", data);
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
}catch (Exception e){
|
|
}catch (Exception e){
|
|
|
log.error("同步数据发送到kafka消息系统异常,data: " + data, e);
|
|
log.error("同步数据发送到kafka消息系统异常,data: " + data, e);
|
|
|
- // todo 异常信息入库
|
|
|
|
|
|
|
+ insertRecord("F", data);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ //记录入库
|
|
|
|
|
+ public void insertRecord(String result, KafkaMessage<?> data){
|
|
|
|
|
+ try{
|
|
|
|
|
+ KafkaHeader header = data.getHeader();
|
|
|
|
|
+ String eventId = header.getEventId();
|
|
|
|
|
+ String sender = header.getSender();
|
|
|
|
|
+ String eventType = header.getEventType();
|
|
|
|
|
+ String tenantId = header.getTenantId();
|
|
|
|
|
+ // 信息入库
|
|
|
|
|
+ SendMessageRecordBo bo = new SendMessageRecordBo();
|
|
|
|
|
+ bo.setEventId(eventId);
|
|
|
|
|
+ bo.setSender(sender);
|
|
|
|
|
+ bo.setEventType(eventType);
|
|
|
|
|
+ bo.setTenantId(tenantId);
|
|
|
|
|
+ bo.setResult(result);
|
|
|
|
|
+ bo.setMessage(JSON.toJSONString(data));
|
|
|
|
|
+ LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
|
|
+ bo.setCreateBy(loginUser.getUserId());
|
|
|
|
|
+ bo.setMqType("kafka");
|
|
|
|
|
+ bo.setTopic(KafkaTopicConstants.SYNC_DATA_TOPIC);
|
|
|
|
|
+ bo.setTenantId(tenantId);
|
|
|
|
|
+ bo.setCreateTime(new Date());
|
|
|
|
|
+ sendMessageRecordService.insertByBo(bo);
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ log.error("kafka消息记录入库异常,data: " + data, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
}
|
|
}
|