diff --git a/docker/zayac-admin/config/application-prod.yml b/docker/zayac-admin/config/application-prod.yml index e2e24b45..1d0f70bb 100644 --- a/docker/zayac-admin/config/application-prod.yml +++ b/docker/zayac-admin/config/application-prod.yml @@ -37,13 +37,13 @@ spring.datasource: lazy: true driver-class-name: com.mysql.cj.jdbc.Driver type: ${spring.datasource.type} -# # PostgreSQL 库配置 -# postgresql: -# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false -# username: ${DB_USER:root} -# password: ${DB_PWD:123456} -# driver-class-name: org.postgresql.Driver -# type: ${spring.datasource.type} + # # PostgreSQL 库配置 + # postgresql: + # url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false + # username: ${DB_USER:root} + # password: ${DB_PWD:123456} + # driver-class-name: org.postgresql.Driver + # type: ${spring.datasource.type} # Hikari 连接池配置(完整配置请参阅:https://github.com/brettwooldridge/HikariCP) hikari: # 最大连接数量(默认 10,根据实际环境调整) @@ -273,7 +273,7 @@ avatar: support-suffix: jpg,jpeg,png,gif webclient: - max-concurrent-requests: 60 + max-requests-per-second: 10.0 spring: rabbitmq: diff --git a/docker/zayac-admin/config/application.yml b/docker/zayac-admin/config/application.yml index ffc3f3f2..a13ba8b7 100644 --- a/docker/zayac-admin/config/application.yml +++ b/docker/zayac-admin/config/application.yml @@ -229,7 +229,7 @@ management.health: enabled: false webclient: - max-concurrent-requests: 60 + max-requests-per-second: 10.0 spring: rabbitmq: diff --git a/zayac-admin-agent/pom.xml b/zayac-admin-agent/pom.xml index 9dfff1b5..f78c15d2 100644 --- a/zayac-admin-agent/pom.xml +++ b/zayac-admin-agent/pom.xml @@ -26,6 +26,11 @@ com.zayac zayac-admin-system + + com.google.guava + guava + 33.2.1-jre + \ No newline at end of file diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/mapper/DailyStatsMapper.java b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/mapper/DailyStatsMapper.java index c5993934..493cde90 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/mapper/DailyStatsMapper.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/mapper/DailyStatsMapper.java @@ -16,13 +16,26 @@ package com.zayac.admin.agent.mapper; +import org.apache.ibatis.annotations.Select; import top.continew.starter.data.mybatis.plus.base.BaseMapper; import com.zayac.admin.agent.model.entity.StatsDO; +import java.time.LocalDate; + /** * 代理每日数据 Mapper * * @author zayac * @since 2024/06/04 17:10 */ -public interface DailyStatsMapper extends BaseMapper {} \ No newline at end of file +public interface DailyStatsMapper extends BaseMapper { + /** + * 查询查询新注册总数 + * + * @param topAgentName 上级代理线 + * @param date 日期 + * @return int + */ + @Select("select COALESCE(sum(is_new), 0) from (select distinct agent_name, is_new from agent_stats where top_agent_name = #{topAgentName} and statics_date = #{date}) as distinct_agents") + int selectCountByTopAgentNameAndDate(String topAgentName, LocalDate date); +} \ No newline at end of file diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/FinanceDO.java b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/FinanceDO.java index 03d0f3d4..2be92ab2 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/FinanceDO.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/FinanceDO.java @@ -314,4 +314,8 @@ public class FinanceDO extends BaseDO { * 彩票利润 */ private BigDecimal lotteryProfit; + /** + * 上级代理线名称 + */ + private String topAgentName; } \ No newline at end of file diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/StatsDO.java b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/StatsDO.java index f1ed1e42..8e11de37 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/StatsDO.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/model/entity/StatsDO.java @@ -101,7 +101,7 @@ public class StatsDO extends BaseDO { private BigDecimal promoDividend; /** - * + * */ private BigDecimal rebate; @@ -161,17 +161,22 @@ public class StatsDO extends BaseDO { private LocalDateTime updatedAt; /** - * + * */ private BigDecimal oldDeposit; /** - * + * */ private Integer oldDepositCount; /** - * + * */ private BigDecimal newDeposit; + + /** + * 上级代理线名称 + */ + private String topAgentName; } \ No newline at end of file diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/StatsService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/StatsService.java index d2d98a88..3cf80ec6 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/StatsService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/StatsService.java @@ -23,6 +23,7 @@ import com.zayac.admin.agent.model.req.StatsReq; import com.zayac.admin.agent.model.resp.StatsDetailResp; import com.zayac.admin.agent.model.resp.StatsResp; +import java.time.LocalDate; import java.util.List; /** @@ -33,4 +34,6 @@ import java.util.List; */ public interface StatsService extends BaseService { void addAll(List list); + + int countNewRegNum(String topAgentName, LocalDate date); } \ No newline at end of file diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/impl/DailyStatsServiceImpl.java b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/impl/DailyStatsServiceImpl.java index 20b5c1a2..a866de4c 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/impl/DailyStatsServiceImpl.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/agent/service/impl/DailyStatsServiceImpl.java @@ -29,6 +29,7 @@ import com.zayac.admin.agent.model.resp.StatsDetailResp; import com.zayac.admin.agent.model.resp.StatsResp; import com.zayac.admin.agent.service.StatsService; +import java.time.LocalDate; import java.util.List; /** @@ -44,4 +45,15 @@ public class DailyStatsServiceImpl extends BaseServiceImpl list) { baseMapper.insertBatch(list); } + + /** + * 统计指定日期的注册情况 + * + * @param date 日期 + * @return int + */ + @Override + public int countNewRegNum(String topAgentName, LocalDate date) { + return baseMapper.selectCountByTopAgentNameAndDate(topAgentName, date); + } } \ No newline at end of file diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/config/RateLimiterConfig.java b/zayac-admin-agent/src/main/java/com/zayac/admin/config/RateLimiterConfig.java new file mode 100644 index 00000000..4d00a337 --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/config/RateLimiterConfig.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zayac.admin.config; + +import com.google.common.util.concurrent.RateLimiter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Component +@Slf4j +public class RateLimiterConfig { + private final ConcurrentMap rateLimiterMap = new ConcurrentHashMap<>(); + private final double maxRequestsPerSecond; + + public RateLimiterConfig(@Value("${webclient.max-requests-per-second}") double maxRequestsPerSecond) { + this.maxRequestsPerSecond = maxRequestsPerSecond; + } + + public RateLimiter getRateLimiter(String url) { + RateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(url, k -> RateLimiter.create(maxRequestsPerSecond)); + return rateLimiter; + } + + public RateLimiter getRateLimiter(String key, double maxRequests) { + return rateLimiterMap.computeIfAbsent(key, k -> RateLimiter.create(maxRequests)); + } +} diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index 82d3c8ab..6119107d 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -19,6 +19,7 @@ package com.zayac.admin.schedule; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; +import com.google.common.util.concurrent.RateLimiter; import com.zayac.admin.agent.model.entity.FinanceDO; import com.zayac.admin.agent.model.entity.StatsDO; import com.zayac.admin.agent.model.req.FinanceSumReq; @@ -26,9 +27,11 @@ import com.zayac.admin.agent.service.FinanceService; import com.zayac.admin.agent.service.FinanceSumService; import com.zayac.admin.agent.service.StatsService; import com.zayac.admin.common.enums.DisEnableStatusEnum; +import com.zayac.admin.config.RateLimiterConfig; import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.req.AgentDataVisualListReq; import com.zayac.admin.req.PayRecordListReq; +import com.zayac.admin.req.PayRecordsListReq; import com.zayac.admin.req.team.TeamFinanceReq; import com.zayac.admin.req.team.TeamInfoReq; import com.zayac.admin.req.team.TeamMemberReq; @@ -58,11 +61,12 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import java.util.stream.IntStream; @Component @RequiredArgsConstructor @Slf4j -@Profile("prod") +@Profile("dev") public class DailyReport { private final TeamService teamService; private final DeptService deptService; @@ -74,11 +78,13 @@ public class DailyReport { private final FinanceSumService financeSumService; private final CompletableFutureWebClientService completableFutureWebClientService; private final Executor asyncTaskExecutor; + private final RateLimiterConfig rateLimiterConfig; private static final String MINISTER_ROLE_CODE = "minister"; private static final String SEO_TEAM_LEADER_ROLE_CODE = "seo_team_leader"; private static final String ASSISTANT_ROLE_CODE = "assistant"; private static final String SEO_ROLE_CODE = "seo"; + private static final String PLATFORM_HTH = "华体会"; @Scheduled(cron = "0 40 11,14,17,21 * * ?") public void teamAccountDailyReport() { @@ -105,6 +111,16 @@ public class DailyReport { } + @Scheduled(cron = "0 40 23 * * ?") + public void ScheduledSendTeamDailyReport1() { + sendTeamDailyReport(); + } + + @Scheduled(cron = "0 0 1-23 * * ?") + public void ScheduledSendTeamDailyReport2() { + sendTeamDailyReport(); + } + @Scheduled(cron = "0 15 0 * * ?") public void dailySummarize() { LocalDate yesterday = LocalDate.now().minusDays(1); @@ -129,22 +145,33 @@ public class DailyReport { .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); + //建立团队之间账号的联系 + Map accountNameWithTopAgentName = new HashMap<>(); + dept.getUsers() + .stream() + .flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream()) + .forEach(accountResp -> ministerUser.getAccounts() + .stream() + .filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount + .getPlatformId())) + .findFirst() + .ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp + .getUsername(), ministerAccount.getUsername()))); + //获取账号不为空的用户 var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList(); var assistants = usersByRole.get(ASSISTANT_ROLE_CODE); sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); + //保存数据 + saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName); getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); - //保存金融数据 - saveData(ministerUser, deptUsers, yesterday); sendFinance(yesterday, accountUsernameToUserMap, ministerUser); }); } - // 一小时发送一次 - @Scheduled(cron = "0 0 * * * ?, 0 40 23 * * ?") - public void generateTeamReportTask() { + private void sendTeamDailyReport() { LocalDateTime nowDateTime = LocalDateTime.now(); LocalDate nowDate = LocalDate.now(); //查询部门下的所有用户 @@ -170,19 +197,46 @@ public class DailyReport { Map userWithRolesAndAccountsRespMap, UserWithRolesAndAccountsResp ministerUser) { List finances = financeService.getFinanceByDate(date); - Map> userFinances = finances.stream() + Map> userFinances = finances.stream() .filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName())) - .collect(Collectors.groupingBy(FinanceDO::getAgentName)); - userFinances.forEach((agentName, userFinancesList) -> { - UserWithRolesAndAccountsResp user = userWithRolesAndAccountsRespMap.get(agentName); + .collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName()))); + + userFinances.forEach((user, userFinancesList) -> { if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) { String message = telegramMessageService.buildFinanceMessage(userFinancesList); String botToken = StrUtil.isEmpty(user.getBotToken()) ? ministerUser.getBotToken() : user.getBotToken(); telegramMessageService.sendMessage(botToken, user.getReportIds(), message); } + //telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message); }); } + private List>>> createFuturesForPagination(AccountResp account, + LocalDate date, + int sumReg) { + int pageSize = 100; + int totalPages = (int) Math.ceil((double) sumReg / pageSize); + + return IntStream.range(0, totalPages).mapToObj(page -> { + int currentPageSize = page == totalPages - 1 + ? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize) + : pageSize; + TeamMemberReq memberListReq = TeamMemberReq.builder() + .registerStartDate(date) + .registerEndDate(date) + .startDate(date) + .endDate(date) + .registerSort(1) + .status(1) + .pageSize(currentPageSize) + .pageNum(page + 1) + .build(); + return completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference>>>() { + }); + }).toList(); + } + /** * 查询存款失败用户,并发送消息 * @@ -192,88 +246,83 @@ public class DailyReport { Map accountUsernameToUserMap, LocalDate date) { - TeamMemberReq memberListReq = TeamMemberReq.builder() - .registerStartDate(date) - .registerEndDate(date) - .startDate(date) - .endDate(date) - .registerSort(1) - .status(1) - .pageSize(100) - .build(); + List>> accountFutureList = new ArrayList<>(); - List>> futureList = new ArrayList<>(); ministerUser.getAccounts().forEach(account -> { - CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); + int sumReg = statsService.countNewRegNum(account.getUsername(), date); + sumReg = (sumReg == 0) ? 100 : sumReg; - CompletableFuture> teamMembersFuture = memberPaginationCompletableFuture - .thenApply(MemberPagination::getList) - .thenApplyAsync(members -> members.stream() - .filter(member -> member.getDeposit() != null && member.getDeposit() - .compareTo(BigDecimal.ZERO) == 0) - .collect(Collectors.toList()), asyncTaskExecutor) - .thenComposeAsync(membersWithoutDep -> { - List> memberFutures = membersWithoutDep.stream() - .map(memberWithoutDep -> { - PayRecordListReq req = PayRecordListReq.builder() - .startDate(date) - .endDate(date) - .pageSize(100) - .id(memberWithoutDep.getId()) - .build(); - CompletableFuture>> completableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { - }); - return completableFuture.thenApplyAsync(pagination -> { - if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination - .getScoreAmountTotal() - .compareTo(BigDecimal.ZERO) == 0) { - return memberWithoutDep; - } else { - return null; - } - }, asyncTaskExecutor); - }) - .toList(); + List>>> paginationFutures = createFuturesForPagination(account, date, sumReg); + CompletableFuture allPaginationFutures = CompletableFuture.allOf(paginationFutures.toArray(new CompletableFuture[0])); - return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> memberFutures.stream() - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toList())); - }, asyncTaskExecutor) - .thenApplyAsync(membersWithoutDep -> { - // 发送给每个account关联的user用户 - if (!membersWithoutDep.isEmpty()) { - Map> groupByTopAgentName = membersWithoutDep.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); - groupByTopAgentName.forEach((accountName, accountMembers) -> { - String notification = telegramMessageService - .buildFailedPayMessage(accountName, accountMembers); - UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); - if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - String botToken = StrUtil.isEmpty(currUser.getBotToken()) - ? ministerUser.getBotToken() - : currUser.getBotToken(); - telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); - } - }); + CompletableFuture> aggregatedMembersFuture = allPaginationFutures.thenApply(v -> { + return paginationFutures.stream() + .map(CompletableFuture::join) + .flatMap(memberPagination -> { + List members = memberPagination.getList(); + log.info("members size:{}", members.size()); + return members.stream(); + }) + .collect(Collectors.toList()); + }); + + CompletableFuture> filteredMembersFuture = aggregatedMembersFuture.thenApplyAsync(members -> { + return members.stream() + .filter(member -> member.getDeposit() != null && member.getDeposit().compareTo(BigDecimal.ZERO) == 0) + .collect(Collectors.toList()); + }, asyncTaskExecutor); + + CompletableFuture> membersWithFailedPayFuture = filteredMembersFuture.thenComposeAsync(membersWithoutDep -> { + List> payRecordFutures = membersWithoutDep.stream().map(memberWithoutDep -> { + PayRecordsListReq req = PayRecordsListReq.builder() + .startDate(date) + .endDate(date) + .pageSize(10) + .memberName(memberWithoutDep.getName()) + .build(); + return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> { + if (CollUtil.isNotEmpty(pagination.getList()) + && pagination.getList().stream().noneMatch(payRecord -> payRecord.getPayStatus() == 2)) { + return memberWithoutDep; + } else { + return null; } - return membersWithoutDep; }, asyncTaskExecutor); + }).toList(); - futureList.add(teamMembersFuture); + return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0])) + .thenApply(v -> payRecordFutures.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + }, asyncTaskExecutor); + + CompletableFuture> notificationFuture = membersWithFailedPayFuture.thenApplyAsync(membersWithoutDep -> { + if (CollUtil.isNotEmpty(membersWithoutDep)) { + Map> groupByTopAgentName = membersWithoutDep.stream() + .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); + UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName); + if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? ministerUser.getBotToken() : currUser.getBotToken(); + telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification); + } + }); + } + return membersWithoutDep; + }, asyncTaskExecutor); + + accountFutureList.add(notificationFuture); }); - CompletableFuture allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])); - allFutures.thenRunAsync(() -> { - // 主线下的所有的存款失败用户 - List allTeamMembers = futureList.stream() + CompletableFuture allAccountFutures = CompletableFuture.allOf(accountFutureList.toArray(new CompletableFuture[0])); + allAccountFutures.thenRunAsync(() -> { + List allTeamMembers = accountFutureList.stream() .map(CompletableFuture::join) .flatMap(List::stream) .toList(); + log.info("All failed pay members size: {}", allTeamMembers.size()); Map> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() .collect(Collectors.groupingBy(TeamMember::getTopAgentName))); StringBuilder combinedNotification = new StringBuilder(); @@ -281,19 +330,43 @@ public class DailyReport { String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); combinedNotification.append(notification).append("\n"); }); - telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification - .toString()); - if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { - telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser - .getReportIds(), combinedNotification.toString()); - } + telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification.toString()); + + if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { + telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), combinedNotification.toString()); + } }, asyncTaskExecutor).exceptionally(ex -> { log.error("Error collecting and processing data", ex); return null; }); + } + private CompletableFuture>> fetchPaginationPayRecordWithRetry(AccountResp account, PayRecordsListReq req) { + return CompletableFuture.supplyAsync(() -> completableFutureWebClientService.fetchDataForAccount( + account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, + new ParameterizedTypeReference>>>() { + }), asyncTaskExecutor).thenCompose(future -> future) + .exceptionallyCompose(ex -> { + log.error("Error fetching pay records, retrying...", ex); + return fetchPaginationPayRecordWithRetry(account, req); + }); + } + + private CompletableFuture>> fetchPayRecordsWithRetry(AccountResp account, + PayRecordListReq req) { + //华体会接口限流严重,先加上 + if (PLATFORM_HTH.equals(account.getPlatformName())) { + RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter("hth", 0.5); + log.info("限流器触发中...."); + rateLimiter.acquire(); // 通过限流器限流 + } + return CompletableFuture.supplyAsync(() -> completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference>>>() { + }), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> { + log.error("Error fetching pay records", ex); + return null; + }); } private void sendDailyReport(LocalDate reportDate, @@ -399,6 +472,8 @@ public class DailyReport { List currUserAccounts = deptUser.getAccounts(); List> futures = currUserAccounts.stream() + // 团队账号暂时不用具体的代理线数据 + .filter(accountResp -> !accountResp.getIsTeam()) .map(currAccount -> agentDataVisualListService .getAgentDataVisualList(currAccount, agentDataVisualListReq) .thenApplyAsync(agentData -> agentData.getCurData() @@ -445,7 +520,8 @@ public class DailyReport { private void saveData(UserWithRolesAndAccountsResp ministerUser, List deptUsers, - LocalDate reportDate) { + LocalDate reportDate, + Map accountUsernameWithTopAgentName) { // 获取传入年月 YearMonth inputYearMonth = YearMonth.from(reportDate); @@ -471,6 +547,7 @@ public class DailyReport { List financeReqList = financePagination.getList().stream().map(finance -> { FinanceDO financeDO = new FinanceDO(); BeanUtil.copyProperties(finance, financeDO); + financeDO.setTopAgentName(accountResp.getUsername()); return financeDO; }).toList(); financeService.addAll(financeReqList); @@ -510,6 +587,7 @@ public class DailyReport { .orElseThrow(() -> new BusinessException("No data found for report date")); StatsDO statsDO = new StatsDO(); BeanUtil.copyProperties(statics, statsDO); + statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername())); return statsDO; }, asyncTaskExecutor) .exceptionally(ex -> { diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java index a33f4f24..d4835c8d 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java @@ -19,16 +19,18 @@ package com.zayac.admin.service; import cn.hutool.core.lang.TypeReference; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.RateLimiter; +import com.zayac.admin.config.RateLimiterConfig; import com.zayac.admin.resp.ApiResponse; import com.zayac.admin.system.model.resp.AccountResp; import io.netty.handler.timeout.TimeoutException; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpStatusCode; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -38,21 +40,19 @@ import top.continew.starter.core.exception.BusinessException; import java.time.Duration; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; @Service @Slf4j public class CompletableFutureWebClientService { private final WebClient webClient; - private final Semaphore semaphore; + private final RateLimiterConfig rateLimiterConfig; private final ObjectMapper objectMapper; public CompletableFutureWebClientService(WebClient.Builder webClientBuilder, - @Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests, + RateLimiterConfig rateLimiterConfig, ObjectMapper objectMapper) { this.webClient = webClientBuilder.build(); - this.semaphore = new Semaphore(maxConcurrentRequests); + this.rateLimiterConfig = rateLimiterConfig; this.objectMapper = objectMapper; } @@ -60,56 +60,62 @@ public class CompletableFutureWebClientService { String apiPath, Object params, ParameterizedTypeReference> typeRef) { - return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture(); + RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter(account.getPlatformUrl() + apiPath); + return fetchData(rateLimiter, account.getPlatformUrl() + apiPath, account + .getHeaders(), params, typeRef, account).toFuture(); } - public Mono fetchData(String url, + public Mono fetchData(RateLimiter rateLimiter, + String url, String headers, Object params, ParameterizedTypeReference> typeRef, AccountResp account) { return Mono.fromCallable(() -> { - if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { - throw new RuntimeException("Unable to acquire a permit"); - } + rateLimiter.acquire(); return true; - }).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> { - try { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - } catch (Exception e) { - log.warn("Header conversion exception: " + e.getMessage()); - throw new BusinessException("Header conversion failed", e); - } }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) - .bodyToMono(String.class) - .doOnNext(resStr -> { - log.debug("request url:{}", url); - log.debug("request headers :{}", headers); - log.debug("request params:{}", params); - log.debug("response {}", resStr); + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(ignored -> webClient.post().uri(url).headers(httpHeaders -> { + addHeaders(httpHeaders, headers); }) - .flatMap(body -> { - try { - ApiResponse apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() - .constructType(typeRef.getType())); - return Mono.justOrEmpty(apiResponse); - } catch (Exception e) { - log.warn("JSON parsing exception: " + e.getMessage()); - return Mono.just(new ApiResponse(null, "Decoding error", 6008)); - } - }) - .flatMap(response -> respHandler(response, account)) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))) - .doFinally(signal -> semaphore.release()); + .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) + .retrieve() + .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) + .bodyToMono(String.class) + .doOnNext(resStr -> { + log.debug("Request URL: {}", url); + log.debug("Request headers: {}", headers); + log.debug("Request params: {}", params); + log.debug("Response: {}", resStr); + }) + .flatMap(body -> { + try { + ApiResponse apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() + .constructType(typeRef.getType())); + return Mono.justOrEmpty(apiResponse); + } catch (Exception e) { + log.warn("JSON parsing exception: " + e.getMessage()); + return Mono.just(new ApiResponse(null, "Decoding error", 6008)); + } + }) + .flatMap(response -> respHandler(response, account)) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(this::isRetryableException))); } private boolean isRetryableException(Throwable throwable) { - return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable; + return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable || throwable instanceof WebClientRequestException; + } + + private void addHeaders(org.springframework.http.HttpHeaders httpHeaders, String headers) { + try { + Map headerMap = JSONUtil.toBean(headers, new TypeReference>() { + }, true); + headerMap.forEach(httpHeaders::add); + } catch (Exception e) { + log.warn("Header conversion exception: " + e.getMessage()); + throw new BusinessException("Header conversion failed", e); + } } private Mono respHandler(ApiResponse response, AccountResp account) { diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java index 36d928fb..a42ee6e6 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java @@ -98,7 +98,15 @@ public class DepositService { LocalDate nowDate, LocalDateTime nowDateTime, Executor asyncTaskExecutor) { - PayRecordsListReq req = createPayRecordsListReq(accountWithChange.getAgentName(), nowDate); + // 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题 + LocalDate startDate = nowDateTime.minusHours(1).toLocalDate(); + PayRecordsListReq req = PayRecordsListReq.builder() + .startDate(startDate) + .endDate(nowDate) + .pageSize(100) + .payState(2) + .agentName(accountWithChange.getAgentName()) + .build(); CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { }); @@ -116,16 +124,6 @@ public class DepositService { }); } - private PayRecordsListReq createPayRecordsListReq(String agentName, LocalDate nowDate) { - return PayRecordsListReq.builder() - .startDate(nowDate) - .endDate(nowDate) - .pageSize(100) - .payState(2) - .agentName(agentName) - .build(); - } - private CompletableFuture processPayRecords(List payRecords, TeamAccountWithChange accountWithChange, AccountResp account, diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java index a1217684..a4fef1c1 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java @@ -51,6 +51,7 @@ public class TelegramMessageService { } String fullMessage = String.format("%s|%s|%s", botToken, targetId, message); this.rabbitTemplate.convertAndSend("message_queue", fullMessage); + } public void sendMessage(String botToken, List targetIds, String message) { diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java deleted file mode 100644 index f4f770da..00000000 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/WebClientService.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.zayac.admin.service; - -import cn.hutool.core.lang.TypeReference; -import cn.hutool.json.JSONUtil; -import com.zayac.admin.resp.ApiResponse; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.stereotype.Service; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.util.retry.Retry; -import top.continew.starter.core.exception.BusinessException; - -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.Semaphore; - -/** - * 封装了一个简单可用的调用后端api的服务类 - */ -@Service -public class WebClientService { - private final WebClient webClient; - - private final Semaphore semaphore; - - public WebClientService(@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests) { - this.webClient = WebClient.create(); - this.semaphore = new Semaphore(maxConcurrentRequests); - } - - public T fetchData(String url, - String headers, - Object params, - ParameterizedTypeReference> typeRef) { - try { - semaphore.acquire(); // 尝试获取许可 - return this.webClient.post().uri(url).headers(httpHeaders -> { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response - .bodyToMono(String.class) - .map(body -> new BusinessException("Error response: " + body))) - .bodyToMono(typeRef) - .flatMap(this::respHandler) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) - .block(); // 遇到网络问题,每三秒重试一次 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new BusinessException("Failed to acquire semaphore", e); - } finally { - semaphore.release(); // 释放许可 - } - } - - public Mono MonoFetchData(String url, - String headers, - Object params, - ParameterizedTypeReference> typeRef) { - System.out.println(Thread.currentThread().getName()); - System.out.println(Thread.currentThread().getId()); - return Mono.fromCallable(() -> { - semaphore.acquire(); // 尝试获取许可 - return true; - }) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response - .bodyToMono(String.class) - .map(body -> new BusinessException("Error response: " + body))) - .bodyToMono(typeRef) - .flatMap(this::monoRespHandler) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) - .doFinally(signal -> semaphore.release())); - } - - private Mono monoRespHandler(ApiResponse apiResponse) { - // 根据响应状态码处理逻辑 - if (apiResponse.getStatusCode() == 6000) { - return Mono.just(apiResponse.getData()); // 正常情况下返回数据 - } else if (apiResponse.getStatusCode() == 6001) { - return Mono.error(new BusinessException("API token expired")); // API令牌过期 - } else { - return Mono.error(new BusinessException("Error status code: " + apiResponse.getStatusCode())); // 其他错误 - } - } - - private Mono respHandler(ApiResponse apiResponse) { - System.out.println(apiResponse); - //api接口只有在statusCode为6000的时候返回的数据才是正常数据 - if (apiResponse.getStatusCode().equals(6000)) { - return Mono.just(apiResponse.getData()); - } else if (apiResponse.getStatusCode().equals(6001)) { - // TODO 调用登录接口,刷新header信息重试 - return Mono.error(new BusinessException("xApiToken失效")); // 状态码6001,登录失效 - } else { - return Mono.error(new BusinessException("错误状态码: " + apiResponse.getStatusCode())); // 其他状态码,抛出异常 - } - } -} \ No newline at end of file diff --git a/zayac-admin-system/src/main/resources/mapper/DeptMapper.xml b/zayac-admin-system/src/main/resources/mapper/DeptMapper.xml index 562db1c6..585b9f16 100644 --- a/zayac-admin-system/src/main/resources/mapper/DeptMapper.xml +++ b/zayac-admin-system/src/main/resources/mapper/DeptMapper.xml @@ -32,6 +32,7 @@ a.id AS account_id, a.nickname AS account_nickname, a.username AS account_username, + a.is_team As account_is_team, a.status AS account_status, a.headers AS account_headers, a.platform_id AS account_platform_id, @@ -82,6 +83,7 @@ + diff --git a/zayac-admin-webapi/src/main/resources/config/application-prod.yml b/zayac-admin-webapi/src/main/resources/config/application-prod.yml index 216b5de9..28e29f76 100644 --- a/zayac-admin-webapi/src/main/resources/config/application-prod.yml +++ b/zayac-admin-webapi/src/main/resources/config/application-prod.yml @@ -37,13 +37,13 @@ spring.datasource: lazy: true driver-class-name: com.mysql.cj.jdbc.Driver type: ${spring.datasource.type} -# # PostgreSQL 库配置 -# postgresql: -# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false -# username: ${DB_USER:root} -# password: ${DB_PWD:123456} -# driver-class-name: org.postgresql.Driver -# type: ${spring.datasource.type} + # # PostgreSQL 库配置 + # postgresql: + # url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false + # username: ${DB_USER:root} + # password: ${DB_PWD:123456} + # driver-class-name: org.postgresql.Driver + # type: ${spring.datasource.type} # Hikari 连接池配置(完整配置请参阅:https://github.com/brettwooldridge/HikariCP) hikari: # 最大连接数量(默认 10,根据实际环境调整) @@ -60,7 +60,7 @@ spring.datasource: ## Liquibase 配置 spring.liquibase: # 是否启用 - enabled: true + enabled: false # 配置文件路径 change-log: classpath:/db/changelog/db.changelog-master.yaml @@ -273,7 +273,8 @@ avatar: support-suffix: jpg,jpeg,png,gif webclient: - max-concurrent-requests: 60 + max-requests-per-second: 10.0 + spring: rabbitmq: diff --git a/zayac-admin-webapi/src/main/resources/config/application.yml b/zayac-admin-webapi/src/main/resources/config/application.yml index ffc3f3f2..0c6bc026 100644 --- a/zayac-admin-webapi/src/main/resources/config/application.yml +++ b/zayac-admin-webapi/src/main/resources/config/application.yml @@ -228,8 +228,9 @@ management.health: # 关闭邮箱健康检查(邮箱配置错误或邮箱服务器不可用时,健康检查会报错) enabled: false +### 每秒钟接口调用的次数 webclient: - max-concurrent-requests: 60 + max-requests-per-second: 10.0 spring: rabbitmq: