Compare commits

...

7 Commits

Author SHA1 Message Date
303db89f46 修复了不能显示首存金额的问题 2024-06-17 17:06:33 +08:00
0cd73df06a 调整线程池
优化部分代码逻辑,现在查询首存不用那么麻烦了
调整定时任务
2024-06-17 06:37:31 +08:00
584350f843 细节优化 2024-06-12 13:17:11 +08:00
c0a15b6580 细节优化 2024-06-10 12:32:26 +08:00
708182e67f 新增限流器
修复不能正常查询存款失败用户的问题
细节优化
2024-06-10 03:05:05 +08:00
4bbfdb4688 新增23:40报数的任务 2024-06-08 23:50:04 +08:00
92607bcaee 新增了负盈利消息通知 2024-06-08 23:31:26 +08:00
28 changed files with 840 additions and 377 deletions

View File

@ -37,13 +37,13 @@ spring.datasource:
lazy: true lazy: true
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
type: ${spring.datasource.type} type: ${spring.datasource.type}
# # PostgreSQL 库配置 # # PostgreSQL 库配置
# postgresql: # postgresql:
# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false # url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false
# username: ${DB_USER:root} # username: ${DB_USER:root}
# password: ${DB_PWD:123456} # password: ${DB_PWD:123456}
# driver-class-name: org.postgresql.Driver # driver-class-name: org.postgresql.Driver
# type: ${spring.datasource.type} # type: ${spring.datasource.type}
# Hikari 连接池配置完整配置请参阅https://github.com/brettwooldridge/HikariCP # Hikari 连接池配置完整配置请参阅https://github.com/brettwooldridge/HikariCP
hikari: hikari:
# 最大连接数量(默认 10根据实际环境调整 # 最大连接数量(默认 10根据实际环境调整
@ -273,7 +273,7 @@ avatar:
support-suffix: jpg,jpeg,png,gif support-suffix: jpg,jpeg,png,gif
webclient: webclient:
max-concurrent-requests: 60 max-requests-per-second: 10.0
spring: spring:
rabbitmq: rabbitmq:

View File

@ -229,7 +229,7 @@ management.health:
enabled: false enabled: false
webclient: webclient:
max-concurrent-requests: 60 max-requests-per-second: 10.0
spring: spring:
rabbitmq: rabbitmq:

View File

@ -26,6 +26,11 @@
<groupId>com.zayac</groupId> <groupId>com.zayac</groupId>
<artifactId>zayac-admin-system</artifactId> <artifactId>zayac-admin-system</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -16,13 +16,26 @@
package com.zayac.admin.agent.mapper; package com.zayac.admin.agent.mapper;
import org.apache.ibatis.annotations.Select;
import top.continew.starter.data.mybatis.plus.base.BaseMapper; import top.continew.starter.data.mybatis.plus.base.BaseMapper;
import com.zayac.admin.agent.model.entity.StatsDO; import com.zayac.admin.agent.model.entity.StatsDO;
import java.time.LocalDate;
/** /**
* 代理每日数据 Mapper * 代理每日数据 Mapper
* *
* @author zayac * @author zayac
* @since 2024/06/04 17:10 * @since 2024/06/04 17:10
*/ */
public interface DailyStatsMapper extends BaseMapper<StatsDO> {} public interface DailyStatsMapper extends BaseMapper<StatsDO> {
/**
* 查询查询新注册总数
*
* @param topAgentName 上级代理线
* @param date 日期
* @return int
*/
@Select("select COALESCE(sum(is_new), 0) from (select distinct agent_name, is_new from agent_stats where top_agent_name = #{topAgentName} and statics_date = #{date}) as distinct_agents")
int selectCountByTopAgentNameAndDate(String topAgentName, LocalDate date);
}

View File

@ -16,13 +16,20 @@
package com.zayac.admin.agent.mapper; package com.zayac.admin.agent.mapper;
import org.apache.ibatis.annotations.Select;
import top.continew.starter.data.mybatis.plus.base.BaseMapper; import top.continew.starter.data.mybatis.plus.base.BaseMapper;
import com.zayac.admin.agent.model.entity.FinanceDO; import com.zayac.admin.agent.model.entity.FinanceDO;
import java.time.LocalDate;
import java.util.List;
/** /**
* 代理线财务报 Mapper * 代理线财务报 Mapper
* *
* @author zayac * @author zayac
* @since 2024/06/04 17:14 * @since 2024/06/04 17:14
*/ */
public interface FinanceMapper extends BaseMapper<FinanceDO> {} public interface FinanceMapper extends BaseMapper<FinanceDO> {
@Select("SELECT * FROM agent_finance WHERE DATE(create_time) = #{date}")
List<FinanceDO> selectFinancesByCreateTime(LocalDate date);
}

View File

@ -314,4 +314,8 @@ public class FinanceDO extends BaseDO {
* 彩票利润 * 彩票利润
*/ */
private BigDecimal lotteryProfit; private BigDecimal lotteryProfit;
/**
* 上级代理线名称
*/
private String topAgentName;
} }

View File

@ -101,7 +101,7 @@ public class StatsDO extends BaseDO {
private BigDecimal promoDividend; private BigDecimal promoDividend;
/** /**
* *
*/ */
private BigDecimal rebate; private BigDecimal rebate;
@ -161,17 +161,22 @@ public class StatsDO extends BaseDO {
private LocalDateTime updatedAt; private LocalDateTime updatedAt;
/** /**
* *
*/ */
private BigDecimal oldDeposit; private BigDecimal oldDeposit;
/** /**
* *
*/ */
private Integer oldDepositCount; private Integer oldDepositCount;
/** /**
* *
*/ */
private BigDecimal newDeposit; private BigDecimal newDeposit;
/**
* 上级代理线名称
*/
private String topAgentName;
} }

View File

@ -23,6 +23,7 @@ import com.zayac.admin.agent.model.req.FinanceReq;
import com.zayac.admin.agent.model.resp.FinanceDetailResp; import com.zayac.admin.agent.model.resp.FinanceDetailResp;
import com.zayac.admin.agent.model.resp.FinanceResp; import com.zayac.admin.agent.model.resp.FinanceResp;
import java.time.LocalDate;
import java.util.List; import java.util.List;
/** /**
@ -33,4 +34,6 @@ import java.util.List;
*/ */
public interface FinanceService extends BaseService<FinanceResp, FinanceDetailResp, FinanceQuery, FinanceReq> { public interface FinanceService extends BaseService<FinanceResp, FinanceDetailResp, FinanceQuery, FinanceReq> {
void addAll(List<FinanceDO> financeReqList); void addAll(List<FinanceDO> financeReqList);
List<FinanceDO> getFinanceByDate(LocalDate yesterday);
} }

View File

@ -23,6 +23,7 @@ import com.zayac.admin.agent.model.req.StatsReq;
import com.zayac.admin.agent.model.resp.StatsDetailResp; import com.zayac.admin.agent.model.resp.StatsDetailResp;
import com.zayac.admin.agent.model.resp.StatsResp; import com.zayac.admin.agent.model.resp.StatsResp;
import java.time.LocalDate;
import java.util.List; import java.util.List;
/** /**
@ -33,4 +34,6 @@ import java.util.List;
*/ */
public interface StatsService extends BaseService<StatsResp, StatsDetailResp, StatsQuery, StatsReq> { public interface StatsService extends BaseService<StatsResp, StatsDetailResp, StatsQuery, StatsReq> {
void addAll(List<StatsDO> list); void addAll(List<StatsDO> list);
int countNewRegNum(String topAgentName, LocalDate date);
} }

View File

@ -29,6 +29,7 @@ import com.zayac.admin.agent.model.resp.StatsDetailResp;
import com.zayac.admin.agent.model.resp.StatsResp; import com.zayac.admin.agent.model.resp.StatsResp;
import com.zayac.admin.agent.service.StatsService; import com.zayac.admin.agent.service.StatsService;
import java.time.LocalDate;
import java.util.List; import java.util.List;
/** /**
@ -44,4 +45,15 @@ public class DailyStatsServiceImpl extends BaseServiceImpl<DailyStatsMapper, Sta
public void addAll(List<StatsDO> list) { public void addAll(List<StatsDO> list) {
baseMapper.insertBatch(list); baseMapper.insertBatch(list);
} }
/**
* 统计指定日期的注册情况
*
* @param date 日期
* @return int
*/
@Override
public int countNewRegNum(String topAgentName, LocalDate date) {
return baseMapper.selectCountByTopAgentNameAndDate(topAgentName, date);
}
} }

View File

@ -29,6 +29,7 @@ import com.zayac.admin.agent.model.resp.FinanceDetailResp;
import com.zayac.admin.agent.model.resp.FinanceResp; import com.zayac.admin.agent.model.resp.FinanceResp;
import com.zayac.admin.agent.service.FinanceService; import com.zayac.admin.agent.service.FinanceService;
import java.time.LocalDate;
import java.util.List; import java.util.List;
/** /**
@ -44,4 +45,16 @@ public class FinanceServiceImpl extends BaseServiceImpl<FinanceMapper, FinanceDO
baseMapper.insertBatch(financeReqList); baseMapper.insertBatch(financeReqList);
} }
/**
* 根据日期查询所有的财务信息
*
* @param date 日期
* @return List<FinanceDO>
*/
@Override
public List<FinanceDO> getFinanceByDate(LocalDate date) {
return baseMapper.selectFinancesByCreateTime(date);
}
} }

View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.config;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Component
@Slf4j
public class RateLimiterConfig {
private final ConcurrentMap<String, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
private final double maxRequestsPerSecond;
public RateLimiterConfig(@Value("${webclient.max-requests-per-second}") double maxRequestsPerSecond) {
this.maxRequestsPerSecond = maxRequestsPerSecond;
}
public RateLimiter getRateLimiter(String url) {
RateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(url, k -> RateLimiter.create(maxRequestsPerSecond));
return rateLimiter;
}
public RateLimiter getRateLimiter(String key, double maxRequests) {
return rateLimiterMap.computeIfAbsent(key, k -> RateLimiter.create(maxRequests));
}
}

View File

@ -33,9 +33,9 @@ public class ThreadPoolConfig {
@Bean(name = "asyncTaskExecutor") @Bean(name = "asyncTaskExecutor")
public Executor taskExecutor() { public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); executor.setCorePoolSize(20);
executor.setMaxPoolSize(20); executor.setMaxPoolSize(50);
executor.setQueueCapacity(500); executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("asyncTaskExecutor-"); executor.setThreadNamePrefix("asyncTaskExecutor-");
executor.initialize(); executor.initialize();
return executor; return executor;
@ -49,7 +49,7 @@ public class ThreadPoolConfig {
@Bean @Bean
public ThreadPoolTaskScheduler taskScheduler() { public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10); scheduler.setPoolSize(20);
scheduler.setThreadNamePrefix("ScheduledTask-"); scheduler.setThreadNamePrefix("ScheduledTask-");
scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(60); scheduler.setAwaitTerminationSeconds(60);

View File

@ -74,4 +74,9 @@ public class ApiPathConstants {
* 团队财务报表 * 团队财务报表
*/ */
public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team"; public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team";
/**
* 活跃会员
*/
public static final String ACTIVE_LIST = "/agent/api/v1/member/activeList";
} }

View File

@ -0,0 +1,74 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.req;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDate;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ActiveListReq {
/**
* 活跃类型
*/
private Integer activeType;
/**
* 统计月份
*/
@JsonFormat(pattern = "yyyy-MM")
private LocalDate date;
/**
* 是否重置
*/
@Builder.Default
private Boolean isRest = false;
/**
* 会员名称
*/
private String name;
/**
* 页码
*/
@Builder.Default
private int pageNum = 1;
/**
* 每页显示数据
*/
@Builder.Default
private int pageSize = 10;
/**
* 注册结束日期
*/
private LocalDate registerEndDate;
/**
* 注册开始日期
*/
private LocalDate registerStartDate;
/**
* 上级代理线
*/
private String topAgentName;
}

View File

@ -0,0 +1,74 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.resp;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* "memberId": 509419585,
* "name": "zaq198014",
* "topId": 500007970,
* "topName": "ky3tg107032",
* "firstPayTime": "2024-06-15 18:20:49",
* "registerTime": "2023-08-18 19:18:30",
* "deposit": 1522,
* "bets": 1394,
* "activeStr": "首存会员"
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ActiveListResp {
/**
* 会员id
*/
private Long memberId;
/**
* 会员名称
*/
private String name;
/**
* 会员上级id
*/
private Long topId;
/**
* 首存时间
*/
private LocalDateTime firstPayTime;
/**
* 注册时间
*/
private LocalDateTime registerTime;
/**
* 存款
*/
private BigDecimal deposit;
/**
* 投注
*/
private BigDecimal bets;
/**
* 活跃类型
*/
private String activeStr;
}

View File

@ -18,7 +18,6 @@ package com.zayac.admin.schedule;
import com.zayac.admin.req.team.TeamInfoReq; import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.resp.team.Team; import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.service.*; import com.zayac.admin.service.*;
import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.model.resp.DeptUsersResp; import com.zayac.admin.system.model.resp.DeptUsersResp;
@ -39,7 +38,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -118,17 +116,12 @@ public class CheckRegAndDep {
return teamFuture.thenComposeAsync(currentTeamInfo -> { return teamFuture.thenComposeAsync(currentTeamInfo -> {
log.info("Previous Team Info: {}", prevTeamInfo); log.info("Previous Team Info: {}", prevTeamInfo);
log.info("Current Team Info: {}", currentTeamInfo); log.info("Current Team Info: {}", currentTeamInfo);
Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity()));
CompletableFuture<Void> registrationProcess = registrationService CompletableFuture<Void> registrationProcess = registrationService
.processRegistration(minister, account, accountUsernameToUserMap, teamAccountMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor) .processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor);
.thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor);
CompletableFuture<Void> depositProcess = depositService CompletableFuture<Void> depositProcess = depositService
.processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor) .processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor);
.thenRunAsync(() -> log.info("Deposit process completed"), asyncTaskExecutor);
return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> { return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> {
teamService.updateTeamInfo(account, currentTeamInfo); teamService.updateTeamInfo(account, currentTeamInfo);

View File

@ -19,6 +19,7 @@ package com.zayac.admin.schedule;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.RateLimiter;
import com.zayac.admin.agent.model.entity.FinanceDO; import com.zayac.admin.agent.model.entity.FinanceDO;
import com.zayac.admin.agent.model.entity.StatsDO; import com.zayac.admin.agent.model.entity.StatsDO;
import com.zayac.admin.agent.model.req.FinanceSumReq; import com.zayac.admin.agent.model.req.FinanceSumReq;
@ -26,9 +27,11 @@ import com.zayac.admin.agent.service.FinanceService;
import com.zayac.admin.agent.service.FinanceSumService; import com.zayac.admin.agent.service.FinanceSumService;
import com.zayac.admin.agent.service.StatsService; import com.zayac.admin.agent.service.StatsService;
import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.config.RateLimiterConfig;
import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.AgentDataVisualListReq; import com.zayac.admin.req.AgentDataVisualListReq;
import com.zayac.admin.req.PayRecordListReq; import com.zayac.admin.req.PayRecordListReq;
import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamFinanceReq; import com.zayac.admin.req.team.TeamFinanceReq;
import com.zayac.admin.req.team.TeamInfoReq; import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.req.team.TeamMemberReq; import com.zayac.admin.req.team.TeamMemberReq;
@ -56,13 +59,16 @@ import java.time.LocalTime;
import java.time.YearMonth; import java.time.YearMonth;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@Profile("dev") @Profile("prod")
public class DailyReport { public class DailyReport {
private final TeamService teamService; private final TeamService teamService;
private final DeptService deptService; private final DeptService deptService;
@ -74,15 +80,18 @@ public class DailyReport {
private final FinanceSumService financeSumService; private final FinanceSumService financeSumService;
private final CompletableFutureWebClientService completableFutureWebClientService; private final CompletableFutureWebClientService completableFutureWebClientService;
private final Executor asyncTaskExecutor; private final Executor asyncTaskExecutor;
private final RateLimiterConfig rateLimiterConfig;
private static final String MINISTER_ROLE_CODE = "minister"; private static final String MINISTER_ROLE_CODE = "minister";
private static final String SEO_TEAM_LEADER_ROLE_CODE = "seo_team_leader"; private static final String SEO_TEAM_LEADER_ROLE_CODE = "seo_team_leader";
private static final String ASSISTANT_ROLE_CODE = "assistant"; private static final String ASSISTANT_ROLE_CODE = "assistant";
private static final String SEO_ROLE_CODE = "seo"; private static final String SEO_ROLE_CODE = "seo";
private static final String PLATFORM_HTH = "华体会";
@Scheduled(cron = "0 40 11,14,17,21 * * ?") @Scheduled(cron = "0 40 11,14,17,21,23,1,3,5 * * ?")
public void teamAccountDailyReport() { public void teamAccountDailyReport() {
LocalDateTime nowDateTime = LocalDateTime.now(); LocalDateTime nowDateTime = LocalDateTime.now();
log.info("dailySummarize started at {}", nowDateTime);
LocalDate nowDate = LocalDate.now(); LocalDate nowDate = LocalDate.now();
//查询部门下的所有用户 //查询部门下的所有用户
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE); List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
@ -105,8 +114,24 @@ public class DailyReport {
} }
// @Scheduled(cron = "0 40 23 * * ?")
// public void ScheduledSendTeamDailyReport1() {
// log.info("ScheduledSendTeamDailyReport1 started at {}", LocalDateTime.now());
// sendTeamDailyReport();
// log.info("ScheduledSendTeamDailyReport1 finished at {}", LocalDateTime.now());
// }
@Scheduled(cron = "0 0/30 1-23 * * ?")
public void ScheduledSendTeamDailyReport2() {
log.info("ScheduledSendTeamDailyReport2 started at {}", LocalDateTime.now());
sendTeamDailyReport();
log.info("ScheduledSendTeamDailyReport2 finished at {}", LocalDateTime.now());
}
@Scheduled(cron = "0 15 0 * * ?") @Scheduled(cron = "0 15 0 * * ?")
public void dailySummarize() { public void dailySummarize() {
log.info("dailySummarize started at {}", LocalDateTime.now());
LocalDate yesterday = LocalDate.now().minusDays(1); LocalDate yesterday = LocalDate.now().minusDays(1);
//查询部门下的所有用户 //查询部门下的所有用户
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE); List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
@ -129,20 +154,34 @@ public class DailyReport {
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0); var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
//建立团队之间账号的联系
Map<String, String> accountNameWithTopAgentName = new HashMap<>();
dept.getUsers()
.stream()
.flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream())
.forEach(accountResp -> ministerUser.getAccounts()
.stream()
.filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount
.getPlatformId()))
.findFirst()
.ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp
.getUsername(), ministerAccount.getUsername())));
//获取账号不为空的用户 //获取账号不为空的用户
var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList(); var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList();
var assistants = usersByRole.get(ASSISTANT_ROLE_CODE); var assistants = usersByRole.get(ASSISTANT_ROLE_CODE);
sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime
.of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers);
getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); //保存数据
saveData(ministerUser, deptUsers, yesterday); saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName);
//getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday);
sendFinance(yesterday, accountUsernameToUserMap, ministerUser);
}); });
log.info("dailySummarize finished at {}", LocalDateTime.now());
} }
// 一小时发送一次 private void sendTeamDailyReport() {
@Scheduled(cron = "0 0 * * * ?")
public void generateTeamReportTask() {
LocalDateTime nowDateTime = LocalDateTime.now(); LocalDateTime nowDateTime = LocalDateTime.now();
LocalDate nowDate = LocalDate.now(); LocalDate nowDate = LocalDate.now();
//查询部门下的所有用户 //查询部门下的所有用户
@ -164,6 +203,51 @@ public class DailyReport {
}); });
} }
public void sendFinance(LocalDate date,
Map<String, UserWithRolesAndAccountsResp> userWithRolesAndAccountsRespMap,
UserWithRolesAndAccountsResp ministerUser) {
List<FinanceDO> finances = financeService.getFinanceByDate(date);
Map<UserWithRolesAndAccountsResp, List<FinanceDO>> userFinances = finances.stream()
.filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName()))
.collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName())));
userFinances.forEach((user, userFinancesList) -> {
if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify()) && !user.getUserId()
.equals(ministerUser.getUserId())) {
String message = telegramMessageService.buildFinanceMessage(userFinancesList);
String botToken = StrUtil.isEmpty(user.getBotToken()) ? ministerUser.getBotToken() : user.getBotToken();
telegramMessageService.sendMessage(botToken, user.getReportIds(), message);
}
//telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
});
}
private List<CompletableFuture<MemberPagination<List<TeamMember>>>> createFuturesForPagination(AccountResp account,
LocalDate date,
int sumReg) {
int pageSize = 100;
int totalPages = (int)Math.ceil((double)sumReg / pageSize);
return IntStream.range(0, totalPages).mapToObj(page -> {
int currentPageSize = page == totalPages - 1
? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize)
: pageSize;
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(currentPageSize)
.pageNum(page + 1)
.build();
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() {
});
}).toList();
}
/** /**
* 查询存款失败用户,并发送消息 * 查询存款失败用户,并发送消息
* *
@ -173,44 +257,43 @@ public class DailyReport {
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
LocalDate date) { LocalDate date) {
TeamMemberReq memberListReq = TeamMemberReq.builder() List<CompletableFuture<List<TeamMember>>> accountFutureList = new ArrayList<>();
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(100)
.build();
List<CompletableFuture<List<TeamMember>>> futureList = new ArrayList<>();
ministerUser.getAccounts().forEach(account -> { ministerUser.getAccounts().forEach(account -> {
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService int sumReg = statsService.countNewRegNum(account.getUsername(), date);
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { sumReg = (sumReg == 0) ? 100 : sumReg;
});
CompletableFuture<List<TeamMember>> teamMembersFuture = memberPaginationCompletableFuture List<CompletableFuture<MemberPagination<List<TeamMember>>>> paginationFutures = createFuturesForPagination(account, date, sumReg);
.thenApply(MemberPagination::getList) CompletableFuture<Void> allPaginationFutures = CompletableFuture.allOf(paginationFutures
.toArray(new CompletableFuture[0]));
CompletableFuture<List<TeamMember>> aggregatedMembersFuture = allPaginationFutures
.thenApply(v -> paginationFutures.stream().map(CompletableFuture::join).flatMap(memberPagination -> {
List<TeamMember> members = memberPagination.getList();
log.info("members size:{}", members.size());
return members.stream();
}).collect(Collectors.toList()));
CompletableFuture<List<TeamMember>> filteredMembersFuture = aggregatedMembersFuture
.thenApplyAsync(members -> members.stream() .thenApplyAsync(members -> members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit() .filter(member -> member.getDeposit() != null && member.getDeposit()
.compareTo(BigDecimal.ZERO) == 0) .compareTo(BigDecimal.ZERO) == 0)
.collect(Collectors.toList()), asyncTaskExecutor) .collect(Collectors.toList()), asyncTaskExecutor);
CompletableFuture<List<TeamMember>> membersWithFailedPayFuture = filteredMembersFuture
.thenComposeAsync(membersWithoutDep -> { .thenComposeAsync(membersWithoutDep -> {
List<CompletableFuture<TeamMember>> memberFutures = membersWithoutDep.stream() List<CompletableFuture<TeamMember>> payRecordFutures = membersWithoutDep.stream()
.map(memberWithoutDep -> { .map(memberWithoutDep -> {
PayRecordListReq req = PayRecordListReq.builder() PayRecordsListReq req = PayRecordsListReq.builder()
.startDate(date) .startDate(date)
.endDate(date) .endDate(date)
.pageSize(100) .pageSize(10)
.id(memberWithoutDep.getId()) .memberName(memberWithoutDep.getName())
.build(); .build();
CompletableFuture<PayRecordList<List<PayRecord>>> completableFuture = completableFutureWebClientService return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> {
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() { if (CollUtil.isNotEmpty(pagination.getList()) && pagination.getList()
}); .stream()
return completableFuture.thenApplyAsync(pagination -> { .noneMatch(payRecord -> payRecord.getPayStatus() == 2)) {
if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination
.getScoreAmountTotal()
.compareTo(BigDecimal.ZERO) == 0) {
return memberWithoutDep; return memberWithoutDep;
} else { } else {
return null; return null;
@ -219,15 +302,16 @@ public class DailyReport {
}) })
.toList(); .toList();
return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0])) return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> memberFutures.stream() .thenApply(v -> payRecordFutures.stream()
.map(CompletableFuture::join) .map(CompletableFuture::join)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList())); .collect(Collectors.toList()));
}, asyncTaskExecutor) }, asyncTaskExecutor);
CompletableFuture<List<TeamMember>> notificationFuture = membersWithFailedPayFuture
.thenApplyAsync(membersWithoutDep -> { .thenApplyAsync(membersWithoutDep -> {
// 发送给每个account关联的user用户 if (CollUtil.isNotEmpty(membersWithoutDep)) {
if (!membersWithoutDep.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream() Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)); .collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> { groupByTopAgentName.forEach((accountName, accountMembers) -> {
@ -238,23 +322,24 @@ public class DailyReport {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken() ? ministerUser.getBotToken()
: currUser.getBotToken(); : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification);
} }
}); });
} }
return membersWithoutDep; return membersWithoutDep;
}, asyncTaskExecutor); }, asyncTaskExecutor);
futureList.add(teamMembersFuture); accountFutureList.add(notificationFuture);
}); });
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])); CompletableFuture<Void> allAccountFutures = CompletableFuture.allOf(accountFutureList
allFutures.thenRunAsync(() -> { .toArray(new CompletableFuture[0]));
// 主线下的所有的存款失败用户 allAccountFutures.thenRunAsync(() -> {
List<TeamMember> allTeamMembers = futureList.stream() List<TeamMember> allTeamMembers = accountFutureList.stream()
.map(CompletableFuture::join) .map(CompletableFuture::join)
.flatMap(List::stream) .flatMap(List::stream)
.toList(); .toList();
log.info("All failed pay members size: {}", allTeamMembers.size());
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream() Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName))); .collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
StringBuilder combinedNotification = new StringBuilder(); StringBuilder combinedNotification = new StringBuilder();
@ -262,19 +347,45 @@ public class DailyReport {
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers); String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
combinedNotification.append(notification).append("\n"); combinedNotification.append(notification).append("\n");
}); });
telegramMessageService telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
.toString()); .toString());
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) { if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser
.getReportIds(), combinedNotification.toString()); .getReportIds(), combinedNotification.toString());
} }
}, asyncTaskExecutor).exceptionally(ex -> { }, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error collecting and processing data", ex); log.error("Error collecting and processing data", ex);
return null; return null;
}); });
}
private CompletableFuture<Pagination<List<PayRecord>>> fetchPaginationPayRecordWithRetry(AccountResp account,
PayRecordsListReq req) {
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
}), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
log.error("Error fetching pay records, retrying...", ex);
return fetchPaginationPayRecordWithRetry(account, req);
});
}
private CompletableFuture<PayRecordList<List<PayRecord>>> fetchPayRecordsWithRetry(AccountResp account,
PayRecordListReq req) {
//华体会接口限流严重,先加上
if (PLATFORM_HTH.equals(account.getPlatformName())) {
RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter("hth", 0.2);
log.info("限流器触发中....");
rateLimiter.acquire(); // 通过限流器限流
}
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<PayRecordList<List<PayRecord>>>>() {
}), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
log.error("Error fetching pay records", ex);
return null;
});
} }
private void sendDailyReport(LocalDate reportDate, private void sendDailyReport(LocalDate reportDate,
@ -309,11 +420,13 @@ public class DailyReport {
LocalDateTime startDateTime, LocalDateTime startDateTime,
LocalDateTime endDateTime, LocalDateTime endDateTime,
List<UserWithRolesAndAccountsResp> assistants) { List<UserWithRolesAndAccountsResp> assistants) {
log.info("Starting to generate and send team report for minister user: {}", ministerUser.getUsername());
return CompletableFuture.runAsync(() -> { return CompletableFuture.runAsync(() -> {
List<String[]> rows = new ArrayList<>(); ConcurrentLinkedQueue<String[]> rows = new ConcurrentLinkedQueue<>();
List<AccountResp> accounts = ministerUser.getAccounts(); List<AccountResp> accounts = ministerUser.getAccounts();
int[] totals = {0, 0}; AtomicIntegerArray totals = new AtomicIntegerArray(2);
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
List<CompletableFuture<Void>> futures = accounts.stream() List<CompletableFuture<Void>> futures = accounts.stream()
@ -324,35 +437,46 @@ public class DailyReport {
.stream() .stream()
.mapToInt(TeamAccount::getFirstDepositNum) .mapToInt(TeamAccount::getFirstDepositNum)
.sum(); .sum();
synchronized (totals) {
totals[0] += totalNewMember; totals.addAndGet(0, totalNewMember);
totals[1] += totalNewFirstDeposit; totals.addAndGet(1, totalNewFirstDeposit);
}
String percent = getPercent(totalNewFirstDeposit, totalNewMember); String percent = getPercent(totalNewFirstDeposit, totalNewMember);
synchronized (rows) { rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String
rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String .valueOf(totalNewFirstDeposit), percent});
.valueOf(totalNewFirstDeposit), percent}); }, asyncTaskExecutor)
} .exceptionally(ex -> {
}, asyncTaskExecutor)) log.error("Error fetching team info for account: {}", accountResp.getPlatformName(), ex);
return null;
}))
.toList(); .toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join(); allFutures.join();
// rows 列表进行排序
rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), log.info("Completed all async tasks for minister user: {}", ministerUser.getUsername());
getPercent(totals[1], totals[0])});
String message = TableFormatter.formatTableAsHtml(rows); // rows 列表进行排序
List<String[]> sortedRows = new ArrayList<>(rows);
sortedRows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
sortedRows.add(new String[] {"总计", String.valueOf(totals.get(0)), String.valueOf(totals.get(1)),
getPercent(totals.get(1), totals.get(0))});
String message = TableFormatter.formatTableAsHtml(sortedRows);
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message); telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message);
} }
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
//发送消息给助理 // 发送消息给助理
if (!CollUtil.isEmpty(assistants)) { if (!CollUtil.isEmpty(assistants)) {
assistants.forEach(assistant -> { assistants.forEach(assistant -> {
if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message); telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message);
log.info("Sent report to assistant: {}", assistant.getUsername());
} }
}); });
} }
@ -379,6 +503,8 @@ public class DailyReport {
List<AccountResp> currUserAccounts = deptUser.getAccounts(); List<AccountResp> currUserAccounts = deptUser.getAccounts();
List<CompletableFuture<Statics>> futures = currUserAccounts.stream() List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
// 团队账号暂时不用具体的代理线数据
.filter(accountResp -> !accountResp.getIsTeam())
.map(currAccount -> agentDataVisualListService .map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq) .getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> agentData.getCurData() .thenApplyAsync(agentData -> agentData.getCurData()
@ -425,7 +551,8 @@ public class DailyReport {
private void saveData(UserWithRolesAndAccountsResp ministerUser, private void saveData(UserWithRolesAndAccountsResp ministerUser,
List<UserWithRolesAndAccountsResp> deptUsers, List<UserWithRolesAndAccountsResp> deptUsers,
LocalDate reportDate) { LocalDate reportDate,
Map<String, String> accountUsernameWithTopAgentName) {
// 获取传入年月 // 获取传入年月
YearMonth inputYearMonth = YearMonth.from(reportDate); YearMonth inputYearMonth = YearMonth.from(reportDate);
@ -451,6 +578,7 @@ public class DailyReport {
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> { List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
FinanceDO financeDO = new FinanceDO(); FinanceDO financeDO = new FinanceDO();
BeanUtil.copyProperties(finance, financeDO); BeanUtil.copyProperties(finance, financeDO);
financeDO.setTopAgentName(accountResp.getUsername());
return financeDO; return financeDO;
}).toList(); }).toList();
financeService.addAll(financeReqList); financeService.addAll(financeReqList);
@ -490,6 +618,7 @@ public class DailyReport {
.orElseThrow(() -> new BusinessException("No data found for report date")); .orElseThrow(() -> new BusinessException("No data found for report date"));
StatsDO statsDO = new StatsDO(); StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO); BeanUtil.copyProperties(statics, statsDO);
statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername()));
return statsDO; return statsDO;
}, asyncTaskExecutor) }, asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {

View File

@ -19,16 +19,18 @@ package com.zayac.admin.service;
import cn.hutool.core.lang.TypeReference; import cn.hutool.core.lang.TypeReference;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import com.zayac.admin.config.RateLimiterConfig;
import com.zayac.admin.resp.ApiResponse; import com.zayac.admin.resp.ApiResponse;
import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.AccountResp;
import io.netty.handler.timeout.TimeoutException; import io.netty.handler.timeout.TimeoutException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatusCode; import org.springframework.http.HttpStatusCode;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException; import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -38,21 +40,19 @@ import top.continew.starter.core.exception.BusinessException;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Service @Service
@Slf4j @Slf4j
public class CompletableFutureWebClientService { public class CompletableFutureWebClientService {
private final WebClient webClient; private final WebClient webClient;
private final Semaphore semaphore; private final RateLimiterConfig rateLimiterConfig;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder, public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests, RateLimiterConfig rateLimiterConfig,
ObjectMapper objectMapper) { ObjectMapper objectMapper) {
this.webClient = webClientBuilder.build(); this.webClient = webClientBuilder.build();
this.semaphore = new Semaphore(maxConcurrentRequests); this.rateLimiterConfig = rateLimiterConfig;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@ -60,56 +60,62 @@ public class CompletableFutureWebClientService {
String apiPath, String apiPath,
Object params, Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) { ParameterizedTypeReference<ApiResponse<T>> typeRef) {
return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture(); RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter(account.getPlatformUrl() + apiPath);
return fetchData(rateLimiter, account.getPlatformUrl() + apiPath, account
.getHeaders(), params, typeRef, account).toFuture();
} }
public <T> Mono<T> fetchData(String url, public <T> Mono<T> fetchData(RateLimiter rateLimiter,
String url,
String headers, String headers,
Object params, Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef, ParameterizedTypeReference<ApiResponse<T>> typeRef,
AccountResp account) { AccountResp account) {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { rateLimiter.acquire();
throw new RuntimeException("Unable to acquire a permit");
}
return true; return true;
}).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> {
try {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
} catch (Exception e) {
log.warn("Header conversion exception: " + e.getMessage());
throw new BusinessException("Header conversion failed", e);
}
}) })
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) .subscribeOn(Schedulers.boundedElastic())
.retrieve() .flatMap(ignored -> webClient.post().uri(url).headers(httpHeaders -> {
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) addHeaders(httpHeaders, headers);
.bodyToMono(String.class)
.doOnNext(resStr -> {
log.debug("request url:{}", url);
log.debug("request headers :{}", headers);
log.debug("request params:{}", params);
log.debug("response {}", resStr);
}) })
.flatMap(body -> { .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
try { .retrieve()
ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
.constructType(typeRef.getType())); .bodyToMono(String.class)
return Mono.justOrEmpty(apiResponse); .doOnNext(resStr -> {
} catch (Exception e) { log.debug("Request URL: {}", url);
log.warn("JSON parsing exception: " + e.getMessage()); log.debug("Request headers: {}", headers);
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008)); log.debug("Request params: {}", params);
} log.debug("Response: {}", resStr);
}) })
.flatMap(response -> respHandler(response, account)) .flatMap(body -> {
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))) try {
.doFinally(signal -> semaphore.release()); ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory()
.constructType(typeRef.getType()));
return Mono.justOrEmpty(apiResponse);
} catch (Exception e) {
log.warn("JSON parsing exception: " + e.getMessage());
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
}
})
.flatMap(response -> respHandler(response, account))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(this::isRetryableException)));
} }
private boolean isRetryableException(Throwable throwable) { private boolean isRetryableException(Throwable throwable) {
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable; return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable || throwable instanceof WebClientRequestException;
}
private void addHeaders(org.springframework.http.HttpHeaders httpHeaders, String headers) {
try {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<Map<String, String>>() {
}, true);
headerMap.forEach(httpHeaders::add);
} catch (Exception e) {
log.warn("Header conversion exception: " + e.getMessage());
throw new BusinessException("Header conversion failed", e);
}
} }
private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) { private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) {

View File

@ -16,19 +16,16 @@
package com.zayac.admin.service; package com.zayac.admin.service;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.ActiveListReq;
import com.zayac.admin.req.MemberDetailsReq; import com.zayac.admin.req.MemberDetailsReq;
import com.zayac.admin.req.PayRecordsListReq; import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamMemberListReq; import com.zayac.admin.req.team.TeamMemberListReq;
import com.zayac.admin.resp.Member; import com.zayac.admin.resp.*;
import com.zayac.admin.resp.MemberPagination;
import com.zayac.admin.resp.Pagination;
import com.zayac.admin.resp.PayRecord;
import com.zayac.admin.resp.team.Team; import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamAccountWithChange; import com.zayac.admin.resp.team.TeamAccountWithChange;
import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
@ -36,10 +33,15 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import top.continew.starter.cache.redisson.util.RedisUtils;
import top.continew.starter.core.exception.BusinessException; import top.continew.starter.core.exception.BusinessException;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -47,6 +49,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.zayac.admin.common.constant.CacheConstants.SUCCESSFULLY_PAYED_ACCOUNTNAME;
import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount;
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -98,34 +103,123 @@ public class DepositService {
LocalDate nowDate, LocalDate nowDate,
LocalDateTime nowDateTime, LocalDateTime nowDateTime,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
PayRecordsListReq req = createPayRecordsListReq(accountWithChange.getAgentName(), nowDate); ActiveListReq req = ActiveListReq.builder()
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService .pageNum(1)
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { .activeType(3)
}); .topAgentName(accountWithChange.getAgentName())
.date(nowDate)
.pageSize(9999)
.isRest(false)
.build();
StringBuilder depositResults = new StringBuilder(); return completableFutureWebClientService
AtomicInteger depositCounter = new AtomicInteger(0); .fetchDataForAccount(account, ApiPathConstants.ACTIVE_LIST, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<ActiveListResp>>>>() {
})
.thenApply(Pagination::getList)
.thenComposeAsync(activeListResps -> {
// 过滤并排序
List<ActiveListResp> sortedList = activeListResps.stream()
.filter(resp -> resp.getFirstPayTime() != null && resp.getFirstPayTime()
.isAfter(nowDateTime.minusMinutes(5)))
.sorted(Comparator.comparing(ActiveListResp::getFirstPayTime))
.collect(Collectors.toList());
return paginationCompletableFuture.thenApply(Pagination::getList) // 截取后N个元素
.thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor) List<ActiveListResp> activeListRespList = ListUtil.sub(sortedList, -accountWithChange
.thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults, depositCounter), asyncTaskExecutor) .getNewDepositNum(), sortedList.size());
// 异步处理每个响应
List<CompletableFuture<Void>> futures = activeListRespList.stream().map(resp -> {
if (resp.getDeposit().compareTo(BigDecimal.ZERO) == 0) {
LocalDate startDate = nowDateTime.minusHours(1).toLocalDate();
PayRecordsListReq payRecordsListReq = PayRecordsListReq.builder()
.startDate(startDate)
.endDate(nowDate)
.memberName(resp.getName())
.pageSize(10)
.payState(2)
.agentName(accountWithChange.getAgentName())
.build();
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, payRecordsListReq, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
})
.thenApply(Pagination::getList)
.thenAcceptAsync(payRecords -> {
if (payRecords != null && !payRecords.isEmpty()) {
resp.setDeposit(payRecords.get(0).getScoreAmount());
} else {
log.warn("No pay records found for member: {}", resp.getName());
}
}, asyncTaskExecutor);
}
return CompletableFuture.<Void>completedFuture(null);
}).toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> activeListRespList);
}, asyncTaskExecutor)
.thenAcceptAsync(activeListRespList -> {
String depositResults = buildDepositResults(activeListRespList);
String notification = buildNotificationMessage(accountWithChange, depositResults);
sendNotifications(accountWithChange, ministerUser, accountUsernameToUserMap, notification);
}, asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex log.error("Error processing account changes for account: {}", account, ex);
.getMessage());
return null; return null;
}); });
} }
private PayRecordsListReq createPayRecordsListReq(String agentName, LocalDate nowDate) { private String buildDepositResults(List<ActiveListResp> activeListRespList) {
return PayRecordsListReq.builder() StringBuilder depositResults = new StringBuilder();
.startDate(nowDate) activeListRespList.forEach(activeListResp -> depositResults.append(telegramMessageService
.endDate(nowDate) .buildDepositResultsMessage(activeListResp.getName(), activeListResp.getDeposit())));
.pageSize(100) return depositResults.toString();
.payState(2)
.agentName(agentName)
.build();
} }
private String buildNotificationMessage(TeamAccountWithChange accountWithChange, String depositResults) {
return telegramMessageService.buildDepositMessage(accountWithChange.getAgentName(), accountWithChange
.getNewDepositNum(), depositResults, accountWithChange.getFirstDepositNum());
}
private void sendNotifications(TeamAccountWithChange accountWithChange,
UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
String notification) {
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken()
: currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
}
// // 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题
// LocalDate startDate = nowDateTime.minusHours(1).toLocalDate();
// PayRecordsListReq req = PayRecordsListReq.builder()
// .startDate(startDate)
// .endDate(nowDate)
// .pageSize(100)
// .payState(2)
// .agentName(accountWithChange.getAgentName())
// .build();
//
// StringBuilder depositResults = new StringBuilder();
// AtomicInteger depositCounter = new AtomicInteger(0);
//
// return completableFutureWebClientService
// .fetchDataForAccount(account,ApiPathConstants.PAY_RECORDS_LIST_URL,req,new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>(){}).thenApply(Pagination::getList)
// .thenComposeAsync(payRecords-> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor),asyncTaskExecutor)
// .thenRunAsync(()->sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults),asyncTaskExecutor)
// .exceptionally(ex->{
// log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex
// .getMessage());
// return null;
// });
//}
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords, private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
TeamAccountWithChange accountWithChange, TeamAccountWithChange accountWithChange,
AccountResp account, AccountResp account,
@ -134,15 +228,30 @@ public class DepositService {
StringBuilder depositResults, StringBuilder depositResults,
AtomicInteger depositCounter, AtomicInteger depositCounter,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
Map<String, PayRecord> earliestPayRecords = payRecords.stream()
//根据用户名去重,保留时间最早的支付记录
List<PayRecord> sortedPayRecords = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
.collect(Collectors.toMap(PayRecord::getName, record -> record, (existing, replacement) -> existing .collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, (
.getCreatedAt() existingRecord,
.isBefore(replacement.getCreatedAt()) ? existing : replacement)); newRecord) -> existingRecord
.getCreatedAt()
.isBefore(newRecord
.getCreatedAt())
? existingRecord
: newRecord, LinkedHashMap::new), map -> map
.values()
.stream()
.sorted(Comparator
.comparing(PayRecord::getCreatedAt)
.reversed())
.filter(payRecord -> !RedisUtils
.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord
.getBillNo()))
.collect(Collectors
.toList())));
List<PayRecord> validPayRecords = earliestPayRecords.values().stream().toList(); List<CompletableFuture<Void>> fetchMemberFutures = sortedPayRecords.stream()
List<CompletableFuture<Void>> fetchMemberFutures = validPayRecords.stream()
.map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor) .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor)
.thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor) .thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {
@ -159,31 +268,38 @@ public class DepositService {
TeamAccountWithChange accountWithChange, TeamAccountWithChange accountWithChange,
StringBuilder depositResults, StringBuilder depositResults,
AtomicInteger depositCounter) { AtomicInteger depositCounter) {
if (payRecord.getCreatedAt().equals(member.getFirstPayAt()) && depositCounter synchronized (this) {
.getAndIncrement() < accountWithChange.getNewDepositNum()) { //如果从缓存中没有key,那就是新存款用户
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord if (!RedisUtils.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo())) {
.getScoreAmount())); //如果订单记录有存款成功但是会员的首存时间还为空,数据未同步,也是首存
if ((member.getFirstPayAt() == null || payRecord.getCreatedAt()
.equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange
.getNewDepositNum()) {
//把存款成功的用户保存起来,设置一个小时的缓存时间 防止重复用户
RedisUtils.set(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo(), member.getName(), Duration
.ofHours(1));
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord
.getScoreAmount()));
}
}
} }
} }
private void sendNotification(TeamAccountWithChange accountWithChange, private void sendNotification(TeamAccountWithChange accountWithChange,
UserWithRolesAndAccountsResp ministerUser, UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
StringBuilder depositResults, StringBuilder depositResults) {
AtomicInteger depositCounter) { String notification = telegramMessageService.buildDepositMessage(accountWithChange
if (depositCounter.get() > 0) { .getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange
String notification = telegramMessageService.buildDepositMessage(accountWithChange .getFirstDepositNum());
.getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
.getFirstDepositNum()); if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName()); String botToken = StrUtil.isEmpty(currUser.getBotToken())
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { ? ministerUser.getBotToken()
String botToken = StrUtil.isEmpty(currUser.getBotToken()) : currUser.getBotToken();
? ministerUser.getBotToken() telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
: currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
} }
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
} }
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, private CompletableFuture<Member> fetchMemberDetails(AccountResp account,
@ -219,27 +335,4 @@ public class DepositService {
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
}); });
} }
private List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
return prevTeam.getList()
.stream()
.filter(account1 -> team2AccountMap.containsKey(account1.getId()))
.map(account1 -> {
TeamAccount account2 = team2AccountMap.get(account1.getId());
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
BeanUtil.copyProperties(account2, changedAccount);
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
}
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
}
return changedAccount;
})
.collect(Collectors.toList());
}
} }

View File

@ -21,9 +21,10 @@ import cn.hutool.core.util.StrUtil;
import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.team.TeamMemberReq; import com.zayac.admin.req.team.TeamMemberReq;
import com.zayac.admin.resp.ApiResponse;
import com.zayac.admin.resp.MemberPagination; import com.zayac.admin.resp.MemberPagination;
import com.zayac.admin.resp.team.Team; import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamAccountWithChange;
import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.resp.team.TeamMember;
import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
@ -37,7 +38,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount;
/** /**
* 新注册处理相关逻辑 * 新注册处理相关逻辑
@ -52,35 +54,37 @@ public class RegistrationService {
public CompletableFuture<Void> processRegistration(UserWithRolesAndAccountsResp minister, public CompletableFuture<Void> processRegistration(UserWithRolesAndAccountsResp minister,
AccountResp account, AccountResp account,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
Map<String, TeamAccount> teamAccountMap,
Team currentTeamInfo, Team currentTeamInfo,
Team prevTeamInfo, Team prevTeamInfo,
LocalDate nowDate, LocalDate nowDate,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) { if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount(); List<TeamAccountWithChange> hasNewRegAccounts = findChangedTeamAccount(prevTeamInfo, currentTeamInfo)
TeamMemberReq memberListReq = TeamMemberReq.builder() .stream()
.registerStartDate(nowDate) .filter(teamAccountWithChange -> teamAccountWithChange.getNewRegisterNum() > 0)
.registerEndDate(nowDate) .toList();
.startDate(nowDate) hasNewRegAccounts.parallelStream().forEach(accountWithChange -> {
.endDate(nowDate) TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerSort(1) .registerStartDate(nowDate)
.pageSize(registerCount) .registerEndDate(nowDate)
.build(); .startDate(nowDate)
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService .endDate(nowDate)
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { .registerSort(1)
}); .topAgentName(accountWithChange.getAgentName())
return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) .pageSize(accountWithChange.getNewRegisterNum())
.thenAcceptAsync(members -> { .build();
log.info("Successfully get [{}] new registered members", members.size()); completableFutureWebClientService
if (CollUtil.isNotEmpty(members)) { .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() {
Map<String, List<TeamMember>> groupByTopAgentName = members.stream() })
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)); .thenApply(MemberPagination::getList)
groupByTopAgentName.forEach((accountName, accountMembers) -> { .thenAcceptAsync(members -> {
log.info("Successfully get [{}] new registered members for {}", members
.size(), accountWithChange.getAgentName());
if (CollUtil.isNotEmpty(members)) {
String notification = telegramMessageService String notification = telegramMessageService
.buildRegistrationMessage(accountName, accountMembers, teamAccountMap.get(accountName)); .buildRegistrationMessage(members, accountWithChange);
var currUser = accountUsernameToUserMap.get(accountName); var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) String botToken = StrUtil.isEmpty(currUser.getBotToken())
? minister.getBotToken() ? minister.getBotToken()
@ -89,9 +93,9 @@ public class RegistrationService {
} }
telegramMessageService telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}); }
} }, asyncTaskExecutor);
}, asyncTaskExecutor); });
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }

View File

@ -19,9 +19,12 @@ package com.zayac.admin.service;
import cn.hutool.core.convert.Convert; import cn.hutool.core.convert.Convert;
import cn.hutool.core.text.CharPool; import cn.hutool.core.text.CharPool;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.zayac.admin.agent.model.entity.FinanceDO;
import com.zayac.admin.resp.Statics; import com.zayac.admin.resp.Statics;
import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamAccountWithChange;
import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.resp.team.TeamMember;
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -49,6 +52,7 @@ public class TelegramMessageService {
} }
String fullMessage = String.format("%s|%s|%s", botToken, targetId, message); String fullMessage = String.format("%s|%s|%s", botToken, targetId, message);
this.rabbitTemplate.convertAndSend("message_queue", fullMessage); this.rabbitTemplate.convertAndSend("message_queue", fullMessage);
} }
public void sendMessage(String botToken, List<Long> targetIds, String message) { public void sendMessage(String botToken, List<Long> targetIds, String message) {
@ -81,11 +85,25 @@ public class TelegramMessageService {
return htmlTags.stream().anyMatch(text::contains); return htmlTags.stream().anyMatch(text::contains);
} }
public String buildRegistrationMessage(String accountName, public String buildRegistrationMessage(UserWithRolesAndAccountsResp currUser,
List<TeamMember> accountMembers, List<TeamMember> accountMembers,
TeamAccount teamAccount) { TeamAccount teamAccount) {
String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", ")); String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", "));
return String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers if (currUser != null) {
return String.format("👏 [%s] %s 注册: %d 会员: `%s` 总数:*%d*", currUser.getNickname(), teamAccount
.getAgentName(), accountMembers.size(), memberNames, teamAccount.getSubMemberNum());
}
return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers
.size(), memberNames, teamAccount.getSubMemberNum());
}
public String buildRegistrationMessage(List<TeamMember> accountMembers, TeamAccountWithChange teamAccount) {
String memberNames = accountMembers.stream()
.map(member -> "`" + member.getName() + "`")
.collect(Collectors.joining(", "));
return String.format("👏 %s 注册: %d 会员: %s 总数:*%d*", teamAccount.getAgentName(), accountMembers
.size(), memberNames, teamAccount.getSubMemberNum()); .size(), memberNames, teamAccount.getSubMemberNum());
} }
@ -94,7 +112,7 @@ public class TelegramMessageService {
} }
public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) { public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) {
return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount); return String.format("会员: `%s`, 首存金额: *%s*\n", name, scoreAmount);
} }
public String buildFailedPayMessage(String accountName, List<TeamMember> accountMembers) { public String buildFailedPayMessage(String accountName, List<TeamMember> accountMembers) {
@ -111,4 +129,14 @@ public class TelegramMessageService {
}); });
return message.toString(); return message.toString();
} }
public String buildFinanceMessage(List<FinanceDO> userFinancesList) {
StringBuilder message = new StringBuilder();
userFinancesList.forEach(financeDO -> {
String formattedFinance = String.format("%s: *%s*\n", financeDO.getAgentName(), financeDO.getNetProfit()
.toPlainString());
message.append(formattedFinance);
});
return message.toString();
}
} }

View File

@ -1,128 +0,0 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.service;
import cn.hutool.core.lang.TypeReference;
import cn.hutool.json.JSONUtil;
import com.zayac.admin.resp.ApiResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import top.continew.starter.core.exception.BusinessException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Semaphore;
/**
* 封装了一个简单可用的调用后端api的服务类
*/
@Service
public class WebClientService {
private final WebClient webClient;
private final Semaphore semaphore;
public WebClientService(@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests) {
this.webClient = WebClient.create();
this.semaphore = new Semaphore(maxConcurrentRequests);
}
public <T> T fetchData(String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
try {
semaphore.acquire(); // 尝试获取许可
return this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
.bodyToMono(String.class)
.map(body -> new BusinessException("Error response: " + body)))
.bodyToMono(typeRef)
.flatMap(this::respHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
.block(); // 遇到网络问题,每三秒重试一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException("Failed to acquire semaphore", e);
} finally {
semaphore.release(); // 释放许可
}
}
public <T> Mono<T> MonoFetchData(String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
System.out.println(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getId());
return Mono.fromCallable(() -> {
semaphore.acquire(); // 尝试获取许可
return true;
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
.bodyToMono(String.class)
.map(body -> new BusinessException("Error response: " + body)))
.bodyToMono(typeRef)
.flatMap(this::monoRespHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
.doFinally(signal -> semaphore.release()));
}
private <T> Mono<T> monoRespHandler(ApiResponse<T> apiResponse) {
// 根据响应状态码处理逻辑
if (apiResponse.getStatusCode() == 6000) {
return Mono.just(apiResponse.getData()); // 正常情况下返回数据
} else if (apiResponse.getStatusCode() == 6001) {
return Mono.error(new BusinessException("API token expired")); // API令牌过期
} else {
return Mono.error(new BusinessException("Error status code: " + apiResponse.getStatusCode())); // 其他错误
}
}
private <T> Mono<T> respHandler(ApiResponse<T> apiResponse) {
System.out.println(apiResponse);
//api接口只有在statusCode为6000的时候返回的数据才是正常数据
if (apiResponse.getStatusCode().equals(6000)) {
return Mono.just(apiResponse.getData());
} else if (apiResponse.getStatusCode().equals(6001)) {
// TODO 调用登录接口,刷新header信息重试
return Mono.error(new BusinessException("xApiToken失效")); // 状态码6001登录失效
} else {
return Mono.error(new BusinessException("错误状态码: " + apiResponse.getStatusCode())); // 其他状态码抛出异常
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.utils;
import cn.hutool.core.bean.BeanUtil;
import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamAccountWithChange;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CommonUtils {
public static List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
return prevTeam.getList()
.stream()
.filter(account1 -> team2AccountMap.containsKey(account1.getId()))
.map(account1 -> {
TeamAccount account2 = team2AccountMap.get(account1.getId());
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
BeanUtil.copyProperties(account2, changedAccount);
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
}
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
}
return changedAccount;
})
.collect(Collectors.toList());
}
public static <T> List<T> getLastNElements(List<T> list, int n) {
if (list == null || list.isEmpty() || n <= 0) {
return Collections.emptyList();
}
int size = list.size();
if (n > size) {
return new ArrayList<>(list);
}
return new ArrayList<>(list.subList(size - n, size));
}
}

View File

@ -76,6 +76,11 @@ public class CacheConstants {
*/ */
public static final String DEPT_USERS_ROLES_ACCOUNTS_KEY_PREFIX = "DEPT_USERS_ROLES_ACCOUNTS" + DELIMITER; public static final String DEPT_USERS_ROLES_ACCOUNTS_KEY_PREFIX = "DEPT_USERS_ROLES_ACCOUNTS" + DELIMITER;
/**
* 存款成功的用户名
*/
public static final String SUCCESSFULLY_PAYED_ACCOUNTNAME = "SUCCESSFULLY_PAYED_ACCOUNTNAME" + DELIMITER;
private CacheConstants() { private CacheConstants() {
} }
} }

View File

@ -32,6 +32,7 @@
a.id AS account_id, a.id AS account_id,
a.nickname AS account_nickname, a.nickname AS account_nickname,
a.username AS account_username, a.username AS account_username,
a.is_team As account_is_team,
a.status AS account_status, a.status AS account_status,
a.headers AS account_headers, a.headers AS account_headers,
a.platform_id AS account_platform_id, a.platform_id AS account_platform_id,
@ -82,6 +83,7 @@
<result column="account_platform_id" property="platformId"/> <result column="account_platform_id" property="platformId"/>
<result column="platform_name" property="platformName"/> <result column="platform_name" property="platformName"/>
<result column="platform_url" property="platformUrl"/> <result column="platform_url" property="platformUrl"/>
<result column="account_is_team" property="isTeam"/>
</collection> </collection>
</collection> </collection>
</resultMap> </resultMap>

View File

@ -37,13 +37,13 @@ spring.datasource:
lazy: true lazy: true
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
type: ${spring.datasource.type} type: ${spring.datasource.type}
# # PostgreSQL 库配置 # # PostgreSQL 库配置
# postgresql: # postgresql:
# url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false # url: jdbc:postgresql://${DB_HOST:127.0.0.1}:${DB_PORT:5432}/${DB_NAME:continew_admin}?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true&autoReconnect=true&maxReconnects=10&failOverReadOnly=false
# username: ${DB_USER:root} # username: ${DB_USER:root}
# password: ${DB_PWD:123456} # password: ${DB_PWD:123456}
# driver-class-name: org.postgresql.Driver # driver-class-name: org.postgresql.Driver
# type: ${spring.datasource.type} # type: ${spring.datasource.type}
# Hikari 连接池配置完整配置请参阅https://github.com/brettwooldridge/HikariCP # Hikari 连接池配置完整配置请参阅https://github.com/brettwooldridge/HikariCP
hikari: hikari:
# 最大连接数量(默认 10根据实际环境调整 # 最大连接数量(默认 10根据实际环境调整
@ -60,7 +60,7 @@ spring.datasource:
## Liquibase 配置 ## Liquibase 配置
spring.liquibase: spring.liquibase:
# 是否启用 # 是否启用
enabled: true enabled: false
# 配置文件路径 # 配置文件路径
change-log: classpath:/db/changelog/db.changelog-master.yaml change-log: classpath:/db/changelog/db.changelog-master.yaml
@ -273,7 +273,8 @@ avatar:
support-suffix: jpg,jpeg,png,gif support-suffix: jpg,jpeg,png,gif
webclient: webclient:
max-concurrent-requests: 60 max-requests-per-second: 10.0
spring: spring:
rabbitmq: rabbitmq:

View File

@ -228,8 +228,9 @@ management.health:
# 关闭邮箱健康检查(邮箱配置错误或邮箱服务器不可用时,健康检查会报错) # 关闭邮箱健康检查(邮箱配置错误或邮箱服务器不可用时,健康检查会报错)
enabled: false enabled: false
### 每秒钟接口调用的次数
webclient: webclient:
max-concurrent-requests: 60 max-requests-per-second: 10.0
spring: spring:
rabbitmq: rabbitmq: