|
|
@@ -29,20 +29,49 @@ public class KafkaNormalProducer {
|
|
|
/**
|
|
|
* Send.
|
|
|
*
|
|
|
- * @param topic the topic
|
|
|
- * @param message the message
|
|
|
+ * @param topic the topic
|
|
|
+ * @param data the message
|
|
|
*/
|
|
|
- public void send(String topic , String message) {
|
|
|
- log.debug("发送消息到kafka消息系统, message:" + message);
|
|
|
- kafkaTemplate.send(topic, message);
|
|
|
- log.debug("发送消息到kafka消息系统结束");
|
|
|
+ public void sendKafkaMessage(String topic, Long createBy, KafkaMessage<?> data, Boolean save) {
|
|
|
+ String jsonMessage = JSON.toJSONString(data);
|
|
|
+ try {
|
|
|
+ ProducerRecord<String, String> record;
|
|
|
+ if (save) {
|
|
|
+ record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
|
|
|
+ } else {
|
|
|
+ record = new ProducerRecord<>(topic, jsonMessage);
|
|
|
+ }
|
|
|
+ log.info("发送同步数据到kafka消息系统, data: {}", jsonMessage);
|
|
|
+ CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
|
|
|
+ send.whenComplete((result, ex) -> {
|
|
|
+ if (ex != null) {
|
|
|
+ log.error("同步数据发送到kafka消息系统异常,data: {}", jsonMessage, ex);
|
|
|
+ if (save) {
|
|
|
+ // 异常信息入库
|
|
|
+ insertRecord("F", createBy, data);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.info("同步数据发送到kafka消息系统成功,data: {}", jsonMessage);
|
|
|
+ if (save) {
|
|
|
+ // 异常信息入库
|
|
|
+ insertRecord("S", createBy, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步数据发送到kafka消息系统异常,data: {}", data, e);
|
|
|
+ if (save) {
|
|
|
+ // 异常信息入库
|
|
|
+ insertRecord("S", createBy, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
- public void sendKafkaMessage(String topic,Long createBy, KafkaMessage<?> data){
|
|
|
- try{
|
|
|
- String jsonMessage = JSON.toJSONString(data);
|
|
|
+ public void sendKafkaMessage(String topic, Long createBy, KafkaMessage<?> data) {
|
|
|
+ this.sendKafkaMessage(topic, createBy, data, true);
|
|
|
+ //try {
|
|
|
+ //String jsonMessage = JSON.toJSONString(data);
|
|
|
/*KafkaMessage kafkaMessage = JsonUtils.parseObject(jsonMessage, KafkaMessage.class);
|
|
|
Object body = kafkaMessage.getBody();
|
|
|
if(body instanceof String){
|
|
|
@@ -58,30 +87,30 @@ public class KafkaNormalProducer {
|
|
|
System.err.println("body is other");
|
|
|
|
|
|
}*/
|
|
|
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
|
|
|
- log.info("发送同步数据到kafka消息系统, data: " + jsonMessage);
|
|
|
- CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
|
|
|
- send.whenComplete((result, ex) -> {
|
|
|
- if (ex != null) {
|
|
|
- log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
|
|
|
-
|
|
|
- // 异常信息入库
|
|
|
- insertRecord("F",createBy, data);
|
|
|
- } else {
|
|
|
- log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
|
|
|
- insertRecord("S",createBy, data);
|
|
|
- }
|
|
|
- });
|
|
|
- }catch (Exception e){
|
|
|
- log.error("同步数据发送到kafka消息系统异常,data: " + data, e);
|
|
|
- insertRecord("F",createBy, data);
|
|
|
- }
|
|
|
+ // ProducerRecord<String, String> record = new ProducerRecord<>(topic, "YKT-SYNC-Message", jsonMessage);
|
|
|
+ // log.info("发送同步数据到kafka消息系统, data: " + jsonMessage);
|
|
|
+ // CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
|
|
|
+ // send.whenComplete((result, ex) -> {
|
|
|
+ // if (ex != null) {
|
|
|
+ // log.error("同步数据发送到kafka消息系统异常,data: " + jsonMessage, ex);
|
|
|
+ //
|
|
|
+ // // 异常信息入库
|
|
|
+ // insertRecord("F", createBy, data);
|
|
|
+ // } else {
|
|
|
+ // log.info("同步数据发送到kafka消息系统成功,data: " + jsonMessage);
|
|
|
+ // insertRecord("S", createBy, data);
|
|
|
+ // }
|
|
|
+ // });
|
|
|
+ //} catch (Exception e) {
|
|
|
+ // log.error("同步数据发送到kafka消息系统异常,data: {}", data, e);
|
|
|
+ // insertRecord("F", createBy, data);
|
|
|
+ //}
|
|
|
|
|
|
}
|
|
|
|
|
|
//记录入库
|
|
|
- public void insertRecord(String result,Long createBy, KafkaMessage<?> data){
|
|
|
- try{
|
|
|
+ public void insertRecord(String result, Long createBy, KafkaMessage<?> data) {
|
|
|
+ try {
|
|
|
KafkaHeader header = data.getHeader();
|
|
|
String eventId = header.getEventId();
|
|
|
String sender = header.getSender();
|
|
|
@@ -95,8 +124,8 @@ public class KafkaNormalProducer {
|
|
|
bo.setTenantId(tenantId);
|
|
|
bo.setResult(result);
|
|
|
String s = JSON.toJSONString(data);
|
|
|
- if(StringUtils.isNotBlank(s) && s.length()> 5000){
|
|
|
- s = s.substring(0,5000);
|
|
|
+ if (StringUtils.isNotBlank(s) && s.length() > 5000) {
|
|
|
+ s = s.substring(0, 5000);
|
|
|
}
|
|
|
bo.setMessage(s);
|
|
|
// LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
@@ -106,7 +135,7 @@ public class KafkaNormalProducer {
|
|
|
bo.setTenantId(tenantId);
|
|
|
bo.setCreateTime(new Date());
|
|
|
sendMessageRecordService.insertByBo(bo);
|
|
|
- }catch (Exception e){
|
|
|
+ } catch (Exception e) {
|
|
|
log.error("kafka消息记录入库异常,data: " + data, e);
|
|
|
}
|
|
|
}
|