From c0a15b6580666881f76e85332d075914dc1fd0a7 Mon Sep 17 00:00:00 2001 From: zayac Date: Mon, 10 Jun 2024 12:32:26 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=86=E8=8A=82=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zayac/admin/schedule/DailyReport.java | 415 +++++++++--------- 1 file changed, 212 insertions(+), 203 deletions(-) diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index 6119107d..73b502c6 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -66,7 +66,7 @@ import java.util.stream.IntStream; @Component @RequiredArgsConstructor @Slf4j -@Profile("dev") +@Profile("prod") public class DailyReport { private final TeamService teamService; private final DeptService deptService; @@ -95,12 +95,12 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() + .stream() + .flatMap(user -> user.getRoles() .stream() - .flatMap(user -> user.getRoles() - .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); //获取账号不为空的用户 @@ -130,40 +130,40 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() + .stream() + .flatMap(user -> user.getRoles() .stream() - .flatMap(user -> user.getRoles() - .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); // 获取所有账号的username与用户的映射 Map accountUsernameToUserMap = dept.getUsers() + .stream() + .flatMap(user -> user.getAccounts() .stream() - .flatMap(user -> user.getAccounts() - .stream() - .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user))) - .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); + .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user))) + .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); //建立团队之间账号的联系 Map accountNameWithTopAgentName = new HashMap<>(); dept.getUsers() + .stream() + .flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream()) + .forEach(accountResp -> ministerUser.getAccounts() .stream() - .flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream()) - .forEach(accountResp -> ministerUser.getAccounts() - .stream() - .filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount - .getPlatformId())) - .findFirst() - .ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp - .getUsername(), ministerAccount.getUsername()))); + .filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount + .getPlatformId())) + .findFirst() + .ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp + .getUsername(), ministerAccount.getUsername()))); //获取账号不为空的用户 var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList(); var assistants = usersByRole.get(ASSISTANT_ROLE_CODE); sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime - .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); + .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); //保存数据 saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName); getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); @@ -180,16 +180,16 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() + .stream() + .flatMap(user -> user.getRoles() .stream() - .flatMap(user -> user.getRoles() - .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE); userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate - .atStartOfDay(), nowDateTime, null)); + .atStartOfDay(), nowDateTime, null)); }); } @@ -198,8 +198,8 @@ public class DailyReport { UserWithRolesAndAccountsResp ministerUser) { List finances = financeService.getFinanceByDate(date); Map> userFinances = finances.stream() - .filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName())) - .collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName()))); + .filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName())) + .collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName()))); userFinances.forEach((user, userFinancesList) -> { if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) { @@ -215,25 +215,25 @@ public class DailyReport { LocalDate date, int sumReg) { int pageSize = 100; - int totalPages = (int) Math.ceil((double) sumReg / pageSize); + int totalPages = (int)Math.ceil((double)sumReg / pageSize); return IntStream.range(0, totalPages).mapToObj(page -> { int currentPageSize = page == totalPages - 1 - ? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize) - : pageSize; + ? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize) + : pageSize; TeamMemberReq memberListReq = TeamMemberReq.builder() - .registerStartDate(date) - .registerEndDate(date) - .startDate(date) - .endDate(date) - .registerSort(1) - .status(1) - .pageSize(currentPageSize) - .pageNum(page + 1) - .build(); + .registerStartDate(date) + .registerEndDate(date) + .startDate(date) + .endDate(date) + .registerSort(1) + .status(1) + .pageSize(currentPageSize) + .pageNum(page + 1) + .build(); return completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference>>>() { - }); + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference>>>() { + }); }).toList(); } @@ -253,88 +253,98 @@ public class DailyReport { sumReg = (sumReg == 0) ? 100 : sumReg; List>>> paginationFutures = createFuturesForPagination(account, date, sumReg); - CompletableFuture allPaginationFutures = CompletableFuture.allOf(paginationFutures.toArray(new CompletableFuture[0])); + CompletableFuture allPaginationFutures = CompletableFuture.allOf(paginationFutures + .toArray(new CompletableFuture[0])); CompletableFuture> aggregatedMembersFuture = allPaginationFutures.thenApply(v -> { - return paginationFutures.stream() - .map(CompletableFuture::join) - .flatMap(memberPagination -> { - List members = memberPagination.getList(); - log.info("members size:{}", members.size()); - return members.stream(); - }) - .collect(Collectors.toList()); + return paginationFutures.stream().map(CompletableFuture::join).flatMap(memberPagination -> { + List members = memberPagination.getList(); + log.info("members size:{}", members.size()); + return members.stream(); + }).collect(Collectors.toList()); }); - CompletableFuture> filteredMembersFuture = aggregatedMembersFuture.thenApplyAsync(members -> { - return members.stream() - .filter(member -> member.getDeposit() != null && member.getDeposit().compareTo(BigDecimal.ZERO) == 0) - .collect(Collectors.toList()); - }, asyncTaskExecutor); + CompletableFuture> filteredMembersFuture = aggregatedMembersFuture + .thenApplyAsync(members -> members.stream() + .filter(member -> member.getDeposit() != null && member.getDeposit() + .compareTo(BigDecimal.ZERO) == 0) + .collect(Collectors.toList()), asyncTaskExecutor); - CompletableFuture> membersWithFailedPayFuture = filteredMembersFuture.thenComposeAsync(membersWithoutDep -> { - List> payRecordFutures = membersWithoutDep.stream().map(memberWithoutDep -> { - PayRecordsListReq req = PayRecordsListReq.builder() - .startDate(date) - .endDate(date) - .pageSize(10) - .memberName(memberWithoutDep.getName()) - .build(); - return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> { - if (CollUtil.isNotEmpty(pagination.getList()) - && pagination.getList().stream().noneMatch(payRecord -> payRecord.getPayStatus() == 2)) { - return memberWithoutDep; - } else { - return null; - } - }, asyncTaskExecutor); - }).toList(); + CompletableFuture> membersWithFailedPayFuture = filteredMembersFuture + .thenComposeAsync(membersWithoutDep -> { + List> payRecordFutures = membersWithoutDep.stream() + .map(memberWithoutDep -> { + PayRecordsListReq req = PayRecordsListReq.builder() + .startDate(date) + .endDate(date) + .pageSize(10) + .memberName(memberWithoutDep.getName()) + .build(); + return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> { + if (CollUtil.isNotEmpty(pagination.getList()) && pagination.getList() + .stream() + .noneMatch(payRecord -> payRecord.getPayStatus() == 2)) { + return memberWithoutDep; + } else { + return null; + } + }, asyncTaskExecutor); + }) + .toList(); - return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0])) + return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0])) .thenApply(v -> payRecordFutures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList())); - }, asyncTaskExecutor); + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + }, asyncTaskExecutor); - CompletableFuture> notificationFuture = membersWithFailedPayFuture.thenApplyAsync(membersWithoutDep -> { - if (CollUtil.isNotEmpty(membersWithoutDep)) { - Map> groupByTopAgentName = membersWithoutDep.stream() + CompletableFuture> notificationFuture = membersWithFailedPayFuture + .thenApplyAsync(membersWithoutDep -> { + if (CollUtil.isNotEmpty(membersWithoutDep)) { + Map> groupByTopAgentName = membersWithoutDep.stream() .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); - groupByTopAgentName.forEach((accountName, accountMembers) -> { - String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); - UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); - if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? ministerUser.getBotToken() : currUser.getBotToken(); - telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification); - } - }); - } - return membersWithoutDep; - }, asyncTaskExecutor); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService + .buildFailedPayMessage(accountName, accountMembers); + UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) + ? ministerUser.getBotToken() + : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification); + } + }); + } + return membersWithoutDep; + }, asyncTaskExecutor); accountFutureList.add(notificationFuture); }); - CompletableFuture allAccountFutures = CompletableFuture.allOf(accountFutureList.toArray(new CompletableFuture[0])); + CompletableFuture allAccountFutures = CompletableFuture.allOf(accountFutureList + .toArray(new CompletableFuture[0])); allAccountFutures.thenRunAsync(() -> { List allTeamMembers = accountFutureList.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .toList(); + .map(CompletableFuture::join) + .flatMap(List::stream) + .toList(); log.info("All failed pay members size: {}", allTeamMembers.size()); Map> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); + .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); StringBuilder combinedNotification = new StringBuilder(); groupByTopAgentName.forEach((accountName, accountMembers) -> { String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); combinedNotification.append(notification).append("\n"); }); - telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification.toString()); + telegramMessageService + .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification + .toString()); if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { - telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), combinedNotification.toString()); + telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser + .getReportIds(), combinedNotification.toString()); } }, asyncTaskExecutor).exceptionally(ex -> { log.error("Error collecting and processing data", ex); @@ -342,15 +352,14 @@ public class DailyReport { }); } - private CompletableFuture>> fetchPaginationPayRecordWithRetry(AccountResp account, PayRecordsListReq req) { - return CompletableFuture.supplyAsync(() -> completableFutureWebClientService.fetchDataForAccount( - account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, - new ParameterizedTypeReference>>>() { - }), asyncTaskExecutor).thenCompose(future -> future) - .exceptionallyCompose(ex -> { - log.error("Error fetching pay records, retrying...", ex); - return fetchPaginationPayRecordWithRetry(account, req); - }); + private CompletableFuture>> fetchPaginationPayRecordWithRetry(AccountResp account, + PayRecordsListReq req) { + return CompletableFuture.supplyAsync(() -> completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference>>>() { + }), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> { + log.error("Error fetching pay records, retrying...", ex); + return fetchPaginationPayRecordWithRetry(account, req); + }); } private CompletableFuture>> fetchPayRecordsWithRetry(AccountResp account, @@ -362,11 +371,11 @@ public class DailyReport { rateLimiter.acquire(); // 通过限流器限流 } return CompletableFuture.supplyAsync(() -> completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference>>>() { - }), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> { - log.error("Error fetching pay records", ex); - return null; - }); + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference>>>() { + }), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> { + log.error("Error fetching pay records", ex); + return null; + }); } private void sendDailyReport(LocalDate reportDate, @@ -382,7 +391,7 @@ public class DailyReport { AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build(); deptUsers.forEach(deptUser -> tasks - .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); + .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); CompletableFuture allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); allTasks.join(); @@ -409,32 +418,32 @@ public class DailyReport { TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); List> futures = accounts.stream() - .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) - .thenAcceptAsync(team -> { - int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); - int totalNewFirstDeposit = team.getList() - .stream() - .mapToInt(TeamAccount::getFirstDepositNum) - .sum(); - synchronized (totals) { - totals[0] += totalNewMember; - totals[1] += totalNewFirstDeposit; - } - String percent = getPercent(totalNewFirstDeposit, totalNewMember); - synchronized (rows) { - rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String - .valueOf(totalNewFirstDeposit), percent}); - } - }, asyncTaskExecutor)) - .toList(); + .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) + .thenAcceptAsync(team -> { + int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); + int totalNewFirstDeposit = team.getList() + .stream() + .mapToInt(TeamAccount::getFirstDepositNum) + .sum(); + synchronized (totals) { + totals[0] += totalNewMember; + totals[1] += totalNewFirstDeposit; + } + String percent = getPercent(totalNewFirstDeposit, totalNewMember); + synchronized (rows) { + rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String + .valueOf(totalNewFirstDeposit), percent}); + } + }, asyncTaskExecutor)) + .toList(); CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); // 对 rows 列表进行排序 rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0])); - rows.add(new String[]{"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), - getPercent(totals[1], totals[0])}); + rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), + getPercent(totals[1], totals[0])}); String message = TableFormatter.formatTableAsHtml(rows); telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message); if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { @@ -472,23 +481,23 @@ public class DailyReport { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() - // 团队账号暂时不用具体的代理线数据 - .filter(accountResp -> !accountResp.getIsTeam()) - .map(currAccount -> agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq) - .thenApplyAsync(agentData -> agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(() -> new BusinessException("No data found for report date"))) - .exceptionally(ex -> { - log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); - return null; - })) - .toList(); + // 团队账号暂时不用具体的代理线数据 + .filter(accountResp -> !accountResp.getIsTeam()) + .map(currAccount -> agentDataVisualListService + .getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date"))) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); + return null; + })) + .toList(); CompletableFuture userStaticsFuture = CompletableFuture.allOf(futures - .toArray(new CompletableFuture[0])); + .toArray(new CompletableFuture[0])); userStaticsFuture.thenRunAsync(() -> { List agentDataList = futures.stream().map(future -> { try { @@ -504,8 +513,8 @@ public class DailyReport { if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { String botToken = StrUtil.isEmpty(deptUser.getBotToken()) - ? ministerUser.getBotToken() - : deptUser.getBotToken(); + ? ministerUser.getBotToken() + : deptUser.getBotToken(); telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message); } }, asyncTaskExecutor).exceptionally(ex -> { @@ -533,34 +542,34 @@ public class DailyReport { List> tasks = new ArrayList<>(); TeamFinanceReq teamFinanceReq = TeamFinanceReq.builder() - .pageNum(1) - .pageSize(999) - .commissionDate(reportDate) - .build(); + .pageNum(1) + .pageSize(999) + .commissionDate(reportDate) + .build(); // 异步处理 ministerUserAccounts CompletableFuture ministerAccountsFuture = CompletableFuture.runAsync(() -> { List> accountFutures = ministerUser.getAccounts() - .stream() - .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) - .thenAcceptAsync(financePagination -> { - List financeReqList = financePagination.getList().stream().map(finance -> { - FinanceDO financeDO = new FinanceDO(); - BeanUtil.copyProperties(finance, financeDO); - financeDO.setTopAgentName(accountResp.getUsername()); - return financeDO; - }).toList(); - financeService.addAll(financeReqList); - FinanceSumReq financeSumReq = new FinanceSumReq(); - BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); - financeSumService.add(financeSumReq); - }, asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error processing minister accounts for account {}", accountResp - .getUsername(), ex); - return null; - })) - .toList(); + .stream() + .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) + .thenAcceptAsync(financePagination -> { + List financeReqList = financePagination.getList().stream().map(finance -> { + FinanceDO financeDO = new FinanceDO(); + BeanUtil.copyProperties(finance, financeDO); + financeDO.setTopAgentName(accountResp.getUsername()); + return financeDO; + }).toList(); + financeService.addAll(financeReqList); + FinanceSumReq financeSumReq = new FinanceSumReq(); + BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); + financeSumService.add(financeSumReq); + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error processing minister accounts for account {}", accountResp + .getUsername(), ex); + return null; + })) + .toList(); CompletableFuture.allOf(accountFutures.toArray(new CompletableFuture[0])).join(); }, asyncTaskExecutor).exceptionally(ex -> { log.error("Error processing minister accounts", ex); @@ -570,37 +579,37 @@ public class DailyReport { // 异步处理 deptUsers AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() - .monthDate(reportDate) - .build(); + .monthDate(reportDate) + .build(); deptUsers.forEach(deptUser -> { CompletableFuture deptUserFuture = CompletableFuture.runAsync(() -> { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() - .map(currAccount -> agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq) - .thenApplyAsync(agentData -> { - Statics statics = agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(() -> new BusinessException("No data found for report date")); - StatsDO statsDO = new StatsDO(); - BeanUtil.copyProperties(statics, statsDO); - statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername())); - return statsDO; - }, asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error fetching data for account {}: {}", currAccount.getId(), ex - .getMessage()); - return null; - })) - .toList(); + .map(currAccount -> agentDataVisualListService + .getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> { + Statics statics = agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date")); + StatsDO statsDO = new StatsDO(); + BeanUtil.copyProperties(statics, statsDO); + statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername())); + return statsDO; + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex + .getMessage()); + return null; + })) + .toList(); List list = futures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList()); statsService.addAll(list); }, asyncTaskExecutor).exceptionally(ex -> {