Browse Source

Merge remote-tracking branch 'origin/master'

bing 5 days ago
parent
commit
9f11ec428c

+ 10 - 1
ruoyi-example/ruoyi-test-mq/pom.xml

@@ -68,7 +68,16 @@
68 68
                 </exclusion>
69 69
             </exclusions>
70 70
         </dependency>
71
-
71
+        <dependency>
72
+            <groupId>org.apache.dubbo</groupId>
73
+            <artifactId>dubbo</artifactId>
74
+        </dependency>
75
+        <dependency>
76
+            <groupId>org.dromara</groupId>
77
+            <artifactId>ruoyi-api-backstage</artifactId>
78
+            <version>2.2.0</version>
79
+            <scope>compile</scope>
80
+        </dependency>
72 81
     </dependencies>
73 82
 
74 83
     <build>

+ 3 - 3
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java

@@ -11,11 +11,11 @@ import org.springframework.kafka.annotation.EnableKafkaStreams;
11 11
  *
12 12
  * @author LionLi
13 13
  */
14
-@Configuration
15
-@EnableKafkaStreams
14
+//@Configuration
15
+//@EnableKafkaStreams
16 16
 public class KafkaStreamsConfig {
17 17
 
18
-    @Bean
18
+    //@Bean
19 19
     public KStream<String, String> demoStream(StreamsBuilder builder) {
20 20
         // 输入主题
21 21
         KStream<String, String> source = builder.stream("input-topic");

+ 34 - 2
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java

@@ -1,23 +1,55 @@
1 1
 package org.dromara.stream.consumer;
2 2
 
3
+import cn.hutool.json.JSONObject;
4
+import cn.hutool.json.JSONUtil;
5
+import lombok.AllArgsConstructor;
6
+import lombok.RequiredArgsConstructor;
3 7
 import lombok.extern.slf4j.Slf4j;
8
+import org.apache.dubbo.config.annotation.DubboReference;
4 9
 import org.apache.kafka.clients.consumer.ConsumerRecord;
10
+import org.dromara.backstage.api.RemoteConsumeService;
11
+import org.dromara.backstage.api.domain.bo.RemoteConsumptionBo;
12
+import org.dromara.common.core.domain.R;
13
+import org.dromara.common.core.enums.CreditTypeEnum;
14
+import org.dromara.stream.domain.bo.ConsumeRecordBo;
15
+import org.dromara.stream.domain.bo.ConsumptionBo;
16
+import org.dromara.stream.domain.bo.KafkaMessage;
5 17
 import org.springframework.kafka.annotation.KafkaListener;
6 18
 import org.springframework.stereotype.Component;
7 19
 
20
+import java.math.BigDecimal;
21
+import java.util.Date;
22
+
8 23
 /**
9 24
  * @author xbhog
10 25
  * @date 2024/05/19 18:04
11 26
  **/
12 27
 @Slf4j
13 28
 @Component
29
+@RequiredArgsConstructor
14 30
 public class KafkaNormalConsumer {
15
-
31
+    @DubboReference
32
+    private final RemoteConsumeService remoteConsumeService;
16 33
     //默认获取最后一条消息
17
-    @KafkaListener(topics = "test-topic", groupId = "test-group-id")
34
+    @KafkaListener(topics = "eventBus", groupId = "test-group-id")
18 35
     public void timiKafka(ConsumerRecord<String, String> record) {
19 36
         Object key = record.key();
20 37
         Object value = record.value();
38
+        KafkaMessage<?> kmsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
39
+        JSONObject data = JSONUtil.parseObj(kmsg.getBody());
40
+        RemoteConsumptionBo recordBo = new RemoteConsumptionBo();
41
+        recordBo.setConsumeMoney((BigDecimal) data.get("consumeValue"));
42
+        recordBo.setConsumeDate((Date) data.get("consumeDate"));
43
+        recordBo.setCardNo((Long) data.get("cardNo"));
44
+        recordBo.setFactoryId((Long) data.get("factoryFixId"));
45
+        recordBo.setTermNo((Long) data.get("termNo"));
46
+        recordBo.setTermRecordId((Long) data.get("termRecordID"));
47
+        recordBo.setRealName((String) data.get("xm"));
48
+        recordBo.setUserNumb((String) data.get("userNumb"));
49
+        recordBo.setRecordStatus((Long) data.get("posRecordState"));
50
+        recordBo.setCreditType(CreditTypeEnum.TERM_CONSUME);
51
+        R<Object> result = remoteConsumeService.receiveConsumeOriginalRecord(recordBo);
52
+
21 53
         log.info("【消费者】received the message key {},value:{}", key, value);
22 54
     }
23 55
 

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java

@@ -11,7 +11,7 @@ import org.springframework.stereotype.Component;
11 11
  * @date 2024/06/01 16:53
12 12
  **/
13 13
 @Slf4j
14
-@Component
14
+//@Component
15 15
 @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
16 16
 public class NormalRocketConsumer implements RocketMQListener<MessageExt> {
17 17
 

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/RabbitConsumer.java

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
12 12
  * @date 2024年5月18日
13 13
  */
14 14
 @Slf4j
15
-@Component
15
+//@Component
16 16
 public class RabbitConsumer {
17 17
 
18 18
     /**

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java

@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
10 10
  * @date 2024/06/01 16:54
11 11
  **/
12 12
 @Slf4j
13
-@Component
13
+//@Component
14 14
 @RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-group")
15 15
 public class TransactionRocketConsumer  implements RocketMQListener<String> {
16 16
 

+ 34 - 0
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/domain/bo/KafkaHeader.java

@@ -0,0 +1,34 @@
1
+package org.dromara.stream.domain.bo;
2
+
3
+import cn.hutool.core.util.IdUtil;
4
+import lombok.Data;
5
+
6
+/**
7
+ * name: KafkaHeader
8
+ * package: org.dromara.stream.domain.bo
9
+ * description: kafka消息头
10
+ * date: 2024-10-15 11:22:26 11:22
11
+ *
12
+ * @author luoyibo
13
+ * @version 0.1
14
+ * @since JDK 1.8
15
+ */
16
+@Data
17
+public class KafkaHeader {
18
+    /**
19
+     * 发送方
20
+     */
21
+    private String sender;
22
+    /**
23
+     * 时间戳
24
+     */
25
+    private Long timestamp;
26
+    /**
27
+     * 事件类型
28
+     */
29
+    private String eventType;
30
+    /**
31
+     * 事件Id
32
+     */
33
+    private String eventId;
34
+}

+ 20 - 0
ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/domain/bo/KafkaMessage.java

@@ -0,0 +1,20 @@
1
+package org.dromara.stream.domain.bo;
2
+
3
+import lombok.Data;
4
+
5
+/**
6
+ * name: KafkaMessage
7
+ * package: org.dromara.stream.domain.bo
8
+ * description: Kafka消息内容
9
+ * date: 2024-10-15 11:28:02 11:28
10
+ *
11
+ * @author luoyibo
12
+ * @version 0.1
13
+ * @since JDK 1.8
14
+ */
15
+@Data
16
+public class KafkaMessage<T> {
17
+    private KafkaHeader header=new KafkaHeader();
18
+
19
+    private T body;
20
+}

+ 1 - 1
ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml

@@ -23,7 +23,7 @@ spring:
23 23
 --- # kafka 配置
24 24
 spring:
25 25
   kafka:
26
-    bootstrap-servers: localhost:9092
26
+    bootstrap-servers: kafka04:18182,kafka05:18283,kafka06:18384
27 27
     consumer:
28 28
       group-id: test-group-id # 消费者组ID
29 29
       auto-offset-reset: earliest # 当没有偏移量或偏移量无效时,从何处开始消费

+ 9 - 3
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/mq/KafkaNormalConsumer.java

@@ -44,13 +44,18 @@ public class KafkaNormalConsumer {
44 44
         KafkaMessage<?> kmsg = JSONUtil.toBean(record.value(), KafkaMessage.class);
45 45
         if(kmsg.getHeader().getEventType().equals("00500001")){
46 46
             JSONObject data = JSONUtil.parseObj(kmsg.getBody());
47
-            //uploadByHttp(data);
48
-            uploadByService(data);
47
+            try {
48
+                //uploadByHttp(data);
49
+
50
+                uploadByService(data);
51
+            } catch (Exception e) {
52
+                log.info("消费失败");
53
+            }
49 54
         }
50 55
         log.info("【消费者】received the message key {},value:{}", key, value);
51 56
     }
52 57
     private void uploadByService(JSONObject data) {
53
-        ConsumptionBo recordBo = new ConsumptionBo();
58
+                    ConsumptionBo recordBo = new ConsumptionBo();
54 59
         String time = data.get("consumeDate").toString();
55 60
         recordBo.setConsumeMoney(new BigDecimal(data.get("consumeValue").toString()));
56 61
         recordBo.setConsumeDate(DateUtil.parse(time));
@@ -70,6 +75,7 @@ public class KafkaNormalConsumer {
70 75
         recordBo.setConsumeId(vo.getOriginalId());
71 76
         consumeStrategyContent.postOrder(recordBo);
72 77
     }
78
+
73 79
     private void uploadByHttp(JSONObject data) {
74 80
         ConsumeRecordBo recordBo = new ConsumeRecordBo();
75 81
         String time = data.get("consumeDate").toString();