|
|
@@ -0,0 +1,155 @@
|
|
|
+package org.dromara.common.message.kafka.aop.aspect;
|
|
|
+
|
|
|
+import cn.hutool.core.lang.UUID;
|
|
|
+import cn.hutool.core.util.ReflectUtil;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.dubbo.config.annotation.DubboReference;
|
|
|
+import org.aspectj.lang.JoinPoint;
|
|
|
+import org.aspectj.lang.annotation.AfterReturning;
|
|
|
+import org.aspectj.lang.annotation.Aspect;
|
|
|
+import org.aspectj.lang.reflect.CodeSignature;
|
|
|
+import org.dromara.common.message.kafka.aop.annotation.SyncDataToLocal;
|
|
|
+import org.dromara.backstage.api.RemoteSyncToLocalByKafkaService;
|
|
|
+import org.dromara.common.core.domain.R;
|
|
|
+import org.dromara.common.message.kafka.constant.KafkaTopicConstants;
|
|
|
+import org.dromara.common.satoken.utils.LoginHelper;
|
|
|
+import org.dromara.system.api.model.LoginUser;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.dromara.backstage.api.domain.bo.MessageBo;
|
|
|
+
|
|
|
+import java.lang.reflect.Field;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 同步数据至本地 的 切面
|
|
|
+ *
|
|
|
+ * @author bing
|
|
|
+ */
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Slf4j
|
|
|
+@Aspect
|
|
|
+@Component
|
|
|
+public class SyncDataToLocalAspect {
|
|
|
+
|
|
|
+// private final KafkaNormalProducer kafkaProducer;
|
|
|
+ @DubboReference
|
|
|
+ private final RemoteSyncToLocalByKafkaService kafkaProducer;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理完请求后执行
|
|
|
+ *
|
|
|
+ * @param joinPoint 切点
|
|
|
+ */
|
|
|
+ @AfterReturning(pointcut = "@annotation(syncDataToLocal)", returning = "jsonResult")
|
|
|
+ public void doAfterReturning(JoinPoint joinPoint, SyncDataToLocal syncDataToLocal, Object jsonResult) {
|
|
|
+ if (jsonResult instanceof R<?> r) {
|
|
|
+ if (r.getCode() == R.SUCCESS) {
|
|
|
+ sendSyncMessage(joinPoint, syncDataToLocal);
|
|
|
+ }else{
|
|
|
+ log.error("同步数据消息未发送:controller 方法返回结果未失败!");
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ log.error("同步数据消息未发送:controller 方法返回不是R类型!");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendSyncMessage(JoinPoint joinPoint, SyncDataToLocal controllerSyncData2Local) {
|
|
|
+ try {
|
|
|
+ MessageBo<Object> data = new MessageBo<>();
|
|
|
+ MessageBo.KafkaHeader header = data.getHeader();
|
|
|
+// header.setTimestamp(System.currentTimeMillis());
|
|
|
+// header.setEventId(UUID.randomUUID().toString());
|
|
|
+ header.setEventType(controllerSyncData2Local.eventType());
|
|
|
+ header.setSender(controllerSyncData2Local.sender());
|
|
|
+ String sender = header.getSender();
|
|
|
+ String eventType = header.getEventType();
|
|
|
+ if(StringUtils.isBlank(sender) && StringUtils.isNotBlank(eventType)){
|
|
|
+ header.setSender(eventType.substring(0, eventType.lastIndexOf("_")));
|
|
|
+ }
|
|
|
+ String tenantId = header.getTenantId();
|
|
|
+ if(StringUtils.isBlank(tenantId)){
|
|
|
+ tenantId = LoginHelper.getTenantId();
|
|
|
+ header.setTenantId(tenantId);
|
|
|
+ }
|
|
|
+
|
|
|
+ Object[] args = joinPoint.getArgs();
|
|
|
+ int length = args.length;
|
|
|
+ if(length == 1){
|
|
|
+ Field tenantIdFiled = null;
|
|
|
+ //利用反射 判断是否有 tenantId属性,如果有则设置
|
|
|
+ try {
|
|
|
+ tenantIdFiled = ReflectUtil.getField(args[0].getClass(), "tenantId");
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("参数对象没有tenantId属性!");
|
|
|
+ }
|
|
|
+// Field tenantIdFiled = args[0].getClass().getDeclaredField("tenantId");
|
|
|
+ if(tenantIdFiled != null){
|
|
|
+ tenantIdFiled.setAccessible(true);
|
|
|
+ tenantIdFiled.set(args[0],tenantId);
|
|
|
+ }
|
|
|
+ data.setBody(args[0]);
|
|
|
+ }else if(length >1){
|
|
|
+ CodeSignature signature = (CodeSignature) joinPoint.getSignature();
|
|
|
+ String[] paramNames = signature.getParameterNames();
|
|
|
+ Map<String, Object> params = new HashMap<>();
|
|
|
+ for (int i = 0; i < length; i++) {
|
|
|
+// System.out.println("参数名: " + paramNames[i] + ", 参数值: " + args[i]);
|
|
|
+ params.put(paramNames[i], args[i]);
|
|
|
+ }
|
|
|
+ data.setBody(params);
|
|
|
+ }else{
|
|
|
+ data.setBody(null);
|
|
|
+ }
|
|
|
+ LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
+
|
|
|
+ kafkaProducer.sendAndInsert(KafkaTopicConstants.SYNC_DATA_TOPIC, loginUser.getUserId(),data);
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("同步数据消息未发送:发送消息异常!", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* private void sendSyncMessage(JoinPoint joinPoint, SyncDataToLocal dataToLocal) {
|
|
|
+ try {
|
|
|
+ String tenantId = LoginHelper.getTenantId();
|
|
|
+ Object data = null;
|
|
|
+ Object[] args = joinPoint.getArgs();
|
|
|
+ int length = args.length;
|
|
|
+ if(length == 1){
|
|
|
+ Field tenantIdFiled = null;
|
|
|
+ //利用反射 判断是否有 tenantId属性,如果有则设置
|
|
|
+ try {
|
|
|
+ tenantIdFiled = ReflectUtil.getField(args[0].getClass(), "tenantId");
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("参数对象没有tenantId属性!");
|
|
|
+ }
|
|
|
+// Field tenantIdFiled = args[0].getClass().getDeclaredField("tenantId");
|
|
|
+ if(tenantIdFiled != null){
|
|
|
+ tenantIdFiled.setAccessible(true);
|
|
|
+ tenantIdFiled.set(args[0],tenantId);
|
|
|
+ }
|
|
|
+ data = args[0];
|
|
|
+ }else if(length >1){
|
|
|
+ CodeSignature signature = (CodeSignature) joinPoint.getSignature();
|
|
|
+ String[] paramNames = signature.getParameterNames();
|
|
|
+ Map<String, Object> params = new HashMap<>();
|
|
|
+ for (int i = 0; i < length; i++) {
|
|
|
+ params.put(paramNames[i], args[i]);
|
|
|
+ }
|
|
|
+ data = params;
|
|
|
+ }
|
|
|
+ LoginUser loginUser = LoginHelper.getLoginUser();
|
|
|
+
|
|
|
+ kafkaProducer.sendAndInsert(KafkaTopicConstants.SYNC_DATA_TOPIC, loginUser.getUserId(),dataToLocal.sender(),
|
|
|
+ dataToLocal.eventType(), tenantId, data);
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("同步数据消息未发送:发送消息异常!", e);
|
|
|
+ }
|
|
|
+ }*/
|
|
|
+
|
|
|
+
|
|
|
+}
|