Browse Source

feature: kafka消费

luoyb 5 days ago
parent
commit
8c827ce561

+ 10 - 1
ruoyi-example/ruoyi-test-mq/pom.xml

@@ -68,7 +68,16 @@
68
                 </exclusion>
68
                 </exclusion>
69
             </exclusions>
69
             </exclusions>
70
         </dependency>
70
         </dependency>
71
-
71
+        <dependency>
72
+            <groupId>org.apache.dubbo</groupId>
73
+            <artifactId>dubbo</artifactId>
74
+        </dependency>
75
+        <dependency>
76
+            <groupId>org.dromara</groupId>
77
+            <artifactId>ruoyi-api-backstage</artifactId>
78
+            <version>2.2.0</version>
79
+            <scope>compile</scope>
80
+        </dependency>
72
     </dependencies>
81
     </dependencies>
73
 
82
 
74
     <build>
83
     <build>

+ 3 - 3
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java

@@ -11,11 +11,11 @@ import org.springframework.kafka.annotation.EnableKafkaStreams;
11
  *
11
  *
12
  * @author LionLi
12
  * @author LionLi
13
  */
13
  */
14
-@Configuration
15
-@EnableKafkaStreams
14
+//@Configuration
15
+//@EnableKafkaStreams
16
 public class KafkaStreamsConfig {
16
 public class KafkaStreamsConfig {
17
 
17
 
18
-    @Bean
18
+    //@Bean
19
     public KStream<String, String> demoStream(StreamsBuilder builder) {
19
     public KStream<String, String> demoStream(StreamsBuilder builder) {
20
         // 输入主题
20
         // 输入主题
21
         KStream<String, String> source = builder.stream("input-topic");
21
         KStream<String, String> source = builder.stream("input-topic");

+ 34 - 2
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java

@@ -1,23 +1,55 @@
1
 package org.dromara.stream.consumer;
1
 package org.dromara.stream.consumer;
2
 
2
 
3
+import cn.hutool.json.JSONObject;
4
+import cn.hutool.json.JSONUtil;
5
+import lombok.AllArgsConstructor;
6
+import lombok.RequiredArgsConstructor;
3
 import lombok.extern.slf4j.Slf4j;
7
 import lombok.extern.slf4j.Slf4j;
8
+import org.apache.dubbo.config.annotation.DubboReference;
4
 import org.apache.kafka.clients.consumer.ConsumerRecord;
9
 import org.apache.kafka.clients.consumer.ConsumerRecord;
10
+import org.dromara.backstage.api.RemoteConsumeService;
11
+import org.dromara.backstage.api.domain.bo.RemoteConsumptionBo;
12
+import org.dromara.common.core.domain.R;
13
+import org.dromara.common.core.enums.CreditTypeEnum;
14
+import org.dromara.stream.domain.bo.ConsumeRecordBo;
15
+import org.dromara.stream.domain.bo.ConsumptionBo;
16
+import org.dromara.stream.domain.bo.KafkaMessage;
5
 import org.springframework.kafka.annotation.KafkaListener;
17
 import org.springframework.kafka.annotation.KafkaListener;
6
 import org.springframework.stereotype.Component;
18
 import org.springframework.stereotype.Component;
7
 
19
 
20
+import java.math.BigDecimal;
21
+import java.util.Date;
22
+
8
 /**
23
 /**
9
  * @author xbhog
24
  * @author xbhog
10
  * @date 2024/05/19 18:04
25
  * @date 2024/05/19 18:04
11
  **/
26
  **/
12
 @Slf4j
27
 @Slf4j
13
 @Component
28
 @Component
29
+@RequiredArgsConstructor
14
 public class KafkaNormalConsumer {
30
 public class KafkaNormalConsumer {
15
-
31
+    @DubboReference
32
+    private final RemoteConsumeService remoteConsumeService;
16
     //默认获取最后一条消息
33
     //默认获取最后一条消息
17
-    @KafkaListener(topics = "test-topic", groupId = "test-group-id")
34
+    @KafkaListener(topics = "eventBus", groupId = "test-group-id")
18
     public void timiKafka(ConsumerRecord<String, String> record) {
35
     public void timiKafka(ConsumerRecord<String, String> record) {
19
         Object key = record.key();
36
         Object key = record.key();
20
         Object value = record.value();
37
         Object value = record.value();
38
+        KafkaMessage<?> kmsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
39
+        JSONObject data = JSONUtil.parseObj(kmsg.getBody());
40
+        RemoteConsumptionBo recordBo = new RemoteConsumptionBo();
41
+        recordBo.setConsumeMoney((BigDecimal) data.get("consumeValue"));
42
+        recordBo.setConsumeDate((Date) data.get("consumeDate"));
43
+        recordBo.setCardNo((Long) data.get("cardNo"));
44
+        recordBo.setFactoryId((Long) data.get("factoryFixId"));
45
+        recordBo.setTermNo((Long) data.get("termNo"));
46
+        recordBo.setTermRecordId((Long) data.get("termRecordID"));
47
+        recordBo.setRealName((String) data.get("xm"));
48
+        recordBo.setUserNumb((String) data.get("userNumb"));
49
+        recordBo.setRecordStatus((Long) data.get("posRecordState"));
50
+        recordBo.setCreditType(CreditTypeEnum.TERM_CONSUME);
51
+        R<Object> result = remoteConsumeService.receiveConsumeOriginalRecord(recordBo);
52
+
21
         log.info("【消费者】received the message key {},value:{}", key, value);
53
         log.info("【消费者】received the message key {},value:{}", key, value);
22
     }
54
     }
23
 
55
 

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java

@@ -11,7 +11,7 @@ import org.springframework.stereotype.Component;
11
  * @date 2024/06/01 16:53
11
  * @date 2024/06/01 16:53
12
  **/
12
  **/
13
 @Slf4j
13
 @Slf4j
14
-@Component
14
+//@Component
15
 @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
15
 @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
16
 public class NormalRocketConsumer implements RocketMQListener<MessageExt> {
16
 public class NormalRocketConsumer implements RocketMQListener<MessageExt> {
17
 
17
 

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/RabbitConsumer.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
12
  * @date 2024年5月18日
12
  * @date 2024年5月18日
13
  */
13
  */
14
 @Slf4j
14
 @Slf4j
15
-@Component
15
+//@Component
16
 public class RabbitConsumer {
16
 public class RabbitConsumer {
17
 
17
 
18
     /**
18
     /**

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java

@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
10
  * @date 2024/06/01 16:54
10
  * @date 2024/06/01 16:54
11
  **/
11
  **/
12
 @Slf4j
12
 @Slf4j
13
-@Component
13
+//@Component
14
 @RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-group")
14
 @RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-group")
15
 public class TransactionRocketConsumer  implements RocketMQListener<String> {
15
 public class TransactionRocketConsumer  implements RocketMQListener<String> {
16
 
16
 

+ 34 - 0
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/domain/bo/KafkaHeader.java

@@ -0,0 +1,34 @@
1
+package org.dromara.stream.domain.bo;
2
+
3
+import cn.hutool.core.util.IdUtil;
4
+import lombok.Data;
5
+
6
+/**
7
+ * name: KafkaHeader
8
+ * package: org.dromara.stream.domain.bo
9
+ * description: kafka消息头
10
+ * date: 2024-10-15 11:22:26 11:22
11
+ *
12
+ * @author luoyibo
13
+ * @version 0.1
14
+ * @since JDK 1.8
15
+ */
16
+@Data
17
+public class KafkaHeader {
18
+    /**
19
+     * 发送方
20
+     */
21
+    private String sender;
22
+    /**
23
+     * 时间戳
24
+     */
25
+    private Long timestamp;
26
+    /**
27
+     * 事件类型
28
+     */
29
+    private String eventType;
30
+    /**
31
+     * 事件Id
32
+     */
33
+    private String eventId;
34
+}

+ 20 - 0
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/domain/bo/KafkaMessage.java

@@ -0,0 +1,20 @@
1
+package org.dromara.stream.domain.bo;
2
+
3
+import lombok.Data;
4
+
5
+/**
6
+ * name: KafkaMessage
7
+ * package: org.dromara.stream.domain.bo
8
+ * description: Kafka消息内容
9
+ * date: 2024-10-15 11:28:02 11:28
10
+ *
11
+ * @author luoyibo
12
+ * @version 0.1
13
+ * @since JDK 1.8
14
+ */
15
+@Data
16
+public class KafkaMessage<T> {
17
+    private KafkaHeader header=new KafkaHeader();
18
+
19
+    private T body;
20
+}

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml

@@ -23,7 +23,7 @@ spring:
23
 --- # kafka 配置
23
 --- # kafka 配置
24
 spring:
24
 spring:
25
   kafka:
25
   kafka:
26
-    bootstrap-servers: localhost:9092
26
+    bootstrap-servers: kafka04:18182,kafka05:18283,kafka06:18384
27
     consumer:
27
     consumer:
28
       group-id: test-group-id # 消费者组ID
28
       group-id: test-group-id # 消费者组ID
29
       auto-offset-reset: earliest # 当没有偏移量或偏移量无效时,从何处开始消费
29
       auto-offset-reset: earliest # 当没有偏移量或偏移量无效时,从何处开始消费

+ 9 - 3
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaNormalConsumer.java

@@ -44,13 +44,18 @@ public class KafkaNormalConsumer {
44
         KafkaMessage<?> kmsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
44
         KafkaMessage<?> kmsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
45
         if(kmsg.getHeader().getEventType().equals("00500001")){
45
         if(kmsg.getHeader().getEventType().equals("00500001")){
46
             JSONObject data = JSONUtil.parseObj(kmsg.getBody());
46
             JSONObject data = JSONUtil.parseObj(kmsg.getBody());
47
-            //uploadByHttp(data);
48
-            uploadByService(data);
47
+            try {
48
+                //uploadByHttp(data);
49
+
50
+                uploadByService(data);
51
+            } catch (Exception e) {
52
+                log.info("消费失败");
53
+            }
49
         }
54
         }
50
         log.info("【消费者】received the message key {},value:{}", key, value);
55
         log.info("【消费者】received the message key {},value:{}", key, value);
51
     }
56
     }
52
     private void uploadByService(JSONObject data) {
57
     private void uploadByService(JSONObject data) {
53
-        ConsumptionBo recordBo = new ConsumptionBo();
58
+                    ConsumptionBo recordBo = new ConsumptionBo();
54
         String time = data.get("consumeDate").toString();
59
         String time = data.get("consumeDate").toString();
55
         recordBo.setConsumeMoney(new BigDecimal(data.get("consumeValue").toString()));
60
         recordBo.setConsumeMoney(new BigDecimal(data.get("consumeValue").toString()));
56
         recordBo.setConsumeDate(DateUtil.parse(time));
61
         recordBo.setConsumeDate(DateUtil.parse(time));
@@ -70,6 +75,7 @@ public class KafkaNormalConsumer {
70
         recordBo.setConsumeId(vo.getOriginalId());
75
         recordBo.setConsumeId(vo.getOriginalId());
71
         consumeStrategyContent.postOrder(recordBo);
76
         consumeStrategyContent.postOrder(recordBo);
72
     }
77
     }
78
+
73
     private void uploadByHttp(JSONObject data) {
79
     private void uploadByHttp(JSONObject data) {
74
         ConsumeRecordBo recordBo = new ConsumeRecordBo();
80
         ConsumeRecordBo recordBo = new ConsumeRecordBo();
75
         String time = data.get("consumeDate").toString();
81
         String time = data.get("consumeDate").toString();