|
|
@@ -0,0 +1,38 @@
|
|
|
+package org.dromara.system.mq;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
+import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.kafka.support.SendResult;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class KafkaNormalProducer {
|
|
|
+ private final KafkaTemplate<String, String> kafkaTemplate;
|
|
|
+
|
|
|
+ public void sendKafkaMessage(String topic, KafkaMessage<?> data) {
|
|
|
+ String jsonMessage = JSON.toJSONString(data);
|
|
|
+ try {
|
|
|
+ ProducerRecord<String, String> record;
|
|
|
+ record = new ProducerRecord<>(topic, jsonMessage);
|
|
|
+
|
|
|
+ CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(record);
|
|
|
+ send.whenComplete((result, ex) -> {
|
|
|
+ if (ex != null) {
|
|
|
+ log.error("消费系统发送到kafka消息系统异常,data: {}", jsonMessage, ex);
|
|
|
+ } else {
|
|
|
+ log.info("消费系统发送到kafka消息系统成功,data: {}", jsonMessage);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("消费系统发送到kafka消息系统异常,data: {}", data, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|