|
|
@@ -12,9 +12,11 @@ import org.dromara.ecs.api.domain.dto.RemoteBatchSyncErrorDto;
|
|
|
import org.dromara.ecs.api.domain.dto.RemoteBatchSyncResultDto;
|
|
|
import org.dromara.ecs.api.domain.dto.RemoteSectionSyncBatchDto;
|
|
|
import org.dromara.ecs.api.domain.dto.RemoteSectionSyncDto;
|
|
|
+import org.dromara.server.common.constant.SyncResourceConstants;
|
|
|
import org.dromara.server.sync.business.batch.mapper.SectionRecordMapper;
|
|
|
import org.dromara.server.sync.domain.SyncBatchErrorRecord;
|
|
|
import org.dromara.server.sync.domain.dto.request.SectionRecordRequest;
|
|
|
+import org.dromara.server.sync.domain.dto.result.BatchChunkSyncResult;
|
|
|
import org.dromara.server.sync.domain.dto.result.BatchSyncResult;
|
|
|
import org.dromara.server.sync.domain.dto.wrapper.BatchSyncRequest;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
@@ -30,148 +32,73 @@ import java.util.List;
|
|
|
* @Version 1.0
|
|
|
* @since jdk17
|
|
|
*/
|
|
|
-
|
|
|
-@Service
|
|
|
+@Service(SyncResourceConstants.SECTION)
|
|
|
@Slf4j
|
|
|
@RequiredArgsConstructor
|
|
|
-public class SectionBatchSyncService {
|
|
|
+public class SectionBatchSyncService implements BatchSyncBusinessHandler<SectionRecordRequest, RemoteSectionSyncBatchDto> {
|
|
|
|
|
|
private static final String API_NAME = "sections";
|
|
|
private static final String RESOURCE_TYPE = "SECTION";
|
|
|
|
|
|
private final SectionRecordMapper sectionRecordMapper;
|
|
|
private final SyncBatchTrackerService syncBatchTrackerService;
|
|
|
+ private final CommonBatchSyncService commonBatchSyncService;
|
|
|
|
|
|
@DubboReference
|
|
|
private RemoteSectionSyncService remoteSectionSyncService;
|
|
|
|
|
|
/**
|
|
|
- * 批量同步课表节次,完成参数校验、远端调用、错误归集和批次结果汇总。
|
|
|
+ * 兼容原有入口,委托公共批量编排服务执行 section 同步。
|
|
|
*
|
|
|
* @param request 批量课表节次请求
|
|
|
* @return 批次同步结果
|
|
|
*/
|
|
|
public BatchSyncResult syncSections(BatchSyncRequest<SectionRecordRequest> request) {
|
|
|
- if (request == null || CollUtil.isEmpty(request.getRecords())) {
|
|
|
- throw new ServiceException("records 不能为空");
|
|
|
- }
|
|
|
-
|
|
|
- List<SectionRecordRequest> requestRecords = request.getRecords();
|
|
|
- String tenantId = requestRecords.stream().map(SectionRecordRequest::getTenantId).filter(StringUtils::isNotBlank).findFirst().orElse("default");
|
|
|
- String batchId = syncBatchTrackerService.start(API_NAME, RESOURCE_TYPE, tenantId, request, requestRecords.size());
|
|
|
-
|
|
|
- List<RemoteSectionSyncBatchDto> records = new ArrayList<>();
|
|
|
- List<SyncBatchErrorRecord> errors = new ArrayList<>();
|
|
|
-
|
|
|
- // 逐条校验并转换请求数据,校验失败的记录直接记入错误明细。
|
|
|
- for (int i = 0; i < requestRecords.size(); i++) {
|
|
|
- SectionRecordRequest record = requestRecords.get(i);
|
|
|
- try {
|
|
|
- validateRecord(record);
|
|
|
- records.add(sectionRecordMapper.toBatchDto(batchId, i + 1, record));
|
|
|
- } catch (Exception e) {
|
|
|
- errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, i + 1, getBizKey(record),
|
|
|
- SyncBatchTrackerService.ERROR_CODE_VALIDATION, e.getMessage(), record));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- String sectionIds = records.stream().map(RemoteSectionSyncBatchDto::getRecord).map(RemoteSectionSyncDto::getSectionId).filter(StringUtils::isNotBlank).limit(10).reduce((a, b) -> a + "," + b).orElse("");
|
|
|
- log.info("[batch-sync-sections-start]-[batchId:{}]-[tenant:{}]-[total:{}]-[valid:{}]-[invalid:{}]-[sampleSectionIds:{}]",
|
|
|
- batchId, tenantId, requestRecords.size(), records.size(), errors.size(), sectionIds);
|
|
|
-
|
|
|
- int successCount = 0;
|
|
|
- if (CollUtil.isNotEmpty(records)) {
|
|
|
- try {
|
|
|
- // 调用远端批量同步服务,并根据返回结果汇总成功数与业务失败明细。
|
|
|
- R<RemoteBatchSyncResultDto> result = remoteSectionSyncService.syncSectionBatch(records);
|
|
|
- if (R.isError(result)) {
|
|
|
- String msg = result.getMsg();
|
|
|
- log.error("[batch-sync-sections-fail]-[batchId:{}]-[msg:{}]", batchId, msg);
|
|
|
- for (RemoteSectionSyncBatchDto record : records) {
|
|
|
- errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, record.getStartIndex(), record.getRecord().getSectionId(),
|
|
|
- SyncBatchTrackerService.ERROR_CODE_BIZ, msg, record.getRecord()));
|
|
|
- }
|
|
|
- } else {
|
|
|
- RemoteBatchSyncResultDto data = result.getData();
|
|
|
- if (data != null) {
|
|
|
- successCount = data.getSuccess() == null ? 0 : data.getSuccess();
|
|
|
- addRemoteErrors(batchId, records, errors, data);
|
|
|
- log.info("[batch-sync-sections-done]-[batchId:{}]-[total:{}]-[成功:{}]-[失败:{}]",
|
|
|
- batchId, data.getTotal(), data.getSuccess(), data.getFailed());
|
|
|
- } else {
|
|
|
- successCount = records.size();
|
|
|
- log.info("[batch-sync-sections-done]-[batchId:{}]-[total:{}]-[远端未返回统计]", batchId, requestRecords.size());
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- // 远端调用异常时,将本次有效记录全部记为业务失败。
|
|
|
- log.error("[batch-sync-sections-exception]-[batchId:{}]-[msg:{}]", batchId, e.getMessage(), e);
|
|
|
- for (RemoteSectionSyncBatchDto record : records) {
|
|
|
- errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, record.getStartIndex(), record.getRecord().getSectionId(),
|
|
|
- SyncBatchTrackerService.ERROR_CODE_BIZ, e.getMessage(), record.getRecord()));
|
|
|
- }
|
|
|
- }
|
|
|
+ if (request != null) {
|
|
|
+ request.setBusinessType(getBusinessType());
|
|
|
}
|
|
|
+ return commonBatchSyncService.sync(request);
|
|
|
+ }
|
|
|
|
|
|
- // 持久化错误明细并结束批次,最终返回汇总结果。
|
|
|
- syncBatchTrackerService.saveErrors(errors);
|
|
|
- int errorCount = errors.size();
|
|
|
- syncBatchTrackerService.finish(batchId, requestRecords.size(), successCount, errorCount);
|
|
|
-
|
|
|
- BatchSyncResult batchSyncResult = new BatchSyncResult();
|
|
|
- batchSyncResult.setBatchId(batchId);
|
|
|
- batchSyncResult.setTotal(requestRecords.size());
|
|
|
- batchSyncResult.setSuccess(successCount);
|
|
|
- batchSyncResult.setError(errorCount);
|
|
|
- batchSyncResult.setStatus(resolveStatus(requestRecords.size(), successCount, errorCount));
|
|
|
- return batchSyncResult;
|
|
|
+ /**
|
|
|
+ * 返回当前处理器对应的业务类型。
|
|
|
+ *
|
|
|
+ * @return 业务类型
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public String getBusinessType() {
|
|
|
+ return SyncResourceConstants.SECTION;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 将远端返回的失败明细归集到本地批次错误列表。
|
|
|
+ * 返回批次跟踪中使用的接口名称。
|
|
|
*
|
|
|
- * @param batchId 批次ID
|
|
|
- * @param records 有效请求记录
|
|
|
- * @param errors 错误明细集合
|
|
|
- * @param result 远端批量同步结果
|
|
|
+ * @return 接口名称
|
|
|
*/
|
|
|
- private void addRemoteErrors(String batchId, List<RemoteSectionSyncBatchDto> records, List<SyncBatchErrorRecord> errors, RemoteBatchSyncResultDto result) {
|
|
|
- if (CollUtil.isNotEmpty(result.getErrors())) {
|
|
|
- for (RemoteBatchSyncErrorDto error : result.getErrors()) {
|
|
|
- RemoteSectionSyncBatchDto record = records.stream().filter(item -> item.getStartIndex().equals(error.getRecordIndex())).findFirst().orElse(null);
|
|
|
- errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, error.getRecordIndex(), error.getBizKey(),
|
|
|
- SyncBatchTrackerService.ERROR_CODE_BIZ, error.getErrorMessage(), record == null ? null : record.getRecord()));
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public String getApiName() {
|
|
|
+ return API_NAME;
|
|
|
+ }
|
|
|
|
|
|
- int remoteFailed = result.getFailed() == null ? 0 : result.getFailed();
|
|
|
- for (int i = 0; i < remoteFailed && i < records.size(); i++) {
|
|
|
- RemoteSectionSyncBatchDto record = records.get(i);
|
|
|
- errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, record.getStartIndex(), record.getRecord().getSectionId(),
|
|
|
- SyncBatchTrackerService.ERROR_CODE_BIZ, "远端处理失败,未返回具体失败明细", record.getRecord()));
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * 返回批次跟踪中使用的资源类型。
|
|
|
+ *
|
|
|
+ * @return 资源类型
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public String getResourceType() {
|
|
|
+ return RESOURCE_TYPE;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 根据总数、成功数和失败数计算批次状态。
|
|
|
+ * 从请求记录中解析租户编号。
|
|
|
*
|
|
|
- * @param total 请求总数
|
|
|
- * @param success 成功数
|
|
|
- * @param error 失败数
|
|
|
- * @return 批次状态
|
|
|
+ * @param records 课表节次记录列表
|
|
|
+ * @return 租户编号
|
|
|
*/
|
|
|
- private String resolveStatus(int total, int success, int error) {
|
|
|
- if (total != success + error) {
|
|
|
- return SyncBatchTrackerService.STATUS_COUNT_MISMATCH;
|
|
|
- }
|
|
|
- if (error == 0) {
|
|
|
- return SyncBatchTrackerService.STATUS_SUCCESS;
|
|
|
- }
|
|
|
- if (success == 0) {
|
|
|
- return SyncBatchTrackerService.STATUS_FAIL;
|
|
|
- }
|
|
|
- return SyncBatchTrackerService.STATUS_PARTIAL_FAIL;
|
|
|
+ @Override
|
|
|
+ public String resolveTenantId(List<SectionRecordRequest> records) {
|
|
|
+ return records.stream().map(SectionRecordRequest::getTenantId).filter(StringUtils::isNotBlank).findFirst().orElse("default");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -179,7 +106,8 @@ public class SectionBatchSyncService {
|
|
|
*
|
|
|
* @param record 单条课表节次记录
|
|
|
*/
|
|
|
- private void validateRecord(SectionRecordRequest record) {
|
|
|
+ @Override
|
|
|
+ public void validateRecord(SectionRecordRequest record) {
|
|
|
if (record == null) {
|
|
|
throw new ServiceException("record 不能为空");
|
|
|
}
|
|
|
@@ -194,7 +122,100 @@ public class SectionBatchSyncService {
|
|
|
* @param record 单条课表节次记录
|
|
|
* @return 业务主键
|
|
|
*/
|
|
|
- private String getBizKey(SectionRecordRequest record) {
|
|
|
+ @Override
|
|
|
+ public String getBizKey(SectionRecordRequest record) {
|
|
|
return record == null ? null : record.getSectionId();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将单条课表节次记录转换为远端批量同步 DTO。
|
|
|
+ *
|
|
|
+ * @param batchId 批次ID
|
|
|
+ * @param recordIndex 原始记录序号
|
|
|
+ * @param record 单条课表节次记录
|
|
|
+ * @return 远端批量同步 DTO
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public RemoteSectionSyncBatchDto toDispatchDto(String batchId, int recordIndex, SectionRecordRequest record) {
|
|
|
+ return sectionRecordMapper.toBatchDto(batchId, recordIndex, record);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行单个分片的 section 同步,并返回分片处理结果。
|
|
|
+ *
|
|
|
+ * @param batchId 批次ID
|
|
|
+ * @param chunkRecords 分片记录列表
|
|
|
+ * @return 分片处理结果
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public BatchChunkSyncResult dispatchChunk(String batchId, List<RemoteSectionSyncBatchDto> chunkRecords) {
|
|
|
+ BatchChunkSyncResult chunkResult = new BatchChunkSyncResult();
|
|
|
+ chunkResult.setTotal(chunkRecords.size());
|
|
|
+ chunkResult.setSuccess(0);
|
|
|
+ chunkResult.setFailed(0);
|
|
|
+ chunkResult.setErrors(new ArrayList<>());
|
|
|
+
|
|
|
+ String sectionIds = chunkRecords.stream().map(RemoteSectionSyncBatchDto::getRecord).map(RemoteSectionSyncDto::getSectionId)
|
|
|
+ .filter(StringUtils::isNotBlank).limit(10).reduce((a, b) -> a + "," + b).orElse("");
|
|
|
+ log.info("[batch-sync-sections-chunk-start]-[batchId:{}]-[size:{}]-[sampleSectionIds:{}]", batchId, chunkRecords.size(), sectionIds);
|
|
|
+
|
|
|
+ try {
|
|
|
+ R<RemoteBatchSyncResultDto> result = remoteSectionSyncService.syncSectionBatch(chunkRecords);
|
|
|
+ if (R.isError(result)) {
|
|
|
+ String msg = result.getMsg();
|
|
|
+ log.error("[batch-sync-sections-chunk-fail]-[batchId:{}]-[msg:{}]", batchId, msg);
|
|
|
+ for (RemoteSectionSyncBatchDto record : chunkRecords) {
|
|
|
+ chunkResult.getErrors().add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, record.getStartIndex(), record.getRecord().getSectionId(),
|
|
|
+ SyncBatchTrackerService.ERROR_CODE_BIZ, msg, record.getRecord()));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ RemoteBatchSyncResultDto data = result.getData();
|
|
|
+ if (data != null) {
|
|
|
+ chunkResult.setTotal(data.getTotal() == null ? chunkRecords.size() : data.getTotal());
|
|
|
+ chunkResult.setSuccess(data.getSuccess() == null ? 0 : data.getSuccess());
|
|
|
+ addRemoteErrors(batchId, chunkRecords, chunkResult.getErrors(), data);
|
|
|
+ log.info("[batch-sync-sections-chunk-done]-[batchId:{}]-[total:{}]-[成功:{}]-[失败:{}]",
|
|
|
+ batchId, data.getTotal(), data.getSuccess(), data.getFailed());
|
|
|
+ } else {
|
|
|
+ chunkResult.setSuccess(chunkRecords.size());
|
|
|
+ log.info("[batch-sync-sections-chunk-done]-[batchId:{}]-[total:{}]-[远端未返回统计]", batchId, chunkRecords.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[batch-sync-sections-chunk-exception]-[batchId:{}]-[msg:{}]", batchId, e.getMessage(), e);
|
|
|
+ for (RemoteSectionSyncBatchDto record : chunkRecords) {
|
|
|
+ chunkResult.getErrors().add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, record.getStartIndex(), record.getRecord().getSectionId(),
|
|
|
+ SyncBatchTrackerService.ERROR_CODE_BIZ, e.getMessage(), record.getRecord()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ chunkResult.setFailed(chunkResult.getErrors().size());
|
|
|
+ return chunkResult;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将远端返回的失败明细归集到本地错误列表。
|
|
|
+ *
|
|
|
+ * @param batchId 批次ID
|
|
|
+ * @param records 分片有效记录
|
|
|
+ * @param errors 错误明细集合
|
|
|
+ * @param result 远端批量同步结果
|
|
|
+ */
|
|
|
+ private void addRemoteErrors(String batchId, List<RemoteSectionSyncBatchDto> records, List<SyncBatchErrorRecord> errors, RemoteBatchSyncResultDto result) {
|
|
|
+ if (CollUtil.isNotEmpty(result.getErrors())) {
|
|
|
+ for (RemoteBatchSyncErrorDto error : result.getErrors()) {
|
|
|
+ RemoteSectionSyncBatchDto record = records.stream().filter(item -> item.getStartIndex().equals(error.getRecordIndex())).findFirst().orElse(null);
|
|
|
+ errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, error.getRecordIndex(), error.getBizKey(),
|
|
|
+ SyncBatchTrackerService.ERROR_CODE_BIZ, error.getErrorMessage(), record == null ? null : record.getRecord()));
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ int remoteFailed = result.getFailed() == null ? 0 : result.getFailed();
|
|
|
+ for (int i = 0; i < remoteFailed && i < records.size(); i++) {
|
|
|
+ RemoteSectionSyncBatchDto record = records.get(i);
|
|
|
+ errors.add(syncBatchTrackerService.error(batchId, RESOURCE_TYPE, record.getStartIndex(), record.getRecord().getSectionId(),
|
|
|
+ SyncBatchTrackerService.ERROR_CODE_BIZ, "远端处理失败,未返回具体失败明细", record.getRecord()));
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|