diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java b/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java new file mode 100644 index 00000000..f4505a28 --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java @@ -0,0 +1,42 @@ +package com.zayac.admin.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import java.util.concurrent.Executor; + +@Configuration +public class ThreadPoolConfig { + /** + * 异步任务线程池 + * + * @return Executor + */ + @Bean(name = "asyncTaskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("asyncTaskExecutor-"); + executor.initialize(); + return executor; + } + + /** + * 定时任务线程池 + * + * @return ThreadPoolTaskScheduler + */ + @Bean + public ThreadPoolTaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(10); + scheduler.setThreadNamePrefix("ScheduledTask-"); + scheduler.setWaitForTasksToCompleteOnShutdown(true); + scheduler.setAwaitTerminationSeconds(60); + return scheduler; + } +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/req/PayRecordListReq.java b/zayac-admin-agent/src/main/java/com/zayac/admin/req/PayRecordListReq.java new file mode 100644 index 00000000..8786ee97 --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/req/PayRecordListReq.java @@ -0,0 +1,40 @@ +package com.zayac.admin.req; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDate; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class PayRecordListReq { + private Long id; + + /** + * 页码 + */ + @Builder.Default + private int pageNum = 1; + + /** + * 每页显示数量 + */ + private int pageSize; + + /** + * 起始日期 + */ + @JsonFormat(pattern = "yyyy-MM-dd") + private LocalDate startDate; + + /** + * 结束日期 + */ + @JsonFormat(pattern = "yyyy-MM-dd") + private LocalDate endDate; +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/resp/team/PayRecordList.java b/zayac-admin-agent/src/main/java/com/zayac/admin/resp/team/PayRecordList.java new file mode 100644 index 00000000..ba0879ce --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/resp/team/PayRecordList.java @@ -0,0 +1,13 @@ +package com.zayac.admin.resp.team; + +import com.zayac.admin.resp.Pagination; +import lombok.Data; + +import java.math.BigDecimal; + +@Data +public class PayRecordList extends Pagination { + private BigDecimal rebateAmountTotal; + private BigDecimal orderAmountTotal; + private BigDecimal scoreAmountTotal; +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java similarity index 83% rename from zayac-admin-agent/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java rename to zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java index fad699ab..bd73830d 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java @@ -40,13 +40,18 @@ import java.time.temporal.TemporalAdjusters; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; +/** + * 检测注册跟新增的定时任务 + */ + @Slf4j @Component @RequiredArgsConstructor -public class TelegramTeamMessageSchedule { +public class CheckRegAndDep { private final UserService userService; private final AccountService accountService; @@ -55,6 +60,7 @@ public class TelegramTeamMessageSchedule { private final UserRoleService userRoleService; private final RegistrationService registrationService; private final DepositService depositService; + private final Executor asyncTaskExecutor; private static final String MINISTER_ROLE_CODE = "minister"; private static final long FIXED_DELAY = 60000L; @@ -64,8 +70,8 @@ public class TelegramTeamMessageSchedule { LocalDate nowDate = LocalDate.now(); LocalDateTime nowDateTime = LocalDateTime.now(); RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); - List users = userRoleService.listUserIdByRoleId(minister.getId()); - users.forEach(userId -> { + List userIds = userRoleService.listUserIdByRoleId(minister.getId()); + userIds.forEach(userId -> { processUser(userService.getById(userId), nowDate, nowDateTime).join(); }); } @@ -94,8 +100,7 @@ public class TelegramTeamMessageSchedule { CompletableFuture teamFuture = teamService.getLatestTeamInfoAsync(account, teamInfoReq); Team prevTeamInfo = teamService.getPreviousTeamInfo(account); - return CompletableFuture.allOf(teamFuture).thenCompose(v -> { - Team currentTeamInfo = teamFuture.join(); + return teamFuture.thenComposeAsync(currentTeamInfo -> { log.info("Previous Team Info: {}", prevTeamInfo); log.info("Current Team Info: {}", currentTeamInfo); Map teamAccountMap = currentTeamInfo.getList() @@ -103,12 +108,18 @@ public class TelegramTeamMessageSchedule { .collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity())); CompletableFuture registrationProcess = registrationService - .processRegistration(minister, account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap); + .processRegistration(minister, account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap, asyncTaskExecutor) + .thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor); + CompletableFuture depositProcess = depositService - .processDeposits(minister, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime); + .processDeposits(minister, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor) + .thenRunAsync(() -> log.info("Deposit process completed"), asyncTaskExecutor); return CompletableFuture.allOf(registrationProcess, depositProcess) - .thenRun(() -> teamService.updateTeamInfo(account, currentTeamInfo)); - }); + .thenRunAsync(() -> { + teamService.updateTeamInfo(account, currentTeamInfo); + log.info("Team info updated"); + }, asyncTaskExecutor); + }, asyncTaskExecutor); } } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index df464ede..8ed8e92f 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -20,46 +20,46 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.zayac.admin.agent.model.entity.FinanceDO; import com.zayac.admin.agent.model.entity.StatsDO; -import com.zayac.admin.agent.model.req.FinanceReq; import com.zayac.admin.agent.model.req.FinanceSumReq; import com.zayac.admin.agent.service.FinanceService; import com.zayac.admin.agent.service.FinanceSumService; import com.zayac.admin.agent.service.StatsService; import com.zayac.admin.common.enums.DisEnableStatusEnum; +import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.req.AgentDataVisualListReq; +import com.zayac.admin.req.PayRecordListReq; import com.zayac.admin.req.team.TeamFinanceReq; import com.zayac.admin.req.team.TeamInfoReq; -import com.zayac.admin.resp.AgentDataVisualList; -import com.zayac.admin.resp.Statics; -import com.zayac.admin.resp.team.Team; -import com.zayac.admin.resp.team.TeamAccount; -import com.zayac.admin.resp.team.TeamAccountFinance; -import com.zayac.admin.resp.team.TeamFinancePagination; -import com.zayac.admin.service.AgentDataVisualListService; -import com.zayac.admin.service.CompletableFutureFinanceService; -import com.zayac.admin.service.TeamService; -import com.zayac.admin.service.TelegramMessageService; +import com.zayac.admin.req.team.TeamMemberReq; +import com.zayac.admin.resp.*; +import com.zayac.admin.resp.team.*; +import com.zayac.admin.service.*; import com.zayac.admin.system.model.entity.RoleDO; import com.zayac.admin.system.model.entity.UserDO; import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.service.*; import com.zayac.admin.utils.TableFormatter; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import top.continew.starter.core.exception.BusinessException; +import java.math.BigDecimal; import java.text.NumberFormat; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.YearMonth; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; @Component @RequiredArgsConstructor +@Slf4j public class DailyReport { private final UserService userService; private final TeamService teamService; @@ -72,6 +72,8 @@ public class DailyReport { private final CompletableFutureFinanceService completableFutureFinanceService; private final FinanceService financeService; private final FinanceSumService financeSumService; + private final CompletableFutureWebClientService completableFutureWebClientService; + private final Executor asyncTaskExecutor; private static final String MINISTER_ROLE_CODE = "minister"; @@ -86,7 +88,120 @@ public class DailyReport { public void dailySummarize() { LocalDate yesterday = LocalDate.now().minusDays(1); sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime.of(yesterday, LocalTime.MAX)); - saveData(yesterday, yesterday.atStartOfDay(), LocalDateTime.of(yesterday, LocalTime.MAX)); + getPayFailedMember(yesterday); + saveData(yesterday); + } + + /** + * 查询存款失败用户,并发送消息 + * + * @param date 日期 + */ + private void getPayFailedMember(LocalDate date) { + RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); + List userIds = userRoleService.listUserIdByRoleId(minister.getId()); + TeamMemberReq memberListReq = TeamMemberReq.builder() + .registerStartDate(date) + .registerEndDate(date) + .startDate(date) + .endDate(date) + .registerSort(1) + .status(1) + .pageSize(100) + .build(); + + userIds.forEach(userId -> { + List>> futureList = new ArrayList<>(); + UserDO ministerUser = userService.getById(userId); + List accounts = accountService.getAccountsByUserId(userId, DisEnableStatusEnum.ENABLE) + .stream() + .filter(AccountResp::getIsTeam) + .toList(); + + accounts.forEach(account -> { + CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { + }); + + CompletableFuture> teamMembersFuture = memberPaginationCompletableFuture.thenApply(MemberPagination::getList) + .thenApplyAsync(members -> members.stream() + .filter(member -> member.getDeposit() != null && member.getDeposit().compareTo(BigDecimal.ZERO) == 0) + .collect(Collectors.toList()), asyncTaskExecutor) + .thenComposeAsync(membersWithoutDep -> { + List> memberFutures = membersWithoutDep.stream() + .map(memberWithoutDep -> { + PayRecordListReq req = PayRecordListReq.builder() + .startDate(date) + .endDate(date) + .pageSize(100) + .id(memberWithoutDep.getId()) + .build(); + CompletableFuture>> completableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { + }); + return completableFuture.thenApplyAsync(pagination -> { + if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 + && pagination.getScoreAmountTotal().compareTo(BigDecimal.ZERO) == 0) { + return memberWithoutDep; + } else { + return null; + } + }, asyncTaskExecutor); + }).toList(); + + return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) + .thenApply(v -> memberFutures.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + }, asyncTaskExecutor) + .thenApplyAsync(membersWithoutDep -> { + // 发送给每个account关联的user用户 + if (!membersWithoutDep.isEmpty()) { + Map> groupByTopAgentName = membersWithoutDep.stream() + .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService + .buildFailedPayMessage(accountName, accountMembers); + UserDO currUser = accountService.getUserByAccountUsername(accountName); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? ministerUser.getBotToken() : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); + } + }); + } + return membersWithoutDep; + }, asyncTaskExecutor); + + futureList.add(teamMembersFuture); + }); + + CompletableFuture allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])); + allFutures.thenRunAsync(() -> { + // 主线下的所有的存款失败用户 + List allTeamMembers = futureList.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .toList(); + Map> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() + .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); + StringBuilder combinedNotification = new StringBuilder(); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService + .buildFailedPayMessage(accountName, accountMembers); + combinedNotification.append(notification).append("\n"); + }); + telegramMessageService + .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification.toString()); + if (ministerUser != null && DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { + telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), combinedNotification.toString()); + } + + }, asyncTaskExecutor).exceptionally(ex -> { + log.error("Error collecting and processing data", ex); + return null; + }); + }); } @@ -94,70 +209,145 @@ public class DailyReport { RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); List userIds = userRoleService.listUserIdByRoleId(minister.getId()); - userIds.parallelStream().forEach(userId -> { + userIds.forEach(userId -> { UserDO ministerUser = userService.getById(userId); List deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, ministerUser.getDeptId()); + + List> tasks = new ArrayList<>(); + if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { - List rows = generateRows(ministerUser.getId(), startDateTime, endDateTime); - String table = TableFormatter.formatTableAsHtml(rows); - //telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), table); - telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, table); + tasks.add(generateAndSendTeamReport(ministerUser, startDateTime, endDateTime)); } AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() .monthDate(reportDate) .build(); - //获取部长对应下级所有用户 - deptUsers.parallelStream().forEach(deptUser -> { - String message = getDeptUserMessage(deptUser, agentDataVisualListReq, reportDate); + + deptUsers.forEach(deptUser -> tasks.add(processDeptUser(deptUser, agentDataVisualListReq, reportDate, ministerUser))); + + CompletableFuture allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); + allTasks.join(); + }); + } + + /** + * 构建表格 + * + * @param ministerUser 部长 + * @param startDateTime 起始时间 + * @param endDateTime 结束时间 + * @return CompletableFuture + */ + + private CompletableFuture generateAndSendTeamReport(UserDO ministerUser, LocalDateTime + startDateTime, LocalDateTime endDateTime) { + return CompletableFuture.runAsync(() -> { + List rows = new ArrayList<>(); + rows.add(new String[]{"平台", "注册", "新增", "转化率"}); + rows.add(new String[]{"----", "----", "----", "----"}); + + List accounts = accountService.getAccountsByUserId(ministerUser.getId(), DisEnableStatusEnum.ENABLE) + .stream() + .filter(AccountResp::getIsTeam) + .toList(); + + int[] totals = {0, 0}; + TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); + + List> futures = accounts.stream() + .map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq) + .thenAcceptAsync(team -> { + int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); + int totalNewFirstDeposit = team.getList().stream().mapToInt(TeamAccount::getFirstDepositNum).sum(); + synchronized (totals) { + totals[0] += totalNewMember; + totals[1] += totalNewFirstDeposit; + } + String percent = getPercent(totalNewFirstDeposit, totalNewMember); + synchronized (rows) { + rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String.valueOf(totalNewFirstDeposit), percent}); + } + }, asyncTaskExecutor) + ) + .toList(); + + CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + allFutures.join(); + + rows.add(new String[]{"总注册", String.valueOf(totals[0]), String.valueOf(totals[1]), getPercent(totals[1], totals[0])}); + String message = TableFormatter.formatTableAsHtml(rows); + telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message); + }, asyncTaskExecutor).exceptionally(ex -> { + log.error("Error generating and sending team report", ex); + return null; + }); + } + + /** + * 查询部门所有用户的报表 + * + * @param deptUser 部门下所有用户对象 + * @param agentDataVisualListReq 请求参数 + * @param reportDate 日期 + * @param ministerUser 上级 + * @return CompletableFuture + */ + private CompletableFuture processDeptUser(UserDO deptUser, AgentDataVisualListReq + agentDataVisualListReq, LocalDate reportDate, UserDO ministerUser) { + return CompletableFuture.runAsync(() -> { + List currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE) + .stream() + .filter(accountResp -> !accountResp.getIsTeam()) + .toList(); + + List> futures = currUserAccounts.stream() + .map(currAccount -> agentDataVisualListService.getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date")) + ) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); + return null; + }) + ) + .toList(); + + CompletableFuture userStaticsFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + userStaticsFuture.thenRunAsync(() -> { + List agentDataList = futures.stream() + .map(future -> { + try { + return future.join(); + } catch (Exception ex) { + log.error("Error joining future", ex); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + // 构造消息体 + String message = telegramMessageService.buildDailyReportMessage(agentDataList); + if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { String botToken = StrUtil.isEmpty(deptUser.getBotToken()) ? ministerUser.getBotToken() : deptUser.getBotToken(); telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message); } - }); + }, asyncTaskExecutor).exceptionally(ex -> { + log.error("Error collecting and processing data", ex); + return null; + }).join(); + }, asyncTaskExecutor).exceptionally(ex -> { + log.error("Error processing dept user", ex); + return null; }); } - //获取需要发送给用户的信息 - private String getDeptUserMessage(UserDO deptUser, - AgentDataVisualListReq agentDataVisualListReq, - LocalDate reportDate) { - StringBuilder message = new StringBuilder(); - // 主线注册新增不用报数,过滤 - List currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE) - .stream() - .filter(accountResp -> !accountResp.getIsTeam()) - .toList(); - - currUserAccounts.forEach(currAccount -> { - CompletableFuture future = agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq); - AgentDataVisualList agentData = future.join(); - Statics statics = agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(); - message.append(currAccount.getUsername()) - .append("\n") - .append("注册: ") - .append(statics.getIsNew()) - .append("\n") - .append("新增: ") - .append(statics.getFirstCount()) - .append("\n") - .append("日活: ") - .append(statics.getCountBets()) - .append("\n\n"); - }); - - return message.toString(); - } - - - private void saveData(LocalDate reportDate, LocalDateTime startDateTime, LocalDateTime endDateTime) { + private void saveData(LocalDate reportDate) { // 获取传入年月 YearMonth inputYearMonth = YearMonth.from(reportDate); @@ -169,6 +359,8 @@ public class DailyReport { RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); List userIds = userRoleService.listUserIdByRoleId(minister.getId()); + List> tasks = new ArrayList<>(); + userIds.forEach(userId -> { UserDO ministerUser = userService.getById(userId); List deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, ministerUser.getDeptId()); @@ -181,44 +373,73 @@ public class DailyReport { .stream() .filter(AccountResp::getIsTeam) .toList(); - ministerUserAccounts.parallelStream().forEach(accountResp -> { - TeamFinancePagination> financePagination = completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq).join(); - List financeReqList = financePagination.getList().stream().map(finance -> { - FinanceDO financeDO = new FinanceDO(); - BeanUtil.copyProperties(finance, financeDO); - return financeDO; - }).toList(); - financeService.addAll(financeReqList); - FinanceSumReq financeSumReq = new FinanceSumReq(); - BeanUtil.copyProperties(financePagination.getTotalSumVo(),financeSumReq); - financeSumService.add(financeSumReq); - }); + + // 异步处理 ministerUserAccounts + CompletableFuture ministerAccountsFuture = CompletableFuture.runAsync(() -> + ministerUserAccounts.parallelStream().forEach(accountResp -> { + TeamFinancePagination> financePagination = completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq).join(); + List financeReqList = financePagination.getList().stream().map(finance -> { + FinanceDO financeDO = new FinanceDO(); + BeanUtil.copyProperties(finance, financeDO); + return financeDO; + }).toList(); + financeService.addAll(financeReqList); + FinanceSumReq financeSumReq = new FinanceSumReq(); + BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq); + financeSumService.add(financeSumReq); + }), asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error processing minister accounts", ex); + return null; + }); + tasks.add(ministerAccountsFuture); + + // 异步处理 deptUsers AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() .monthDate(reportDate) .build(); - deptUsers.parallelStream().forEach(deptUser -> { - List currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE) - .stream() - .filter(accountResp -> !accountResp.getIsTeam()) - .toList(); - List list = currUserAccounts.parallelStream().map(currAccount -> { - CompletableFuture future = agentDataVisualListService - .getAgentDataVisualList(currAccount, agentDataVisualListReq); - AgentDataVisualList agentData = future.join(); - Statics statics = agentData.getCurData() - .stream() - .filter(data -> data.getStaticsDate().equals(reportDate)) - .findFirst() - .orElseThrow(); - StatsDO statsDO = new StatsDO(); - BeanUtil.copyProperties(statics, statsDO); - return statsDO; - } - ).toList(); - statsService.addAll(list); - }); + deptUsers.forEach(deptUser -> { + CompletableFuture deptUserFuture = CompletableFuture.runAsync(() -> { + List currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE) + .stream() + .filter(accountResp -> !accountResp.getIsTeam()) + .toList(); + + List> futures = currUserAccounts.stream() + .map(currAccount -> agentDataVisualListService.getAgentDataVisualList(currAccount, agentDataVisualListReq) + .thenApplyAsync(agentData -> { + Statics statics = agentData.getCurData() + .stream() + .filter(data -> data.getStaticsDate().equals(reportDate)) + .findFirst() + .orElseThrow(() -> new BusinessException("No data found for report date")); + StatsDO statsDO = new StatsDO(); + BeanUtil.copyProperties(statics, statsDO); + return statsDO; + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage()); + return null; + }) + ).toList(); + + List list = futures.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + statsService.addAll(list); + }, asyncTaskExecutor).exceptionally(ex -> { + log.error("Error processing dept user accounts", ex); + return null; + }); + tasks.add(deptUserFuture); + }); }); + + CompletableFuture allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); + allTasks.join(); } else { throw new BusinessException("只允许查询当月以及上个月的数据"); } @@ -232,29 +453,4 @@ public class DailyReport { percentInstance.setMinimumFractionDigits(2); return percentInstance.format(d1 / d2); } - - private List generateRows(Long userId, LocalDateTime startDateTime, LocalDateTime endDateTime) { - List rows = new ArrayList<>(); - rows.add(new String[]{"平台", "注册", "新增", "转化率"}); - rows.add(new String[]{"----", "----", "----", "----"}); - - List accounts = accountService.getAccountsByUserId(userId, DisEnableStatusEnum.ENABLE) - .stream() - .filter(AccountResp::getIsTeam) - .toList(); - int[] totals = {0, 0}; - TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); - accounts.forEach(accountResp -> { - Team team = teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq).join(); - int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum(); - int totalNewFirstDeposit = team.getList().stream().mapToInt(TeamAccount::getFirstDepositNum).sum(); - totals[0] += totalNewMember; - totals[1] += totalNewFirstDeposit; - String percent = getPercent(totalNewFirstDeposit, totalNewMember); - rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String.valueOf(totalNewFirstDeposit), percent}); - }); - - rows.add(new String[]{"总注册", String.valueOf(totals[0]), String.valueOf(totals[1]), getPercent(totals[1], totals[0])}); - return rows; - } } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/MessageSchedule.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/MessageSchedule.java deleted file mode 100644 index f60776c9..00000000 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/MessageSchedule.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.zayac.admin.schedule; - -import com.zayac.admin.common.enums.DisEnableStatusEnum; -import com.zayac.admin.constant.ApiPathConstants; -import com.zayac.admin.req.MemberDetailsReq; -import com.zayac.admin.req.MemberListReq; -import com.zayac.admin.req.PayRecordsListReq; -import com.zayac.admin.resp.*; -import com.zayac.admin.service.BannerService; -import com.zayac.admin.service.CompletableFutureWebClientService; -import com.zayac.admin.service.TelegramMessageService; -import com.zayac.admin.system.model.entity.UserDO; -import com.zayac.admin.system.model.resp.AccountResp; -import com.zayac.admin.system.service.AccountService; -import com.zayac.admin.system.service.UserService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.stereotype.Component; - -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -/** - * 定时任务 查询注册 新增用 暂定一分钟执行一次 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class MessageSchedule { - - private final UserService userService; - private final AccountService accountService; - private final CompletableFutureWebClientService completableFutureWebClientService; - private final TelegramMessageService telegramMessageService; - private final BannerService bannerService; - - //@Scheduled(fixedDelay = 60000) - public void newCheckRegistrationAndNewDeposit() { - // Get the current date and time - LocalDate nowDate = LocalDate.now(); - LocalDateTime nowDateTime = LocalDateTime.now(); - // Fetch all enabled users - List allEnableUser = userService.getAllEnabledUsers(); - - allEnableUser.forEach(user -> processUser(user, nowDate, nowDateTime).join()); - } - - private CompletableFuture processUser(UserDO user, LocalDate nowDate, LocalDateTime nowDateTime) { - List accounts = accountService.getAccountsByUserId(user.getId(), DisEnableStatusEnum.ENABLE); - List> futures = accounts.stream() - .map(account -> processAccount(account, user, nowDate, nowDateTime)) - .toList(); - - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - } - - private CompletableFuture processAccount(AccountResp account, - UserDO user, - LocalDate nowDate, - LocalDateTime nowDateTime) { - CompletableFuture bannerFuture = bannerService.getLatestBannerInfoAsync(account); - Banner prevBanner = bannerService.getPreviousBanner(account); - - return CompletableFuture.allOf(bannerFuture).thenCompose(v -> { - Banner currentBanner = bannerFuture.join(); - - CompletableFuture registrationProcess = processRegistration(account, user, currentBanner, prevBanner, nowDate); - CompletableFuture depositProcess = processDeposits(account, user, currentBanner, prevBanner, nowDate, nowDateTime); - - return CompletableFuture.allOf(registrationProcess, depositProcess) - .thenRun(() -> bannerService.updateBanner(account, currentBanner)); - }); - } - - private CompletableFuture processRegistration(AccountResp account, - UserDO currUser, - Banner banner, - Banner prevBanner, - LocalDate nowDate) { - if (prevBanner != null && banner.getRegisterMembers() > prevBanner.getRegisterMembers()) { - int registerCount = banner.getRegisterMembers() - prevBanner.getRegisterMembers(); - - MemberListReq memberListReq = MemberListReq.builder() - .registerStartDate(nowDate) - .registerEndDate(nowDate) - .startDate(nowDate) - .endDate(nowDate) - .registerSort(1) - .pageSize(registerCount) - .build(); - CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); - return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> { - if (!members.isEmpty()) { - String memberNames = members.stream().map(Member::getName).collect(Collectors.joining(", ")); - - String notification = String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", account - .getNickname(), registerCount, memberNames, banner.getRegisterMembers()); - if (DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getRegAndDepIds(), notification); - } - } - }); - } - return CompletableFuture.completedFuture(null); - } - - private CompletableFuture processDeposits(AccountResp account, - UserDO user, - Banner current, - Banner previous, - LocalDate nowDate, - LocalDateTime nowDateTime) { - return CompletableFuture.runAsync(() -> { - if (previous != null && current.getFirstDepositNum() > previous.getFirstDepositNum()) { - int newDeposits = current.getFirstDepositNum() - previous.getFirstDepositNum(); - - PayRecordsListReq req = PayRecordsListReq.builder() - .startDate(nowDate) - .endDate(nowDate) - .pageSize(100) - .payState(2) - .build(); - CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { - }); - paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> { - List names = payRecords.stream() - .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) - .map(PayRecord::getName) - .distinct() - .toList(); - - names.forEach(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> { - Optional matchingRecord = payRecords.stream() - .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) - .findFirst(); - matchingRecord.ifPresent(record -> { - String depositResults = String.format("用户: `%s`, 首存金额: *%s*", member.getName(), record - .getScoreAmount()); - String depResults = "🎉 " + account - .getNickname() + " 首存: " + newDeposits + depositResults + " 总数:*" + current - .getFirstDepositNum() + "*"; - if (DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) { - telegramMessageService.sendMessage(user.getBotToken(), user.getRegAndDepIds(), depResults); - } - }); - }).exceptionally(ex -> { - log.error("Error fetching details for member {}: {}", name, ex.getMessage()); - return null; - }).join()); - }).exceptionally(ex -> { - log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage()); - return null; - }).join(); - } - }); - } - - private CompletableFuture fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) { - MemberListReq memberListReq = MemberListReq.builder() - .name(name) - .startDate(nowDate) - .endDate(nowDate) - .status(1) - .build(); - - CompletableFuture>> memberFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); - - return memberFuture.thenApply(MemberPagination::getList) - .thenApply(list -> list.stream().findFirst()) - .thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member - .getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息"))); - } - - private CompletableFuture fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { - MemberDetailsReq detailsReq = MemberDetailsReq.builder() - .id(memberId) - .startDate(nowDate) - .endDate(nowDate) - .build(); - - return completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { - }); - } -} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java index 1c9e08b5..61791bfa 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java @@ -69,43 +69,43 @@ public class CompletableFutureWebClientService { ParameterizedTypeReference> typeRef, AccountResp account) { return Mono.fromCallable(() -> { - if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { - throw new RuntimeException("Unable to acquire a permit"); - } - return true; - }).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> { - try { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - } catch (Exception e) { - log.warn("Header conversion exception: " + e.getMessage()); - throw new BusinessException("Header conversion failed", e); - } - }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) - .bodyToMono(String.class) - .doOnNext(resStr -> { - log.info("request url:{}", url); - log.info("request headers :{}", headers); - log.info("request params:{}", params); - log.info("response {}", resStr); - }) - .flatMap(body -> { - try { - ApiResponse apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() - .constructType(typeRef.getType())); - return Mono.justOrEmpty(apiResponse); - } catch (Exception e) { - log.warn("JSON parsing exception: " + e.getMessage()); - return Mono.just(new ApiResponse(null, "Decoding error", 6008)); - } - }) - .flatMap(response -> respHandler(response, account)) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))) - .doFinally(signal -> semaphore.release()); + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { + throw new RuntimeException("Unable to acquire a permit"); + } + return true; + }).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> { + try { + Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { + }, true); + headerMap.forEach(httpHeaders::add); + } catch (Exception e) { + log.warn("Header conversion exception: " + e.getMessage()); + throw new BusinessException("Header conversion failed", e); + } + }) + .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) + .retrieve() + .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) + .bodyToMono(String.class) + .doOnNext(resStr -> { + log.info("request url:{}", url); + log.info("request headers :{}", headers); + log.info("request params:{}", params); + log.info("response {}", resStr); + }) + .flatMap(body -> { + try { + ApiResponse apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() + .constructType(typeRef.getType())); + return Mono.justOrEmpty(apiResponse); + } catch (Exception e) { + log.warn("JSON parsing exception: " + e.getMessage()); + return Mono.just(new ApiResponse(null, "Decoding error", 6008)); + } + }) + .flatMap(response -> respHandler(response, account)) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))) + .doFinally(signal -> semaphore.release()); } private boolean isRetryableException(Throwable throwable) { diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java index 0f4f5475..804ff9e3 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java @@ -1,19 +1,3 @@ -/* - * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.zayac.admin.service; import cn.hutool.core.bean.BeanUtil; @@ -46,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.ArrayList; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -62,89 +47,117 @@ public class DepositService { Team currentTeam, Team previousTeam, LocalDate nowDate, - LocalDateTime nowDateTime) { - return CompletableFuture.runAsync(() -> { - if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) { - PayRecordsListReq req = PayRecordsListReq.builder() - .startDate(nowDate) - .endDate(nowDate) - .pageSize(100) - .payState(2) - .build(); - CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { - }); - List changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam); - Set changedAgentNames = changedTeamAccounts.stream() - .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) - .map(TeamAccountWithChange::getAgentName) - .collect(Collectors.toSet()); - paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> { - Map> agentNameWithNames = payRecords.stream() - .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames - .contains(record.getAgentName())) - .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors - .mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors - .toSet(), ArrayList::new)))); + LocalDateTime nowDateTime, + Executor asyncTaskExecutor) { + if (previousTeam == null || currentTeam.getFirstDepositNum() <= previousTeam.getFirstDepositNum()) { + return CompletableFuture.completedFuture(null); + } - agentNameWithNames.forEach((agentName, names) -> { - StringBuilder depositResults = new StringBuilder(); - AtomicInteger depositCounter = new AtomicInteger(0); + return processDepositRecords(minister, account, currentTeam, previousTeam, nowDate, nowDateTime, asyncTaskExecutor); + } - TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream() - .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName)) - .findFirst() - .orElseThrow(() -> new BusinessException(String - .format("can not find agent name %s", agentName))); - - List> fetchFutures = names.stream() - .map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> { - payRecords.stream() - .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) - .findFirst() - .ifPresent(record -> { - if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) { - depositResults.append(telegramMessageService - .buildDepositResultsMessage(member.getName(), record.getScoreAmount())); - } - }); - }).exceptionally(ex -> { - log.error("Error fetching details for member {}: {}", name, ex.getMessage()); - return null; - })) - .toList(); - - CompletableFuture allFetches = CompletableFuture.allOf(fetchFutures - .toArray(new CompletableFuture[0])); - - allFetches.thenRun(() -> { - if (!depositResults.isEmpty()) { - String notification = telegramMessageService - .buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(), depositResults - .toString(), targetTeamAccount.getFirstDepositNum()); - UserDO currUser = accountService.getUserByAccountUsername(agentName); - if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? minister.getBotToken() : currUser.getBotToken(); - telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); - } - telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); - } - }).exceptionally(ex -> { - log.error("Error sending notification for account {}: {}", account.getId(), ex - .getMessage()); - return null; - }); - }); - }).exceptionally(ex -> { + private CompletableFuture processDepositRecords(UserDO minister, + AccountResp account, + Team currentTeam, + Team previousTeam, + LocalDate nowDate, + LocalDateTime nowDateTime, + Executor asyncTaskExecutor) { + PayRecordsListReq req = PayRecordsListReq.builder() + .startDate(nowDate) + .endDate(nowDate) + .pageSize(100) + .payState(2) + .build(); + CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { + }); + List changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam); + Set changedAgentNames = changedTeamAccounts.stream() + .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) + .map(TeamAccountWithChange::getAgentName) + .collect(Collectors.toSet()); + return paginationCompletableFuture.thenApplyAsync(Pagination::getList, asyncTaskExecutor) + .thenComposeAsync(payRecords -> + processPayRecords(payRecords, changedTeamAccounts, changedAgentNames, minister, account, nowDateTime, asyncTaskExecutor), asyncTaskExecutor) + .exceptionally(ex -> { log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage()); return null; - }).join(); + }); + } + + private CompletableFuture processPayRecords(List payRecords, + List changedTeamAccounts, + Set changedAgentNames, + UserDO minister, + AccountResp account, + LocalDateTime nowDateTime, + Executor asyncTaskExecutor) { + Map> agentNameWithNames = payRecords.stream() + .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames + .contains(record.getAgentName())) + .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors + .mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors + .toSet(), ArrayList::new)))); + + List> futures = agentNameWithNames.entrySet().stream() + .map(entry -> processAgentRecords(entry.getKey(), entry.getValue(), changedTeamAccounts, payRecords, minister, account, asyncTaskExecutor)) + .toList(); + + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } + + private CompletableFuture processAgentRecords(String agentName, + List names, + List changedTeamAccounts, + List payRecords, + UserDO minister, + AccountResp account, + Executor asyncTaskExecutor) { + StringBuilder depositResults = new StringBuilder(); + AtomicInteger depositCounter = new AtomicInteger(0); + + TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream() + .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName)) + .findFirst() + .orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName))); + + List> fetchFutures = names.stream() + .map(name -> fetchMemberDetails(account, name, LocalDate.now(), asyncTaskExecutor).thenAcceptAsync(member -> + payRecords.stream() + .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) + .findFirst() + .ifPresent(record -> { + if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) { + depositResults.append(telegramMessageService + .buildDepositResultsMessage(member.getName(), record.getScoreAmount())); + } + }), asyncTaskExecutor).exceptionally(ex -> { + log.error("Error fetching details for member {}: {}", name, ex.getMessage()); + return null; + })) + .toList(); + + CompletableFuture allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0])); + + return allFetches.thenRunAsync(() -> { + if (!depositResults.isEmpty()) { + String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(), + depositResults.toString(), targetTeamAccount.getFirstDepositNum()); + UserDO currUser = accountService.getUserByAccountUsername(agentName); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? minister.getBotToken() : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); + } + telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); } + }, asyncTaskExecutor).exceptionally(ex -> { + log.error("Error sending notification for account {}: {}", account.getUsername(), ex.getMessage()); + return null; }); } - private CompletableFuture fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) { + private CompletableFuture fetchMemberDetails(AccountResp account, String name, LocalDate nowDate, Executor asyncTaskExecutor) { TeamMemberListReq memberListReq = TeamMemberListReq.builder() .name(name) .startDate(nowDate) @@ -156,10 +169,9 @@ public class DepositService { .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { }); - return memberFuture.thenApply(MemberPagination::getList) - .thenApply(list -> list.stream().findFirst()) - .thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member - .getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息"))); + return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) + .thenApplyAsync(list -> list.stream().findFirst().orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")), asyncTaskExecutor) + .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor); } private CompletableFuture fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { @@ -196,4 +208,4 @@ public class DepositService { }) .collect(Collectors.toList()); } -} \ No newline at end of file +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java index 690ed5dd..0a77e985 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java @@ -36,8 +36,12 @@ import java.time.LocalDate; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.stream.Collectors; +/** + * 新注册处理相关逻辑 + */ @Slf4j @Service @RequiredArgsConstructor @@ -51,7 +55,8 @@ public class RegistrationService { Team currentTeamInfo, Team prevTeamInfo, LocalDate nowDate, - Map teamAccountMap) { + Map teamAccountMap, + Executor asyncTaskExecutor) { if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) { int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount(); TeamMemberReq memberListReq = TeamMemberReq.builder() @@ -65,8 +70,8 @@ public class RegistrationService { CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { }); - return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> { - log.info("Successfully get {} new registered members", members.size()); + return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor).thenAcceptAsync(members -> { + log.info("Successfully get [{}] new registered members", members.size()); if (!members.isEmpty()) { Map> groupByTopAgentName = members.stream() .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); @@ -82,7 +87,7 @@ public class RegistrationService { .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); }); } - }); + }, asyncTaskExecutor); } return CompletableFuture.completedFuture(null); } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java index 740ee857..034d7c08 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java @@ -19,6 +19,7 @@ package com.zayac.admin.service; import cn.hutool.core.convert.Convert; import cn.hutool.core.text.CharPool; import cn.hutool.core.util.StrUtil; +import com.zayac.admin.resp.Statics; import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamMember; import lombok.RequiredArgsConstructor; @@ -97,4 +98,26 @@ public class TelegramMessageService { public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) { return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount); } + + public String buildFailedPayMessage(String accountName, + List accountMembers) { + String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining("\n")); + return String.format("%s\n%s\n", accountName, memberNames); + } + + + public String buildDailyReportMessage(List statics) { + StringBuilder message = new StringBuilder(); + statics.forEach(stat -> { + String formattedStat = String.format( + "%s\n注册: %s\n新增: %d\n日活: %d\n\n", + stat.getAgentName(), + stat.getIsNew(), + stat.getFirstCount(), + stat.getCountBets() + ); + message.append(formattedStat); + }); + return message.toString(); + } } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java index f4f770da..4587af52 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java @@ -54,19 +54,19 @@ public class WebClientService { try { semaphore.acquire(); // 尝试获取许可 return this.webClient.post().uri(url).headers(httpHeaders -> { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response - .bodyToMono(String.class) - .map(body -> new BusinessException("Error response: " + body))) - .bodyToMono(typeRef) - .flatMap(this::respHandler) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) - .block(); // 遇到网络问题,每三秒重试一次 + Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { + }, true); + headerMap.forEach(httpHeaders::add); + }) + .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) + .retrieve() + .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response + .bodyToMono(String.class) + .map(body -> new BusinessException("Error response: " + body))) + .bodyToMono(typeRef) + .flatMap(this::respHandler) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) + .block(); // 遇到网络问题,每三秒重试一次 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("Failed to acquire semaphore", e); @@ -82,24 +82,24 @@ public class WebClientService { System.out.println(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getId()); return Mono.fromCallable(() -> { - semaphore.acquire(); // 尝试获取许可 - return true; - }) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response - .bodyToMono(String.class) - .map(body -> new BusinessException("Error response: " + body))) - .bodyToMono(typeRef) - .flatMap(this::monoRespHandler) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) - .doFinally(signal -> semaphore.release())); + semaphore.acquire(); // 尝试获取许可 + return true; + }) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> { + Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { + }, true); + headerMap.forEach(httpHeaders::add); + }) + .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) + .retrieve() + .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response + .bodyToMono(String.class) + .map(body -> new BusinessException("Error response: " + body))) + .bodyToMono(typeRef) + .flatMap(this::monoRespHandler) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) + .doFinally(signal -> semaphore.release())); } private Mono monoRespHandler(ApiResponse apiResponse) {