Compare commits
No commits in common. "303db89f46eeee1f71821258a644037d2dc1eabc" and "daa6b2957949c84fec81564e2bac69c685f74aa8" have entirely different histories.
303db89f46
...
daa6b29579
@ -273,7 +273,7 @@ avatar:
|
|||||||
support-suffix: jpg,jpeg,png,gif
|
support-suffix: jpg,jpeg,png,gif
|
||||||
|
|
||||||
webclient:
|
webclient:
|
||||||
max-requests-per-second: 10.0
|
max-concurrent-requests: 60
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
|
@ -229,7 +229,7 @@ management.health:
|
|||||||
enabled: false
|
enabled: false
|
||||||
|
|
||||||
webclient:
|
webclient:
|
||||||
max-requests-per-second: 10.0
|
max-concurrent-requests: 60
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
|
@ -26,11 +26,6 @@
|
|||||||
<groupId>com.zayac</groupId>
|
<groupId>com.zayac</groupId>
|
||||||
<artifactId>zayac-admin-system</artifactId>
|
<artifactId>zayac-admin-system</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>com.google.guava</groupId>
|
|
||||||
<artifactId>guava</artifactId>
|
|
||||||
<version>33.2.1-jre</version>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
@ -16,26 +16,13 @@
|
|||||||
|
|
||||||
package com.zayac.admin.agent.mapper;
|
package com.zayac.admin.agent.mapper;
|
||||||
|
|
||||||
import org.apache.ibatis.annotations.Select;
|
|
||||||
import top.continew.starter.data.mybatis.plus.base.BaseMapper;
|
import top.continew.starter.data.mybatis.plus.base.BaseMapper;
|
||||||
import com.zayac.admin.agent.model.entity.StatsDO;
|
import com.zayac.admin.agent.model.entity.StatsDO;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 代理每日数据 Mapper
|
* 代理每日数据 Mapper
|
||||||
*
|
*
|
||||||
* @author zayac
|
* @author zayac
|
||||||
* @since 2024/06/04 17:10
|
* @since 2024/06/04 17:10
|
||||||
*/
|
*/
|
||||||
public interface DailyStatsMapper extends BaseMapper<StatsDO> {
|
public interface DailyStatsMapper extends BaseMapper<StatsDO> {}
|
||||||
/**
|
|
||||||
* 查询查询新注册总数
|
|
||||||
*
|
|
||||||
* @param topAgentName 上级代理线
|
|
||||||
* @param date 日期
|
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
@Select("select COALESCE(sum(is_new), 0) from (select distinct agent_name, is_new from agent_stats where top_agent_name = #{topAgentName} and statics_date = #{date}) as distinct_agents")
|
|
||||||
int selectCountByTopAgentNameAndDate(String topAgentName, LocalDate date);
|
|
||||||
}
|
|
@ -16,20 +16,13 @@
|
|||||||
|
|
||||||
package com.zayac.admin.agent.mapper;
|
package com.zayac.admin.agent.mapper;
|
||||||
|
|
||||||
import org.apache.ibatis.annotations.Select;
|
|
||||||
import top.continew.starter.data.mybatis.plus.base.BaseMapper;
|
import top.continew.starter.data.mybatis.plus.base.BaseMapper;
|
||||||
import com.zayac.admin.agent.model.entity.FinanceDO;
|
import com.zayac.admin.agent.model.entity.FinanceDO;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 代理线财务报 Mapper
|
* 代理线财务报 Mapper
|
||||||
*
|
*
|
||||||
* @author zayac
|
* @author zayac
|
||||||
* @since 2024/06/04 17:14
|
* @since 2024/06/04 17:14
|
||||||
*/
|
*/
|
||||||
public interface FinanceMapper extends BaseMapper<FinanceDO> {
|
public interface FinanceMapper extends BaseMapper<FinanceDO> {}
|
||||||
@Select("SELECT * FROM agent_finance WHERE DATE(create_time) = #{date}")
|
|
||||||
List<FinanceDO> selectFinancesByCreateTime(LocalDate date);
|
|
||||||
}
|
|
@ -314,8 +314,4 @@ public class FinanceDO extends BaseDO {
|
|||||||
* 彩票利润
|
* 彩票利润
|
||||||
*/
|
*/
|
||||||
private BigDecimal lotteryProfit;
|
private BigDecimal lotteryProfit;
|
||||||
/**
|
|
||||||
* 上级代理线名称
|
|
||||||
*/
|
|
||||||
private String topAgentName;
|
|
||||||
}
|
}
|
@ -174,9 +174,4 @@ public class StatsDO extends BaseDO {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private BigDecimal newDeposit;
|
private BigDecimal newDeposit;
|
||||||
|
|
||||||
/**
|
|
||||||
* 上级代理线名称
|
|
||||||
*/
|
|
||||||
private String topAgentName;
|
|
||||||
}
|
}
|
@ -23,7 +23,6 @@ import com.zayac.admin.agent.model.req.FinanceReq;
|
|||||||
import com.zayac.admin.agent.model.resp.FinanceDetailResp;
|
import com.zayac.admin.agent.model.resp.FinanceDetailResp;
|
||||||
import com.zayac.admin.agent.model.resp.FinanceResp;
|
import com.zayac.admin.agent.model.resp.FinanceResp;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -34,6 +33,4 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public interface FinanceService extends BaseService<FinanceResp, FinanceDetailResp, FinanceQuery, FinanceReq> {
|
public interface FinanceService extends BaseService<FinanceResp, FinanceDetailResp, FinanceQuery, FinanceReq> {
|
||||||
void addAll(List<FinanceDO> financeReqList);
|
void addAll(List<FinanceDO> financeReqList);
|
||||||
|
|
||||||
List<FinanceDO> getFinanceByDate(LocalDate yesterday);
|
|
||||||
}
|
}
|
@ -23,7 +23,6 @@ import com.zayac.admin.agent.model.req.StatsReq;
|
|||||||
import com.zayac.admin.agent.model.resp.StatsDetailResp;
|
import com.zayac.admin.agent.model.resp.StatsDetailResp;
|
||||||
import com.zayac.admin.agent.model.resp.StatsResp;
|
import com.zayac.admin.agent.model.resp.StatsResp;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -34,6 +33,4 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public interface StatsService extends BaseService<StatsResp, StatsDetailResp, StatsQuery, StatsReq> {
|
public interface StatsService extends BaseService<StatsResp, StatsDetailResp, StatsQuery, StatsReq> {
|
||||||
void addAll(List<StatsDO> list);
|
void addAll(List<StatsDO> list);
|
||||||
|
|
||||||
int countNewRegNum(String topAgentName, LocalDate date);
|
|
||||||
}
|
}
|
@ -29,7 +29,6 @@ import com.zayac.admin.agent.model.resp.StatsDetailResp;
|
|||||||
import com.zayac.admin.agent.model.resp.StatsResp;
|
import com.zayac.admin.agent.model.resp.StatsResp;
|
||||||
import com.zayac.admin.agent.service.StatsService;
|
import com.zayac.admin.agent.service.StatsService;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -45,15 +44,4 @@ public class DailyStatsServiceImpl extends BaseServiceImpl<DailyStatsMapper, Sta
|
|||||||
public void addAll(List<StatsDO> list) {
|
public void addAll(List<StatsDO> list) {
|
||||||
baseMapper.insertBatch(list);
|
baseMapper.insertBatch(list);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 统计指定日期的注册情况
|
|
||||||
*
|
|
||||||
* @param date 日期
|
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public int countNewRegNum(String topAgentName, LocalDate date) {
|
|
||||||
return baseMapper.selectCountByTopAgentNameAndDate(topAgentName, date);
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -29,7 +29,6 @@ import com.zayac.admin.agent.model.resp.FinanceDetailResp;
|
|||||||
import com.zayac.admin.agent.model.resp.FinanceResp;
|
import com.zayac.admin.agent.model.resp.FinanceResp;
|
||||||
import com.zayac.admin.agent.service.FinanceService;
|
import com.zayac.admin.agent.service.FinanceService;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -45,16 +44,4 @@ public class FinanceServiceImpl extends BaseServiceImpl<FinanceMapper, FinanceDO
|
|||||||
baseMapper.insertBatch(financeReqList);
|
baseMapper.insertBatch(financeReqList);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据日期查询所有的财务信息
|
|
||||||
*
|
|
||||||
* @param date 日期
|
|
||||||
* @return List<FinanceDO>
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public List<FinanceDO> getFinanceByDate(LocalDate date) {
|
|
||||||
|
|
||||||
return baseMapper.selectFinancesByCreateTime(date);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
@ -1,45 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.zayac.admin.config;
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
public class RateLimiterConfig {
|
|
||||||
private final ConcurrentMap<String, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
|
|
||||||
private final double maxRequestsPerSecond;
|
|
||||||
|
|
||||||
public RateLimiterConfig(@Value("${webclient.max-requests-per-second}") double maxRequestsPerSecond) {
|
|
||||||
this.maxRequestsPerSecond = maxRequestsPerSecond;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RateLimiter getRateLimiter(String url) {
|
|
||||||
RateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(url, k -> RateLimiter.create(maxRequestsPerSecond));
|
|
||||||
return rateLimiter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RateLimiter getRateLimiter(String key, double maxRequests) {
|
|
||||||
return rateLimiterMap.computeIfAbsent(key, k -> RateLimiter.create(maxRequests));
|
|
||||||
}
|
|
||||||
}
|
|
@ -33,9 +33,9 @@ public class ThreadPoolConfig {
|
|||||||
@Bean(name = "asyncTaskExecutor")
|
@Bean(name = "asyncTaskExecutor")
|
||||||
public Executor taskExecutor() {
|
public Executor taskExecutor() {
|
||||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
executor.setCorePoolSize(20);
|
executor.setCorePoolSize(10);
|
||||||
executor.setMaxPoolSize(50);
|
executor.setMaxPoolSize(20);
|
||||||
executor.setQueueCapacity(1000);
|
executor.setQueueCapacity(500);
|
||||||
executor.setThreadNamePrefix("asyncTaskExecutor-");
|
executor.setThreadNamePrefix("asyncTaskExecutor-");
|
||||||
executor.initialize();
|
executor.initialize();
|
||||||
return executor;
|
return executor;
|
||||||
@ -49,7 +49,7 @@ public class ThreadPoolConfig {
|
|||||||
@Bean
|
@Bean
|
||||||
public ThreadPoolTaskScheduler taskScheduler() {
|
public ThreadPoolTaskScheduler taskScheduler() {
|
||||||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||||
scheduler.setPoolSize(20);
|
scheduler.setPoolSize(10);
|
||||||
scheduler.setThreadNamePrefix("ScheduledTask-");
|
scheduler.setThreadNamePrefix("ScheduledTask-");
|
||||||
scheduler.setWaitForTasksToCompleteOnShutdown(true);
|
scheduler.setWaitForTasksToCompleteOnShutdown(true);
|
||||||
scheduler.setAwaitTerminationSeconds(60);
|
scheduler.setAwaitTerminationSeconds(60);
|
||||||
|
@ -74,9 +74,4 @@ public class ApiPathConstants {
|
|||||||
* 团队财务报表
|
* 团队财务报表
|
||||||
*/
|
*/
|
||||||
public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team";
|
public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team";
|
||||||
|
|
||||||
/**
|
|
||||||
* 活跃会员
|
|
||||||
*/
|
|
||||||
public static final String ACTIVE_LIST = "/agent/api/v1/member/activeList";
|
|
||||||
}
|
}
|
||||||
|
@ -1,74 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.zayac.admin.req;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
@Builder
|
|
||||||
public class ActiveListReq {
|
|
||||||
/**
|
|
||||||
* 活跃类型
|
|
||||||
*/
|
|
||||||
private Integer activeType;
|
|
||||||
/**
|
|
||||||
* 统计月份
|
|
||||||
*/
|
|
||||||
@JsonFormat(pattern = "yyyy-MM")
|
|
||||||
private LocalDate date;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 是否重置
|
|
||||||
*/
|
|
||||||
@Builder.Default
|
|
||||||
private Boolean isRest = false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 会员名称
|
|
||||||
*/
|
|
||||||
private String name;
|
|
||||||
/**
|
|
||||||
* 页码
|
|
||||||
*/
|
|
||||||
@Builder.Default
|
|
||||||
private int pageNum = 1;
|
|
||||||
/**
|
|
||||||
* 每页显示数据
|
|
||||||
*/
|
|
||||||
@Builder.Default
|
|
||||||
private int pageSize = 10;
|
|
||||||
/**
|
|
||||||
* 注册结束日期
|
|
||||||
*/
|
|
||||||
private LocalDate registerEndDate;
|
|
||||||
/**
|
|
||||||
* 注册开始日期
|
|
||||||
*/
|
|
||||||
private LocalDate registerStartDate;
|
|
||||||
/**
|
|
||||||
* 上级代理线
|
|
||||||
*/
|
|
||||||
private String topAgentName;
|
|
||||||
}
|
|
@ -1,74 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.zayac.admin.resp;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* "memberId": 509419585,
|
|
||||||
* "name": "zaq198014",
|
|
||||||
* "topId": 500007970,
|
|
||||||
* "topName": "ky3tg107032",
|
|
||||||
* "firstPayTime": "2024-06-15 18:20:49",
|
|
||||||
* "registerTime": "2023-08-18 19:18:30",
|
|
||||||
* "deposit": 1522,
|
|
||||||
* "bets": 1394,
|
|
||||||
* "activeStr": "首存会员"
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class ActiveListResp {
|
|
||||||
/**
|
|
||||||
* 会员id
|
|
||||||
*/
|
|
||||||
private Long memberId;
|
|
||||||
/**
|
|
||||||
* 会员名称
|
|
||||||
*/
|
|
||||||
private String name;
|
|
||||||
/**
|
|
||||||
* 会员上级id
|
|
||||||
*/
|
|
||||||
private Long topId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 首存时间
|
|
||||||
*/
|
|
||||||
private LocalDateTime firstPayTime;
|
|
||||||
/**
|
|
||||||
* 注册时间
|
|
||||||
*/
|
|
||||||
private LocalDateTime registerTime;
|
|
||||||
/**
|
|
||||||
* 存款
|
|
||||||
*/
|
|
||||||
private BigDecimal deposit;
|
|
||||||
/**
|
|
||||||
* 投注
|
|
||||||
*/
|
|
||||||
private BigDecimal bets;
|
|
||||||
/**
|
|
||||||
* 活跃类型
|
|
||||||
*/
|
|
||||||
private String activeStr;
|
|
||||||
}
|
|
@ -18,6 +18,7 @@ package com.zayac.admin.schedule;
|
|||||||
|
|
||||||
import com.zayac.admin.req.team.TeamInfoReq;
|
import com.zayac.admin.req.team.TeamInfoReq;
|
||||||
import com.zayac.admin.resp.team.Team;
|
import com.zayac.admin.resp.team.Team;
|
||||||
|
import com.zayac.admin.resp.team.TeamAccount;
|
||||||
import com.zayac.admin.service.*;
|
import com.zayac.admin.service.*;
|
||||||
import com.zayac.admin.system.model.resp.AccountResp;
|
import com.zayac.admin.system.model.resp.AccountResp;
|
||||||
import com.zayac.admin.system.model.resp.DeptUsersResp;
|
import com.zayac.admin.system.model.resp.DeptUsersResp;
|
||||||
@ -38,6 +39,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -116,12 +118,17 @@ public class CheckRegAndDep {
|
|||||||
return teamFuture.thenComposeAsync(currentTeamInfo -> {
|
return teamFuture.thenComposeAsync(currentTeamInfo -> {
|
||||||
log.info("Previous Team Info: {}", prevTeamInfo);
|
log.info("Previous Team Info: {}", prevTeamInfo);
|
||||||
log.info("Current Team Info: {}", currentTeamInfo);
|
log.info("Current Team Info: {}", currentTeamInfo);
|
||||||
|
Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity()));
|
||||||
|
|
||||||
CompletableFuture<Void> registrationProcess = registrationService
|
CompletableFuture<Void> registrationProcess = registrationService
|
||||||
.processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor);
|
.processRegistration(minister, account, accountUsernameToUserMap, teamAccountMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor)
|
||||||
|
.thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor);
|
||||||
|
|
||||||
CompletableFuture<Void> depositProcess = depositService
|
CompletableFuture<Void> depositProcess = depositService
|
||||||
.processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor);
|
.processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor)
|
||||||
|
.thenRunAsync(() -> log.info("Deposit process completed"), asyncTaskExecutor);
|
||||||
|
|
||||||
return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> {
|
return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> {
|
||||||
teamService.updateTeamInfo(account, currentTeamInfo);
|
teamService.updateTeamInfo(account, currentTeamInfo);
|
||||||
|
@ -19,7 +19,6 @@ package com.zayac.admin.schedule;
|
|||||||
import cn.hutool.core.bean.BeanUtil;
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
|
||||||
import com.zayac.admin.agent.model.entity.FinanceDO;
|
import com.zayac.admin.agent.model.entity.FinanceDO;
|
||||||
import com.zayac.admin.agent.model.entity.StatsDO;
|
import com.zayac.admin.agent.model.entity.StatsDO;
|
||||||
import com.zayac.admin.agent.model.req.FinanceSumReq;
|
import com.zayac.admin.agent.model.req.FinanceSumReq;
|
||||||
@ -27,11 +26,9 @@ import com.zayac.admin.agent.service.FinanceService;
|
|||||||
import com.zayac.admin.agent.service.FinanceSumService;
|
import com.zayac.admin.agent.service.FinanceSumService;
|
||||||
import com.zayac.admin.agent.service.StatsService;
|
import com.zayac.admin.agent.service.StatsService;
|
||||||
import com.zayac.admin.common.enums.DisEnableStatusEnum;
|
import com.zayac.admin.common.enums.DisEnableStatusEnum;
|
||||||
import com.zayac.admin.config.RateLimiterConfig;
|
|
||||||
import com.zayac.admin.constant.ApiPathConstants;
|
import com.zayac.admin.constant.ApiPathConstants;
|
||||||
import com.zayac.admin.req.AgentDataVisualListReq;
|
import com.zayac.admin.req.AgentDataVisualListReq;
|
||||||
import com.zayac.admin.req.PayRecordListReq;
|
import com.zayac.admin.req.PayRecordListReq;
|
||||||
import com.zayac.admin.req.PayRecordsListReq;
|
|
||||||
import com.zayac.admin.req.team.TeamFinanceReq;
|
import com.zayac.admin.req.team.TeamFinanceReq;
|
||||||
import com.zayac.admin.req.team.TeamInfoReq;
|
import com.zayac.admin.req.team.TeamInfoReq;
|
||||||
import com.zayac.admin.req.team.TeamMemberReq;
|
import com.zayac.admin.req.team.TeamMemberReq;
|
||||||
@ -59,16 +56,13 @@ import java.time.LocalTime;
|
|||||||
import java.time.YearMonth;
|
import java.time.YearMonth;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Profile("prod")
|
@Profile("dev")
|
||||||
public class DailyReport {
|
public class DailyReport {
|
||||||
private final TeamService teamService;
|
private final TeamService teamService;
|
||||||
private final DeptService deptService;
|
private final DeptService deptService;
|
||||||
@ -80,18 +74,15 @@ public class DailyReport {
|
|||||||
private final FinanceSumService financeSumService;
|
private final FinanceSumService financeSumService;
|
||||||
private final CompletableFutureWebClientService completableFutureWebClientService;
|
private final CompletableFutureWebClientService completableFutureWebClientService;
|
||||||
private final Executor asyncTaskExecutor;
|
private final Executor asyncTaskExecutor;
|
||||||
private final RateLimiterConfig rateLimiterConfig;
|
|
||||||
|
|
||||||
private static final String MINISTER_ROLE_CODE = "minister";
|
private static final String MINISTER_ROLE_CODE = "minister";
|
||||||
private static final String SEO_TEAM_LEADER_ROLE_CODE = "seo_team_leader";
|
private static final String SEO_TEAM_LEADER_ROLE_CODE = "seo_team_leader";
|
||||||
private static final String ASSISTANT_ROLE_CODE = "assistant";
|
private static final String ASSISTANT_ROLE_CODE = "assistant";
|
||||||
private static final String SEO_ROLE_CODE = "seo";
|
private static final String SEO_ROLE_CODE = "seo";
|
||||||
private static final String PLATFORM_HTH = "华体会";
|
|
||||||
|
|
||||||
@Scheduled(cron = "0 40 11,14,17,21,23,1,3,5 * * ?")
|
@Scheduled(cron = "0 40 11,14,17,21 * * ?")
|
||||||
public void teamAccountDailyReport() {
|
public void teamAccountDailyReport() {
|
||||||
LocalDateTime nowDateTime = LocalDateTime.now();
|
LocalDateTime nowDateTime = LocalDateTime.now();
|
||||||
log.info("dailySummarize started at {}", nowDateTime);
|
|
||||||
LocalDate nowDate = LocalDate.now();
|
LocalDate nowDate = LocalDate.now();
|
||||||
//查询部门下的所有用户
|
//查询部门下的所有用户
|
||||||
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
|
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
|
||||||
@ -114,24 +105,8 @@ public class DailyReport {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Scheduled(cron = "0 40 23 * * ?")
|
|
||||||
// public void ScheduledSendTeamDailyReport1() {
|
|
||||||
// log.info("ScheduledSendTeamDailyReport1 started at {}", LocalDateTime.now());
|
|
||||||
// sendTeamDailyReport();
|
|
||||||
// log.info("ScheduledSendTeamDailyReport1 finished at {}", LocalDateTime.now());
|
|
||||||
// }
|
|
||||||
|
|
||||||
@Scheduled(cron = "0 0/30 1-23 * * ?")
|
|
||||||
public void ScheduledSendTeamDailyReport2() {
|
|
||||||
log.info("ScheduledSendTeamDailyReport2 started at {}", LocalDateTime.now());
|
|
||||||
sendTeamDailyReport();
|
|
||||||
log.info("ScheduledSendTeamDailyReport2 finished at {}", LocalDateTime.now());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Scheduled(cron = "0 15 0 * * ?")
|
@Scheduled(cron = "0 15 0 * * ?")
|
||||||
public void dailySummarize() {
|
public void dailySummarize() {
|
||||||
|
|
||||||
log.info("dailySummarize started at {}", LocalDateTime.now());
|
|
||||||
LocalDate yesterday = LocalDate.now().minusDays(1);
|
LocalDate yesterday = LocalDate.now().minusDays(1);
|
||||||
//查询部门下的所有用户
|
//查询部门下的所有用户
|
||||||
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
|
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
|
||||||
@ -154,34 +129,20 @@ public class DailyReport {
|
|||||||
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
|
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
|
||||||
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
|
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
|
||||||
//建立团队之间账号的联系
|
|
||||||
Map<String, String> accountNameWithTopAgentName = new HashMap<>();
|
|
||||||
dept.getUsers()
|
|
||||||
.stream()
|
|
||||||
.flatMap(userWithRolesAndAccountsResp -> userWithRolesAndAccountsResp.getAccounts().stream())
|
|
||||||
.forEach(accountResp -> ministerUser.getAccounts()
|
|
||||||
.stream()
|
|
||||||
.filter(ministerAccount -> Objects.equals(accountResp.getPlatformId(), ministerAccount
|
|
||||||
.getPlatformId()))
|
|
||||||
.findFirst()
|
|
||||||
.ifPresent(ministerAccount -> accountNameWithTopAgentName.put(accountResp
|
|
||||||
.getUsername(), ministerAccount.getUsername())));
|
|
||||||
|
|
||||||
//获取账号不为空的用户
|
//获取账号不为空的用户
|
||||||
var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList();
|
var deptUsers = dept.getUsers().stream().filter(user -> CollUtil.isNotEmpty(user.getAccounts())).toList();
|
||||||
var assistants = usersByRole.get(ASSISTANT_ROLE_CODE);
|
var assistants = usersByRole.get(ASSISTANT_ROLE_CODE);
|
||||||
|
|
||||||
sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime
|
sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime
|
||||||
.of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers);
|
.of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers);
|
||||||
//保存数据
|
getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday);
|
||||||
saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName);
|
saveData(ministerUser, deptUsers, yesterday);
|
||||||
//getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday);
|
|
||||||
sendFinance(yesterday, accountUsernameToUserMap, ministerUser);
|
|
||||||
});
|
});
|
||||||
log.info("dailySummarize finished at {}", LocalDateTime.now());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendTeamDailyReport() {
|
// 一小时发送一次
|
||||||
|
@Scheduled(cron = "0 0 * * * ?")
|
||||||
|
public void generateTeamReportTask() {
|
||||||
LocalDateTime nowDateTime = LocalDateTime.now();
|
LocalDateTime nowDateTime = LocalDateTime.now();
|
||||||
LocalDate nowDate = LocalDate.now();
|
LocalDate nowDate = LocalDate.now();
|
||||||
//查询部门下的所有用户
|
//查询部门下的所有用户
|
||||||
@ -203,51 +164,6 @@ public class DailyReport {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendFinance(LocalDate date,
|
|
||||||
Map<String, UserWithRolesAndAccountsResp> userWithRolesAndAccountsRespMap,
|
|
||||||
UserWithRolesAndAccountsResp ministerUser) {
|
|
||||||
List<FinanceDO> finances = financeService.getFinanceByDate(date);
|
|
||||||
Map<UserWithRolesAndAccountsResp, List<FinanceDO>> userFinances = finances.stream()
|
|
||||||
.filter(finance -> userWithRolesAndAccountsRespMap.containsKey(finance.getAgentName()))
|
|
||||||
.collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName())));
|
|
||||||
|
|
||||||
userFinances.forEach((user, userFinancesList) -> {
|
|
||||||
if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify()) && !user.getUserId()
|
|
||||||
.equals(ministerUser.getUserId())) {
|
|
||||||
String message = telegramMessageService.buildFinanceMessage(userFinancesList);
|
|
||||||
String botToken = StrUtil.isEmpty(user.getBotToken()) ? ministerUser.getBotToken() : user.getBotToken();
|
|
||||||
telegramMessageService.sendMessage(botToken, user.getReportIds(), message);
|
|
||||||
}
|
|
||||||
//telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<CompletableFuture<MemberPagination<List<TeamMember>>>> createFuturesForPagination(AccountResp account,
|
|
||||||
LocalDate date,
|
|
||||||
int sumReg) {
|
|
||||||
int pageSize = 100;
|
|
||||||
int totalPages = (int)Math.ceil((double)sumReg / pageSize);
|
|
||||||
|
|
||||||
return IntStream.range(0, totalPages).mapToObj(page -> {
|
|
||||||
int currentPageSize = page == totalPages - 1
|
|
||||||
? (sumReg % pageSize == 0 ? pageSize : sumReg % pageSize)
|
|
||||||
: pageSize;
|
|
||||||
TeamMemberReq memberListReq = TeamMemberReq.builder()
|
|
||||||
.registerStartDate(date)
|
|
||||||
.registerEndDate(date)
|
|
||||||
.startDate(date)
|
|
||||||
.endDate(date)
|
|
||||||
.registerSort(1)
|
|
||||||
.status(1)
|
|
||||||
.pageSize(currentPageSize)
|
|
||||||
.pageNum(page + 1)
|
|
||||||
.build();
|
|
||||||
return completableFutureWebClientService
|
|
||||||
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() {
|
|
||||||
});
|
|
||||||
}).toList();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 查询存款失败用户,并发送消息
|
* 查询存款失败用户,并发送消息
|
||||||
*
|
*
|
||||||
@ -257,43 +173,44 @@ public class DailyReport {
|
|||||||
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
||||||
LocalDate date) {
|
LocalDate date) {
|
||||||
|
|
||||||
List<CompletableFuture<List<TeamMember>>> accountFutureList = new ArrayList<>();
|
TeamMemberReq memberListReq = TeamMemberReq.builder()
|
||||||
|
.registerStartDate(date)
|
||||||
|
.registerEndDate(date)
|
||||||
|
.startDate(date)
|
||||||
|
.endDate(date)
|
||||||
|
.registerSort(1)
|
||||||
|
.status(1)
|
||||||
|
.pageSize(100)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<CompletableFuture<List<TeamMember>>> futureList = new ArrayList<>();
|
||||||
ministerUser.getAccounts().forEach(account -> {
|
ministerUser.getAccounts().forEach(account -> {
|
||||||
int sumReg = statsService.countNewRegNum(account.getUsername(), date);
|
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
|
||||||
sumReg = (sumReg == 0) ? 100 : sumReg;
|
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
|
||||||
|
});
|
||||||
|
|
||||||
List<CompletableFuture<MemberPagination<List<TeamMember>>>> paginationFutures = createFuturesForPagination(account, date, sumReg);
|
CompletableFuture<List<TeamMember>> teamMembersFuture = memberPaginationCompletableFuture
|
||||||
CompletableFuture<Void> allPaginationFutures = CompletableFuture.allOf(paginationFutures
|
.thenApply(MemberPagination::getList)
|
||||||
.toArray(new CompletableFuture[0]));
|
|
||||||
|
|
||||||
CompletableFuture<List<TeamMember>> aggregatedMembersFuture = allPaginationFutures
|
|
||||||
.thenApply(v -> paginationFutures.stream().map(CompletableFuture::join).flatMap(memberPagination -> {
|
|
||||||
List<TeamMember> members = memberPagination.getList();
|
|
||||||
log.info("members size:{}", members.size());
|
|
||||||
return members.stream();
|
|
||||||
}).collect(Collectors.toList()));
|
|
||||||
|
|
||||||
CompletableFuture<List<TeamMember>> filteredMembersFuture = aggregatedMembersFuture
|
|
||||||
.thenApplyAsync(members -> members.stream()
|
.thenApplyAsync(members -> members.stream()
|
||||||
.filter(member -> member.getDeposit() != null && member.getDeposit()
|
.filter(member -> member.getDeposit() != null && member.getDeposit()
|
||||||
.compareTo(BigDecimal.ZERO) == 0)
|
.compareTo(BigDecimal.ZERO) == 0)
|
||||||
.collect(Collectors.toList()), asyncTaskExecutor);
|
.collect(Collectors.toList()), asyncTaskExecutor)
|
||||||
|
|
||||||
CompletableFuture<List<TeamMember>> membersWithFailedPayFuture = filteredMembersFuture
|
|
||||||
.thenComposeAsync(membersWithoutDep -> {
|
.thenComposeAsync(membersWithoutDep -> {
|
||||||
List<CompletableFuture<TeamMember>> payRecordFutures = membersWithoutDep.stream()
|
List<CompletableFuture<TeamMember>> memberFutures = membersWithoutDep.stream()
|
||||||
.map(memberWithoutDep -> {
|
.map(memberWithoutDep -> {
|
||||||
PayRecordsListReq req = PayRecordsListReq.builder()
|
PayRecordListReq req = PayRecordListReq.builder()
|
||||||
.startDate(date)
|
.startDate(date)
|
||||||
.endDate(date)
|
.endDate(date)
|
||||||
.pageSize(10)
|
.pageSize(100)
|
||||||
.memberName(memberWithoutDep.getName())
|
.id(memberWithoutDep.getId())
|
||||||
.build();
|
.build();
|
||||||
return fetchPaginationPayRecordWithRetry(account, req).thenApplyAsync(pagination -> {
|
CompletableFuture<PayRecordList<List<PayRecord>>> completableFuture = completableFutureWebClientService
|
||||||
if (CollUtil.isNotEmpty(pagination.getList()) && pagination.getList()
|
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() {
|
||||||
.stream()
|
});
|
||||||
.noneMatch(payRecord -> payRecord.getPayStatus() == 2)) {
|
return completableFuture.thenApplyAsync(pagination -> {
|
||||||
|
if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination
|
||||||
|
.getScoreAmountTotal()
|
||||||
|
.compareTo(BigDecimal.ZERO) == 0) {
|
||||||
return memberWithoutDep;
|
return memberWithoutDep;
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
@ -302,16 +219,15 @@ public class DailyReport {
|
|||||||
})
|
})
|
||||||
.toList();
|
.toList();
|
||||||
|
|
||||||
return CompletableFuture.allOf(payRecordFutures.toArray(new CompletableFuture[0]))
|
return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0]))
|
||||||
.thenApply(v -> payRecordFutures.stream()
|
.thenApply(v -> memberFutures.stream()
|
||||||
.map(CompletableFuture::join)
|
.map(CompletableFuture::join)
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
}, asyncTaskExecutor);
|
}, asyncTaskExecutor)
|
||||||
|
|
||||||
CompletableFuture<List<TeamMember>> notificationFuture = membersWithFailedPayFuture
|
|
||||||
.thenApplyAsync(membersWithoutDep -> {
|
.thenApplyAsync(membersWithoutDep -> {
|
||||||
if (CollUtil.isNotEmpty(membersWithoutDep)) {
|
// 发送给每个account关联的user用户
|
||||||
|
if (!membersWithoutDep.isEmpty()) {
|
||||||
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
|
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
|
||||||
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
|
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
|
||||||
groupByTopAgentName.forEach((accountName, accountMembers) -> {
|
groupByTopAgentName.forEach((accountName, accountMembers) -> {
|
||||||
@ -322,24 +238,23 @@ public class DailyReport {
|
|||||||
String botToken = StrUtil.isEmpty(currUser.getBotToken())
|
String botToken = StrUtil.isEmpty(currUser.getBotToken())
|
||||||
? ministerUser.getBotToken()
|
? ministerUser.getBotToken()
|
||||||
: currUser.getBotToken();
|
: currUser.getBotToken();
|
||||||
telegramMessageService.sendMessage(botToken, currUser.getReportIds(), notification);
|
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return membersWithoutDep;
|
return membersWithoutDep;
|
||||||
}, asyncTaskExecutor);
|
}, asyncTaskExecutor);
|
||||||
|
|
||||||
accountFutureList.add(notificationFuture);
|
futureList.add(teamMembersFuture);
|
||||||
});
|
});
|
||||||
|
|
||||||
CompletableFuture<Void> allAccountFutures = CompletableFuture.allOf(accountFutureList
|
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
|
||||||
.toArray(new CompletableFuture[0]));
|
allFutures.thenRunAsync(() -> {
|
||||||
allAccountFutures.thenRunAsync(() -> {
|
// 主线下的所有的存款失败用户
|
||||||
List<TeamMember> allTeamMembers = accountFutureList.stream()
|
List<TeamMember> allTeamMembers = futureList.stream()
|
||||||
.map(CompletableFuture::join)
|
.map(CompletableFuture::join)
|
||||||
.flatMap(List::stream)
|
.flatMap(List::stream)
|
||||||
.toList();
|
.toList();
|
||||||
log.info("All failed pay members size: {}", allTeamMembers.size());
|
|
||||||
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
|
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
|
||||||
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
|
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
|
||||||
StringBuilder combinedNotification = new StringBuilder();
|
StringBuilder combinedNotification = new StringBuilder();
|
||||||
@ -347,45 +262,19 @@ public class DailyReport {
|
|||||||
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
|
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
|
||||||
combinedNotification.append(notification).append("\n");
|
combinedNotification.append(notification).append("\n");
|
||||||
});
|
});
|
||||||
|
|
||||||
telegramMessageService
|
telegramMessageService
|
||||||
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
|
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
|
||||||
.toString());
|
.toString());
|
||||||
|
|
||||||
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
|
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
|
||||||
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser
|
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser
|
||||||
.getReportIds(), combinedNotification.toString());
|
.getReportIds(), combinedNotification.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
}, asyncTaskExecutor).exceptionally(ex -> {
|
}, asyncTaskExecutor).exceptionally(ex -> {
|
||||||
log.error("Error collecting and processing data", ex);
|
log.error("Error collecting and processing data", ex);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<Pagination<List<PayRecord>>> fetchPaginationPayRecordWithRetry(AccountResp account,
|
|
||||||
PayRecordsListReq req) {
|
|
||||||
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
|
|
||||||
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
|
|
||||||
}), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
|
|
||||||
log.error("Error fetching pay records, retrying...", ex);
|
|
||||||
return fetchPaginationPayRecordWithRetry(account, req);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<PayRecordList<List<PayRecord>>> fetchPayRecordsWithRetry(AccountResp account,
|
|
||||||
PayRecordListReq req) {
|
|
||||||
//华体会接口限流严重,先加上
|
|
||||||
if (PLATFORM_HTH.equals(account.getPlatformName())) {
|
|
||||||
RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter("hth", 0.2);
|
|
||||||
log.info("限流器触发中....");
|
|
||||||
rateLimiter.acquire(); // 通过限流器限流
|
|
||||||
}
|
|
||||||
return CompletableFuture.supplyAsync(() -> completableFutureWebClientService
|
|
||||||
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<PayRecordList<List<PayRecord>>>>() {
|
|
||||||
}), asyncTaskExecutor).thenCompose(future -> future).exceptionallyCompose(ex -> {
|
|
||||||
log.error("Error fetching pay records", ex);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendDailyReport(LocalDate reportDate,
|
private void sendDailyReport(LocalDate reportDate,
|
||||||
@ -420,13 +309,11 @@ public class DailyReport {
|
|||||||
LocalDateTime startDateTime,
|
LocalDateTime startDateTime,
|
||||||
LocalDateTime endDateTime,
|
LocalDateTime endDateTime,
|
||||||
List<UserWithRolesAndAccountsResp> assistants) {
|
List<UserWithRolesAndAccountsResp> assistants) {
|
||||||
log.info("Starting to generate and send team report for minister user: {}", ministerUser.getUsername());
|
|
||||||
|
|
||||||
return CompletableFuture.runAsync(() -> {
|
return CompletableFuture.runAsync(() -> {
|
||||||
ConcurrentLinkedQueue<String[]> rows = new ConcurrentLinkedQueue<>();
|
List<String[]> rows = new ArrayList<>();
|
||||||
List<AccountResp> accounts = ministerUser.getAccounts();
|
List<AccountResp> accounts = ministerUser.getAccounts();
|
||||||
|
|
||||||
AtomicIntegerArray totals = new AtomicIntegerArray(2);
|
int[] totals = {0, 0};
|
||||||
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
|
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
|
||||||
|
|
||||||
List<CompletableFuture<Void>> futures = accounts.stream()
|
List<CompletableFuture<Void>> futures = accounts.stream()
|
||||||
@ -437,46 +324,35 @@ public class DailyReport {
|
|||||||
.stream()
|
.stream()
|
||||||
.mapToInt(TeamAccount::getFirstDepositNum)
|
.mapToInt(TeamAccount::getFirstDepositNum)
|
||||||
.sum();
|
.sum();
|
||||||
|
synchronized (totals) {
|
||||||
totals.addAndGet(0, totalNewMember);
|
totals[0] += totalNewMember;
|
||||||
totals.addAndGet(1, totalNewFirstDeposit);
|
totals[1] += totalNewFirstDeposit;
|
||||||
|
}
|
||||||
String percent = getPercent(totalNewFirstDeposit, totalNewMember);
|
String percent = getPercent(totalNewFirstDeposit, totalNewMember);
|
||||||
|
synchronized (rows) {
|
||||||
rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String
|
rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String
|
||||||
.valueOf(totalNewFirstDeposit), percent});
|
.valueOf(totalNewFirstDeposit), percent});
|
||||||
}, asyncTaskExecutor)
|
}
|
||||||
.exceptionally(ex -> {
|
}, asyncTaskExecutor))
|
||||||
log.error("Error fetching team info for account: {}", accountResp.getPlatformName(), ex);
|
|
||||||
return null;
|
|
||||||
}))
|
|
||||||
.toList();
|
.toList();
|
||||||
|
|
||||||
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
||||||
allFutures.join();
|
allFutures.join();
|
||||||
|
|
||||||
log.info("Completed all async tasks for minister user: {}", ministerUser.getUsername());
|
|
||||||
|
|
||||||
// 对 rows 列表进行排序
|
// 对 rows 列表进行排序
|
||||||
List<String[]> sortedRows = new ArrayList<>(rows);
|
rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
|
||||||
sortedRows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
|
|
||||||
|
|
||||||
sortedRows.add(new String[] {"总计", String.valueOf(totals.get(0)), String.valueOf(totals.get(1)),
|
|
||||||
getPercent(totals.get(1), totals.get(0))});
|
|
||||||
|
|
||||||
String message = TableFormatter.formatTableAsHtml(sortedRows);
|
|
||||||
|
|
||||||
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
|
|
||||||
|
|
||||||
|
rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]),
|
||||||
|
getPercent(totals[1], totals[0])});
|
||||||
|
String message = TableFormatter.formatTableAsHtml(rows);
|
||||||
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
|
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
|
||||||
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message);
|
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message);
|
||||||
}
|
}
|
||||||
|
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
|
||||||
//发送消息给助理
|
//发送消息给助理
|
||||||
if (!CollUtil.isEmpty(assistants)) {
|
if (!CollUtil.isEmpty(assistants)) {
|
||||||
assistants.forEach(assistant -> {
|
assistants.forEach(assistant -> {
|
||||||
if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
|
if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
|
||||||
telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message);
|
telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message);
|
||||||
log.info("Sent report to assistant: {}", assistant.getUsername());
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -503,8 +379,6 @@ public class DailyReport {
|
|||||||
List<AccountResp> currUserAccounts = deptUser.getAccounts();
|
List<AccountResp> currUserAccounts = deptUser.getAccounts();
|
||||||
|
|
||||||
List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
|
List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
|
||||||
// 团队账号暂时不用具体的代理线数据
|
|
||||||
.filter(accountResp -> !accountResp.getIsTeam())
|
|
||||||
.map(currAccount -> agentDataVisualListService
|
.map(currAccount -> agentDataVisualListService
|
||||||
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
|
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
|
||||||
.thenApplyAsync(agentData -> agentData.getCurData()
|
.thenApplyAsync(agentData -> agentData.getCurData()
|
||||||
@ -551,8 +425,7 @@ public class DailyReport {
|
|||||||
|
|
||||||
private void saveData(UserWithRolesAndAccountsResp ministerUser,
|
private void saveData(UserWithRolesAndAccountsResp ministerUser,
|
||||||
List<UserWithRolesAndAccountsResp> deptUsers,
|
List<UserWithRolesAndAccountsResp> deptUsers,
|
||||||
LocalDate reportDate,
|
LocalDate reportDate) {
|
||||||
Map<String, String> accountUsernameWithTopAgentName) {
|
|
||||||
// 获取传入年月
|
// 获取传入年月
|
||||||
YearMonth inputYearMonth = YearMonth.from(reportDate);
|
YearMonth inputYearMonth = YearMonth.from(reportDate);
|
||||||
|
|
||||||
@ -578,7 +451,6 @@ public class DailyReport {
|
|||||||
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
|
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
|
||||||
FinanceDO financeDO = new FinanceDO();
|
FinanceDO financeDO = new FinanceDO();
|
||||||
BeanUtil.copyProperties(finance, financeDO);
|
BeanUtil.copyProperties(finance, financeDO);
|
||||||
financeDO.setTopAgentName(accountResp.getUsername());
|
|
||||||
return financeDO;
|
return financeDO;
|
||||||
}).toList();
|
}).toList();
|
||||||
financeService.addAll(financeReqList);
|
financeService.addAll(financeReqList);
|
||||||
@ -618,7 +490,6 @@ public class DailyReport {
|
|||||||
.orElseThrow(() -> new BusinessException("No data found for report date"));
|
.orElseThrow(() -> new BusinessException("No data found for report date"));
|
||||||
StatsDO statsDO = new StatsDO();
|
StatsDO statsDO = new StatsDO();
|
||||||
BeanUtil.copyProperties(statics, statsDO);
|
BeanUtil.copyProperties(statics, statsDO);
|
||||||
statsDO.setTopAgentName(accountUsernameWithTopAgentName.get(currAccount.getUsername()));
|
|
||||||
return statsDO;
|
return statsDO;
|
||||||
}, asyncTaskExecutor)
|
}, asyncTaskExecutor)
|
||||||
.exceptionally(ex -> {
|
.exceptionally(ex -> {
|
||||||
|
@ -19,18 +19,16 @@ package com.zayac.admin.service;
|
|||||||
import cn.hutool.core.lang.TypeReference;
|
import cn.hutool.core.lang.TypeReference;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
|
||||||
import com.zayac.admin.config.RateLimiterConfig;
|
|
||||||
import com.zayac.admin.resp.ApiResponse;
|
import com.zayac.admin.resp.ApiResponse;
|
||||||
import com.zayac.admin.system.model.resp.AccountResp;
|
import com.zayac.admin.system.model.resp.AccountResp;
|
||||||
import io.netty.handler.timeout.TimeoutException;
|
import io.netty.handler.timeout.TimeoutException;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.core.ParameterizedTypeReference;
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
import org.springframework.http.HttpStatusCode;
|
import org.springframework.http.HttpStatusCode;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.reactive.function.BodyInserters;
|
import org.springframework.web.reactive.function.BodyInserters;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
import org.springframework.web.reactive.function.client.WebClientRequestException;
|
|
||||||
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
@ -40,19 +38,21 @@ import top.continew.starter.core.exception.BusinessException;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class CompletableFutureWebClientService {
|
public class CompletableFutureWebClientService {
|
||||||
private final WebClient webClient;
|
private final WebClient webClient;
|
||||||
private final RateLimiterConfig rateLimiterConfig;
|
private final Semaphore semaphore;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
|
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
|
||||||
RateLimiterConfig rateLimiterConfig,
|
@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests,
|
||||||
ObjectMapper objectMapper) {
|
ObjectMapper objectMapper) {
|
||||||
this.webClient = webClientBuilder.build();
|
this.webClient = webClientBuilder.build();
|
||||||
this.rateLimiterConfig = rateLimiterConfig;
|
this.semaphore = new Semaphore(maxConcurrentRequests);
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,34 +60,38 @@ public class CompletableFutureWebClientService {
|
|||||||
String apiPath,
|
String apiPath,
|
||||||
Object params,
|
Object params,
|
||||||
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
|
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
|
||||||
RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter(account.getPlatformUrl() + apiPath);
|
return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture();
|
||||||
return fetchData(rateLimiter, account.getPlatformUrl() + apiPath, account
|
|
||||||
.getHeaders(), params, typeRef, account).toFuture();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> Mono<T> fetchData(RateLimiter rateLimiter,
|
public <T> Mono<T> fetchData(String url,
|
||||||
String url,
|
|
||||||
String headers,
|
String headers,
|
||||||
Object params,
|
Object params,
|
||||||
ParameterizedTypeReference<ApiResponse<T>> typeRef,
|
ParameterizedTypeReference<ApiResponse<T>> typeRef,
|
||||||
AccountResp account) {
|
AccountResp account) {
|
||||||
return Mono.fromCallable(() -> {
|
return Mono.fromCallable(() -> {
|
||||||
rateLimiter.acquire();
|
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
|
||||||
|
throw new RuntimeException("Unable to acquire a permit");
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
})
|
}).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> {
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
try {
|
||||||
.flatMap(ignored -> webClient.post().uri(url).headers(httpHeaders -> {
|
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
|
||||||
addHeaders(httpHeaders, headers);
|
}, true);
|
||||||
|
headerMap.forEach(httpHeaders::add);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Header conversion exception: " + e.getMessage());
|
||||||
|
throw new BusinessException("Header conversion failed", e);
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
|
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
|
||||||
.retrieve()
|
.retrieve()
|
||||||
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
|
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
|
||||||
.bodyToMono(String.class)
|
.bodyToMono(String.class)
|
||||||
.doOnNext(resStr -> {
|
.doOnNext(resStr -> {
|
||||||
log.debug("Request URL: {}", url);
|
log.debug("request url:{}", url);
|
||||||
log.debug("Request headers: {}", headers);
|
log.debug("request headers :{}", headers);
|
||||||
log.debug("Request params: {}", params);
|
log.debug("request params:{}", params);
|
||||||
log.debug("Response: {}", resStr);
|
log.debug("response {}", resStr);
|
||||||
})
|
})
|
||||||
.flatMap(body -> {
|
.flatMap(body -> {
|
||||||
try {
|
try {
|
||||||
@ -100,22 +104,12 @@ public class CompletableFutureWebClientService {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
.flatMap(response -> respHandler(response, account))
|
.flatMap(response -> respHandler(response, account))
|
||||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(this::isRetryableException)));
|
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException)))
|
||||||
|
.doFinally(signal -> semaphore.release());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isRetryableException(Throwable throwable) {
|
private boolean isRetryableException(Throwable throwable) {
|
||||||
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable || throwable instanceof WebClientRequestException;
|
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable;
|
||||||
}
|
|
||||||
|
|
||||||
private void addHeaders(org.springframework.http.HttpHeaders httpHeaders, String headers) {
|
|
||||||
try {
|
|
||||||
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<Map<String, String>>() {
|
|
||||||
}, true);
|
|
||||||
headerMap.forEach(httpHeaders::add);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("Header conversion exception: " + e.getMessage());
|
|
||||||
throw new BusinessException("Header conversion failed", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) {
|
private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) {
|
||||||
|
@ -16,16 +16,19 @@
|
|||||||
|
|
||||||
package com.zayac.admin.service;
|
package com.zayac.admin.service;
|
||||||
|
|
||||||
import cn.hutool.core.collection.ListUtil;
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.zayac.admin.common.enums.DisEnableStatusEnum;
|
import com.zayac.admin.common.enums.DisEnableStatusEnum;
|
||||||
import com.zayac.admin.constant.ApiPathConstants;
|
import com.zayac.admin.constant.ApiPathConstants;
|
||||||
import com.zayac.admin.req.ActiveListReq;
|
|
||||||
import com.zayac.admin.req.MemberDetailsReq;
|
import com.zayac.admin.req.MemberDetailsReq;
|
||||||
import com.zayac.admin.req.PayRecordsListReq;
|
import com.zayac.admin.req.PayRecordsListReq;
|
||||||
import com.zayac.admin.req.team.TeamMemberListReq;
|
import com.zayac.admin.req.team.TeamMemberListReq;
|
||||||
import com.zayac.admin.resp.*;
|
import com.zayac.admin.resp.Member;
|
||||||
|
import com.zayac.admin.resp.MemberPagination;
|
||||||
|
import com.zayac.admin.resp.Pagination;
|
||||||
|
import com.zayac.admin.resp.PayRecord;
|
||||||
import com.zayac.admin.resp.team.Team;
|
import com.zayac.admin.resp.team.Team;
|
||||||
|
import com.zayac.admin.resp.team.TeamAccount;
|
||||||
import com.zayac.admin.resp.team.TeamAccountWithChange;
|
import com.zayac.admin.resp.team.TeamAccountWithChange;
|
||||||
import com.zayac.admin.system.model.resp.AccountResp;
|
import com.zayac.admin.system.model.resp.AccountResp;
|
||||||
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
|
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
|
||||||
@ -33,15 +36,10 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.core.ParameterizedTypeReference;
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import top.continew.starter.cache.redisson.util.RedisUtils;
|
|
||||||
import top.continew.starter.core.exception.BusinessException;
|
import top.continew.starter.core.exception.BusinessException;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
@ -49,9 +47,6 @@ import java.util.concurrent.Executor;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static com.zayac.admin.common.constant.CacheConstants.SUCCESSFULLY_PAYED_ACCOUNTNAME;
|
|
||||||
import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@ -103,123 +98,34 @@ public class DepositService {
|
|||||||
LocalDate nowDate,
|
LocalDate nowDate,
|
||||||
LocalDateTime nowDateTime,
|
LocalDateTime nowDateTime,
|
||||||
Executor asyncTaskExecutor) {
|
Executor asyncTaskExecutor) {
|
||||||
ActiveListReq req = ActiveListReq.builder()
|
PayRecordsListReq req = createPayRecordsListReq(accountWithChange.getAgentName(), nowDate);
|
||||||
.pageNum(1)
|
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
|
||||||
.activeType(3)
|
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
|
||||||
.topAgentName(accountWithChange.getAgentName())
|
});
|
||||||
.date(nowDate)
|
|
||||||
.pageSize(9999)
|
|
||||||
.isRest(false)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
return completableFutureWebClientService
|
StringBuilder depositResults = new StringBuilder();
|
||||||
.fetchDataForAccount(account, ApiPathConstants.ACTIVE_LIST, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<ActiveListResp>>>>() {
|
AtomicInteger depositCounter = new AtomicInteger(0);
|
||||||
})
|
|
||||||
.thenApply(Pagination::getList)
|
|
||||||
.thenComposeAsync(activeListResps -> {
|
|
||||||
// 过滤并排序
|
|
||||||
List<ActiveListResp> sortedList = activeListResps.stream()
|
|
||||||
.filter(resp -> resp.getFirstPayTime() != null && resp.getFirstPayTime()
|
|
||||||
.isAfter(nowDateTime.minusMinutes(5)))
|
|
||||||
.sorted(Comparator.comparing(ActiveListResp::getFirstPayTime))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
// 截取后N个元素
|
return paginationCompletableFuture.thenApply(Pagination::getList)
|
||||||
List<ActiveListResp> activeListRespList = ListUtil.sub(sortedList, -accountWithChange
|
.thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor)
|
||||||
.getNewDepositNum(), sortedList.size());
|
.thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults, depositCounter), asyncTaskExecutor)
|
||||||
|
|
||||||
// 异步处理每个响应
|
|
||||||
List<CompletableFuture<Void>> futures = activeListRespList.stream().map(resp -> {
|
|
||||||
if (resp.getDeposit().compareTo(BigDecimal.ZERO) == 0) {
|
|
||||||
LocalDate startDate = nowDateTime.minusHours(1).toLocalDate();
|
|
||||||
PayRecordsListReq payRecordsListReq = PayRecordsListReq.builder()
|
|
||||||
.startDate(startDate)
|
|
||||||
.endDate(nowDate)
|
|
||||||
.memberName(resp.getName())
|
|
||||||
.pageSize(10)
|
|
||||||
.payState(2)
|
|
||||||
.agentName(accountWithChange.getAgentName())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
return completableFutureWebClientService
|
|
||||||
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, payRecordsListReq, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
|
|
||||||
})
|
|
||||||
.thenApply(Pagination::getList)
|
|
||||||
.thenAcceptAsync(payRecords -> {
|
|
||||||
if (payRecords != null && !payRecords.isEmpty()) {
|
|
||||||
resp.setDeposit(payRecords.get(0).getScoreAmount());
|
|
||||||
} else {
|
|
||||||
log.warn("No pay records found for member: {}", resp.getName());
|
|
||||||
}
|
|
||||||
}, asyncTaskExecutor);
|
|
||||||
}
|
|
||||||
return CompletableFuture.<Void>completedFuture(null);
|
|
||||||
}).toList();
|
|
||||||
|
|
||||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
|
||||||
.thenApply(v -> activeListRespList);
|
|
||||||
}, asyncTaskExecutor)
|
|
||||||
.thenAcceptAsync(activeListRespList -> {
|
|
||||||
String depositResults = buildDepositResults(activeListRespList);
|
|
||||||
String notification = buildNotificationMessage(accountWithChange, depositResults);
|
|
||||||
sendNotifications(accountWithChange, ministerUser, accountUsernameToUserMap, notification);
|
|
||||||
}, asyncTaskExecutor)
|
|
||||||
.exceptionally(ex -> {
|
.exceptionally(ex -> {
|
||||||
log.error("Error processing account changes for account: {}", account, ex);
|
log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex
|
||||||
|
.getMessage());
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildDepositResults(List<ActiveListResp> activeListRespList) {
|
private PayRecordsListReq createPayRecordsListReq(String agentName, LocalDate nowDate) {
|
||||||
StringBuilder depositResults = new StringBuilder();
|
return PayRecordsListReq.builder()
|
||||||
activeListRespList.forEach(activeListResp -> depositResults.append(telegramMessageService
|
.startDate(nowDate)
|
||||||
.buildDepositResultsMessage(activeListResp.getName(), activeListResp.getDeposit())));
|
.endDate(nowDate)
|
||||||
return depositResults.toString();
|
.pageSize(100)
|
||||||
|
.payState(2)
|
||||||
|
.agentName(agentName)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildNotificationMessage(TeamAccountWithChange accountWithChange, String depositResults) {
|
|
||||||
return telegramMessageService.buildDepositMessage(accountWithChange.getAgentName(), accountWithChange
|
|
||||||
.getNewDepositNum(), depositResults, accountWithChange.getFirstDepositNum());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendNotifications(TeamAccountWithChange accountWithChange,
|
|
||||||
UserWithRolesAndAccountsResp ministerUser,
|
|
||||||
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
|
||||||
String notification) {
|
|
||||||
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
|
|
||||||
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
|
|
||||||
String botToken = StrUtil.isEmpty(currUser.getBotToken())
|
|
||||||
? ministerUser.getBotToken()
|
|
||||||
: currUser.getBotToken();
|
|
||||||
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
|
|
||||||
}
|
|
||||||
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
|
|
||||||
}
|
|
||||||
|
|
||||||
// // 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题
|
|
||||||
// LocalDate startDate = nowDateTime.minusHours(1).toLocalDate();
|
|
||||||
// PayRecordsListReq req = PayRecordsListReq.builder()
|
|
||||||
// .startDate(startDate)
|
|
||||||
// .endDate(nowDate)
|
|
||||||
// .pageSize(100)
|
|
||||||
// .payState(2)
|
|
||||||
// .agentName(accountWithChange.getAgentName())
|
|
||||||
// .build();
|
|
||||||
//
|
|
||||||
// StringBuilder depositResults = new StringBuilder();
|
|
||||||
// AtomicInteger depositCounter = new AtomicInteger(0);
|
|
||||||
//
|
|
||||||
// return completableFutureWebClientService
|
|
||||||
// .fetchDataForAccount(account,ApiPathConstants.PAY_RECORDS_LIST_URL,req,new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>(){}).thenApply(Pagination::getList)
|
|
||||||
// .thenComposeAsync(payRecords-> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor),asyncTaskExecutor)
|
|
||||||
// .thenRunAsync(()->sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults),asyncTaskExecutor)
|
|
||||||
// .exceptionally(ex->{
|
|
||||||
// log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex
|
|
||||||
// .getMessage());
|
|
||||||
// return null;
|
|
||||||
// });
|
|
||||||
//}
|
|
||||||
|
|
||||||
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
|
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
|
||||||
TeamAccountWithChange accountWithChange,
|
TeamAccountWithChange accountWithChange,
|
||||||
AccountResp account,
|
AccountResp account,
|
||||||
@ -228,30 +134,15 @@ public class DepositService {
|
|||||||
StringBuilder depositResults,
|
StringBuilder depositResults,
|
||||||
AtomicInteger depositCounter,
|
AtomicInteger depositCounter,
|
||||||
Executor asyncTaskExecutor) {
|
Executor asyncTaskExecutor) {
|
||||||
|
Map<String, PayRecord> earliestPayRecords = payRecords.stream()
|
||||||
//根据用户名去重,保留时间最早的支付记录
|
|
||||||
List<PayRecord> sortedPayRecords = payRecords.stream()
|
|
||||||
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
|
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
|
||||||
.collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, (
|
.collect(Collectors.toMap(PayRecord::getName, record -> record, (existing, replacement) -> existing
|
||||||
existingRecord,
|
|
||||||
newRecord) -> existingRecord
|
|
||||||
.getCreatedAt()
|
.getCreatedAt()
|
||||||
.isBefore(newRecord
|
.isBefore(replacement.getCreatedAt()) ? existing : replacement));
|
||||||
.getCreatedAt())
|
|
||||||
? existingRecord
|
|
||||||
: newRecord, LinkedHashMap::new), map -> map
|
|
||||||
.values()
|
|
||||||
.stream()
|
|
||||||
.sorted(Comparator
|
|
||||||
.comparing(PayRecord::getCreatedAt)
|
|
||||||
.reversed())
|
|
||||||
.filter(payRecord -> !RedisUtils
|
|
||||||
.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord
|
|
||||||
.getBillNo()))
|
|
||||||
.collect(Collectors
|
|
||||||
.toList())));
|
|
||||||
|
|
||||||
List<CompletableFuture<Void>> fetchMemberFutures = sortedPayRecords.stream()
|
List<PayRecord> validPayRecords = earliestPayRecords.values().stream().toList();
|
||||||
|
|
||||||
|
List<CompletableFuture<Void>> fetchMemberFutures = validPayRecords.stream()
|
||||||
.map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor)
|
.map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor)
|
||||||
.thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor)
|
.thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor)
|
||||||
.exceptionally(ex -> {
|
.exceptionally(ex -> {
|
||||||
@ -268,27 +159,19 @@ public class DepositService {
|
|||||||
TeamAccountWithChange accountWithChange,
|
TeamAccountWithChange accountWithChange,
|
||||||
StringBuilder depositResults,
|
StringBuilder depositResults,
|
||||||
AtomicInteger depositCounter) {
|
AtomicInteger depositCounter) {
|
||||||
synchronized (this) {
|
if (payRecord.getCreatedAt().equals(member.getFirstPayAt()) && depositCounter
|
||||||
//如果从缓存中没有key,那就是新存款用户
|
.getAndIncrement() < accountWithChange.getNewDepositNum()) {
|
||||||
if (!RedisUtils.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo())) {
|
|
||||||
//如果订单记录有存款成功但是会员的首存时间还为空,数据未同步,也是首存
|
|
||||||
if ((member.getFirstPayAt() == null || payRecord.getCreatedAt()
|
|
||||||
.equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange
|
|
||||||
.getNewDepositNum()) {
|
|
||||||
//把存款成功的用户保存起来,设置一个小时的缓存时间 防止重复用户
|
|
||||||
RedisUtils.set(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo(), member.getName(), Duration
|
|
||||||
.ofHours(1));
|
|
||||||
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord
|
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord
|
||||||
.getScoreAmount()));
|
.getScoreAmount()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendNotification(TeamAccountWithChange accountWithChange,
|
private void sendNotification(TeamAccountWithChange accountWithChange,
|
||||||
UserWithRolesAndAccountsResp ministerUser,
|
UserWithRolesAndAccountsResp ministerUser,
|
||||||
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
||||||
StringBuilder depositResults) {
|
StringBuilder depositResults,
|
||||||
|
AtomicInteger depositCounter) {
|
||||||
|
if (depositCounter.get() > 0) {
|
||||||
String notification = telegramMessageService.buildDepositMessage(accountWithChange
|
String notification = telegramMessageService.buildDepositMessage(accountWithChange
|
||||||
.getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange
|
.getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange
|
||||||
.getFirstDepositNum());
|
.getFirstDepositNum());
|
||||||
@ -301,6 +184,7 @@ public class DepositService {
|
|||||||
}
|
}
|
||||||
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
|
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private CompletableFuture<Member> fetchMemberDetails(AccountResp account,
|
private CompletableFuture<Member> fetchMemberDetails(AccountResp account,
|
||||||
String name,
|
String name,
|
||||||
@ -335,4 +219,27 @@ public class DepositService {
|
|||||||
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
|
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
|
||||||
|
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
|
||||||
|
|
||||||
|
return prevTeam.getList()
|
||||||
|
.stream()
|
||||||
|
.filter(account1 -> team2AccountMap.containsKey(account1.getId()))
|
||||||
|
.map(account1 -> {
|
||||||
|
TeamAccount account2 = team2AccountMap.get(account1.getId());
|
||||||
|
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
|
||||||
|
BeanUtil.copyProperties(account2, changedAccount);
|
||||||
|
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
|
||||||
|
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
|
||||||
|
}
|
||||||
|
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
|
||||||
|
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
|
||||||
|
}
|
||||||
|
return changedAccount;
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,10 +21,9 @@ import cn.hutool.core.util.StrUtil;
|
|||||||
import com.zayac.admin.common.enums.DisEnableStatusEnum;
|
import com.zayac.admin.common.enums.DisEnableStatusEnum;
|
||||||
import com.zayac.admin.constant.ApiPathConstants;
|
import com.zayac.admin.constant.ApiPathConstants;
|
||||||
import com.zayac.admin.req.team.TeamMemberReq;
|
import com.zayac.admin.req.team.TeamMemberReq;
|
||||||
import com.zayac.admin.resp.ApiResponse;
|
|
||||||
import com.zayac.admin.resp.MemberPagination;
|
import com.zayac.admin.resp.MemberPagination;
|
||||||
import com.zayac.admin.resp.team.Team;
|
import com.zayac.admin.resp.team.Team;
|
||||||
import com.zayac.admin.resp.team.TeamAccountWithChange;
|
import com.zayac.admin.resp.team.TeamAccount;
|
||||||
import com.zayac.admin.resp.team.TeamMember;
|
import com.zayac.admin.resp.team.TeamMember;
|
||||||
import com.zayac.admin.system.model.resp.AccountResp;
|
import com.zayac.admin.system.model.resp.AccountResp;
|
||||||
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
|
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
|
||||||
@ -38,8 +37,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 新注册处理相关逻辑
|
* 新注册处理相关逻辑
|
||||||
@ -54,37 +52,35 @@ public class RegistrationService {
|
|||||||
public CompletableFuture<Void> processRegistration(UserWithRolesAndAccountsResp minister,
|
public CompletableFuture<Void> processRegistration(UserWithRolesAndAccountsResp minister,
|
||||||
AccountResp account,
|
AccountResp account,
|
||||||
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
|
||||||
|
Map<String, TeamAccount> teamAccountMap,
|
||||||
Team currentTeamInfo,
|
Team currentTeamInfo,
|
||||||
Team prevTeamInfo,
|
Team prevTeamInfo,
|
||||||
LocalDate nowDate,
|
LocalDate nowDate,
|
||||||
|
|
||||||
Executor asyncTaskExecutor) {
|
Executor asyncTaskExecutor) {
|
||||||
if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
|
if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
|
||||||
List<TeamAccountWithChange> hasNewRegAccounts = findChangedTeamAccount(prevTeamInfo, currentTeamInfo)
|
int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount();
|
||||||
.stream()
|
|
||||||
.filter(teamAccountWithChange -> teamAccountWithChange.getNewRegisterNum() > 0)
|
|
||||||
.toList();
|
|
||||||
hasNewRegAccounts.parallelStream().forEach(accountWithChange -> {
|
|
||||||
TeamMemberReq memberListReq = TeamMemberReq.builder()
|
TeamMemberReq memberListReq = TeamMemberReq.builder()
|
||||||
.registerStartDate(nowDate)
|
.registerStartDate(nowDate)
|
||||||
.registerEndDate(nowDate)
|
.registerEndDate(nowDate)
|
||||||
.startDate(nowDate)
|
.startDate(nowDate)
|
||||||
.endDate(nowDate)
|
.endDate(nowDate)
|
||||||
.registerSort(1)
|
.registerSort(1)
|
||||||
.topAgentName(accountWithChange.getAgentName())
|
.pageSize(registerCount)
|
||||||
.pageSize(accountWithChange.getNewRegisterNum())
|
|
||||||
.build();
|
.build();
|
||||||
completableFutureWebClientService
|
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
|
||||||
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<ApiResponse<MemberPagination<List<TeamMember>>>>() {
|
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
|
||||||
})
|
});
|
||||||
.thenApply(MemberPagination::getList)
|
return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor)
|
||||||
.thenAcceptAsync(members -> {
|
.thenAcceptAsync(members -> {
|
||||||
log.info("Successfully get [{}] new registered members for {}", members
|
log.info("Successfully get [{}] new registered members", members.size());
|
||||||
.size(), accountWithChange.getAgentName());
|
|
||||||
if (CollUtil.isNotEmpty(members)) {
|
if (CollUtil.isNotEmpty(members)) {
|
||||||
|
Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
|
||||||
|
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
|
||||||
|
groupByTopAgentName.forEach((accountName, accountMembers) -> {
|
||||||
String notification = telegramMessageService
|
String notification = telegramMessageService
|
||||||
.buildRegistrationMessage(members, accountWithChange);
|
.buildRegistrationMessage(accountName, accountMembers, teamAccountMap.get(accountName));
|
||||||
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
|
var currUser = accountUsernameToUserMap.get(accountName);
|
||||||
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
|
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
|
||||||
String botToken = StrUtil.isEmpty(currUser.getBotToken())
|
String botToken = StrUtil.isEmpty(currUser.getBotToken())
|
||||||
? minister.getBotToken()
|
? minister.getBotToken()
|
||||||
@ -93,9 +89,9 @@ public class RegistrationService {
|
|||||||
}
|
}
|
||||||
telegramMessageService
|
telegramMessageService
|
||||||
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
|
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}, asyncTaskExecutor);
|
}, asyncTaskExecutor);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
return CompletableFuture.completedFuture(null);
|
return CompletableFuture.completedFuture(null);
|
||||||
}
|
}
|
||||||
|
@ -19,12 +19,9 @@ package com.zayac.admin.service;
|
|||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
import cn.hutool.core.text.CharPool;
|
import cn.hutool.core.text.CharPool;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.zayac.admin.agent.model.entity.FinanceDO;
|
|
||||||
import com.zayac.admin.resp.Statics;
|
import com.zayac.admin.resp.Statics;
|
||||||
import com.zayac.admin.resp.team.TeamAccount;
|
import com.zayac.admin.resp.team.TeamAccount;
|
||||||
import com.zayac.admin.resp.team.TeamAccountWithChange;
|
|
||||||
import com.zayac.admin.resp.team.TeamMember;
|
import com.zayac.admin.resp.team.TeamMember;
|
||||||
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@ -52,7 +49,6 @@ public class TelegramMessageService {
|
|||||||
}
|
}
|
||||||
String fullMessage = String.format("%s|%s|%s", botToken, targetId, message);
|
String fullMessage = String.format("%s|%s|%s", botToken, targetId, message);
|
||||||
this.rabbitTemplate.convertAndSend("message_queue", fullMessage);
|
this.rabbitTemplate.convertAndSend("message_queue", fullMessage);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(String botToken, List<Long> targetIds, String message) {
|
public void sendMessage(String botToken, List<Long> targetIds, String message) {
|
||||||
@ -85,25 +81,11 @@ public class TelegramMessageService {
|
|||||||
return htmlTags.stream().anyMatch(text::contains);
|
return htmlTags.stream().anyMatch(text::contains);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String buildRegistrationMessage(UserWithRolesAndAccountsResp currUser,
|
public String buildRegistrationMessage(String accountName,
|
||||||
List<TeamMember> accountMembers,
|
List<TeamMember> accountMembers,
|
||||||
TeamAccount teamAccount) {
|
TeamAccount teamAccount) {
|
||||||
|
|
||||||
String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", "));
|
String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", "));
|
||||||
if (currUser != null) {
|
return String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers
|
||||||
return String.format("👏 [%s] %s 注册: %d 会员: `%s` 总数:*%d*", currUser.getNickname(), teamAccount
|
|
||||||
.getAgentName(), accountMembers.size(), memberNames, teamAccount.getSubMemberNum());
|
|
||||||
}
|
|
||||||
return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers
|
|
||||||
.size(), memberNames, teamAccount.getSubMemberNum());
|
|
||||||
}
|
|
||||||
|
|
||||||
public String buildRegistrationMessage(List<TeamMember> accountMembers, TeamAccountWithChange teamAccount) {
|
|
||||||
|
|
||||||
String memberNames = accountMembers.stream()
|
|
||||||
.map(member -> "`" + member.getName() + "`")
|
|
||||||
.collect(Collectors.joining(", "));
|
|
||||||
return String.format("👏 %s 注册: %d 会员: %s 总数:*%d*", teamAccount.getAgentName(), accountMembers
|
|
||||||
.size(), memberNames, teamAccount.getSubMemberNum());
|
.size(), memberNames, teamAccount.getSubMemberNum());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,7 +94,7 @@ public class TelegramMessageService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) {
|
public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) {
|
||||||
return String.format("会员: `%s`, 首存金额: *%s*\n", name, scoreAmount);
|
return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String buildFailedPayMessage(String accountName, List<TeamMember> accountMembers) {
|
public String buildFailedPayMessage(String accountName, List<TeamMember> accountMembers) {
|
||||||
@ -129,14 +111,4 @@ public class TelegramMessageService {
|
|||||||
});
|
});
|
||||||
return message.toString();
|
return message.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String buildFinanceMessage(List<FinanceDO> userFinancesList) {
|
|
||||||
StringBuilder message = new StringBuilder();
|
|
||||||
userFinancesList.forEach(financeDO -> {
|
|
||||||
String formattedFinance = String.format("%s: *%s*\n", financeDO.getAgentName(), financeDO.getNetProfit()
|
|
||||||
.toPlainString());
|
|
||||||
message.append(formattedFinance);
|
|
||||||
});
|
|
||||||
return message.toString();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,128 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.zayac.admin.service;
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.TypeReference;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.zayac.admin.resp.ApiResponse;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.web.reactive.function.BodyInserters;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
import reactor.util.retry.Retry;
|
||||||
|
import top.continew.starter.core.exception.BusinessException;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 封装了一个简单可用的调用后端api的服务类
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class WebClientService {
|
||||||
|
private final WebClient webClient;
|
||||||
|
|
||||||
|
private final Semaphore semaphore;
|
||||||
|
|
||||||
|
public WebClientService(@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests) {
|
||||||
|
this.webClient = WebClient.create();
|
||||||
|
this.semaphore = new Semaphore(maxConcurrentRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T fetchData(String url,
|
||||||
|
String headers,
|
||||||
|
Object params,
|
||||||
|
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
|
||||||
|
try {
|
||||||
|
semaphore.acquire(); // 尝试获取许可
|
||||||
|
return this.webClient.post().uri(url).headers(httpHeaders -> {
|
||||||
|
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
|
||||||
|
}, true);
|
||||||
|
headerMap.forEach(httpHeaders::add);
|
||||||
|
})
|
||||||
|
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
|
||||||
|
.retrieve()
|
||||||
|
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
|
||||||
|
.bodyToMono(String.class)
|
||||||
|
.map(body -> new BusinessException("Error response: " + body)))
|
||||||
|
.bodyToMono(typeRef)
|
||||||
|
.flatMap(this::respHandler)
|
||||||
|
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
|
||||||
|
.block(); // 遇到网络问题,每三秒重试一次
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new BusinessException("Failed to acquire semaphore", e);
|
||||||
|
} finally {
|
||||||
|
semaphore.release(); // 释放许可
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> Mono<T> MonoFetchData(String url,
|
||||||
|
String headers,
|
||||||
|
Object params,
|
||||||
|
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
|
||||||
|
System.out.println(Thread.currentThread().getName());
|
||||||
|
System.out.println(Thread.currentThread().getId());
|
||||||
|
return Mono.fromCallable(() -> {
|
||||||
|
semaphore.acquire(); // 尝试获取许可
|
||||||
|
return true;
|
||||||
|
})
|
||||||
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
|
.flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> {
|
||||||
|
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
|
||||||
|
}, true);
|
||||||
|
headerMap.forEach(httpHeaders::add);
|
||||||
|
})
|
||||||
|
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
|
||||||
|
.retrieve()
|
||||||
|
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
|
||||||
|
.bodyToMono(String.class)
|
||||||
|
.map(body -> new BusinessException("Error response: " + body)))
|
||||||
|
.bodyToMono(typeRef)
|
||||||
|
.flatMap(this::monoRespHandler)
|
||||||
|
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
|
||||||
|
.doFinally(signal -> semaphore.release()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> Mono<T> monoRespHandler(ApiResponse<T> apiResponse) {
|
||||||
|
// 根据响应状态码处理逻辑
|
||||||
|
if (apiResponse.getStatusCode() == 6000) {
|
||||||
|
return Mono.just(apiResponse.getData()); // 正常情况下返回数据
|
||||||
|
} else if (apiResponse.getStatusCode() == 6001) {
|
||||||
|
return Mono.error(new BusinessException("API token expired")); // API令牌过期
|
||||||
|
} else {
|
||||||
|
return Mono.error(new BusinessException("Error status code: " + apiResponse.getStatusCode())); // 其他错误
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> Mono<T> respHandler(ApiResponse<T> apiResponse) {
|
||||||
|
System.out.println(apiResponse);
|
||||||
|
//api接口只有在statusCode为6000的时候返回的数据才是正常数据
|
||||||
|
if (apiResponse.getStatusCode().equals(6000)) {
|
||||||
|
return Mono.just(apiResponse.getData());
|
||||||
|
} else if (apiResponse.getStatusCode().equals(6001)) {
|
||||||
|
// TODO 调用登录接口,刷新header信息重试
|
||||||
|
return Mono.error(new BusinessException("xApiToken失效")); // 状态码6001,登录失效
|
||||||
|
} else {
|
||||||
|
return Mono.error(new BusinessException("错误状态码: " + apiResponse.getStatusCode())); // 其他状态码,抛出异常
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,66 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.zayac.admin.utils;
|
|
||||||
|
|
||||||
import cn.hutool.core.bean.BeanUtil;
|
|
||||||
import com.zayac.admin.resp.team.Team;
|
|
||||||
import com.zayac.admin.resp.team.TeamAccount;
|
|
||||||
import com.zayac.admin.resp.team.TeamAccountWithChange;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class CommonUtils {
|
|
||||||
public static List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
|
|
||||||
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
|
|
||||||
.stream()
|
|
||||||
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
|
|
||||||
|
|
||||||
return prevTeam.getList()
|
|
||||||
.stream()
|
|
||||||
.filter(account1 -> team2AccountMap.containsKey(account1.getId()))
|
|
||||||
.map(account1 -> {
|
|
||||||
TeamAccount account2 = team2AccountMap.get(account1.getId());
|
|
||||||
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
|
|
||||||
BeanUtil.copyProperties(account2, changedAccount);
|
|
||||||
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
|
|
||||||
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
|
|
||||||
}
|
|
||||||
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
|
|
||||||
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
|
|
||||||
}
|
|
||||||
return changedAccount;
|
|
||||||
})
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> List<T> getLastNElements(List<T> list, int n) {
|
|
||||||
if (list == null || list.isEmpty() || n <= 0) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
int size = list.size();
|
|
||||||
if (n > size) {
|
|
||||||
return new ArrayList<>(list);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new ArrayList<>(list.subList(size - n, size));
|
|
||||||
}
|
|
||||||
}
|
|
@ -76,11 +76,6 @@ public class CacheConstants {
|
|||||||
*/
|
*/
|
||||||
public static final String DEPT_USERS_ROLES_ACCOUNTS_KEY_PREFIX = "DEPT_USERS_ROLES_ACCOUNTS" + DELIMITER;
|
public static final String DEPT_USERS_ROLES_ACCOUNTS_KEY_PREFIX = "DEPT_USERS_ROLES_ACCOUNTS" + DELIMITER;
|
||||||
|
|
||||||
/**
|
|
||||||
* 存款成功的用户名
|
|
||||||
*/
|
|
||||||
public static final String SUCCESSFULLY_PAYED_ACCOUNTNAME = "SUCCESSFULLY_PAYED_ACCOUNTNAME" + DELIMITER;
|
|
||||||
|
|
||||||
private CacheConstants() {
|
private CacheConstants() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,6 @@
|
|||||||
a.id AS account_id,
|
a.id AS account_id,
|
||||||
a.nickname AS account_nickname,
|
a.nickname AS account_nickname,
|
||||||
a.username AS account_username,
|
a.username AS account_username,
|
||||||
a.is_team As account_is_team,
|
|
||||||
a.status AS account_status,
|
a.status AS account_status,
|
||||||
a.headers AS account_headers,
|
a.headers AS account_headers,
|
||||||
a.platform_id AS account_platform_id,
|
a.platform_id AS account_platform_id,
|
||||||
@ -83,7 +82,6 @@
|
|||||||
<result column="account_platform_id" property="platformId"/>
|
<result column="account_platform_id" property="platformId"/>
|
||||||
<result column="platform_name" property="platformName"/>
|
<result column="platform_name" property="platformName"/>
|
||||||
<result column="platform_url" property="platformUrl"/>
|
<result column="platform_url" property="platformUrl"/>
|
||||||
<result column="account_is_team" property="isTeam"/>
|
|
||||||
</collection>
|
</collection>
|
||||||
</collection>
|
</collection>
|
||||||
</resultMap>
|
</resultMap>
|
||||||
|
@ -60,7 +60,7 @@ spring.datasource:
|
|||||||
## Liquibase 配置
|
## Liquibase 配置
|
||||||
spring.liquibase:
|
spring.liquibase:
|
||||||
# 是否启用
|
# 是否启用
|
||||||
enabled: false
|
enabled: true
|
||||||
# 配置文件路径
|
# 配置文件路径
|
||||||
change-log: classpath:/db/changelog/db.changelog-master.yaml
|
change-log: classpath:/db/changelog/db.changelog-master.yaml
|
||||||
|
|
||||||
@ -273,8 +273,7 @@ avatar:
|
|||||||
support-suffix: jpg,jpeg,png,gif
|
support-suffix: jpg,jpeg,png,gif
|
||||||
|
|
||||||
webclient:
|
webclient:
|
||||||
max-requests-per-second: 10.0
|
max-concurrent-requests: 60
|
||||||
|
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
|
@ -228,9 +228,8 @@ management.health:
|
|||||||
# 关闭邮箱健康检查(邮箱配置错误或邮箱服务器不可用时,健康检查会报错)
|
# 关闭邮箱健康检查(邮箱配置错误或邮箱服务器不可用时,健康检查会报错)
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
||||||
### 每秒钟接口调用的次数
|
|
||||||
webclient:
|
webclient:
|
||||||
max-requests-per-second: 10.0
|
max-concurrent-requests: 60
|
||||||
|
|
||||||
spring:
|
spring:
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
|
Loading…
Reference in New Issue
Block a user