diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java b/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java index 36416cfc..f1ff8400 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/config/ThreadPoolConfig.java @@ -33,9 +33,9 @@ public class ThreadPoolConfig { @Bean(name = "asyncTaskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(10); - executor.setMaxPoolSize(20); - executor.setQueueCapacity(500); + executor.setCorePoolSize(20); + executor.setMaxPoolSize(50); + executor.setQueueCapacity(1000); executor.setThreadNamePrefix("asyncTaskExecutor-"); executor.initialize(); return executor; @@ -49,7 +49,7 @@ public class ThreadPoolConfig { @Bean public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - scheduler.setPoolSize(10); + scheduler.setPoolSize(20); scheduler.setThreadNamePrefix("ScheduledTask-"); scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.setAwaitTerminationSeconds(60); diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/constant/ApiPathConstants.java b/zayac-admin-agent/src/main/java/com/zayac/admin/constant/ApiPathConstants.java index 3ba93ead..8dd9ec49 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/constant/ApiPathConstants.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/constant/ApiPathConstants.java @@ -74,4 +74,9 @@ public class ApiPathConstants { * 团队财务报表 */ public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team"; + + /** + * 活跃会员 + */ + public static final String ACTIVE_LIST = "/agent/api/v1/member/activeList"; } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/req/ActiveListReq.java b/zayac-admin-agent/src/main/java/com/zayac/admin/req/ActiveListReq.java new file mode 100644 index 00000000..9a50e796 --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/req/ActiveListReq.java @@ -0,0 +1,57 @@ +package com.zayac.admin.req; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDate; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class ActiveListReq { + /** + * 活跃类型 + */ + private Integer activeType; + /** + * 统计月份 + */ + @JsonFormat(pattern = "yyyy-MM") + private LocalDate date; + + /** + * 是否重置 + */ + private Boolean isRest = false; + + /** + * 会员名称 + */ + private String name; + /** + * 页码 + */ + @Builder.Default + private int pageNum = 1; + /** + * 每页显示数据 + */ + @Builder.Default + private int pageSize = 10; + /** + * 注册结束日期 + */ + private LocalDate registerEndDate; + /** + * 注册开始日期 + */ + private LocalDate registerStartDate; + /** + * 上级代理线 + */ + private String topAgentName; +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/resp/ActiveListResp.java b/zayac-admin-agent/src/main/java/com/zayac/admin/resp/ActiveListResp.java new file mode 100644 index 00000000..4b4495eb --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/resp/ActiveListResp.java @@ -0,0 +1,59 @@ +package com.zayac.admin.resp; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; + +/** + * "memberId": 509419585, + * "name": "zaq198014", + * "topId": 500007970, + * "topName": "ky3tg107032", + * "firstPayTime": "2024-06-15 18:20:49", + * "registerTime": "2023-08-18 19:18:30", + * "deposit": 1522, + * "bets": 1394, + * "activeStr": "首存会员" + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ActiveListResp { + /** + * 会员id + */ + private Long memberId; + /** + * 会员名称 + */ + private String name; + /** + * 会员上级id + */ + private Long topId; + + /** + * 首存时间 + */ + private LocalDateTime firstPayTime; + /** + * 注册时间 + */ + private LocalDateTime registerTime; + /** + * 存款 + */ + private BigDecimal deposit; + /** + * 投注 + */ + private BigDecimal bets; + /** + * 活跃类型 + */ + private String activeStr; +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java index f79e5043..b36a5735 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java @@ -118,12 +118,10 @@ public class CheckRegAndDep { log.info("Current Team Info: {}", currentTeamInfo); CompletableFuture registrationProcess = registrationService - .processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor) - .thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor); + .processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor); CompletableFuture depositProcess = depositService - .processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor) - .thenRunAsync(() -> log.info("Deposit process completed"), asyncTaskExecutor); + .processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor); return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> { teamService.updateTeamInfo(account, currentTeamInfo); diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index 325b6210..6898f602 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -59,7 +59,9 @@ import java.time.LocalTime; import java.time.YearMonth; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -86,9 +88,10 @@ public class DailyReport { private static final String SEO_ROLE_CODE = "seo"; private static final String PLATFORM_HTH = "华体会"; - @Scheduled(cron = "0 40 11,14,17,21 * * ?") + @Scheduled(cron = "0 40 11,14,17,21,23,1,3,5 * * ?") public void teamAccountDailyReport() { LocalDateTime nowDateTime = LocalDateTime.now(); + log.info("dailySummarize started at {}", nowDateTime); LocalDate nowDate = LocalDate.now(); //查询部门下的所有用户 List deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE); @@ -111,18 +114,24 @@ public class DailyReport { } - @Scheduled(cron = "0 40 23 * * ?") - public void ScheduledSendTeamDailyReport1() { - sendTeamDailyReport(); - } + // @Scheduled(cron = "0 40 23 * * ?") + // public void ScheduledSendTeamDailyReport1() { + // log.info("ScheduledSendTeamDailyReport1 started at {}", LocalDateTime.now()); + // sendTeamDailyReport(); + // log.info("ScheduledSendTeamDailyReport1 finished at {}", LocalDateTime.now()); + // } - @Scheduled(cron = "0 0 1-23 * * ?") + @Scheduled(cron = "0 0/30 1-23 * * ?") public void ScheduledSendTeamDailyReport2() { + log.info("ScheduledSendTeamDailyReport2 started at {}", LocalDateTime.now()); sendTeamDailyReport(); + log.info("ScheduledSendTeamDailyReport2 finished at {}", LocalDateTime.now()); } @Scheduled(cron = "0 15 0 * * ?") public void dailySummarize() { + + log.info("dailySummarize started at {}", LocalDateTime.now()); LocalDate yesterday = LocalDate.now().minusDays(1); //查询部门下的所有用户 List deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE); @@ -166,9 +175,10 @@ public class DailyReport { .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); //保存数据 saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName); - getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); + //getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); sendFinance(yesterday, accountUsernameToUserMap, ministerUser); }); + log.info("dailySummarize finished at {}", LocalDateTime.now()); } private void sendTeamDailyReport() { @@ -410,11 +420,13 @@ public class DailyReport { LocalDateTime startDateTime, LocalDateTime endDateTime, List assistants) { + log.info("Starting to generate and send team report for minister user: {}", ministerUser.getUsername()); + return CompletableFuture.runAsync(() -> { - List rows = new ArrayList<>(); + ConcurrentLinkedQueue rows = new ConcurrentLinkedQueue<>(); List accounts = ministerUser.getAccounts(); - int[] totals = {0, 0}; + AtomicIntegerArray totals = new AtomicIntegerArray(2); TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); List> futures = accounts.stream() @@ -425,36 +437,46 @@ public class DailyReport { .stream() .mapToInt(TeamAccount::getFirstDepositNum) .sum(); - synchronized (totals) { - totals[0] += totalNewMember; - totals[1] += totalNewFirstDeposit; - } + + totals.addAndGet(0, totalNewMember); + totals.addAndGet(1, totalNewFirstDeposit); + String percent = getPercent(totalNewFirstDeposit, totalNewMember); - synchronized (rows) { - rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String - .valueOf(totalNewFirstDeposit), percent}); - } - }, asyncTaskExecutor)) + rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String + .valueOf(totalNewFirstDeposit), percent}); + }, asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error fetching team info for account: {}", accountResp.getPlatformName(), ex); + return null; + })) .toList(); CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); - // 对 rows 列表进行排序 - rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0])); - rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), - getPercent(totals[1], totals[0])}); - String message = TableFormatter.formatTableAsHtml(rows); + log.info("Completed all async tasks for minister user: {}", ministerUser.getUsername()); + + // 对 rows 列表进行排序 + List sortedRows = new ArrayList<>(rows); + sortedRows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0])); + + sortedRows.add(new String[] {"总计", String.valueOf(totals.get(0)), String.valueOf(totals.get(1)), + getPercent(totals.get(1), totals.get(0))}); + + String message = TableFormatter.formatTableAsHtml(sortedRows); + telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message); + if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message); } - //发送消息给助理 + // 发送消息给助理 if (!CollUtil.isEmpty(assistants)) { assistants.forEach(assistant -> { if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) { telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message); + log.info("Sent report to assistant: {}", assistant.getUsername()); } }); } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java index 6f59d3af..e87a8e7b 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java @@ -19,6 +19,7 @@ package com.zayac.admin.service; import cn.hutool.core.util.StrUtil; import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.constant.ApiPathConstants; +import com.zayac.admin.req.ActiveListReq; import com.zayac.admin.req.MemberDetailsReq; import com.zayac.admin.req.PayRecordsListReq; import com.zayac.admin.req.team.TeamMemberListReq; @@ -48,6 +49,7 @@ import java.util.stream.Collectors; import static com.zayac.admin.common.constant.CacheConstants.SUCCESSFULLY_PAYED_ACCOUNTNAME; import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount; +import static com.zayac.admin.utils.CommonUtils.getLastNElements; @Slf4j @Service @@ -59,19 +61,19 @@ public class DepositService { private static final String BOT_TOKEN = "6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto"; private static final Long TELEGRAM_CHAT_ID = 6054562838L; + public CompletableFuture processDeposits(UserWithRolesAndAccountsResp ministerUser, Map accountUsernameToUserMap, AccountResp account, Team currentTeam, Team previousTeam, LocalDate nowDate, - LocalDateTime nowDateTime, Executor asyncTaskExecutor) { if (previousTeam == null || currentTeam.getFirstDepositNum() <= previousTeam.getFirstDepositNum()) { return CompletableFuture.completedFuture(null); } - return processDepositRecords(ministerUser, accountUsernameToUserMap, account, currentTeam, previousTeam, nowDate, nowDateTime, asyncTaskExecutor); + return processDepositRecords(ministerUser, accountUsernameToUserMap, account, currentTeam, previousTeam, nowDate, asyncTaskExecutor); } private CompletableFuture processDepositRecords(UserWithRolesAndAccountsResp ministerUser, @@ -80,15 +82,14 @@ public class DepositService { Team currentTeam, Team previousTeam, LocalDate nowDate, - LocalDateTime nowDateTime, Executor asyncTaskExecutor) { List hasNewDepositAccounts = findChangedTeamAccount(previousTeam, currentTeam).stream() - .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) - .toList(); + .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) + .toList(); List> allTasks = hasNewDepositAccounts.stream() - .map(accountWithChange -> processAccountChanges(accountWithChange, ministerUser, accountUsernameToUserMap, account, nowDate, nowDateTime, asyncTaskExecutor)) - .toList(); + .map(accountWithChange -> processAccountChanges(accountWithChange, ministerUser, accountUsernameToUserMap, account, nowDate, asyncTaskExecutor)) + .toList(); return CompletableFuture.allOf(allTasks.toArray(new CompletableFuture[0])); } @@ -98,34 +99,101 @@ public class DepositService { Map accountUsernameToUserMap, AccountResp account, LocalDate nowDate, - LocalDateTime nowDateTime, Executor asyncTaskExecutor) { - // 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题 - LocalDate startDate = nowDateTime.minusHours(1).toLocalDate(); - PayRecordsListReq req = PayRecordsListReq.builder() - .startDate(startDate) - .endDate(nowDate) - .pageSize(100) - .payState(2) - .agentName(accountWithChange.getAgentName()) - .build(); - - StringBuilder depositResults = new StringBuilder(); - AtomicInteger depositCounter = new AtomicInteger(0); + ActiveListReq req = ActiveListReq.builder() + .pageNum(1) + .activeType(3) + .topAgentName(accountWithChange.getAgentName()) + .date(nowDate) + .isRest(false) + .build(); return completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference>>>() { - }) - .thenApply(Pagination::getList) - .thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor) - .thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults), asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex - .getMessage()); - return null; - }); + .fetchDataForAccount(account, ApiPathConstants.ACTIVE_LIST, req, new ParameterizedTypeReference>>>() { + }) + .thenComposeAsync(page -> fetchAndProcessActiveList(account, req, page, accountWithChange, ministerUser, accountUsernameToUserMap, asyncTaskExecutor), asyncTaskExecutor) + .exceptionally(ex -> { + // 处理异常 + log.error("Error processing account changes for account: {}", account, ex); + return null; + }); } + private CompletableFuture fetchAndProcessActiveList(AccountResp account, ActiveListReq req, Pagination> page, + TeamAccountWithChange accountWithChange, UserWithRolesAndAccountsResp ministerUser, + Map accountUsernameToUserMap, Executor asyncTaskExecutor) { + req.setPageNum(page.getPageNum()); + return completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.ACTIVE_LIST, req, new ParameterizedTypeReference>>>() { + }) + .thenApply(Pagination::getList) + .thenAcceptAsync(activeListResps -> { + List sortedList = activeListResps.stream() + .sorted(Comparator.comparing(ActiveListResp::getFirstPayTime)) + .collect(Collectors.toList()); + List activeListRespList = getLastNElements(sortedList, accountWithChange.getNewDepositNum()); + String depositResults = buildDepositResults(activeListRespList); + String notification = buildNotificationMessage(accountWithChange, depositResults); + sendNotifications(accountWithChange, ministerUser, accountUsernameToUserMap, notification); + }, asyncTaskExecutor); + } + + private String buildDepositResults(List activeListRespList) { + StringBuilder depositResults = new StringBuilder(); + activeListRespList.forEach(activeListResp -> + depositResults.append(telegramMessageService.buildDepositResultsMessage(activeListResp.getName(), activeListResp.getDeposit())) + ); + return depositResults.toString(); + } + + private String buildNotificationMessage(TeamAccountWithChange accountWithChange, String depositResults) { + return telegramMessageService.buildDepositMessage( + accountWithChange.getAgentName(), + accountWithChange.getNewDepositNum(), + depositResults, + accountWithChange.getFirstDepositNum() + ); + } + + private void sendNotifications(TeamAccountWithChange accountWithChange, UserWithRolesAndAccountsResp ministerUser, + Map accountUsernameToUserMap, String notification) { + var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName()); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) + ? ministerUser.getBotToken() + : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); + } + telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification); + } + + +// // 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题 +// LocalDate startDate = nowDateTime.minusHours(1).toLocalDate(); +// PayRecordsListReq req = PayRecordsListReq.builder() +// .startDate(startDate) +// .endDate(nowDate) +// .pageSize(100) +// .payState(2) +// .agentName(accountWithChange.getAgentName()) +// .build(); +// +// StringBuilder depositResults = new StringBuilder(); +// AtomicInteger depositCounter = new AtomicInteger(0); +// +// return completableFutureWebClientService +// .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference>>>() { +// }) +// .thenApply(Pagination::getList) +// .thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor) +// .thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults), asyncTaskExecutor) +// .exceptionally(ex -> { +// log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex +// .getMessage()); +// return null; +// }); +// } + private CompletableFuture processPayRecords(List payRecords, TeamAccountWithChange accountWithChange, AccountResp account, @@ -137,31 +205,34 @@ public class DepositService { //根据用户名去重,保留时间最早的支付记录 List sortedPayRecords = payRecords.stream() - .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) - .collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, ( - existingRecord, - newRecord) -> existingRecord - .getCreatedAt() - .isBefore(newRecord - .getCreatedAt()) - ? existingRecord - : newRecord, LinkedHashMap::new), map -> map - .values() - .stream() - .sorted(Comparator - .comparing(PayRecord::getCreatedAt) - .reversed()) - .collect(Collectors - .toList()))); + .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) + .collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, ( + existingRecord, + newRecord) -> existingRecord + .getCreatedAt() + .isBefore(newRecord + .getCreatedAt()) + ? existingRecord + : newRecord, LinkedHashMap::new), map -> map + .values() + .stream() + .sorted(Comparator + .comparing(PayRecord::getCreatedAt) + .reversed()) + .filter(payRecord -> !RedisUtils + .hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord + .getBillNo())) + .collect(Collectors + .toList()))); List> fetchMemberFutures = sortedPayRecords.stream() - .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor) - .thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor) - .exceptionally(ex -> { - log.error("Error fetching details for member {}: {}", payRecord.getName(), ex.getMessage()); - return null; - })) - .toList(); + .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor) + .thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor) + .exceptionally(ex -> { + log.error("Error fetching details for member {}: {}", payRecord.getName(), ex.getMessage()); + return null; + })) + .toList(); return CompletableFuture.allOf(fetchMemberFutures.toArray(new CompletableFuture[0])); } @@ -176,13 +247,13 @@ public class DepositService { if (!RedisUtils.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo())) { //如果订单记录有存款成功但是会员的首存时间还为空,数据未同步,也是首存 if ((member.getFirstPayAt() == null || payRecord.getCreatedAt() - .equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange + .equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange .getNewDepositNum()) { //把存款成功的用户保存起来,设置一个小时的缓存时间 防止重复用户 RedisUtils.set(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo(), member.getName(), Duration - .ofHours(1)); + .ofHours(1)); depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord - .getScoreAmount())); + .getScoreAmount())); } } } @@ -193,49 +264,50 @@ public class DepositService { Map accountUsernameToUserMap, StringBuilder depositResults) { String notification = telegramMessageService.buildDepositMessage(accountWithChange - .getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange + .getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange .getFirstDepositNum()); var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName()); if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { String botToken = StrUtil.isEmpty(currUser.getBotToken()) - ? ministerUser.getBotToken() - : currUser.getBotToken(); + ? ministerUser.getBotToken() + : currUser.getBotToken(); telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); } telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification); } + private CompletableFuture fetchMemberDetails(AccountResp account, String name, LocalDate nowDate, Executor asyncTaskExecutor) { TeamMemberListReq memberListReq = TeamMemberListReq.builder() - .name(name) - .startDate(nowDate) - .endDate(nowDate) - .status(1) - .build(); + .name(name) + .startDate(nowDate) + .endDate(nowDate) + .status(1) + .build(); CompletableFuture>> memberFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { + }); return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) - .thenApplyAsync(list -> list.stream() - .findFirst() - .orElseThrow(() -> new BusinessException("没有找到匹配的成员信息")), asyncTaskExecutor) - .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor); + .thenApplyAsync(list -> list.stream() + .findFirst() + .orElseThrow(() -> new BusinessException("没有找到匹配的成员信息")), asyncTaskExecutor) + .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor); } private CompletableFuture fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { MemberDetailsReq detailsReq = MemberDetailsReq.builder() - .id(memberId) - .startDate(nowDate) - .endDate(nowDate) - .build(); + .id(memberId) + .startDate(nowDate) + .endDate(nowDate) + .build(); return completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { - }); + .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { + }); } } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java index a663de31..59ccc548 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java @@ -61,7 +61,7 @@ public class TelegramMessageService { public void sendMessage(String botToken, String targetIds, String message) { convertStringToList(targetIds).parallelStream() - .forEach(targetId -> this.sendMessage(botToken, targetId, message)); + .forEach(targetId -> this.sendMessage(botToken, targetId, message)); } public static List convertStringToList(String str) { @@ -73,7 +73,7 @@ public class TelegramMessageService { private static String escapeMarkdown(String text) { List escapeChars = Arrays - .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'); + .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'); for (Character charItem : escapeChars) { text = text.replace(charItem.toString(), "\\" + charItem); } @@ -92,17 +92,19 @@ public class TelegramMessageService { String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", ")); if (currUser != null) { return String.format("👏 [%s] %s 注册: %d 会员: `%s` 总数:*%d*", currUser.getNickname(), teamAccount - .getAgentName(), accountMembers.size(), memberNames, teamAccount.getSubMemberNum()); + .getAgentName(), accountMembers.size(), memberNames, teamAccount.getSubMemberNum()); } return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers - .size(), memberNames, teamAccount.getSubMemberNum()); + .size(), memberNames, teamAccount.getSubMemberNum()); } public String buildRegistrationMessage(List accountMembers, TeamAccountWithChange teamAccount) { - String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", ")); - return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers - .size(), memberNames, teamAccount.getSubMemberNum()); + String memberNames = accountMembers.stream() + .map(member -> "`" + member.getName() + "`") + .collect(Collectors.joining(", ")); + return String.format("👏 %s 注册: %d 会员: %s 总数:*%d*", teamAccount.getAgentName(), accountMembers + .size(), memberNames, teamAccount.getSubMemberNum()); } public String buildDepositMessage(String agentName, int newDepositNum, String depositResults, int firstDepositNum) { @@ -122,7 +124,7 @@ public class TelegramMessageService { StringBuilder message = new StringBuilder(); statics.forEach(stat -> { String formattedStat = String.format("%s\n注册: %s\n新增: %d\n日活: %d\n\n", stat.getAgentName(), stat - .getIsNew(), stat.getFirstCount(), stat.getCountBets()); + .getIsNew(), stat.getFirstCount(), stat.getCountBets()); message.append(formattedStat); }); return message.toString(); @@ -132,7 +134,7 @@ public class TelegramMessageService { StringBuilder message = new StringBuilder(); userFinancesList.forEach(financeDO -> { String formattedFinance = String.format("%s: *%s*\n", financeDO.getAgentName(), financeDO.getNetProfit() - .toPlainString()); + .toPlainString()); message.append(formattedFinance); }); return message.toString(); diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java b/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java index bf4037ae..a6b90726 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java @@ -21,6 +21,8 @@ import com.zayac.admin.resp.team.Team; import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamAccountWithChange; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -28,24 +30,38 @@ import java.util.stream.Collectors; public class CommonUtils { public static List findChangedTeamAccount(Team prevTeam, Team currTeam) { Map team2AccountMap = currTeam.getList() - .stream() - .collect(Collectors.toMap(TeamAccount::getId, account -> account)); + .stream() + .collect(Collectors.toMap(TeamAccount::getId, account -> account)); return prevTeam.getList() - .stream() - .filter(account1 -> team2AccountMap.containsKey(account1.getId())) - .map(account1 -> { - TeamAccount account2 = team2AccountMap.get(account1.getId()); - TeamAccountWithChange changedAccount = new TeamAccountWithChange(); - BeanUtil.copyProperties(account2, changedAccount); - if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { - changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); - } - if (account1.getSubMemberNum() != account2.getSubMemberNum()) { - changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); - } - return changedAccount; - }) - .collect(Collectors.toList()); + .stream() + .filter(account1 -> team2AccountMap.containsKey(account1.getId())) + .map(account1 -> { + TeamAccount account2 = team2AccountMap.get(account1.getId()); + TeamAccountWithChange changedAccount = new TeamAccountWithChange(); + BeanUtil.copyProperties(account2, changedAccount); + if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { + changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); + } + if (account1.getSubMemberNum() != account2.getSubMemberNum()) { + changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); + } + return changedAccount; + }) + .collect(Collectors.toList()); + } + + + public static List getLastNElements(List list, int n) { + if (list == null || list.isEmpty() || n <= 0) { + return Collections.emptyList(); + } + + int size = list.size(); + if (n > size) { + return new ArrayList<>(list); + } + + return new ArrayList<>(list.subList(size - n, size)); } }