|
@@ -1,25 +1,24 @@
|
|
|
-package org.dromara.backstage.aop.aspect;
|
|
|
|
|
|
|
+package org.dromara.common.message.kafka.aop.aspect;
|
|
|
|
|
|
|
|
import cn.hutool.core.lang.UUID;
|
|
import cn.hutool.core.lang.UUID;
|
|
|
|
|
+import cn.hutool.core.util.ReflectUtil;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
+import org.apache.dubbo.config.annotation.DubboReference;
|
|
|
import org.aspectj.lang.JoinPoint;
|
|
import org.aspectj.lang.JoinPoint;
|
|
|
import org.aspectj.lang.annotation.AfterReturning;
|
|
import org.aspectj.lang.annotation.AfterReturning;
|
|
|
import org.aspectj.lang.annotation.Aspect;
|
|
import org.aspectj.lang.annotation.Aspect;
|
|
|
import org.aspectj.lang.reflect.CodeSignature;
|
|
import org.aspectj.lang.reflect.CodeSignature;
|
|
|
-import org.dromara.backstage.aop.annotation.SyncDataToLocal;
|
|
|
|
|
-import org.dromara.backstage.mq.KafkaProducer;
|
|
|
|
|
|
|
+import org.dromara.common.message.kafka.aop.annotation.SyncDataToLocal;
|
|
|
|
|
+import org.dromara.backstage.api.RemoteSyncToLocalByKafkaService;
|
|
|
import org.dromara.common.core.domain.R;
|
|
import org.dromara.common.core.domain.R;
|
|
|
import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
|
-import org.dromara.common.message.kafka.domain.KafkaHeader;
|
|
|
|
|
-import org.dromara.common.message.kafka.domain.KafkaMessage;
|
|
|
|
|
import org.dromara.common.satoken.utils.LoginHelper;
|
|
import org.dromara.common.satoken.utils.LoginHelper;
|
|
|
-import org.dromara.common.tenant.core.TenantEntity;
|
|
|
|
|
import org.dromara.system.api.model.LoginUser;
|
|
import org.dromara.system.api.model.LoginUser;
|
|
|
-import org.springframework.boot.autoconfigure.AutoConfiguration;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
+import java.lang.reflect.Field;
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
|
@@ -34,7 +33,9 @@ import java.util.Map;
|
|
|
@Component
|
|
@Component
|
|
|
public class SyncDataToLocalAspect {
|
|
public class SyncDataToLocalAspect {
|
|
|
|
|
|
|
|
- private final KafkaProducer kafkaProducer;
|
|
|
|
|
|
|
+// private final KafkaNormalProducer kafkaProducer;
|
|
|
|
|
+ @DubboReference
|
|
|
|
|
+ private final RemoteSyncToLocalByKafkaService kafkaProducer;
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -56,7 +57,7 @@ public class SyncDataToLocalAspect {
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private void sendSyncMessage(JoinPoint joinPoint, SyncDataToLocal controllerSyncData2Local) {
|
|
|
|
|
|
|
+ /*private void sendSyncMessage(JoinPoint joinPoint, SyncDataToLocal controllerSyncData2Local) {
|
|
|
try {
|
|
try {
|
|
|
KafkaMessage<Object> data = new KafkaMessage<>();
|
|
KafkaMessage<Object> data = new KafkaMessage<>();
|
|
|
KafkaHeader header = data.getHeader();
|
|
KafkaHeader header = data.getHeader();
|
|
@@ -96,10 +97,49 @@ public class SyncDataToLocalAspect {
|
|
|
data.setBody(null);
|
|
data.setBody(null);
|
|
|
}
|
|
}
|
|
|
LoginUser loginUser = LoginHelper.getLoginUser();
|
|
LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
|
|
+
|
|
|
kafkaProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, loginUser.getUserId(),data);
|
|
kafkaProducer.sendKafkaMessage(KafkaTopicConstants.SYNC_DATA_TOPIC, loginUser.getUserId(),data);
|
|
|
}catch (Exception e){
|
|
}catch (Exception e){
|
|
|
log.error("同步数据消息未发送:发送消息异常!", e);
|
|
log.error("同步数据消息未发送:发送消息异常!", e);
|
|
|
}
|
|
}
|
|
|
|
|
+ }*/
|
|
|
|
|
+
|
|
|
|
|
+ private void sendSyncMessage(JoinPoint joinPoint, SyncDataToLocal dataToLocal) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String tenantId = LoginHelper.getTenantId();
|
|
|
|
|
+ Object data = null;
|
|
|
|
|
+ Object[] args = joinPoint.getArgs();
|
|
|
|
|
+ int length = args.length;
|
|
|
|
|
+ if(length == 1){
|
|
|
|
|
+ Field tenantIdFiled = null;
|
|
|
|
|
+ //利用反射 判断是否有 tenantId属性,如果有则设置
|
|
|
|
|
+ try {
|
|
|
|
|
+ tenantIdFiled = ReflectUtil.getField(args[0].getClass(), "tenantId");
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ log.error("参数对象没有tenantId属性!");
|
|
|
|
|
+ }
|
|
|
|
|
+// Field tenantIdFiled = args[0].getClass().getDeclaredField("tenantId");
|
|
|
|
|
+ if(tenantIdFiled != null){
|
|
|
|
|
+ tenantIdFiled.setAccessible(true);
|
|
|
|
|
+ tenantIdFiled.set(args[0],tenantId);
|
|
|
|
|
+ }
|
|
|
|
|
+ data = args[0];
|
|
|
|
|
+ }else if(length >1){
|
|
|
|
|
+ CodeSignature signature = (CodeSignature) joinPoint.getSignature();
|
|
|
|
|
+ String[] paramNames = signature.getParameterNames();
|
|
|
|
|
+ Map<String, Object> params = new HashMap<>();
|
|
|
|
|
+ for (int i = 0; i < length; i++) {
|
|
|
|
|
+ params.put(paramNames[i], args[i]);
|
|
|
|
|
+ }
|
|
|
|
|
+ data = params;
|
|
|
|
|
+ }
|
|
|
|
|
+ LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
|
|
+
|
|
|
|
|
+ kafkaProducer.sendAndInsert(KafkaTopicConstants.SYNC_DATA_TOPIC, loginUser.getUserId(),dataToLocal.sender(),
|
|
|
|
|
+ dataToLocal.eventType(), tenantId, data);
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ log.error("同步数据消息未发送:发送消息异常!", e);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|