Преглед изворни кода

feat(海康消费机对接): 增加了对人员账户和卡片的kafka消息处理

luo.yibo@datuai.com пре 1 година
родитељ
комит
594dd2591f

+ 6 - 0
ruoyi-api/ruoyi-api-backstage/src/main/java/org/dromara/backstage/api/domain/vo/RemoteUserAccountVo.java

@@ -183,4 +183,10 @@ public class RemoteUserAccountVo implements Serializable {
      * 完整的人脸照片地址
      */
     private String facePicUrl;
+
+    /**
+     * 删除标志
+     */
+    private String delFlag;
+
 }

+ 5 - 1
ruoyi-modules/ruoyi-backstage/src/main/java/org/dromara/backstage/payment/domain/vo/PtUserAccountVo.java

@@ -14,7 +14,6 @@ import org.dromara.common.translation.constant.TransConstant;
 import java.io.Serial;
 import java.io.Serializable;
 import java.util.Date;
-import java.util.List;
 
 
 /**
@@ -285,4 +284,9 @@ public class PtUserAccountVo implements Serializable {
      * 完整的人脸照片地址
      */
     private String facePicUrl;
+
+    /**
+     * 删除标志
+     */
+    private String delFlag;
 }

+ 1 - 1
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/controller/TestController.java

@@ -183,7 +183,7 @@ public class TestController {
      */
     @GetMapping("/emp/upload/{userId}")
     public R<Void> upLoadEmpToDevice(@PathVariable("userId") Long userId){
-        return sendDeviceService.upLoadEmpToDevice(userId);
+        return sendDeviceService.upLoadEmpToAllDevice(userId);
     }
 
     /**

+ 38 - 0
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/domain/dto/remote/RemoteCardDto.java

@@ -0,0 +1,38 @@
+package org.dromara.server.hik.domain.dto.remote;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+/**
+ * 远程调用卡片数据传输对象
+ * <p>
+ * 远程调用卡片数据传输对象
+ *
+ * @author luoyibo
+ * @version 2.2.0
+ * @date 2025-05-25
+ * @since JDK17
+ */
+@Data
+public class RemoteCardDto implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 776456160182525438L;
+
+    /**
+     * 所属账户Id
+     */
+    private Long userId;
+
+    /**
+     * 物理卡号
+     */
+    private Long factoryId;
+
+    /**
+     * 卡片状态,见KZT字典类别
+     */
+    private String status;
+}

+ 95 - 0
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/mq/consumer/HikKafkaConsumer.java

@@ -0,0 +1,95 @@
+package org.dromara.server.hik.mq.consumer;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.dromara.backstage.api.domain.bo.RemoteSendMessageRecordBo;
+import org.dromara.common.core.utils.SpringUtils;
+import org.dromara.common.core.utils.StringUtils;
+import org.dromara.common.message.kafka.domain.KafkaHeader;
+import org.dromara.common.message.kafka.domain.KafkaMessage;
+import org.dromara.server.base.service.yktOperation.SyncRemoteSendMessageRecordService;
+import org.dromara.server.hik.mq.event.IHIKEventStrategy;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import static org.dromara.common.message.kafka.constant.KafkaTopicConstants.SYNC_DATA_TOPIC;
+
+/**
+ * 海康服务消费kafka
+ * <p>
+ *  海康消费服务作为消费者消费人员与卡片的kafka消息
+ *
+ * @author luoyibo
+ * @version 2.2.0
+ * @date 2025-05-25
+ * @since JDK17
+ */
+@RequiredArgsConstructor
+@Slf4j
+@Component
+@ConditionalOnExpression("'local'.equals('${locationFlag}')")
+public class HikKafkaConsumer {
+    private final SyncRemoteSendMessageRecordService syncRemoteSendMessageRecordService;
+
+    @Value("${system.default-config.tenantId}")
+    private String tenantId;
+
+    /**
+     * 一卡通云端业务操作本地同步处理
+     * @param record kafka消息
+     */
+    @KafkaListener(topics = SYNC_DATA_TOPIC, groupId = "HIK_${system.default-config.tenantId}")
+    public void cloudOperationSync(ConsumerRecord<String, String> record){
+        String value = record.value();
+
+        KafkaMessage<?> receiveMsg = JSONUtil.toBean(value, KafkaMessage.class);
+        try{
+            KafkaHeader header = receiveMsg.getHeader();
+            String tenantId;
+            if (StringUtils.isNotEmpty(header.getTenantId())) {
+                tenantId = header.getTenantId();
+            } else {
+                JSONObject bodyObj = JSONUtil.parseObj(receiveMsg.getBody());
+                tenantId = bodyObj.getStr("tenantId");
+            }
+            if(tenantId.equals(this.tenantId)){
+                String eventType = receiveMsg.getHeader().getEventType();
+                String sender = receiveMsg.getHeader().getSender();
+                IHIKEventStrategy eventStrategy = SpringUtils.getBean(sender, IHIKEventStrategy.class);
+                eventStrategy.doMsgHandle(eventType, receiveMsg.getBody());
+                try {
+                    syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "Y"));
+                }catch (Exception e){
+                    log.error("消息发送记录更新失败", e);
+                }
+            }
+        } catch (Exception e){
+            try {
+                syncRemoteSendMessageRecordService.insertSendMessageRecord(initBo(receiveMsg, "N"));
+            } catch (Exception ex) {
+                log.error("消息发送记录更新失败", e);
+            }
+            log.error("[kafka消息处理失败]-[消息:{}-[错误:{}]", record.value(), e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 初始化消息记录Bo
+     * @param consumeStatus 消费状态
+     */
+    private static RemoteSendMessageRecordBo initBo(KafkaMessage<?> msg, String consumeStatus){
+        RemoteSendMessageRecordBo bo = new RemoteSendMessageRecordBo();
+        KafkaHeader header = msg.getHeader();
+        bo.setEventId(header.getEventId());
+        bo.setSender(header.getSender());
+        bo.setEventType(header.getEventType());
+        bo.setMessage(msg.getBody().toString());
+        bo.setConsumeStatus(consumeStatus);
+        return bo;
+    }
+}

+ 20 - 0
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/mq/event/IHIKEventStrategy.java

@@ -0,0 +1,20 @@
+package org.dromara.server.hik.mq.event;
+
+/**
+ * 海康服务消费kafka策略
+ * <p>
+ *  海康消费服务作为消费者消费人员与卡片的kafka消息时的消息处理策略
+ *
+ * @author luoyibo
+ * @version 2.2.0
+ * @date 2025-05-25
+ * @since JDK17
+ */
+public interface IHIKEventStrategy {
+    /**
+     *
+     * @param eventType 事件类型
+     * @param msg 消息体
+     */
+    void doMsgHandle(String eventType,Object msg) throws Exception;
+}

+ 62 - 0
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/mq/event/impl/HikBackStageEventImpl.java

@@ -0,0 +1,62 @@
+package org.dromara.server.hik.mq.event.impl;
+
+import cn.hutool.json.JSONUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.backstage.api.domain.vo.RemoteUserAccountVo;
+import org.dromara.common.core.domain.R;
+import org.dromara.common.core.enums.CardStatusEnum;
+import org.dromara.common.message.kafka.constant.EventSenderConstants;
+import org.dromara.common.message.kafka.constant.EventTypeConstants;
+import org.dromara.server.hik.domain.dto.remote.RemoteCardDto;
+import org.dromara.server.hik.mq.event.IHIKEventStrategy;
+import org.dromara.server.hik.service.ISendDeviceService;
+import org.springframework.stereotype.Service;
+
+import java.util.Date;
+
+/**
+ * 海康服务消费管理平台kafka消息策略实现
+ * <p>
+ *  海康消费服务作为消费者消费人员与卡片的kafka消息时的消息处理策略实现
+ *
+ * @author luoyibo
+ * @version 2.2.0
+ * @date 2025-05-25
+ * @since JDK17
+ */
+@Slf4j
+@RequiredArgsConstructor
+@Service(EventSenderConstants.BACKSTAGE)
+public class HikBackStageEventImpl implements IHIKEventStrategy {
+    private final ISendDeviceService sendDeviceService;
+
+    @Override
+    public void doMsgHandle(String eventType, Object msg) {
+        switch (eventType) {
+            case EventTypeConstants.ACCOUNT -> {
+                RemoteUserAccountVo remoteVo = JSONUtil.toBean(JSONUtil.parseObj(msg), RemoteUserAccountVo.class);
+                log.info("[海康服务处理云端->本地一卡通账户同步请求]-[账户信息:{}]", JSONUtil.toJsonStr(remoteVo));
+
+                Long userNo = remoteVo.getUserNo();
+                Boolean deleteUser = !remoteVo.getDelFlag().equals("0");
+                Date lifespan = remoteVo.getLifespan();
+                R<Void> result = sendDeviceService.upLoadEmpToAllDeviceByUserNo(userNo, lifespan, deleteUser);
+                log.info(result.getMsg());
+            }
+            case EventTypeConstants.CARD -> {
+                RemoteCardDto remoteCard = JSONUtil.toBean(JSONUtil.parseObj(msg), RemoteCardDto.class);
+                Long userId = remoteCard.getUserId();
+                Long factorId = remoteCard.getFactoryId();
+                String status = remoteCard.getStatus();
+                Boolean deleteCard = !status.equals(CardStatusEnum.NORMAL.code().toString());
+
+                log.info("[海康服务处理云端->本地卡片同步请求]-[卡片信息:{}]", JSONUtil.toJsonStr(remoteCard));
+
+                R<Void> result =  sendDeviceService.upLoadEmpCardToAllDevice(userId, factorId, false, deleteCard);
+
+                log.info(result.getMsg());
+            }
+        }
+    }
+}

+ 23 - 0
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/service/ISendDeviceService.java

@@ -5,6 +5,8 @@ import org.dromara.server.hik.domain.dto.DeviceDto;
 import org.dromara.server.hik.domain.dto.QueryDto;
 import org.dromara.server.hik.domain.dto.UploadEmpDto;
 
+import java.util.Date;
+
 
 /**
  * 向消费机发送信息服务接口
@@ -186,4 +188,25 @@ public interface ISendDeviceService {
      * @return 响应信息主体,表示上传操作的结果状态及可能的附加信息
      */
     R<Void> upLoadEmpToAllDevice(Long userId);
+
+    /**
+     * 上传指定员工信息到所有设备,并设置相关信息。
+     *
+     * @param userNo 用户编号,用于标识需要上传的员工信息
+     * @param lifeSpan 生效期限,表示员工信息在设备上的有效时间范围
+     * @param deleteUser 是否删除用户标志,用于指示是否在设备上删除该用户的相关信息
+     * @return 响应信息主体,表示上传操作的结果状态及可能的附加信息
+     */
+    R<Void> upLoadEmpToAllDeviceByUserNo(Long userNo, Date lifeSpan, Boolean deleteUser);
+
+    /**
+     * 上传指定员工的卡片到所有设备,并设置相关信息。
+     *
+     * @param userId 用户Id,用于标识需要上传的员工信息
+     * @param factorId 物理卡号
+     * @param deleteAllCard 是否删除所有卡片 false-不删除
+     * @param deleteCard 是否删除物理卡号对应的卡片, false-不删除
+     * @return n 响应信息主体,表示上传操作的结果状态及可能的附加信息
+     */
+    R<Void> upLoadEmpCardToAllDevice(Long userId, Long factorId,Boolean deleteAllCard,Boolean deleteCard);
 }

+ 70 - 37
ruoyi-server/ruoyi-server-hik/src/main/java/org/dromara/server/hik/service/impl/SendDeviceServiceImpl.java

@@ -83,32 +83,30 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         EmpInfoDto empDto = new EmpInfoDto();
         // 设置用户基本信息
         empDto.setEmployeeNo(accountVo.getUserNo().toString()).setName(accountVo.getRealName());
-        if (deleteUser) {
-            empDto.setDeleteUser(Boolean.TRUE);
-        }
+        empDto.setDeleteUser(deleteUser);
         // 设置有效期,海康设备支持有效期最大2037-12-31 23:59:59,所以要和系统的有效期比较取较小值
         Date endTime = accountVo.getLifespan();
-        Date hikEndTime = DateUtil.parse(HikDefaultConstants.EMP_END_TIME);
-        if (endTime.compareTo(hikEndTime) > 0) {
-            endTime = hikEndTime;
-        }
-        ValidDto validDto = new ValidDto().setBeginTime(getBeginTime()).setEndTime(DateUtil.date(endTime));
-        empDto.setValid(validDto);
-
-        // 设置用户卡片信息
-        if (ObjectUtil.isNotEmpty(accountVo.getFactoryId())) {
-            CardDto cardDto = new CardDto().setCardNo(accountVo.getFactoryId().toString());
-            if (deleteCard) {
-                cardDto.setDeleteCard(Boolean.TRUE);
+        if (ObjectUtil.isNotEmpty(endTime)) {
+            Date hikEndTime = DateUtil.parse(HikDefaultConstants.EMP_END_TIME);
+            if (endTime.compareTo(hikEndTime) > 0) {
+                endTime = hikEndTime;
             }
+            ValidDto validDto = new ValidDto().setBeginTime(getBeginTime()).setEndTime(DateUtil.date(endTime));
+            empDto.setValid(validDto);
+        }
+        // 设置用户卡片信息,有物理卡号并且物理卡号>0时设置
+        Long factoryId = accountVo.getFactoryId();
+        if (ObjectUtil.isNotEmpty(factoryId) && factoryId > 0L) {
+            CardDto cardDto = new CardDto().setCardNo(factoryId.toString());
+            cardDto.setDeleteCard(deleteCard);
+
             List<CardDto> cardList = new ArrayList<>();
             cardList.add(cardDto);
 
             CardListDto cardListDto = new CardListDto();
             cardListDto.setList(cardList);
-            if (deleteAllCard) {
-                cardListDto.setDeleteAllCard(Boolean.TRUE);
-            }
+            cardListDto.setDeleteAllCard(deleteAllCard);
+
             empDto.setCardInfo(cardListDto);
         }
 
@@ -117,16 +115,13 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         // String photo = accountVo.getFacePicUrl();
         // if (ObjectUtil.isNotEmpty(photo)) {
         //     FaceDto faceDto = new FaceDto().setFDID("1").setFaceID(1L).setFacePicURL(photo);
-        //     if (deleteFace) {
-        //         faceDto.setDeleteFace(Boolean.TRUE);
-        //     }
+        //     faceDto.setDeleteFace(deleteFace);
         //
         //     List<FaceDto> faceList = new ArrayList<>();
         //     faceList.add(faceDto);
         //     FaceListDto faceListDto = new FaceListDto().setList(faceList);
-        //     if (deleteAllFace) {
-        //         faceListDto.setDeleteAllFace(Boolean.TRUE);
-        //     }
+        //     faceListDto.setDeleteAllFace(deleteAllFace);
+        //
         //     empDto.setFaceInfo(faceListDto);
         // }
 
@@ -374,6 +369,26 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         }
         return getDeviceDto(termVo);
     }
+
+    /**
+     * 发送指定员工信息到所有设备
+     *
+     * @param empDto 待发送员工信息
+     */
+    private void sendEmpToAllDevice(EmpInfoDto empDto) {
+        List<RemoteXfTermVo> termList = remotePtXfTermService.queryListByBrand("hk");
+        if (CollectionUtil.isEmpty(termList)) {
+            throw new ServiceException("没有要处理的设备");
+        }
+        termList.parallelStream().forEach(p -> {
+            scheduledExecutorService.execute(() -> {
+                DeviceDto deviceDto = getDeviceDto(p);
+                R<Void> result = this.createOperatorEmpInfo(deviceDto, empDto);
+                log.info(result.getMsg());
+            });
+        });
+    }
+
     // endregion
 
     // region 设置监听相关
@@ -596,7 +611,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         if (CollectionUtil.isEmpty(accountVoList)) {
             return R.warn("没有要处理的人员");
         }
-         // 并行处理人员列表
+        // 并行处理人员列表
         accountVoList.parallelStream().forEach(p -> {
             scheduledExecutorService.execute(() -> {
                 EmpInfoDto empDto = getEmpInfoDto(p, false, false, false, false, false);
@@ -634,7 +649,7 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         // 并行处理
         termList.parallelStream().forEach(p -> {
             scheduledExecutorService.execute(() -> {
-                DeviceDto device  = getDeviceDto(p);
+                DeviceDto device = getDeviceDto(p);
                 accountVoList.parallelStream().forEach(t -> {
                     EmpInfoDto empInfo = getEmpInfoDto(t, false, false, false, false, false);
                     R<Void> result = createOperatorEmpInfo(device, empInfo);
@@ -653,22 +668,40 @@ public class SendDeviceServiceImpl implements ISendDeviceService {
         }
         EmpInfoDto empDto = getEmpInfoDto(accountVo, false, false, false, false, false);
 
-        List<RemoteXfTermVo> termList = remotePtXfTermService.queryListByBrand("hk");
-        if (CollectionUtil.isEmpty(termList)) {
-            return R.warn("没有要处理的设备");
-        }
-        termList.parallelStream().forEach(p -> {
-            scheduledExecutorService.execute(() -> {
-                DeviceDto deviceDto = getDeviceDto(p);
-                R<Void> result = this.createOperatorEmpInfo(deviceDto, empDto);
-                log.info(result.getMsg());
-            });
-        });
+        sendEmpToAllDevice(empDto);
+
+        return R.ok();
+    }
+
+    @Override
+    public R<Void> upLoadEmpToAllDeviceByUserNo(Long userNo, Date lifeSpan, Boolean deleteUser) {
+        RemoteUserAccountVo accountVo = new RemoteUserAccountVo();
+        accountVo.setUserNo(userNo);
+        accountVo.setLifespan(lifeSpan);
+
+        EmpInfoDto empDto = getEmpInfoDto(accountVo, deleteUser, false, false, false, false);
+
+        sendEmpToAllDevice(empDto);
 
         return R.ok();
     }
+
     // endregion
 
+    @Override
+    public R<Void> upLoadEmpCardToAllDevice(Long userId, Long factorId, Boolean deleteAllCard, Boolean deleteCard) {
+        RemoteUserAccountVo accountVo = remoteUserAccountService.getUserAccountVoBy(userId);
+        if (ObjectUtil.isEmpty(accountVo)) {
+            return R.warn(MessageFormat.format("没有要处理的人员信息,userId:{0}", userId));
+        }
+        accountVo.setFactoryId(factorId);
+        EmpInfoDto empDto = getEmpInfoDto(accountVo, false, false, false, deleteAllCard, deleteCard);
+
+        sendEmpToAllDevice(empDto);
+
+        return R.ok();
+    }
+
     @Override
     public R<Void> queryBatchEmpFormDevice(@NotNull QueryDto dto) {
         DeviceDto device = dto.getDevice();