|
|
@@ -21,8 +21,11 @@ import org.dromara.common.core.constant.CacheNames;
|
|
|
import org.dromara.common.core.domain.R;
|
|
|
import org.dromara.common.core.enums.*;
|
|
|
import org.dromara.common.core.exception.consume.ConsumeException;
|
|
|
+import org.dromara.common.core.utils.SpringUtils;
|
|
|
+import org.dromara.common.json.utils.JsonUtils;
|
|
|
import org.dromara.common.redis.utils.CacheUtils;
|
|
|
import org.dromara.consume.api.RemoteConsumeService;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
@@ -30,9 +33,11 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
import java.math.BigDecimal;
|
|
|
import java.text.MessageFormat;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.atomic.LongAdder;
|
|
|
|
|
|
/**
|
|
|
* name: PayOrderBusiness
|
|
|
@@ -53,6 +58,7 @@ public class PayOrderBusiness {
|
|
|
|
|
|
@DubboReference
|
|
|
private final RemoteConsumeService remoteConsumeService;
|
|
|
+ private final SpringUtils springUtils;
|
|
|
|
|
|
/**
|
|
|
* 生成收支原始订单信息,写t_xf_creditAccountBack表
|
|
|
@@ -145,26 +151,65 @@ public class PayOrderBusiness {
|
|
|
*
|
|
|
* @return 入账结果
|
|
|
*/
|
|
|
- @Transactional(rollbackFor = ConsumeException.class)
|
|
|
- public R<Object> createSubsidyOrder() {
|
|
|
- List<java.lang.String> infoList = new ArrayList<>();
|
|
|
- Map<java.lang.String, Integer> mapCount = new HashMap<>();
|
|
|
- mapCount.put("successCount", 0);
|
|
|
- mapCount.put("errorCount", 0);
|
|
|
+ @Async("threadPoolTaskExecutor")
|
|
|
+ public void createSubsidyOrder() {
|
|
|
// 获取需要入账的补助明细
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
List<PtSubsidyitemVo> list = payBaseBusiness.selectPostSubsidyItem();
|
|
|
if (ObjUtil.isEmpty(list) || list.isEmpty()) {
|
|
|
- return R.fail("[补助到账]-[没有需要入账的补助明细]");
|
|
|
+ log.info("[补助到账]-[没有需要入账的补助明细]");
|
|
|
+ }
|
|
|
+ // 使用线程安全的计数器和集合
|
|
|
+ LongAdder successCount = new LongAdder();
|
|
|
+ LongAdder failCount = new LongAdder();
|
|
|
+ ConcurrentLinkedQueue<String> errInfo = new ConcurrentLinkedQueue<>();
|
|
|
+
|
|
|
+ // 动态计算最优批次大小
|
|
|
+ int batchSize = calculateOptimalBatchSize(list.size());
|
|
|
+ List<List<PtSubsidyitemVo>> batches = partitionList(new ArrayList<>(list), batchSize);
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<CompletableFuture<Void>> batchFutures = batches.stream()
|
|
|
+ .map(batch -> CompletableFuture.runAsync(() -> {
|
|
|
+ batch.parallelStream().forEach(item -> {
|
|
|
+
|
|
|
+ R<PurseInOutBo> result = doSubsidyOrderHandle(item);
|
|
|
+
|
|
|
+ if (R.isSuccess(result)) {
|
|
|
+ successCount.increment();
|
|
|
+ } else {
|
|
|
+ failCount.increment();
|
|
|
+ errInfo.add(new StringBuilder()
|
|
|
+ .append('[').append(result.getMsg()).append("]-[")
|
|
|
+ .append(JsonUtils.toJsonString(item)).append(']')
|
|
|
+ .toString());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }))
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ // 等待所有批次完成
|
|
|
+ CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("补助到账处理异常", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("补助到账处理完成,待入账数数:{},成功数:{},失败数:{},耗时:{}ms", list.size(), successCount.sum(),
|
|
|
+ failCount.sum(), System.currentTimeMillis() - startTime);
|
|
|
+
|
|
|
+ if (!errInfo.isEmpty()) {
|
|
|
+ log.error("补助到账失败详情: {}", String.join("; ", errInfo));
|
|
|
}
|
|
|
- list.parallelStream().forEach(item -> {
|
|
|
- // doSubsidyOrderHandle(item, mapCount, infoList);
|
|
|
- // 使用线程池进行补助入账
|
|
|
- threadPoolTaskExecutor.execute(() -> doSubsidyOrderHandle(item, mapCount, infoList));
|
|
|
- });
|
|
|
- java.lang.String message = MessageFormat.format("补助到账完成,成功[{0}]条,失败[{1}]条", mapCount.get("successCount"), mapCount.get("errorCount"));
|
|
|
- infoList.forEach(log::error);
|
|
|
- log.info(message);
|
|
|
- return R.ok(message);
|
|
|
+
|
|
|
+ //list.parallelStream().forEach(item -> {
|
|
|
+ // // doSubsidyOrderHandle(item, mapCount, infoList);
|
|
|
+ // // 使用线程池进行补助入账
|
|
|
+ // threadPoolTaskExecutor.execute(() -> doSubsidyOrderHandle(item, mapCount, infoList));
|
|
|
+ //});
|
|
|
+ //java.lang.String message = MessageFormat.format("补助到账完成,成功[{0}]条,失败[{1}]条", mapCount.get("successCount"), mapCount.get("errorCount"));
|
|
|
+ //infoList.forEach(log::error);
|
|
|
+ //log.info(message);
|
|
|
+ //return R.ok(message);
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -290,7 +335,23 @@ public class PayOrderBusiness {
|
|
|
infoList.add(MessageFormat.format("[补助到账失败]-[补助信息:{0}]-[错误信息:{1}]", JSONUtil.toJsonStr(orderBo), e.getMessage()));
|
|
|
}
|
|
|
}
|
|
|
+ private R<PurseInOutBo> doSubsidyOrderHandle(PtSubsidyitemVo item) {
|
|
|
+ PurseInOutBo orderBo = createSubsidyPostBo(item);
|
|
|
|
|
|
+ try {
|
|
|
+ R<PurseInOutBo> result = SpringUtils.getAopProxy(this).createNormalOrder(orderBo);
|
|
|
+ if(R.isSuccess(result)) {
|
|
|
+ // 到账成功,更新补助明细的 到账状态
|
|
|
+ item.setGetDate(DateUtil.date());
|
|
|
+ item.setFillStatus("Y");
|
|
|
+ PtSubsidyitemBo bo = BeanUtil.copyProperties(item, PtSubsidyitemBo.class);
|
|
|
+ payBaseBusiness.updatePostSubsidyItemStatus(bo);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ } catch (Exception e) {
|
|
|
+ return R.fail(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
private void updateCacheBalanceByUserId(Long id, BigDecimal dealMoney) {
|
|
|
BigDecimal balance = remoteConsumeService.getUserTotalBalance(id);
|
|
|
balance = balance.add(dealMoney);
|
|
|
@@ -308,4 +369,25 @@ public class PayOrderBusiness {
|
|
|
CacheUtils.put(CacheNames.USER_BAG_BALANCE, key, balance);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 计算最优批次大小
|
|
|
+ */
|
|
|
+ private int calculateOptimalBatchSize(int totalSize) {
|
|
|
+ if (totalSize <= 100) return Math.max(10, totalSize);
|
|
|
+ if (totalSize <= 1000) return 50;
|
|
|
+ if (totalSize <= 10000) return 100;
|
|
|
+ return 200; // 最大批次大小
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将列表分割成固定大小的子列表
|
|
|
+ */
|
|
|
+ private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
|
|
|
+ List<List<T>> partitions = new ArrayList<>();
|
|
|
+ for (int i = 0; i < list.size(); i += batchSize) {
|
|
|
+ int endIndex = Math.min(i + batchSize, list.size());
|
|
|
+ partitions.add(list.subList(i, endIndex));
|
|
|
+ }
|
|
|
+ return partitions;
|
|
|
+ }
|
|
|
}
|