|
|
@@ -24,22 +24,24 @@ import org.springframework.stereotype.Component;
|
|
|
*/
|
|
|
@RequiredArgsConstructor
|
|
|
@Slf4j
|
|
|
-//@Component
|
|
|
+@Component
|
|
|
@ConditionalOnExpression("'local'.equals('${locationFlag}')")
|
|
|
public class SyncKafkaConsumer {
|
|
|
private final EventStrategyContext eventStrategyContext;
|
|
|
|
|
|
- //@KafkaListener(topics = "eventBus", groupId = "test-group-id")
|
|
|
+ @KafkaListener(topics = "old-kafka-jw", groupId = "test-ykt2")
|
|
|
public void kafkaReceiveHandler(ConsumerRecord<String, String> record) {
|
|
|
KafkaMessage<?> receiveMsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
|
|
|
String eventType = receiveMsg.getHeader().getEventType();
|
|
|
String sender = receiveMsg.getHeader().getSender();
|
|
|
+ String tenantId = receiveMsg.getHeader().getTenantId();
|
|
|
JSONObject eventMsg = JSONUtil.parseObj(receiveMsg.getBody());
|
|
|
+ eventMsg.set("tenantId", tenantId);
|
|
|
if (ObjUtil.notEqual(sender, "005")) {
|
|
|
try {
|
|
|
eventStrategyContext.doMsgHandle(eventType, eventMsg);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("[业中kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e.getMessage());
|
|
|
+ log.error("[业中kafka消息处理失败]-[消息:{}-[错误:{}]", receiveMsg, e);
|
|
|
}
|
|
|
}
|
|
|
}
|