|
@@ -15,7 +15,6 @@ import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
|
|
import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
|
|
|
-import org.dromara.server.mq.domain.bo.XfOffsetBo;
|
|
|
|
|
import org.dromara.server.mq.event.kafka.IYktEventStrategy;
|
|
import org.dromara.server.mq.event.kafka.IYktEventStrategy;
|
|
|
import org.dromara.server.mq.service.IXfOffsetService;
|
|
import org.dromara.server.mq.service.IXfOffsetService;
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|
@@ -44,6 +43,26 @@ public class KafkaCloudConsumer {
|
|
|
|
|
|
|
|
private final IXfOffsetService xfOffsetService;
|
|
private final IXfOffsetService xfOffsetService;
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 初始化消息记录Bo
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param consumeStatus 消费状态
|
|
|
|
|
+ * @param topic
|
|
|
|
|
+ */
|
|
|
|
|
+ private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String consumeStatus, String topic) {
|
|
|
|
|
+ RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
|
|
|
|
|
+ KafkaHeader header = msg.getHeader();
|
|
|
|
|
+ bo.setMessage("kafka");
|
|
|
|
|
+ bo.setTopic(topic);
|
|
|
|
|
+ bo.setEventId(header.getEventId());
|
|
|
|
|
+ bo.setSender(header.getSender());
|
|
|
|
|
+ bo.setEventType(header.getEventType());
|
|
|
|
|
+ bo.setMessage(msg.getBody().toString());
|
|
|
|
|
+ bo.setTenantId(header.getTenantId());
|
|
|
|
|
+ bo.setConsumeStatus(consumeStatus);
|
|
|
|
|
+ return bo;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* eventBus主题监听 第三方对接相关
|
|
* eventBus主题监听 第三方对接相关
|
|
|
*
|
|
*
|
|
@@ -71,7 +90,7 @@ public class KafkaCloudConsumer {
|
|
|
String sender = receiveMsg.getHeader().getSender();
|
|
String sender = receiveMsg.getHeader().getSender();
|
|
|
//在eventBus主题中,sender=005是由本系统发出,无需业务处理
|
|
//在eventBus主题中,sender=005是由本系统发出,无需业务处理
|
|
|
if (ObjUtil.notEqual(sender, "005") && ObjUtil.notEqual(sender, "006")) {
|
|
if (ObjUtil.notEqual(sender, "005") && ObjUtil.notEqual(sender, "006")) {
|
|
|
- doMessageHandle(receiveMsg);
|
|
|
|
|
|
|
+ doMessageHandle(receiveMsg, "");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 记录
|
|
// 记录
|
|
@@ -103,34 +122,19 @@ public class KafkaCloudConsumer {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
- doMessageHandle(receiveMsg);
|
|
|
|
|
|
|
+ doMessageHandle(receiveMsg, topic);
|
|
|
|
|
|
|
|
// 记录offset
|
|
// 记录offset
|
|
|
xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date(),partition);
|
|
xfOffsetService.insert(IdUtil.simpleUUID(), offset, topic, groupId, value, new Date(),partition);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * 初始化消息记录Bo
|
|
|
|
|
- *
|
|
|
|
|
- * @param consumeStatus 消费状态
|
|
|
|
|
- */
|
|
|
|
|
- private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String consumeStatus) {
|
|
|
|
|
- RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
|
|
|
|
|
- KafkaHeader header = msg.getHeader();
|
|
|
|
|
- bo.setEventId(header.getEventId());
|
|
|
|
|
- bo.setSender(header.getSender());
|
|
|
|
|
- bo.setEventType(header.getEventType());
|
|
|
|
|
- bo.setMessage(msg.getBody().toString());
|
|
|
|
|
- bo.setConsumeStatus(consumeStatus);
|
|
|
|
|
- return bo;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
/**
|
|
/**
|
|
|
* 公用消息消费处理事件
|
|
* 公用消息消费处理事件
|
|
|
*
|
|
*
|
|
|
* @param receiveMsg 接收到的消息信息
|
|
* @param receiveMsg 接收到的消息信息
|
|
|
|
|
+ * @param topic 主题
|
|
|
*/
|
|
*/
|
|
|
- private void doMessageHandle(KafkaMessage<?> receiveMsg) {
|
|
|
|
|
|
|
+ private void doMessageHandle(KafkaMessage<?> receiveMsg, String topic) {
|
|
|
try {
|
|
try {
|
|
|
JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
@@ -152,14 +156,14 @@ public class KafkaCloudConsumer {
|
|
|
IYktEventStrategy eventStrategy = SpringUtils.getBean(sender, IYktEventStrategy.class);
|
|
IYktEventStrategy eventStrategy = SpringUtils.getBean(sender, IYktEventStrategy.class);
|
|
|
eventStrategy.doMsgHandle(eventType, eventMsg);
|
|
eventStrategy.doMsgHandle(eventType, eventMsg);
|
|
|
try {
|
|
try {
|
|
|
- syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "Y"));
|
|
|
|
|
|
|
+ syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "Y", topic));
|
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
|
log.error("消息发送记录更新失败", ex);
|
|
log.error("消息发送记录更新失败", ex);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
try {
|
|
try {
|
|
|
- syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "N"));
|
|
|
|
|
|
|
+ syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "N", topic));
|
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
|
log.error("消息发送记录更新失败", ex);
|
|
log.error("消息发送记录更新失败", ex);
|
|
|
}
|
|
}
|