|
|
@@ -1,38 +0,0 @@
|
|
|
-package org.dromara.server.base.mq;
|
|
|
-
|
|
|
-import com.alibaba.fastjson2.JSON;
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
-import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
-import org.springframework.kafka.core.KafkaTemplate;
|
|
|
-import org.springframework.kafka.support.SendResult;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-
|
|
|
-@RequiredArgsConstructor
|
|
|
-@Slf4j
|
|
|
-@Component
|
|
|
-public class KafkaNormalProducer {
|
|
|
- private final KafkaTemplate<String, String> kafkaTemplate;
|
|
|
-
|
|
|
- public void sendKafkaMessage(String topic, KafkaMessage<?> data) {
|
|
|
- String jsonMessage = JSON.toJSONString(data);
|
|
|
- try {
|
|
|
- ProducerRecord<String, String> record;
|
|
|
- record = new ProducerRecord<>(topic, jsonMessage);
|
|
|
-
|
|
|
- CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
|
|
|
- send.whenComplete((result, ex) -> {
|
|
|
- if (ex != null) {
|
|
|
- log.error("消费系统发送到kafka消息系统异常,data: {}", jsonMessage, ex);
|
|
|
- } else {
|
|
|
- log.info("消费系统发送到kafka消息系统成功,data: {}", jsonMessage);
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("消费系统发送到kafka消息系统异常,data: {}", data, e);
|
|
|
- }
|
|
|
- }
|
|
|
-}
|