细节优化

This commit is contained in:
zayac 2024-06-10 12:32:26 +08:00
parent 708182e67f
commit c0a15b6580

View File

@ -66,7 +66,7 @@ import java.util.stream.IntStream;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@Profile("dev") @Profile("prod")
public class DailyReport { public class DailyReport {
private final TeamService teamService; private final TeamService teamService;
private final DeptService deptService; private final DeptService deptService;
@ -95,12 +95,12 @@ public class DailyReport {
deptWithUsersAndAccounts.forEach(dept -> { deptWithUsersAndAccounts.forEach(dept -> {
//根据用户角色对部门用户进行分组 //根据用户角色对部门用户进行分组
Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers() Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers()
.stream()
.flatMap(user -> user.getRoles()
.stream() .stream()
.flatMap(user -> user.getRoles() .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.stream() .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) .mapping(Map.Entry::getValue, Collectors.toList())));
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
//获取账号不为空的用户 //获取账号不为空的用户
@ -130,40 +130,40 @@ public class DailyReport {
deptWithUsersAndAccounts.forEach(dept -> { deptWithUsersAndAccounts.forEach(dept -> {
//根据用户角色对部门用户进行分组 //根据用户角色对部门用户进行分组
Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers() Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers()
.stream()
.flatMap(user -> user.getRoles()
.stream() .stream()
.flatMap(user -> user.getRoles() .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.stream() .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) .mapping(Map.Entry::getValue, Collectors.toList())));
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
// 获取所有账号的username与用户的映射 // 获取所有账号的username与用户的映射
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap = dept.getUsers() Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap = dept.getUsers()
.stream()
.flatMap(user -> user.getAccounts()
.stream() .stream()
.flatMap(user -> user.getAccounts() .map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user)))
.stream() .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
.map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user)))
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
//建立团队之间账号的联系 //建立团队之间账号的联系
Map<String, String> accountNameWithTopAgentName = new HashMap<>(); Map<String, String> accountNameWithTopAgentName = new HashMap<>();
dept.getUsers() dept.getUsers()
.stream()
.flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream())
.forEach(accountResp -> ministerUser.getAccounts()
.stream() .stream()
.flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream()) .filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount
.forEach(accountResp -> ministerUser.getAccounts() .getPlatformId()))
.stream() .findFirst()
.filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount .ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp
.getPlatformId())) .getUsername(), ministerAccount.getUsername())));
.findFirst()
.ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp
.getUsername(), ministerAccount.getUsername())));
//获取账号不为空的用户 //获取账号不为空的用户
var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList(); var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList();
var assistants = usersByRole.get(ASSISTANT_ROLE_CODE); var assistants = usersByRole.get(ASSISTANT_ROLE_CODE);
sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime
.of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers);
//保存数据 //保存数据
saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName); saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName);
getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday);
@ -180,16 +180,16 @@ public class DailyReport {
deptWithUsersAndAccounts.forEach(dept -> { deptWithUsersAndAccounts.forEach(dept -> {
//根据用户角色对部门用户进行分组 //根据用户角色对部门用户进行分组
Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers() Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers()
.stream()
.flatMap(user -> user.getRoles()
.stream() .stream()
.flatMap(user -> user.getRoles() .map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.stream() .collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user))) .mapping(Map.Entry::getValue, Collectors.toList())));
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE); var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE);
userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate
.atStartOfDay(), nowDateTime, null)); .atStartOfDay(), nowDateTime, null));
}); });
} }
@ -198,8 +198,8 @@ public class DailyReport {
UserWithRolesAndAccountsResp ministerUser) { UserWithRolesAndAccountsResp ministerUser) {
List<FinanceDO> finances = financeService.getFinanceByDate(date); List<FinanceDO> finances = financeService.getFinanceByDate(date);
Map<UserWithRolesAndAccountsResp, List<FinanceDO>> userFinances = finances.stream() Map<UserWithRolesAndAccountsResp, List<FinanceDO>> userFinances = finances.stream()
.filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName())) .filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName()))
.collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName()))); .collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName())));
userFinances.forEach((user, userFinancesList) -> { userFinances.forEach((user, userFinancesList) -> {
if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) { if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) {
@ -215,25 +215,25 @@ public class DailyReport {
LocalDate date, LocalDate date,
int sumReg) { int sumReg) {
int pageSize = 100; int pageSize = 100;
int totalPages = (int) Math.ceil((double) sumReg / pageSize); int totalPages = (int)Math.ceil((double)sumReg / pageSize);
return IntStream.range(0, totalPages).mapToObj(page -> { return IntStream.range(0, totalPages).mapToObj(page -> {
int currentPageSize = page == totalPages - 1 int currentPageSize = page == totalPages - 1
? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize) ? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize)
: pageSize; : pageSize;
TeamMemberReq memberListReq = TeamMemberReq.builder() TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(date) .registerStartDate(date)
.registerEndDate(date) .registerEndDate(date)
.startDate(date) .startDate(date)
.endDate(date) .endDate(date)
.registerSort(1) .registerSort(1)
.status(1) .status(1)
.pageSize(currentPageSize) .pageSize(currentPageSize)
.pageNum(page + 1) .pageNum(page + 1)
.build(); .build();
return completableFutureWebClientService return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() { .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() {
}); });
}).toList(); }).toList();
} }
@ -253,88 +253,98 @@ public class DailyReport {
sumReg = (sumReg == 0) ? 100 : sumReg; sumReg = (sumReg == 0) ? 100 : sumReg;
List<CompletableFuture<MemberPagination<List<TeamMember>>>> paginationFutures = createFuturesForPagination(account, date, sumReg); List<CompletableFuture<MemberPagination<List<TeamMember>>>> paginationFutures = createFuturesForPagination(account, date, sumReg);
CompletableFuture<Void> allPaginationFutures = CompletableFuture.allOf(paginationFutures.toArray(new CompletableFuture[0])); CompletableFuture<Void> allPaginationFutures = CompletableFuture.allOf(paginationFutures
.toArray(new CompletableFuture[0]));
CompletableFuture<List<TeamMember>> aggregatedMembersFuture = allPaginationFutures.thenApply(v -> { CompletableFuture<List<TeamMember>> aggregatedMembersFuture = allPaginationFutures.thenApply(v -> {
return paginationFutures.stream() return paginationFutures.stream().map(CompletableFuture::join).flatMap(memberPagination -> {
.map(CompletableFuture::join) List<TeamMember> members = memberPagination.getList();
.flatMap(memberPagination -> { log.info("members size:{}", members.size());
List<TeamMember> members = memberPagination.getList(); return members.stream();
log.info("members size:{}", members.size()); }).collect(Collectors.toList());
return members.stream();
})
.collect(Collectors.toList());
}); });
CompletableFuture<List<TeamMember>> filteredMembersFuture = aggregatedMembersFuture.thenApplyAsync(members -> { CompletableFuture<List<TeamMember>> filteredMembersFuture = aggregatedMembersFuture
return members.stream() .thenApplyAsync(members -> members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit().compareTo(BigDecimal.ZERO) == 0) .filter(member -> member.getDeposit() != null && member.getDeposit()
.collect(Collectors.toList()); .compareTo(BigDecimal.ZERO) == 0)
}, asyncTaskExecutor); .collect(Collectors.toList()), asyncTaskExecutor);
CompletableFuture<List<TeamMember>> membersWithFailedPayFuture = filteredMembersFuture.thenComposeAsync(membersWithoutDep -> { CompletableFuture<List<TeamMember>> membersWithFailedPayFuture = filteredMembersFuture
List<CompletableFuture<TeamMember>> payRecordFutures = membersWithoutDep.stream().map(memberWithoutDep -> { .thenComposeAsync(membersWithoutDep -> {
PayRecordsListReq req = PayRecordsListReq.builder() List<CompletableFuture<TeamMember>> payRecordFutures = membersWithoutDep.stream()
.startDate(date) .map(memberWithoutDep -> {
.endDate(date) PayRecordsListReq req = PayRecordsListReq.builder()
.pageSize(10) .startDate(date)
.memberName(memberWithoutDep.getName()) .endDate(date)
.build(); .pageSize(10)
return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> { .memberName(memberWithoutDep.getName())
if (CollUtil.isNotEmpty(pagination.getList()) .build();
&& pagination.getList().stream().noneMatch(payRecord -> payRecord.getPayStatus() == 2)) { return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> {
return memberWithoutDep; if (CollUtil.isNotEmpty(pagination.getList()) && pagination.getList()
} else { .stream()
return null; .noneMatch(payRecord -> payRecord.getPayStatus() == 2)) {
} return memberWithoutDep;
}, asyncTaskExecutor); } else {
}).toList(); return null;
}
}, asyncTaskExecutor);
})
.toList();
return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0])) return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> payRecordFutures.stream() .thenApply(v -> payRecordFutures.stream()
.map(CompletableFuture::join) .map(CompletableFuture::join)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList())); .collect(Collectors.toList()));
}, asyncTaskExecutor); }, asyncTaskExecutor);
CompletableFuture<List<TeamMember>> notificationFuture = membersWithFailedPayFuture.thenApplyAsync(membersWithoutDep -> { CompletableFuture<List<TeamMember>> notificationFuture = membersWithFailedPayFuture
if (CollUtil.isNotEmpty(membersWithoutDep)) { .thenApplyAsync(membersWithoutDep -> {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream() if (CollUtil.isNotEmpty(membersWithoutDep)) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)); .collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> { groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); String notification = telegramMessageService
UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); .buildFailedPayMessage(accountName, accountMembers);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName);
String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? ministerUser.getBotToken() : currUser.getBotToken(); if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification); String botToken = StrUtil.isEmpty(currUser.getBotToken())
} ? ministerUser.getBotToken()
}); : currUser.getBotToken();
} telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification);
return membersWithoutDep; }
}, asyncTaskExecutor); });
}
return membersWithoutDep;
}, asyncTaskExecutor);
accountFutureList.add(notificationFuture); accountFutureList.add(notificationFuture);
}); });
CompletableFuture<Void> allAccountFutures = CompletableFuture.allOf(accountFutureList.toArray(new CompletableFuture[0])); CompletableFuture<Void> allAccountFutures = CompletableFuture.allOf(accountFutureList
.toArray(new CompletableFuture[0]));
allAccountFutures.thenRunAsync(() -> { allAccountFutures.thenRunAsync(() -> {
List<TeamMember> allTeamMembers = accountFutureList.stream() List<TeamMember> allTeamMembers = accountFutureList.stream()
.map(CompletableFuture::join) .map(CompletableFuture::join)
.flatMap(List::stream) .flatMap(List::stream)
.toList(); .toList();
log.info("All failed pay members size: {}", allTeamMembers.size()); log.info("All failed pay members size: {}", allTeamMembers.size());
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName))); .collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
StringBuilder combinedNotification = new StringBuilder(); StringBuilder combinedNotification = new StringBuilder();
groupByTopAgentName.forEach((accountName, accountMembers) -> { groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
combinedNotification.append(notification).append("\n"); combinedNotification.append(notification).append("\n");
}); });
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification.toString()); telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
.toString());
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), combinedNotification.toString()); telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser
.getReportIds(), combinedNotification.toString());
} }
}, asyncTaskExecutor).exceptionally(ex -> { }, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error collecting and processing data", ex); log.error("Error collecting and processing data", ex);
@ -342,15 +352,14 @@ public class DailyReport {
}); });
} }
private CompletableFuture<Pagination<List<PayRecord>>> fetchPaginationPayRecordWithRetry(AccountResp account, PayRecordsListReq req) { private CompletableFuture<Pagination<List<PayRecord>>> fetchPaginationPayRecordWithRetry(AccountResp account,
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService.fetchDataForAccount( PayRecordsListReq req) {
account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() { .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
}), asyncTaskExecutor).thenCompose(future -> future) }), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
.exceptionallyCompose(ex -> { log.error("Error fetching pay records, retrying...", ex);
log.error("Error fetching pay records, retrying...", ex); return fetchPaginationPayRecordWithRetry(account, req);
return fetchPaginationPayRecordWithRetry(account, req); });
});
} }
private CompletableFuture<PayRecordList<List<PayRecord>>> fetchPayRecordsWithRetry(AccountResp account, private CompletableFuture<PayRecordList<List<PayRecord>>> fetchPayRecordsWithRetry(AccountResp account,
@ -362,11 +371,11 @@ public class DailyReport {
rateLimiter.acquire(); // 通过限流器限流 rateLimiter.acquire(); // 通过限流器限流
} }
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<PayRecordList<List<PayRecord>>>>() { .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<PayRecordList<List<PayRecord>>>>() {
}), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> { }), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
log.error("Error fetching pay records", ex); log.error("Error fetching pay records", ex);
return null; return null;
}); });
} }
private void sendDailyReport(LocalDate reportDate, private void sendDailyReport(LocalDate reportDate,
@ -382,7 +391,7 @@ public class DailyReport {
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build(); AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build();
deptUsers.forEach(deptUser -> tasks deptUsers.forEach(deptUser -> tasks
.add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate))); .add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate)));
CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
allTasks.join(); allTasks.join();
@ -409,32 +418,32 @@ public class DailyReport {
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
List<CompletableFuture<Void>> futures = accounts.stream() List<CompletableFuture<Void>> futures = accounts.stream()
.map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq)
.thenAcceptAsync(team -> { .thenAcceptAsync(team -> {
int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int totalNewFirstDeposit = team.getList() int totalNewFirstDeposit = team.getList()
.stream() .stream()
.mapToInt(TeamAccount::getFirstDepositNum) .mapToInt(TeamAccount::getFirstDepositNum)
.sum(); .sum();
synchronized (totals) { synchronized (totals) {
totals[0] += totalNewMember; totals[0] += totalNewMember;
totals[1] += totalNewFirstDeposit; totals[1] += totalNewFirstDeposit;
} }
String percent = getPercent(totalNewFirstDeposit, totalNewMember); String percent = getPercent(totalNewFirstDeposit, totalNewMember);
synchronized (rows) { synchronized (rows) {
rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String
.valueOf(totalNewFirstDeposit), percent}); .valueOf(totalNewFirstDeposit), percent});
} }
}, asyncTaskExecutor)) }, asyncTaskExecutor))
.toList(); .toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join(); allFutures.join();
// rows 列表进行排序 // rows 列表进行排序
rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0])); rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
rows.add(new String[]{"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]),
getPercent(totals[1], totals[0])}); getPercent(totals[1], totals[0])});
String message = TableFormatter.formatTableAsHtml(rows); String message = TableFormatter.formatTableAsHtml(rows);
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message); telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
@ -472,23 +481,23 @@ public class DailyReport {
List<AccountResp> currUserAccounts = deptUser.getAccounts(); List<AccountResp> currUserAccounts = deptUser.getAccounts();
List<CompletableFuture<Statics>> futures = currUserAccounts.stream() List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
// 团队账号暂时不用具体的代理线数据 // 团队账号暂时不用具体的代理线数据
.filter(accountResp -> !accountResp.getIsTeam()) .filter(accountResp -> !accountResp.getIsTeam())
.map(currAccount -> agentDataVisualListService .map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq) .getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> agentData.getCurData() .thenApplyAsync(agentData -> agentData.getCurData()
.stream() .stream()
.filter(data -> data.getStaticsDate().equals(reportDate)) .filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst() .findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date"))) .orElseThrow(() -> new BusinessException("No data found for report date")))
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage());
return null; return null;
})) }))
.toList(); .toList();
CompletableFuture<Void> userStaticsFuture = CompletableFuture.allOf(futures CompletableFuture<Void> userStaticsFuture = CompletableFuture.allOf(futures
.toArray(new CompletableFuture[0])); .toArray(new CompletableFuture[0]));
userStaticsFuture.thenRunAsync(() -> { userStaticsFuture.thenRunAsync(() -> {
List<Statics> agentDataList = futures.stream().map(future -> { List<Statics> agentDataList = futures.stream().map(future -> {
try { try {
@ -504,8 +513,8 @@ public class DailyReport {
if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
String botToken = StrUtil.isEmpty(deptUser.getBotToken()) String botToken = StrUtil.isEmpty(deptUser.getBotToken())
? ministerUser.getBotToken() ? ministerUser.getBotToken()
: deptUser.getBotToken(); : deptUser.getBotToken();
telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message); telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message);
} }
}, asyncTaskExecutor).exceptionally(ex -> { }, asyncTaskExecutor).exceptionally(ex -> {
@ -533,34 +542,34 @@ public class DailyReport {
List<CompletableFuture<Void>> tasks = new ArrayList<>(); List<CompletableFuture<Void>> tasks = new ArrayList<>();
TeamFinanceReq teamFinanceReq = TeamFinanceReq.builder() TeamFinanceReq teamFinanceReq = TeamFinanceReq.builder()
.pageNum(1) .pageNum(1)
.pageSize(999) .pageSize(999)
.commissionDate(reportDate) .commissionDate(reportDate)
.build(); .build();
// 异步处理 ministerUserAccounts // 异步处理 ministerUserAccounts
CompletableFuture<Void> ministerAccountsFuture = CompletableFuture.runAsync(() -> { CompletableFuture<Void> ministerAccountsFuture = CompletableFuture.runAsync(() -> {
List<CompletableFuture<Void>> accountFutures = ministerUser.getAccounts() List<CompletableFuture<Void>> accountFutures = ministerUser.getAccounts()
.stream() .stream()
.map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq) .map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq)
.thenAcceptAsync(financePagination -> { .thenAcceptAsync(financePagination -> {
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> { List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
FinanceDO financeDO = new FinanceDO(); FinanceDO financeDO = new FinanceDO();
BeanUtil.copyProperties(finance, financeDO); BeanUtil.copyProperties(finance, financeDO);
financeDO.setTopAgentName(accountResp.getUsername()); financeDO.setTopAgentName(accountResp.getUsername());
return financeDO; return financeDO;
}).toList(); }).toList();
financeService.addAll(financeReqList); financeService.addAll(financeReqList);
FinanceSumReq financeSumReq = new FinanceSumReq(); FinanceSumReq financeSumReq = new FinanceSumReq();
BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq);
financeSumService.add(financeSumReq); financeSumService.add(financeSumReq);
}, asyncTaskExecutor) }, asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("Error processing minister accounts for account {}", accountResp log.error("Error processing minister accounts for account {}", accountResp
.getUsername(), ex); .getUsername(), ex);
return null; return null;
})) }))
.toList(); .toList();
CompletableFuture.allOf(accountFutures.toArray(new CompletableFuture[0])).join(); CompletableFuture.allOf(accountFutures.toArray(new CompletableFuture[0])).join();
}, asyncTaskExecutor).exceptionally(ex -> { }, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error processing minister accounts", ex); log.error("Error processing minister accounts", ex);
@ -570,37 +579,37 @@ public class DailyReport {
// 异步处理 deptUsers // 异步处理 deptUsers
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder()
.monthDate(reportDate) .monthDate(reportDate)
.build(); .build();
deptUsers.forEach(deptUser -> { deptUsers.forEach(deptUser -> {
CompletableFuture<Void> deptUserFuture = CompletableFuture.runAsync(() -> { CompletableFuture<Void> deptUserFuture = CompletableFuture.runAsync(() -> {
List<AccountResp> currUserAccounts = deptUser.getAccounts(); List<AccountResp> currUserAccounts = deptUser.getAccounts();
List<CompletableFuture<StatsDO>> futures = currUserAccounts.stream() List<CompletableFuture<StatsDO>> futures = currUserAccounts.stream()
.map(currAccount -> agentDataVisualListService .map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq) .getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> { .thenApplyAsync(agentData -> {
Statics statics = agentData.getCurData() Statics statics = agentData.getCurData()
.stream() .stream()
.filter(data -> data.getStaticsDate().equals(reportDate)) .filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst() .findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date")); .orElseThrow(() -> new BusinessException("No data found for report date"));
StatsDO statsDO = new StatsDO(); StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO); BeanUtil.copyProperties(statics, statsDO);
statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername())); statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername()));
return statsDO; return statsDO;
}, asyncTaskExecutor) }, asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex log.error("Error fetching data for account {}: {}", currAccount.getId(), ex
.getMessage()); .getMessage());
return null; return null;
})) }))
.toList(); .toList();
List<StatsDO> list = futures.stream() List<StatsDO> list = futures.stream()
.map(CompletableFuture::join) .map(CompletableFuture::join)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()); .collect(Collectors.toList());
statsService.addAll(list); statsService.addAll(list);
}, asyncTaskExecutor).exceptionally(ex -> { }, asyncTaskExecutor).exceptionally(ex -> {