From 4bbfdb468880f3210a662321bd8bc6df77e0ca1d Mon Sep 17 00:00:00 2001 From: zayac Date: Sat, 8 Jun 2024 23:50:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E23:40=E6=8A=A5=E6=95=B0?= =?UTF-8?q?=E7=9A=84=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zayac/admin/schedule/DailyReport.java | 352 +++++++++--------- 1 file changed, 176 insertions(+), 176 deletions(-) diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index 392ce187..82d3c8ab 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -89,12 +89,12 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() - .stream() - .flatMap(user -> user.getRoles() .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .flatMap(user -> user.getRoles() + .stream() + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); //获取账号不为空的用户 @@ -114,19 +114,19 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() - .stream() - .flatMap(user -> user.getRoles() .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .flatMap(user -> user.getRoles() + .stream() + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); // 获取所有账号的username与用户的映射 Map accountUsernameToUserMap = dept.getUsers() - .stream() - .flatMap(user -> user.getAccounts() .stream() - .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user))) - .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); + .flatMap(user -> user.getAccounts() + .stream() + .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user))) + .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); //获取账号不为空的用户 @@ -134,7 +134,7 @@ public class DailyReport { var assistants = usersByRole.get(ASSISTANT_ROLE_CODE); sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime - .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); + .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); //保存金融数据 saveData(ministerUser, deptUsers, yesterday); @@ -143,7 +143,7 @@ public class DailyReport { } // 一小时发送一次 - @Scheduled(cron = "0 0 * * * ?") + @Scheduled(cron = "0 0 * * * ?, 0 40 23 * * ?") public void generateTeamReportTask() { LocalDateTime nowDateTime = LocalDateTime.now(); LocalDate nowDate = LocalDate.now(); @@ -153,16 +153,16 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() - .stream() - .flatMap(user -> user.getRoles() .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .flatMap(user -> user.getRoles() + .stream() + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE); userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate - .atStartOfDay(), nowDateTime, null)); + .atStartOfDay(), nowDateTime, null)); }); } @@ -171,8 +171,8 @@ public class DailyReport { UserWithRolesAndAccountsResp ministerUser) { List finances = financeService.getFinanceByDate(date); Map> userFinances = finances.stream() - .filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName())) - .collect(Collectors.groupingBy(FinanceDO::getAgentName)); + .filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName())) + .collect(Collectors.groupingBy(FinanceDO::getAgentName)); userFinances.forEach((agentName, userFinancesList) -> { UserWithRolesAndAccountsResp user = userWithRolesAndAccountsRespMap.get(agentName); if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) { @@ -193,76 +193,76 @@ public class DailyReport { LocalDate date) { TeamMemberReq memberListReq = TeamMemberReq.builder() - .registerStartDate(date) - .registerEndDate(date) - .startDate(date) - .endDate(date) - .registerSort(1) - .status(1) - .pageSize(100) - .build(); + .registerStartDate(date) + .registerEndDate(date) + .startDate(date) + .endDate(date) + .registerSort(1) + .status(1) + .pageSize(100) + .build(); List>> futureList = new ArrayList<>(); ministerUser.getAccounts().forEach(account -> { CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { + }); CompletableFuture> teamMembersFuture = memberPaginationCompletableFuture - .thenApply(MemberPagination::getList) - .thenApplyAsync(members -> members.stream() - .filter(member -> member.getDeposit() != null && member.getDeposit() - .compareTo(BigDecimal.ZERO) == 0) - .collect(Collectors.toList()), asyncTaskExecutor) - .thenComposeAsync(membersWithoutDep -> { - List> memberFutures = membersWithoutDep.stream() - .map(memberWithoutDep -> { - PayRecordListReq req = PayRecordListReq.builder() - .startDate(date) - .endDate(date) - .pageSize(100) - .id(memberWithoutDep.getId()) - .build(); - CompletableFuture>> completableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { - }); - return completableFuture.thenApplyAsync(pagination -> { - if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination - .getScoreAmountTotal() - .compareTo(BigDecimal.ZERO) == 0) { - return memberWithoutDep; - } else { - return null; - } - }, asyncTaskExecutor); - }) - .toList(); + .thenApply(MemberPagination::getList) + .thenApplyAsync(members -> members.stream() + .filter(member -> member.getDeposit() != null && member.getDeposit() + .compareTo(BigDecimal.ZERO) == 0) + .collect(Collectors.toList()), asyncTaskExecutor) + .thenComposeAsync(membersWithoutDep -> { + List> memberFutures = membersWithoutDep.stream() + .map(memberWithoutDep -> { + PayRecordListReq req = PayRecordListReq.builder() + .startDate(date) + .endDate(date) + .pageSize(100) + .id(memberWithoutDep.getId()) + .build(); + CompletableFuture>> completableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { + }); + return completableFuture.thenApplyAsync(pagination -> { + if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination + .getScoreAmountTotal() + .compareTo(BigDecimal.ZERO) == 0) { + return memberWithoutDep; + } else { + return null; + } + }, asyncTaskExecutor); + }) + .toList(); - return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> memberFutures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList())); - }, asyncTaskExecutor) - .thenApplyAsync(membersWithoutDep -> { - // 发送给每个account关联的user用户 - if (!membersWithoutDep.isEmpty()) { - Map> groupByTopAgentName = membersWithoutDep.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); - groupByTopAgentName.forEach((accountName, accountMembers) -> { - String notification = telegramMessageService - .buildFailedPayMessage(accountName, accountMembers); - UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); - if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - String botToken = StrUtil.isEmpty(currUser.getBotToken()) - ? ministerUser.getBotToken() - : currUser.getBotToken(); - telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); - } - }); - } - return membersWithoutDep; - }, asyncTaskExecutor); + return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) + .thenApply(v -> memberFutures.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + }, asyncTaskExecutor) + .thenApplyAsync(membersWithoutDep -> { + // 发送给每个account关联的user用户 + if (!membersWithoutDep.isEmpty()) { + Map> groupByTopAgentName = membersWithoutDep.stream() + .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService + .buildFailedPayMessage(accountName, accountMembers); + UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) + ? ministerUser.getBotToken() + : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); + } + }); + } + return membersWithoutDep; + }, asyncTaskExecutor); futureList.add(teamMembersFuture); }); @@ -271,22 +271,22 @@ public class DailyReport { allFutures.thenRunAsync(() -> { // 主线下的所有的存款失败用户 List allTeamMembers = futureList.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .toList(); + .map(CompletableFuture::join) + .flatMap(List::stream) + .toList(); Map> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); + .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); StringBuilder combinedNotification = new StringBuilder(); groupByTopAgentName.forEach((accountName, accountMembers) -> { String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); combinedNotification.append(notification).append("\n"); }); telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification - .toString()); + .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification + .toString()); if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser - .getReportIds(), combinedNotification.toString()); + .getReportIds(), combinedNotification.toString()); } }, asyncTaskExecutor).exceptionally(ex -> { @@ -309,7 +309,7 @@ public class DailyReport { AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build(); deptUsers.forEach(deptUser -> tasks - .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); + .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); CompletableFuture allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); allTasks.join(); @@ -336,32 +336,32 @@ public class DailyReport { TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); List> futures = accounts.stream() - .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) - .thenAcceptAsync(team -> { - int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); - int totalNewFirstDeposit = team.getList() - .stream() - .mapToInt(TeamAccount::getFirstDepositNum) - .sum(); - synchronized (totals) { - totals[0] += totalNewMember; - totals[1] += totalNewFirstDeposit; - } - String percent = getPercent(totalNewFirstDeposit, totalNewMember); - synchronized (rows) { - rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String - .valueOf(totalNewFirstDeposit), percent}); - } - }, asyncTaskExecutor)) - .toList(); + .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) + .thenAcceptAsync(team -> { + int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); + int totalNewFirstDeposit = team.getList() + .stream() + .mapToInt(TeamAccount::getFirstDepositNum) + .sum(); + synchronized (totals) { + totals[0] += totalNewMember; + totals[1] += totalNewFirstDeposit; + } + String percent = getPercent(totalNewFirstDeposit, totalNewMember); + synchronized (rows) { + rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String + .valueOf(totalNewFirstDeposit), percent}); + } + }, asyncTaskExecutor)) + .toList(); CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); // 对 rows 列表进行排序 rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0])); - rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), - getPercent(totals[1], totals[0])}); + rows.add(new String[]{"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), + getPercent(totals[1], totals[0])}); String message = TableFormatter.formatTableAsHtml(rows); telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message); if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { @@ -399,21 +399,21 @@ public class DailyReport { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() - .map(currAccount -> agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq) - .thenApplyAsync(agentData -> agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(() -> new BusinessException("No data found for report date"))) - .exceptionally(ex -> { - log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); - return null; - })) - .toList(); + .map(currAccount -> agentDataVisualListService + .getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date"))) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); + return null; + })) + .toList(); CompletableFuture userStaticsFuture = CompletableFuture.allOf(futures - .toArray(new CompletableFuture[0])); + .toArray(new CompletableFuture[0])); userStaticsFuture.thenRunAsync(() -> { List agentDataList = futures.stream().map(future -> { try { @@ -429,8 +429,8 @@ public class DailyReport { if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { String botToken = StrUtil.isEmpty(deptUser.getBotToken()) - ? ministerUser.getBotToken() - : deptUser.getBotToken(); + ? ministerUser.getBotToken() + : deptUser.getBotToken(); telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message); } }, asyncTaskExecutor).exceptionally(ex -> { @@ -457,33 +457,33 @@ public class DailyReport { List> tasks = new ArrayList<>(); TeamFinanceReq teamFinanceReq = TeamFinanceReq.builder() - .pageNum(1) - .pageSize(999) - .commissionDate(reportDate) - .build(); + .pageNum(1) + .pageSize(999) + .commissionDate(reportDate) + .build(); // 异步处理 ministerUserAccounts CompletableFuture ministerAccountsFuture = CompletableFuture.runAsync(() -> { List> accountFutures = ministerUser.getAccounts() - .stream() - .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) - .thenAcceptAsync(financePagination -> { - List financeReqList = financePagination.getList().stream().map(finance -> { - FinanceDO financeDO = new FinanceDO(); - BeanUtil.copyProperties(finance, financeDO); - return financeDO; - }).toList(); - financeService.addAll(financeReqList); - FinanceSumReq financeSumReq = new FinanceSumReq(); - BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); - financeSumService.add(financeSumReq); - }, asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error processing minister accounts for account {}", accountResp - .getUsername(), ex); - return null; - })) - .toList(); + .stream() + .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) + .thenAcceptAsync(financePagination -> { + List financeReqList = financePagination.getList().stream().map(finance -> { + FinanceDO financeDO = new FinanceDO(); + BeanUtil.copyProperties(finance, financeDO); + return financeDO; + }).toList(); + financeService.addAll(financeReqList); + FinanceSumReq financeSumReq = new FinanceSumReq(); + BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); + financeSumService.add(financeSumReq); + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error processing minister accounts for account {}", accountResp + .getUsername(), ex); + return null; + })) + .toList(); CompletableFuture.allOf(accountFutures.toArray(new CompletableFuture[0])).join(); }, asyncTaskExecutor).exceptionally(ex -> { log.error("Error processing minister accounts", ex); @@ -493,36 +493,36 @@ public class DailyReport { // 异步处理 deptUsers AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() - .monthDate(reportDate) - .build(); + .monthDate(reportDate) + .build(); deptUsers.forEach(deptUser -> { CompletableFuture deptUserFuture = CompletableFuture.runAsync(() -> { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() - .map(currAccount -> agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq) - .thenApplyAsync(agentData -> { - Statics statics = agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(() -> new BusinessException("No data found for report date")); - StatsDO statsDO = new StatsDO(); - BeanUtil.copyProperties(statics, statsDO); - return statsDO; - }, asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error fetching data for account {}: {}", currAccount.getId(), ex - .getMessage()); - return null; - })) - .toList(); + .map(currAccount -> agentDataVisualListService + .getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> { + Statics statics = agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date")); + StatsDO statsDO = new StatsDO(); + BeanUtil.copyProperties(statics, statsDO); + return statsDO; + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex + .getMessage()); + return null; + })) + .toList(); List list = futures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList()); statsService.addAll(list); }, asyncTaskExecutor).exceptionally(ex -> {