diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index 7556ec80..3d506134 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -62,7 +62,7 @@ import java.util.stream.Collectors; @Component @RequiredArgsConstructor @Slf4j -//@Profile("prod") +@Profile("prod") public class DailyReport { private final TeamService teamService; private final DeptService deptService; @@ -80,7 +80,7 @@ public class DailyReport { private static final String ASSISTANT_ROLE_CODE = "assistant"; private static final String SEO_ROLE_CODE = "seo"; - @Scheduled(cron = "0 51 11,14,17,21 * * ?") + @Scheduled(cron = "0 40 11,14,17,21 * * ?") public void teamAccountDailyReport() { LocalDateTime nowDateTime = LocalDateTime.now(); LocalDate nowDate = LocalDate.now(); @@ -89,12 +89,12 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() + .stream() + .flatMap(user -> user.getRoles() .stream() - .flatMap(user -> user.getRoles() - .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); //获取账号不为空的用户 @@ -114,19 +114,19 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() + .stream() + .flatMap(user -> user.getRoles() .stream() - .flatMap(user -> user.getRoles() - .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); // 获取所有账号的username与用户的映射 Map accountUsernameToUserMap = dept.getUsers() + .stream() + .flatMap(user -> user.getAccounts() .stream() - .flatMap(user -> user.getAccounts() - .stream() - .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user))) - .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); + .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user))) + .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); //获取账号不为空的用户 @@ -134,7 +134,7 @@ public class DailyReport { var assistants = usersByRole.get(ASSISTANT_ROLE_CODE); sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime - .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); + .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); saveData(ministerUser, deptUsers, yesterday); }); @@ -152,16 +152,16 @@ public class DailyReport { deptWithUsersAndAccounts.forEach(dept -> { //根据用户角色对部门用户进行分组 Map> usersByRole = dept.getUsers() + .stream() + .flatMap(user -> user.getRoles() .stream() - .flatMap(user -> user.getRoles() - .stream() - .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) - .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors - .mapping(Map.Entry::getValue, Collectors.toList()))); + .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) + .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors + .mapping(Map.Entry::getValue, Collectors.toList()))); var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE); - userWithRolesAndAccountsResps.forEach(ministerUser -> - generateAndSendTeamReport(ministerUser, nowDate.atStartOfDay(), nowDateTime, null)); + userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate + .atStartOfDay(), nowDateTime, null)); }); } @@ -175,76 +175,76 @@ public class DailyReport { LocalDate date) { TeamMemberReq memberListReq = TeamMemberReq.builder() - .registerStartDate(date) - .registerEndDate(date) - .startDate(date) - .endDate(date) - .registerSort(1) - .status(1) - .pageSize(100) - .build(); + .registerStartDate(date) + .registerEndDate(date) + .startDate(date) + .endDate(date) + .registerSort(1) + .status(1) + .pageSize(100) + .build(); List>> futureList = new ArrayList<>(); ministerUser.getAccounts().forEach(account -> { CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { + }); CompletableFuture> teamMembersFuture = memberPaginationCompletableFuture - .thenApply(MemberPagination::getList) - .thenApplyAsync(members -> members.stream() - .filter(member -> member.getDeposit() != null && member.getDeposit() - .compareTo(BigDecimal.ZERO) == 0) - .collect(Collectors.toList()), asyncTaskExecutor) - .thenComposeAsync(membersWithoutDep -> { - List> memberFutures = membersWithoutDep.stream() - .map(memberWithoutDep -> { - PayRecordListReq req = PayRecordListReq.builder() - .startDate(date) - .endDate(date) - .pageSize(100) - .id(memberWithoutDep.getId()) - .build(); - CompletableFuture>> completableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { - }); - return completableFuture.thenApplyAsync(pagination -> { - if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination - .getScoreAmountTotal() - .compareTo(BigDecimal.ZERO) == 0) { - return memberWithoutDep; - } else { - return null; - } - }, asyncTaskExecutor); - }) - .toList(); - - return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> memberFutures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList())); - }, asyncTaskExecutor) - .thenApplyAsync(membersWithoutDep -> { - // 发送给每个account关联的user用户 - if (!membersWithoutDep.isEmpty()) { - Map> groupByTopAgentName = membersWithoutDep.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); - groupByTopAgentName.forEach((accountName, accountMembers) -> { - String notification = telegramMessageService - .buildFailedPayMessage(accountName, accountMembers); - UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); - if (DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - String botToken = StrUtil.isEmpty(currUser.getBotToken()) - ? ministerUser.getBotToken() - : currUser.getBotToken(); - telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); + .thenApply(MemberPagination::getList) + .thenApplyAsync(members -> members.stream() + .filter(member -> member.getDeposit() != null && member.getDeposit() + .compareTo(BigDecimal.ZERO) == 0) + .collect(Collectors.toList()), asyncTaskExecutor) + .thenComposeAsync(membersWithoutDep -> { + List> memberFutures = membersWithoutDep.stream() + .map(memberWithoutDep -> { + PayRecordListReq req = PayRecordListReq.builder() + .startDate(date) + .endDate(date) + .pageSize(100) + .id(memberWithoutDep.getId()) + .build(); + CompletableFuture>> completableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { + }); + return completableFuture.thenApplyAsync(pagination -> { + if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination + .getScoreAmountTotal() + .compareTo(BigDecimal.ZERO) == 0) { + return memberWithoutDep; + } else { + return null; } - }); - } - return membersWithoutDep; - }, asyncTaskExecutor); + }, asyncTaskExecutor); + }) + .toList(); + + return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) + .thenApply(v -> memberFutures.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + }, asyncTaskExecutor) + .thenApplyAsync(membersWithoutDep -> { + // 发送给每个account关联的user用户 + if (!membersWithoutDep.isEmpty()) { + Map> groupByTopAgentName = membersWithoutDep.stream() + .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService + .buildFailedPayMessage(accountName, accountMembers); + UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); + if (DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) + ? ministerUser.getBotToken() + : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); + } + }); + } + return membersWithoutDep; + }, asyncTaskExecutor); futureList.add(teamMembersFuture); }); @@ -253,22 +253,22 @@ public class DailyReport { allFutures.thenRunAsync(() -> { // 主线下的所有的存款失败用户 List allTeamMembers = futureList.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .toList(); + .map(CompletableFuture::join) + .flatMap(List::stream) + .toList(); Map> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); + .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); StringBuilder combinedNotification = new StringBuilder(); groupByTopAgentName.forEach((accountName, accountMembers) -> { String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); combinedNotification.append(notification).append("\n"); }); telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification - .toString()); + .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification + .toString()); if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser - .getReportIds(), combinedNotification.toString()); + .getReportIds(), combinedNotification.toString()); } }, asyncTaskExecutor).exceptionally(ex -> { @@ -286,12 +286,12 @@ public class DailyReport { List deptUsers) { List> tasks = new ArrayList<>(); - //tasks.add(generateAndSendTeamReport(ministerUser, startDateTime, endDateTime, assistants)); + tasks.add(generateAndSendTeamReport(ministerUser, startDateTime, endDateTime, assistants)); AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build(); deptUsers.forEach(deptUser -> tasks - .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); + .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); CompletableFuture allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); allTasks.join(); @@ -318,32 +318,32 @@ public class DailyReport { TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); List> futures = accounts.stream() - .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) - .thenAcceptAsync(team -> { - int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); - int totalNewFirstDeposit = team.getList() - .stream() - .mapToInt(TeamAccount::getFirstDepositNum) - .sum(); - synchronized (totals) { - totals[0] += totalNewMember; - totals[1] += totalNewFirstDeposit; - } - String percent = getPercent(totalNewFirstDeposit, totalNewMember); - synchronized (rows) { - rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String - .valueOf(totalNewFirstDeposit), percent}); - } - }, asyncTaskExecutor)) - .toList(); + .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) + .thenAcceptAsync(team -> { + int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); + int totalNewFirstDeposit = team.getList() + .stream() + .mapToInt(TeamAccount::getFirstDepositNum) + .sum(); + synchronized (totals) { + totals[0] += totalNewMember; + totals[1] += totalNewFirstDeposit; + } + String percent = getPercent(totalNewFirstDeposit, totalNewMember); + synchronized (rows) { + rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String + .valueOf(totalNewFirstDeposit), percent}); + } + }, asyncTaskExecutor)) + .toList(); CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); // 对 rows 列表进行排序 rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0])); - rows.add(new String[]{"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), - getPercent(totals[1], totals[0])}); + rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), + getPercent(totals[1], totals[0])}); String message = TableFormatter.formatTableAsHtml(rows); if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message); @@ -380,21 +380,21 @@ public class DailyReport { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() - .map(currAccount -> agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq) - .thenApplyAsync(agentData -> agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(() -> new BusinessException("No data found for report date"))) - .exceptionally(ex -> { - log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); - return null; - })) - .toList(); + .map(currAccount -> agentDataVisualListService + .getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date"))) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); + return null; + })) + .toList(); CompletableFuture userStaticsFuture = CompletableFuture.allOf(futures - .toArray(new CompletableFuture[0])); + .toArray(new CompletableFuture[0])); userStaticsFuture.thenRunAsync(() -> { List agentDataList = futures.stream().map(future -> { try { @@ -410,8 +410,8 @@ public class DailyReport { if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { String botToken = StrUtil.isEmpty(deptUser.getBotToken()) - ? ministerUser.getBotToken() - : deptUser.getBotToken(); + ? ministerUser.getBotToken() + : deptUser.getBotToken(); telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message); } }, asyncTaskExecutor).exceptionally(ex -> { @@ -438,33 +438,33 @@ public class DailyReport { List> tasks = new ArrayList<>(); TeamFinanceReq teamFinanceReq = TeamFinanceReq.builder() - .pageNum(1) - .pageSize(999) - .commissionDate(reportDate) - .build(); + .pageNum(1) + .pageSize(999) + .commissionDate(reportDate) + .build(); // 异步处理 ministerUserAccounts CompletableFuture ministerAccountsFuture = CompletableFuture.runAsync(() -> { List> accountFutures = ministerUser.getAccounts() - .stream() - .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) - .thenAcceptAsync(financePagination -> { - List financeReqList = financePagination.getList().stream().map(finance -> { - FinanceDO financeDO = new FinanceDO(); - BeanUtil.copyProperties(finance, financeDO); - return financeDO; - }).toList(); - financeService.addAll(financeReqList); - FinanceSumReq financeSumReq = new FinanceSumReq(); - BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); - financeSumService.add(financeSumReq); - }, asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error processing minister accounts for account {}", accountResp - .getUsername(), ex); - return null; - })) - .toList(); + .stream() + .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) + .thenAcceptAsync(financePagination -> { + List financeReqList = financePagination.getList().stream().map(finance -> { + FinanceDO financeDO = new FinanceDO(); + BeanUtil.copyProperties(finance, financeDO); + return financeDO; + }).toList(); + financeService.addAll(financeReqList); + FinanceSumReq financeSumReq = new FinanceSumReq(); + BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); + financeSumService.add(financeSumReq); + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error processing minister accounts for account {}", accountResp + .getUsername(), ex); + return null; + })) + .toList(); CompletableFuture.allOf(accountFutures.toArray(new CompletableFuture[0])).join(); }, asyncTaskExecutor).exceptionally(ex -> { log.error("Error processing minister accounts", ex); @@ -474,36 +474,36 @@ public class DailyReport { // 异步处理 deptUsers AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() - .monthDate(reportDate) - .build(); + .monthDate(reportDate) + .build(); deptUsers.forEach(deptUser -> { CompletableFuture deptUserFuture = CompletableFuture.runAsync(() -> { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() - .map(currAccount -> agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq) - .thenApplyAsync(agentData -> { - Statics statics = agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(() -> new BusinessException("No data found for report date")); - StatsDO statsDO = new StatsDO(); - BeanUtil.copyProperties(statics, statsDO); - return statsDO; - }, asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error fetching data for account {}: {}", currAccount.getId(), ex - .getMessage()); - return null; - })) - .toList(); + .map(currAccount -> agentDataVisualListService + .getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> { + Statics statics = agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date")); + StatsDO statsDO = new StatsDO(); + BeanUtil.copyProperties(statics, statsDO); + return statsDO; + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex + .getMessage()); + return null; + })) + .toList(); List list = futures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList()); statsService.addAll(list); }, asyncTaskExecutor).exceptionally(ex -> { diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java index 2aaeadb6..f31d4aba 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java @@ -42,8 +42,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +54,9 @@ public class DepositService { private final CompletableFutureWebClientService completableFutureWebClientService; private final TelegramMessageService telegramMessageService; + private static final String BOT_TOKEN = "6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto"; + private static final Long TELEGRAM_CHAT_ID = 6054562838L; + public CompletableFuture processDeposits(UserWithRolesAndAccountsResp ministerUser, Map accountUsernameToUserMap, AccountResp account, @@ -79,104 +80,110 @@ public class DepositService { LocalDate nowDate, LocalDateTime nowDateTime, Executor asyncTaskExecutor) { - PayRecordsListReq req = PayRecordsListReq.builder() - .startDate(nowDate) - .endDate(nowDate) - .pageSize(100) - .payState(2) - .build(); + List hasNewDepositAccounts = findChangedTeamAccount(previousTeam, currentTeam).stream() + .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) + .toList(); + + List> allTasks = hasNewDepositAccounts.stream() + .map(accountWithChange -> processAccountChanges(accountWithChange, ministerUser, accountUsernameToUserMap, account, nowDate, nowDateTime, asyncTaskExecutor)) + .toList(); + + return CompletableFuture.allOf(allTasks.toArray(new CompletableFuture[0])); + } + + private CompletableFuture processAccountChanges(TeamAccountWithChange accountWithChange, + UserWithRolesAndAccountsResp ministerUser, + Map accountUsernameToUserMap, + AccountResp account, + LocalDate nowDate, + LocalDateTime nowDateTime, + Executor asyncTaskExecutor) { + PayRecordsListReq req = createPayRecordsListReq(accountWithChange.getAgentName(), nowDate); CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { }); - List changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam); - Set changedAgentNames = changedTeamAccounts.stream() - .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) - .map(TeamAccountWithChange::getAgentName) - .collect(Collectors.toSet()); - return paginationCompletableFuture.thenApplyAsync(Pagination::getList, asyncTaskExecutor) - .thenComposeAsync(payRecords -> processPayRecords(payRecords, changedTeamAccounts, changedAgentNames, ministerUser, accountUsernameToUserMap, account, nowDateTime, asyncTaskExecutor), asyncTaskExecutor) + + StringBuilder depositResults = new StringBuilder(); + AtomicInteger depositCounter = new AtomicInteger(0); + + return paginationCompletableFuture.thenApply(Pagination::getList) + .thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor) + .thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults, depositCounter), asyncTaskExecutor) .exceptionally(ex -> { - log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage()); + log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex + .getMessage()); return null; }); } - private CompletableFuture processPayRecords(List payRecords, - List changedTeamAccounts, - Set changedAgentNames, - UserWithRolesAndAccountsResp minister, - Map accountUsernameToUserMap, - AccountResp account, - LocalDateTime nowDateTime, - Executor asyncTaskExecutor) { - Map> agentNameWithNames = payRecords.stream() - .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames - .contains(record.getAgentName())) - .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors.mapping(PayRecord::getName, Collectors - .collectingAndThen(Collectors.toSet(), ArrayList::new)))); - - List> futures = agentNameWithNames.entrySet() - .stream() - .map(entry -> processAgentRecords(entry.getKey(), entry - .getValue(), changedTeamAccounts, payRecords, minister, accountUsernameToUserMap, account, asyncTaskExecutor)) - .toList(); - - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + private PayRecordsListReq createPayRecordsListReq(String agentName, LocalDate nowDate) { + return PayRecordsListReq.builder() + .startDate(nowDate) + .endDate(nowDate) + .pageSize(100) + .payState(2) + .agentName(agentName) + .build(); } - private CompletableFuture processAgentRecords(String agentName, - List names, - List changedTeamAccounts, - List payRecords, - UserWithRolesAndAccountsResp minister, - Map accountUsernameToUserMap, - AccountResp account, - Executor asyncTaskExecutor) { - StringBuilder depositResults = new StringBuilder(); - AtomicInteger depositCounter = new AtomicInteger(0); + private CompletableFuture processPayRecords(List payRecords, + TeamAccountWithChange accountWithChange, + AccountResp account, + LocalDate nowDate, + LocalDateTime nowDateTime, + StringBuilder depositResults, + AtomicInteger depositCounter, + Executor asyncTaskExecutor) { + Map earliestPayRecords = payRecords.stream() + .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) + .collect(Collectors.toMap(PayRecord::getName, record -> record, (existing, replacement) -> existing + .getCreatedAt() + .isBefore(replacement.getCreatedAt()) ? existing : replacement)); - TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream() - .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName)) - .findFirst() - .orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName))); + List validPayRecords = earliestPayRecords.values().stream().toList(); - List> fetchFutures = names.stream() - .map(name -> fetchMemberDetails(account, name, LocalDate.now(), asyncTaskExecutor) - .thenAcceptAsync(member -> payRecords.stream() - .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) - .findFirst() - .ifPresent(record -> { - if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) { - depositResults.append(telegramMessageService.buildDepositResultsMessage(member - .getName(), record.getScoreAmount())); - } - }), asyncTaskExecutor) + List> fetchMemberFutures = validPayRecords.stream() + .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor) + .thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor) .exceptionally(ex -> { - log.error("Error fetching details for member {}: {}", name, ex.getMessage()); + log.error("Error fetching details for member {}: {}", payRecord.getName(), ex.getMessage()); return null; })) .toList(); - CompletableFuture allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0])); + return CompletableFuture.allOf(fetchMemberFutures.toArray(new CompletableFuture[0])); + } - return allFetches.thenRunAsync(() -> { - if (!depositResults.isEmpty()) { - String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount - .getNewDepositNum(), depositResults.toString(), targetTeamAccount.getFirstDepositNum()); - var currUser = accountUsernameToUserMap.get(agentName); - if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - String botToken = StrUtil.isEmpty(currUser.getBotToken()) - ? minister.getBotToken() - : currUser.getBotToken(); - telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); - } - telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); + private void processMemberDetails(Member member, + PayRecord payRecord, + TeamAccountWithChange accountWithChange, + StringBuilder depositResults, + AtomicInteger depositCounter) { + if (payRecord.getCreatedAt().equals(member.getFirstPayAt()) && depositCounter + .getAndIncrement() < accountWithChange.getNewDepositNum()) { + depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord + .getScoreAmount())); + } + } + + private void sendNotification(TeamAccountWithChange accountWithChange, + UserWithRolesAndAccountsResp ministerUser, + Map accountUsernameToUserMap, + StringBuilder depositResults, + AtomicInteger depositCounter) { + if (depositCounter.get() > 0) { + String notification = telegramMessageService.buildDepositMessage(accountWithChange + .getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange + .getFirstDepositNum()); + var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName()); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) + ? ministerUser.getBotToken() + : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); } - }, asyncTaskExecutor).exceptionally(ex -> { - log.error("Error sending notification for account {}: {}", account.getUsername(), ex.getMessage()); - return null; - }); + telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification); + } } private CompletableFuture fetchMemberDetails(AccountResp account, @@ -197,7 +204,7 @@ public class DepositService { return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) .thenApplyAsync(list -> list.stream() .findFirst() - .orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")), asyncTaskExecutor) + .orElseThrow(() -> new BusinessException("没有找到匹配的成员信息")), asyncTaskExecutor) .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor); } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java index 70cd558a..3ed8ed17 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java @@ -16,6 +16,7 @@ package com.zayac.admin.service; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.constant.ApiPathConstants; @@ -26,7 +27,6 @@ import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; -import com.zayac.admin.system.service.AccountService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; @@ -48,7 +48,6 @@ import java.util.stream.Collectors; public class RegistrationService { private final CompletableFutureWebClientService completableFutureWebClientService; private final TelegramMessageService telegramMessageService; - private final AccountService accountService; public CompletableFuture processRegistration(UserWithRolesAndAccountsResp minister, AccountResp account, @@ -75,7 +74,7 @@ public class RegistrationService { return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) .thenAcceptAsync(members -> { log.info("Successfully get [{}] new registered members", members.size()); - if (!members.isEmpty()) { + if (CollUtil.isNotEmpty(members)) { Map> groupByTopAgentName = members.stream() .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); groupByTopAgentName.forEach((accountName, accountMembers) -> {