新增限流器

修复不能正常查询存款失败用户的问题
细节优化
This commit is contained in:
zayac 2024-06-10 03:05:05 +08:00
parent 4bbfdb4688
commit 708182e67f
17 changed files with 340 additions and 294 deletions

View File

@ -37,13 +37,13 @@ spring.datasource:
lazy: true
driver-class-name: com.mysql.cj.jdbc.Driver
type: ${spring.datasource.type}
# # PostgreSQL 库配置
# postgresql:
# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false
# username: ${DB_USER:root}
# password: ${DB_PWD:123456}
# driver-class-name: org.postgresql.Driver
# type: ${spring.datasource.type}
# # PostgreSQL 库配置
# postgresql:
# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false
# username: ${DB_USER:root}
# password: ${DB_PWD:123456}
# driver-class-name: org.postgresql.Driver
# type: ${spring.datasource.type}
# Hikari 连接池配置完整配置请参阅https://github.com/brettwooldridge/HikariCP
hikari:
# 最大连接数量(默认 10根据实际环境调整
@ -273,7 +273,7 @@ avatar:
support-suffix: jpg,jpeg,png,gif
webclient:
max-concurrent-requests: 60
max-requests-per-second: 10.0
spring:
rabbitmq:

View File

@ -229,7 +229,7 @@ management.health:
enabled: false
webclient:
max-concurrent-requests: 60
max-requests-per-second: 10.0
spring:
rabbitmq:

View File

@ -26,6 +26,11 @@
<groupId>com.zayac</groupId>
<artifactId>zayac-admin-system</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>
</dependencies>
</project>

View File

@ -16,13 +16,26 @@
package com.zayac.admin.agent.mapper;
import org.apache.ibatis.annotations.Select;
import top.continew.starter.data.mybatis.plus.base.BaseMapper;
import com.zayac.admin.agent.model.entity.StatsDO;
import java.time.LocalDate;
/**
* 代理每日数据 Mapper
*
* @author zayac
* @since 2024/06/04 17:10
*/
public interface DailyStatsMapper extends BaseMapper<StatsDO> {}
public interface DailyStatsMapper extends BaseMapper<StatsDO> {
/**
* 查询查询新注册总数
*
* @param topAgentName 上级代理线
* @param date 日期
* @return int
*/
@Select("select COALESCE(sum(is_new), 0) from (select distinct agent_name, is_new from agent_stats where top_agent_name = #{topAgentName} and statics_date = #{date}) as distinct_agents")
int selectCountByTopAgentNameAndDate(String topAgentName, LocalDate date);
}

View File

@ -314,4 +314,8 @@ public class FinanceDO extends BaseDO {
* 彩票利润
*/
private BigDecimal lotteryProfit;
/**
* 上级代理线名称
*/
private String topAgentName;
}

View File

@ -101,7 +101,7 @@ public class StatsDO extends BaseDO {
private BigDecimal promoDividend;
/**
*
*
*/
private BigDecimal rebate;
@ -161,17 +161,22 @@ public class StatsDO extends BaseDO {
private LocalDateTime updatedAt;
/**
*
*
*/
private BigDecimal oldDeposit;
/**
*
*
*/
private Integer oldDepositCount;
/**
*
*
*/
private BigDecimal newDeposit;
/**
* 上级代理线名称
*/
private String topAgentName;
}

View File

@ -23,6 +23,7 @@ import com.zayac.admin.agent.model.req.StatsReq;
import com.zayac.admin.agent.model.resp.StatsDetailResp;
import com.zayac.admin.agent.model.resp.StatsResp;
import java.time.LocalDate;
import java.util.List;
/**
@ -33,4 +34,6 @@ import java.util.List;
*/
public interface StatsService extends BaseService<StatsResp, StatsDetailResp, StatsQuery, StatsReq> {
void addAll(List<StatsDO> list);
int countNewRegNum(String topAgentName, LocalDate date);
}

View File

@ -29,6 +29,7 @@ import com.zayac.admin.agent.model.resp.StatsDetailResp;
import com.zayac.admin.agent.model.resp.StatsResp;
import com.zayac.admin.agent.service.StatsService;
import java.time.LocalDate;
import java.util.List;
/**
@ -44,4 +45,15 @@ public class DailyStatsServiceImpl extends BaseServiceImpl<DailyStatsMapper, Sta
public void addAll(List<StatsDO> list) {
baseMapper.insertBatch(list);
}
/**
* 统计指定日期的注册情况
*
* @param date 日期
* @return int
*/
@Override
public int countNewRegNum(String topAgentName, LocalDate date) {
return baseMapper.selectCountByTopAgentNameAndDate(topAgentName, date);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.config;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Component
@Slf4j
public class RateLimiterConfig {
private final ConcurrentMap<String, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
private final double maxRequestsPerSecond;
public RateLimiterConfig(@Value("${webclient.max-requests-per-second}") double maxRequestsPerSecond) {
this.maxRequestsPerSecond = maxRequestsPerSecond;
}
public RateLimiter getRateLimiter(String url) {
RateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(url, k -> RateLimiter.create(maxRequestsPerSecond));
return rateLimiter;
}
public RateLimiter getRateLimiter(String key, double maxRequests) {
return rateLimiterMap.computeIfAbsent(key, k -> RateLimiter.create(maxRequests));
}
}

View File

@ -19,6 +19,7 @@ package com.zayac.admin.schedule;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.RateLimiter;
import com.zayac.admin.agent.model.entity.FinanceDO;
import com.zayac.admin.agent.model.entity.StatsDO;
import com.zayac.admin.agent.model.req.FinanceSumReq;
@ -26,9 +27,11 @@ import com.zayac.admin.agent.service.FinanceService;
import com.zayac.admin.agent.service.FinanceSumService;
import com.zayac.admin.agent.service.StatsService;
import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.config.RateLimiterConfig;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.AgentDataVisualListReq;
import com.zayac.admin.req.PayRecordListReq;
import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamFinanceReq;
import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.req.team.TeamMemberReq;
@ -58,11 +61,12 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Component
@RequiredArgsConstructor
@Slf4j
@Profile("prod")
@Profile("dev")
public class DailyReport {
private final TeamService teamService;
private final DeptService deptService;
@ -74,11 +78,13 @@ public class DailyReport {
private final FinanceSumService financeSumService;
private final CompletableFutureWebClientService completableFutureWebClientService;
private final Executor asyncTaskExecutor;
private final RateLimiterConfig rateLimiterConfig;
private static final String MINISTER_ROLE_CODE = "minister";
private static final String SEO_TEAM_LEADER_ROLE_CODE = "seo_team_leader";
private static final String ASSISTANT_ROLE_CODE = "assistant";
private static final String SEO_ROLE_CODE = "seo";
private static final String PLATFORM_HTH = "华体会";
@Scheduled(cron = "0 40 11,14,17,21 * * ?")
public void teamAccountDailyReport() {
@ -105,6 +111,16 @@ public class DailyReport {
}
@Scheduled(cron = "0 40 23 * * ?")
public void ScheduledSendTeamDailyReport1() {
sendTeamDailyReport();
}
@Scheduled(cron = "0 0 1-23 * * ?")
public void ScheduledSendTeamDailyReport2() {
sendTeamDailyReport();
}
@Scheduled(cron = "0 15 0 * * ?")
public void dailySummarize() {
LocalDate yesterday = LocalDate.now().minusDays(1);
@ -129,22 +145,33 @@ public class DailyReport {
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
//建立团队之间账号的联系
Map<String, String> accountNameWithTopAgentName = new HashMap<>();
dept.getUsers()
.stream()
.flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream())
.forEach(accountResp -> ministerUser.getAccounts()
.stream()
.filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount
.getPlatformId()))
.findFirst()
.ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp
.getUsername(), ministerAccount.getUsername())));
//获取账号不为空的用户
var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList();
var assistants = usersByRole.get(ASSISTANT_ROLE_CODE);
sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime
.of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers);
//保存数据
saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName);
getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday);
//保存金融数据
saveData(ministerUser, deptUsers, yesterday);
sendFinance(yesterday, accountUsernameToUserMap, ministerUser);
});
}
// 一小时发送一次
@Scheduled(cron = "0 0 * * * ?, 0 40 23 * * ?")
public void generateTeamReportTask() {
private void sendTeamDailyReport() {
LocalDateTime nowDateTime = LocalDateTime.now();
LocalDate nowDate = LocalDate.now();
//查询部门下的所有用户
@ -170,19 +197,46 @@ public class DailyReport {
Map<String, UserWithRolesAndAccountsResp> userWithRolesAndAccountsRespMap,
UserWithRolesAndAccountsResp ministerUser) {
List<FinanceDO> finances = financeService.getFinanceByDate(date);
Map<String, List<FinanceDO>> userFinances = finances.stream()
Map<UserWithRolesAndAccountsResp, List<FinanceDO>> userFinances = finances.stream()
.filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName()))
.collect(Collectors.groupingBy(FinanceDO::getAgentName));
userFinances.forEach((agentName, userFinancesList) -> {
UserWithRolesAndAccountsResp user = userWithRolesAndAccountsRespMap.get(agentName);
.collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName())));
userFinances.forEach((user, userFinancesList) -> {
if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) {
String message = telegramMessageService.buildFinanceMessage(userFinancesList);
String botToken = StrUtil.isEmpty(user.getBotToken()) ? ministerUser.getBotToken() : user.getBotToken();
telegramMessageService.sendMessage(botToken, user.getReportIds(), message);
}
//telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
});
}
private List<CompletableFuture<MemberPagination<List<TeamMember>>>> createFuturesForPagination(AccountResp account,
LocalDate date,
int sumReg) {
int pageSize = 100;
int totalPages = (int) Math.ceil((double) sumReg / pageSize);
return IntStream.range(0, totalPages).mapToObj(page -> {
int currentPageSize = page == totalPages - 1
? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize)
: pageSize;
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(currentPageSize)
.pageNum(page + 1)
.build();
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() {
});
}).toList();
}
/**
* 查询存款失败用户,并发送消息
*
@ -192,88 +246,83 @@ public class DailyReport {
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
LocalDate date) {
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(100)
.build();
List<CompletableFuture<List<TeamMember>>> accountFutureList = new ArrayList<>();
List<CompletableFuture<List<TeamMember>>> futureList = new ArrayList<>();
ministerUser.getAccounts().forEach(account -> {
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
int sumReg = statsService.countNewRegNum(account.getUsername(), date);
sumReg = (sumReg == 0) ? 100 : sumReg;
CompletableFuture<List<TeamMember>> teamMembersFuture = memberPaginationCompletableFuture
.thenApply(MemberPagination::getList)
.thenApplyAsync(members -> members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit()
.compareTo(BigDecimal.ZERO) == 0)
.collect(Collectors.toList()), asyncTaskExecutor)
.thenComposeAsync(membersWithoutDep -> {
List<CompletableFuture<TeamMember>> memberFutures = membersWithoutDep.stream()
.map(memberWithoutDep -> {
PayRecordListReq req = PayRecordListReq.builder()
.startDate(date)
.endDate(date)
.pageSize(100)
.id(memberWithoutDep.getId())
.build();
CompletableFuture<PayRecordList<List<PayRecord>>> completableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() {
});
return completableFuture.thenApplyAsync(pagination -> {
if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination
.getScoreAmountTotal()
.compareTo(BigDecimal.ZERO) == 0) {
return memberWithoutDep;
} else {
return null;
}
}, asyncTaskExecutor);
})
.toList();
List<CompletableFuture<MemberPagination<List<TeamMember>>>> paginationFutures = createFuturesForPagination(account, date, sumReg);
CompletableFuture<Void> allPaginationFutures = CompletableFuture.allOf(paginationFutures.toArray(new CompletableFuture[0]));
return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> memberFutures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}, asyncTaskExecutor)
.thenApplyAsync(membersWithoutDep -> {
// 发送给每个account关联的user用户
if (!membersWithoutDep.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService
.buildFailedPayMessage(accountName, accountMembers);
UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken()
: currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
});
CompletableFuture<List<TeamMember>> aggregatedMembersFuture = allPaginationFutures.thenApply(v -> {
return paginationFutures.stream()
.map(CompletableFuture::join)
.flatMap(memberPagination -> {
List<TeamMember> members = memberPagination.getList();
log.info("members size:{}", members.size());
return members.stream();
})
.collect(Collectors.toList());
});
CompletableFuture<List<TeamMember>> filteredMembersFuture = aggregatedMembersFuture.thenApplyAsync(members -> {
return members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit().compareTo(BigDecimal.ZERO) == 0)
.collect(Collectors.toList());
}, asyncTaskExecutor);
CompletableFuture<List<TeamMember>> membersWithFailedPayFuture = filteredMembersFuture.thenComposeAsync(membersWithoutDep -> {
List<CompletableFuture<TeamMember>> payRecordFutures = membersWithoutDep.stream().map(memberWithoutDep -> {
PayRecordsListReq req = PayRecordsListReq.builder()
.startDate(date)
.endDate(date)
.pageSize(10)
.memberName(memberWithoutDep.getName())
.build();
return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> {
if (CollUtil.isNotEmpty(pagination.getList())
&& pagination.getList().stream().noneMatch(payRecord -> payRecord.getPayStatus() == 2)) {
return memberWithoutDep;
} else {
return null;
}
return membersWithoutDep;
}, asyncTaskExecutor);
}).toList();
futureList.add(teamMembersFuture);
return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> payRecordFutures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}, asyncTaskExecutor);
CompletableFuture<List<TeamMember>> notificationFuture = membersWithFailedPayFuture.thenApplyAsync(membersWithoutDep -> {
if (CollUtil.isNotEmpty(membersWithoutDep)) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? ministerUser.getBotToken() : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification);
}
});
}
return membersWithoutDep;
}, asyncTaskExecutor);
accountFutureList.add(notificationFuture);
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allFutures.thenRunAsync(() -> {
// 主线下的所有的存款失败用户
List<TeamMember> allTeamMembers = futureList.stream()
CompletableFuture<Void> allAccountFutures = CompletableFuture.allOf(accountFutureList.toArray(new CompletableFuture[0]));
allAccountFutures.thenRunAsync(() -> {
List<TeamMember> allTeamMembers = accountFutureList.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.toList();
log.info("All failed pay members size: {}", allTeamMembers.size());
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
StringBuilder combinedNotification = new StringBuilder();
@ -281,19 +330,43 @@ public class DailyReport {
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
combinedNotification.append(notification).append("\n");
});
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
.toString());
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser
.getReportIds(), combinedNotification.toString());
}
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification.toString());
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), combinedNotification.toString());
}
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error collecting and processing data", ex);
return null;
});
}
private CompletableFuture<Pagination<List<PayRecord>>> fetchPaginationPayRecordWithRetry(AccountResp account, PayRecordsListReq req) {
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService.fetchDataForAccount(
account, ApiPathConstants.PAY_RECORDS_LIST_URL, req,
new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
}), asyncTaskExecutor).thenCompose(future -> future)
.exceptionallyCompose(ex -> {
log.error("Error fetching pay records, retrying...", ex);
return fetchPaginationPayRecordWithRetry(account, req);
});
}
private CompletableFuture<PayRecordList<List<PayRecord>>> fetchPayRecordsWithRetry(AccountResp account,
PayRecordListReq req) {
//华体会接口限流严重,先加上
if (PLATFORM_HTH.equals(account.getPlatformName())) {
RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter("hth", 0.5);
log.info("限流器触发中....");
rateLimiter.acquire(); // 通过限流器限流
}
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<PayRecordList<List<PayRecord>>>>() {
}), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
log.error("Error fetching pay records", ex);
return null;
});
}
private void sendDailyReport(LocalDate reportDate,
@ -399,6 +472,8 @@ public class DailyReport {
List<AccountResp> currUserAccounts = deptUser.getAccounts();
List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
// 团队账号暂时不用具体的代理线数据
.filter(accountResp -> !accountResp.getIsTeam())
.map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> agentData.getCurData()
@ -445,7 +520,8 @@ public class DailyReport {
private void saveData(UserWithRolesAndAccountsResp ministerUser,
List<UserWithRolesAndAccountsResp> deptUsers,
LocalDate reportDate) {
LocalDate reportDate,
Map<String, String> accountUsernameWithTopAgentName) {
// 获取传入年月
YearMonth inputYearMonth = YearMonth.from(reportDate);
@ -471,6 +547,7 @@ public class DailyReport {
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
FinanceDO financeDO = new FinanceDO();
BeanUtil.copyProperties(finance, financeDO);
financeDO.setTopAgentName(accountResp.getUsername());
return financeDO;
}).toList();
financeService.addAll(financeReqList);
@ -510,6 +587,7 @@ public class DailyReport {
.orElseThrow(() -> new BusinessException("No data found for report date"));
StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO);
statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername()));
return statsDO;
}, asyncTaskExecutor)
.exceptionally(ex -> {

View File

@ -19,16 +19,18 @@ package com.zayac.admin.service;
import cn.hutool.core.lang.TypeReference;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import com.zayac.admin.config.RateLimiterConfig;
import com.zayac.admin.resp.ApiResponse;
import com.zayac.admin.system.model.resp.AccountResp;
import io.netty.handler.timeout.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatusCode;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -38,21 +40,19 @@ import top.continew.starter.core.exception.BusinessException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class CompletableFutureWebClientService {
private final WebClient webClient;
private final Semaphore semaphore;
private final RateLimiterConfig rateLimiterConfig;
private final ObjectMapper objectMapper;
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests,
RateLimiterConfig rateLimiterConfig,
ObjectMapper objectMapper) {
this.webClient = webClientBuilder.build();
this.semaphore = new Semaphore(maxConcurrentRequests);
this.rateLimiterConfig = rateLimiterConfig;
this.objectMapper = objectMapper;
}
@ -60,56 +60,62 @@ public class CompletableFutureWebClientService {
String apiPath,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture();
RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter(account.getPlatformUrl() + apiPath);
return fetchData(rateLimiter, account.getPlatformUrl() + apiPath, account
.getHeaders(), params, typeRef, account).toFuture();
}
public <T> Mono<T> fetchData(String url,
public <T> Mono<T> fetchData(RateLimiter rateLimiter,
String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef,
AccountResp account) {
return Mono.fromCallable(() -> {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Unable to acquire a permit");
}
rateLimiter.acquire();
return true;
}).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> {
try {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
} catch (Exception e) {
log.warn("Header conversion exception: " + e.getMessage());
throw new BusinessException("Header conversion failed", e);
}
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
.bodyToMono(String.class)
.doOnNext(resStr -> {
log.debug("request url:{}", url);
log.debug("request headers :{}", headers);
log.debug("request params:{}", params);
log.debug("response {}", resStr);
.subscribeOn(Schedulers.boundedElastic())
.flatMap(ignored -> webClient.post().uri(url).headers(httpHeaders -> {
addHeaders(httpHeaders, headers);
})
.flatMap(body -> {
try {
ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory()
.constructType(typeRef.getType()));
return Mono.justOrEmpty(apiResponse);
} catch (Exception e) {
log.warn("JSON parsing exception: " + e.getMessage());
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
}
})
.flatMap(response -> respHandler(response, account))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException)))
.doFinally(signal -> semaphore.release());
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
.bodyToMono(String.class)
.doOnNext(resStr -> {
log.debug("Request URL: {}", url);
log.debug("Request headers: {}", headers);
log.debug("Request params: {}", params);
log.debug("Response: {}", resStr);
})
.flatMap(body -> {
try {
ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory()
.constructType(typeRef.getType()));
return Mono.justOrEmpty(apiResponse);
} catch (Exception e) {
log.warn("JSON parsing exception: " + e.getMessage());
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
}
})
.flatMap(response -> respHandler(response, account))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(this::isRetryableException)));
}
private boolean isRetryableException(Throwable throwable) {
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable;
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable || throwable instanceof WebClientRequestException;
}
private void addHeaders(org.springframework.http.HttpHeaders httpHeaders, String headers) {
try {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<Map<String, String>>() {
}, true);
headerMap.forEach(httpHeaders::add);
} catch (Exception e) {
log.warn("Header conversion exception: " + e.getMessage());
throw new BusinessException("Header conversion failed", e);
}
}
private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) {

View File

@ -98,7 +98,15 @@ public class DepositService {
LocalDate nowDate,
LocalDateTime nowDateTime,
Executor asyncTaskExecutor) {
PayRecordsListReq req = createPayRecordsListReq(accountWithChange.getAgentName(), nowDate);
// 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题
LocalDate startDate = nowDateTime.minusHours(1).toLocalDate();
PayRecordsListReq req = PayRecordsListReq.builder()
.startDate(startDate)
.endDate(nowDate)
.pageSize(100)
.payState(2)
.agentName(accountWithChange.getAgentName())
.build();
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
@ -116,16 +124,6 @@ public class DepositService {
});
}
private PayRecordsListReq createPayRecordsListReq(String agentName, LocalDate nowDate) {
return PayRecordsListReq.builder()
.startDate(nowDate)
.endDate(nowDate)
.pageSize(100)
.payState(2)
.agentName(agentName)
.build();
}
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
TeamAccountWithChange accountWithChange,
AccountResp account,

View File

@ -51,6 +51,7 @@ public class TelegramMessageService {
}
String fullMessage = String.format("%s|%s|%s", botToken, targetId, message);
this.rabbitTemplate.convertAndSend("message_queue", fullMessage);
}
public void sendMessage(String botToken, List<Long> targetIds, String message) {

View File

@ -1,128 +0,0 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.service;
import cn.hutool.core.lang.TypeReference;
import cn.hutool.json.JSONUtil;
import com.zayac.admin.resp.ApiResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import top.continew.starter.core.exception.BusinessException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Semaphore;
/**
* 封装了一个简单可用的调用后端api的服务类
*/
@Service
public class WebClientService {
private final WebClient webClient;
private final Semaphore semaphore;
public WebClientService(@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests) {
this.webClient = WebClient.create();
this.semaphore = new Semaphore(maxConcurrentRequests);
}
public <T> T fetchData(String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
try {
semaphore.acquire(); // 尝试获取许可
return this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
.bodyToMono(String.class)
.map(body -> new BusinessException("Error response: " + body)))
.bodyToMono(typeRef)
.flatMap(this::respHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
.block(); // 遇到网络问题,每三秒重试一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException("Failed to acquire semaphore", e);
} finally {
semaphore.release(); // 释放许可
}
}
public <T> Mono<T> MonoFetchData(String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
System.out.println(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getId());
return Mono.fromCallable(() -> {
semaphore.acquire(); // 尝试获取许可
return true;
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
.bodyToMono(String.class)
.map(body -> new BusinessException("Error response: " + body)))
.bodyToMono(typeRef)
.flatMap(this::monoRespHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
.doFinally(signal -> semaphore.release()));
}
private <T> Mono<T> monoRespHandler(ApiResponse<T> apiResponse) {
// 根据响应状态码处理逻辑
if (apiResponse.getStatusCode() == 6000) {
return Mono.just(apiResponse.getData()); // 正常情况下返回数据
} else if (apiResponse.getStatusCode() == 6001) {
return Mono.error(new BusinessException("API token expired")); // API令牌过期
} else {
return Mono.error(new BusinessException("Error status code: " + apiResponse.getStatusCode())); // 其他错误
}
}
private <T> Mono<T> respHandler(ApiResponse<T> apiResponse) {
System.out.println(apiResponse);
//api接口只有在statusCode为6000的时候返回的数据才是正常数据
if (apiResponse.getStatusCode().equals(6000)) {
return Mono.just(apiResponse.getData());
} else if (apiResponse.getStatusCode().equals(6001)) {
// TODO 调用登录接口,刷新header信息重试
return Mono.error(new BusinessException("xApiToken失效")); // 状态码6001登录失效
} else {
return Mono.error(new BusinessException("错误状态码: " + apiResponse.getStatusCode())); // 其他状态码抛出异常
}
}
}

View File

@ -32,6 +32,7 @@
a.id AS account_id,
a.nickname AS account_nickname,
a.username AS account_username,
a.is_team As account_is_team,
a.status AS account_status,
a.headers AS account_headers,
a.platform_id AS account_platform_id,
@ -82,6 +83,7 @@
<result column="account_platform_id" property="platformId"/>
<result column="platform_name" property="platformName"/>
<result column="platform_url" property="platformUrl"/>
<result column="account_is_team" property="isTeam"/>
</collection>
</collection>
</resultMap>

View File

@ -37,13 +37,13 @@ spring.datasource:
lazy: true
driver-class-name: com.mysql.cj.jdbc.Driver
type: ${spring.datasource.type}
# # PostgreSQL 库配置
# postgresql:
# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false
# username: ${DB_USER:root}
# password: ${DB_PWD:123456}
# driver-class-name: org.postgresql.Driver
# type: ${spring.datasource.type}
# # PostgreSQL 库配置
# postgresql:
# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false
# username: ${DB_USER:root}
# password: ${DB_PWD:123456}
# driver-class-name: org.postgresql.Driver
# type: ${spring.datasource.type}
# Hikari 连接池配置完整配置请参阅https://github.com/brettwooldridge/HikariCP
hikari:
# 最大连接数量(默认 10根据实际环境调整
@ -60,7 +60,7 @@ spring.datasource:
## Liquibase 配置
spring.liquibase:
# 是否启用
enabled: true
enabled: false
# 配置文件路径
change-log: classpath:/db/changelog/db.changelog-master.yaml
@ -273,7 +273,8 @@ avatar:
support-suffix: jpg,jpeg,png,gif
webclient:
max-concurrent-requests: 60
max-requests-per-second: 10.0
spring:
rabbitmq:

View File

@ -228,8 +228,9 @@ management.health:
# 关闭邮箱健康检查(邮箱配置错误或邮箱服务器不可用时,健康检查会报错)
enabled: false
### 每秒钟接口调用的次数
webclient:
max-concurrent-requests: 60
max-requests-per-second: 10.0
spring:
rabbitmq: