|
|
@@ -43,7 +43,7 @@ public class KafkaProducer {
|
|
|
|
|
|
|
|
|
|
|
|
- public void sendKafkaMessage(String topic, KafkaMessage<?> data){
|
|
|
+ public void sendKafkaMessage(String topic,Long createBy, KafkaMessage<?> data){
|
|
|
try{
|
|
|
String jsonMessage = JSON.toJSONString(data);
|
|
|
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
|
|
|
@@ -54,21 +54,21 @@ public class KafkaProducer {
|
|
|
log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
|
|
|
|
|
|
// 异常信息入库
|
|
|
- insertRecord("F", data);
|
|
|
+ insertRecord("F",createBy, data);
|
|
|
} else {
|
|
|
log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
|
|
|
- insertRecord("S", data);
|
|
|
+ insertRecord("S",createBy, data);
|
|
|
}
|
|
|
});
|
|
|
}catch (Exception e){
|
|
|
log.error("同步数据发送到kafka消息系统异常,data: " + data, e);
|
|
|
- insertRecord("F", data);
|
|
|
+ insertRecord("F",createBy, data);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
//记录入库
|
|
|
- public void insertRecord(String result, KafkaMessage<?> data){
|
|
|
+ public void insertRecord(String result,Long createBy, KafkaMessage<?> data){
|
|
|
try{
|
|
|
KafkaHeader header = data.getHeader();
|
|
|
String eventId = header.getEventId();
|
|
|
@@ -83,8 +83,8 @@ public class KafkaProducer {
|
|
|
bo.setTenantId(tenantId);
|
|
|
bo.setResult(result);
|
|
|
bo.setMessage(JSON.toJSONString(data));
|
|
|
- LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
- bo.setCreateBy(loginUser.getUserId());
|
|
|
+// LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
+ bo.setCreateBy(createBy);
|
|
|
bo.setMqType("kafka");
|
|
|
bo.setTopic(KafkaTopicConstants.SYNC_DATA_TOPIC);
|
|
|
bo.setTenantId(tenantId);
|