|
|
@@ -29,17 +29,19 @@ import org.springframework.stereotype.Component;
|
|
|
public class SyncKafkaConsumer {
|
|
|
private final EventStrategyContext eventStrategyContext;
|
|
|
|
|
|
- @KafkaListener(topics = "eventBus", groupId = "test-group-id")
|
|
|
+ @KafkaListener(topics = "old-kafka-jw", groupId = "test-ykt2")
|
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
|
String sender = receiveMsg.getHeader().getSender();
|
|
|
+ String tenantId = receiveMsg.getHeader().getTenantId();
|
|
|
JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
+ eventMsg.set("tenantId", tenantId);
|
|
|
if (ObjUtil.notEqual(sender, "005")) {
|
|
|
try {
|
|
|
eventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("[业中kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
|
+ log.error("[业中kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e);
|
|
|
}
|
|
|
}
|
|
|
}
|