新增了线程池配置

现在定时任务都是异步多线程的了
This commit is contained in:
zayac 2024-06-05 20:17:44 +08:00
parent 693f4e5f0f
commit 6d05b3b95d
11 changed files with 639 additions and 508 deletions

View File

@ -0,0 +1,42 @@
package com.zayac.admin.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.Executor;
@Configuration
public class ThreadPoolConfig {
/**
* 异步任务线程池
*
* @return Executor
*/
@Bean(name = "asyncTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("asyncTaskExecutor-");
executor.initialize();
return executor;
}
/**
* 定时任务线程池
*
* @return ThreadPoolTaskScheduler
*/
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("ScheduledTask-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(60);
return scheduler;
}
}

View File

@ -0,0 +1,40 @@
package com.zayac.admin.req;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDate;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class PayRecordListReq {
private Long id;
/**
* 页码
*/
@Builder.Default
private int pageNum = 1;
/**
* 每页显示数量
*/
private int pageSize;
/**
* 起始日期
*/
@JsonFormat(pattern = "yyyy-MM-dd")
private LocalDate startDate;
/**
* 结束日期
*/
@JsonFormat(pattern = "yyyy-MM-dd")
private LocalDate endDate;
}

View File

@ -0,0 +1,13 @@
package com.zayac.admin.resp.team;
import com.zayac.admin.resp.Pagination;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class PayRecordList<T> extends Pagination<T> {
private BigDecimal rebateAmountTotal;
private BigDecimal orderAmountTotal;
private BigDecimal scoreAmountTotal;
}

View File

@ -40,13 +40,18 @@ import java.time.temporal.TemporalAdjusters;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* 检测注册跟新增的定时任务
*/
@Slf4j @Slf4j
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class TelegramTeamMessageSchedule { public class CheckRegAndDep {
private final UserService userService; private final UserService userService;
private final AccountService accountService; private final AccountService accountService;
@ -55,6 +60,7 @@ public class TelegramTeamMessageSchedule {
private final UserRoleService userRoleService; private final UserRoleService userRoleService;
private final RegistrationService registrationService; private final RegistrationService registrationService;
private final DepositService depositService; private final DepositService depositService;
private final Executor asyncTaskExecutor;
private static final String MINISTER_ROLE_CODE = "minister"; private static final String MINISTER_ROLE_CODE = "minister";
private static final long FIXED_DELAY = 60000L; private static final long FIXED_DELAY = 60000L;
@ -64,8 +70,8 @@ public class TelegramTeamMessageSchedule {
LocalDate nowDate = LocalDate.now(); LocalDate nowDate = LocalDate.now();
LocalDateTime nowDateTime = LocalDateTime.now(); LocalDateTime nowDateTime = LocalDateTime.now();
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> users = userRoleService.listUserIdByRoleId(minister.getId()); List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId());
users.forEach(userId -> { userIds.forEach(userId -> {
processUser(userService.getById(userId), nowDate, nowDateTime).join(); processUser(userService.getById(userId), nowDate, nowDateTime).join();
}); });
} }
@ -94,8 +100,7 @@ public class TelegramTeamMessageSchedule {
CompletableFuture<Team> teamFuture = teamService.getLatestTeamInfoAsync(account, teamInfoReq); CompletableFuture<Team> teamFuture = teamService.getLatestTeamInfoAsync(account, teamInfoReq);
Team prevTeamInfo = teamService.getPreviousTeamInfo(account); Team prevTeamInfo = teamService.getPreviousTeamInfo(account);
return CompletableFuture.allOf(teamFuture).thenCompose(v -> { return teamFuture.thenComposeAsync(currentTeamInfo -> {
Team currentTeamInfo = teamFuture.join();
log.info("Previous Team Info: {}", prevTeamInfo); log.info("Previous Team Info: {}", prevTeamInfo);
log.info("Current Team Info: {}", currentTeamInfo); log.info("Current Team Info: {}", currentTeamInfo);
Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList() Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList()
@ -103,12 +108,18 @@ public class TelegramTeamMessageSchedule {
.collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity())); .collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity()));
CompletableFuture<Void> registrationProcess = registrationService CompletableFuture<Void> registrationProcess = registrationService
.processRegistration(minister, account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap); .processRegistration(minister, account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap, asyncTaskExecutor)
.thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor);
CompletableFuture<Void> depositProcess = depositService CompletableFuture<Void> depositProcess = depositService
.processDeposits(minister, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime); .processDeposits(minister, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor)
.thenRunAsync(() -> log.info("Deposit process completed"), asyncTaskExecutor);
return CompletableFuture.allOf(registrationProcess, depositProcess) return CompletableFuture.allOf(registrationProcess, depositProcess)
.thenRun(() -> teamService.updateTeamInfo(account, currentTeamInfo)); .thenRunAsync(() -> {
}); teamService.updateTeamInfo(account, currentTeamInfo);
log.info("Team info updated");
}, asyncTaskExecutor);
}, asyncTaskExecutor);
} }
} }

View File

@ -20,46 +20,46 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.zayac.admin.agent.model.entity.FinanceDO; import com.zayac.admin.agent.model.entity.FinanceDO;
import com.zayac.admin.agent.model.entity.StatsDO; import com.zayac.admin.agent.model.entity.StatsDO;
import com.zayac.admin.agent.model.req.FinanceReq;
import com.zayac.admin.agent.model.req.FinanceSumReq; import com.zayac.admin.agent.model.req.FinanceSumReq;
import com.zayac.admin.agent.service.FinanceService; import com.zayac.admin.agent.service.FinanceService;
import com.zayac.admin.agent.service.FinanceSumService; import com.zayac.admin.agent.service.FinanceSumService;
import com.zayac.admin.agent.service.StatsService; import com.zayac.admin.agent.service.StatsService;
import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.AgentDataVisualListReq; import com.zayac.admin.req.AgentDataVisualListReq;
import com.zayac.admin.req.PayRecordListReq;
import com.zayac.admin.req.team.TeamFinanceReq; import com.zayac.admin.req.team.TeamFinanceReq;
import com.zayac.admin.req.team.TeamInfoReq; import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.resp.AgentDataVisualList; import com.zayac.admin.req.team.TeamMemberReq;
import com.zayac.admin.resp.Statics; import com.zayac.admin.resp.*;
import com.zayac.admin.resp.team.Team; import com.zayac.admin.resp.team.*;
import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.service.*;
import com.zayac.admin.resp.team.TeamAccountFinance;
import com.zayac.admin.resp.team.TeamFinancePagination;
import com.zayac.admin.service.AgentDataVisualListService;
import com.zayac.admin.service.CompletableFutureFinanceService;
import com.zayac.admin.service.TeamService;
import com.zayac.admin.service.TelegramMessageService;
import com.zayac.admin.system.model.entity.RoleDO; import com.zayac.admin.system.model.entity.RoleDO;
import com.zayac.admin.system.model.entity.UserDO; import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.*; import com.zayac.admin.system.service.*;
import com.zayac.admin.utils.TableFormatter; import com.zayac.admin.utils.TableFormatter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import top.continew.starter.core.exception.BusinessException; import top.continew.starter.core.exception.BusinessException;
import java.math.BigDecimal;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.YearMonth; import java.time.YearMonth;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class DailyReport { public class DailyReport {
private final UserService userService; private final UserService userService;
private final TeamService teamService; private final TeamService teamService;
@ -72,6 +72,8 @@ public class DailyReport {
private final CompletableFutureFinanceService completableFutureFinanceService; private final CompletableFutureFinanceService completableFutureFinanceService;
private final FinanceService financeService; private final FinanceService financeService;
private final FinanceSumService financeSumService; private final FinanceSumService financeSumService;
private final CompletableFutureWebClientService completableFutureWebClientService;
private final Executor asyncTaskExecutor;
private static final String MINISTER_ROLE_CODE = "minister"; private static final String MINISTER_ROLE_CODE = "minister";
@ -86,7 +88,120 @@ public class DailyReport {
public void dailySummarize() { public void dailySummarize() {
LocalDate yesterday = LocalDate.now().minusDays(1); LocalDate yesterday = LocalDate.now().minusDays(1);
sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime.of(yesterday, LocalTime.MAX)); sendDailyReport(yesterday, yesterday.atStartOfDay(), LocalDateTime.of(yesterday, LocalTime.MAX));
saveData(yesterday, yesterday.atStartOfDay(), LocalDateTime.of(yesterday, LocalTime.MAX)); getPayFailedMember(yesterday);
saveData(yesterday);
}
/**
* 查询存款失败用户,并发送消息
*
* @param date 日期
*/
private void getPayFailedMember(LocalDate date) {
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId());
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(100)
.build();
userIds.forEach(userId -> {
List<CompletableFuture<List<TeamMember>>> futureList = new ArrayList<>();
UserDO ministerUser = userService.getById(userId);
List<AccountResp> accounts = accountService.getAccountsByUserId(userId, DisEnableStatusEnum.ENABLE)
.stream()
.filter(AccountResp::getIsTeam)
.toList();
accounts.forEach(account -> {
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
CompletableFuture<List<TeamMember>> teamMembersFuture = memberPaginationCompletableFuture.thenApply(MemberPagination::getList)
.thenApplyAsync(members -> members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit().compareTo(BigDecimal.ZERO) == 0)
.collect(Collectors.toList()), asyncTaskExecutor)
.thenComposeAsync(membersWithoutDep -> {
List<CompletableFuture<TeamMember>> memberFutures = membersWithoutDep.stream()
.map(memberWithoutDep -> {
PayRecordListReq req = PayRecordListReq.builder()
.startDate(date)
.endDate(date)
.pageSize(100)
.id(memberWithoutDep.getId())
.build();
CompletableFuture<PayRecordList<List<PayRecord>>> completableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() {
});
return completableFuture.thenApplyAsync(pagination -> {
if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0
&& pagination.getScoreAmountTotal().compareTo(BigDecimal.ZERO) == 0) {
return memberWithoutDep;
} else {
return null;
}
}, asyncTaskExecutor);
}).toList();
return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> memberFutures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}, asyncTaskExecutor)
.thenApplyAsync(membersWithoutDep -> {
// 发送给每个account关联的user用户
if (!membersWithoutDep.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService
.buildFailedPayMessage(accountName, accountMembers);
UserDO currUser = accountService.getUserByAccountUsername(accountName);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? ministerUser.getBotToken() : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
});
}
return membersWithoutDep;
}, asyncTaskExecutor);
futureList.add(teamMembersFuture);
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allFutures.thenRunAsync(() -> {
// 主线下的所有的存款失败用户
List<TeamMember> allTeamMembers = futureList.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.toList();
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
StringBuilder combinedNotification = new StringBuilder();
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService
.buildFailedPayMessage(accountName, accountMembers);
combinedNotification.append(notification).append("\n");
});
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification.toString());
if (ministerUser != null && DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), combinedNotification.toString());
}
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error collecting and processing data", ex);
return null;
});
});
} }
@ -94,70 +209,145 @@ public class DailyReport {
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId()); List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId());
userIds.parallelStream().forEach(userId -> { userIds.forEach(userId -> {
UserDO ministerUser = userService.getById(userId); UserDO ministerUser = userService.getById(userId);
List<UserDO> deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, ministerUser.getDeptId()); List<UserDO> deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, ministerUser.getDeptId());
List<CompletableFuture<Void>> tasks = new ArrayList<>();
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
List<String[]> rows = generateRows(ministerUser.getId(), startDateTime, endDateTime); tasks.add(generateAndSendTeamReport(ministerUser, startDateTime, endDateTime));
String table = TableFormatter.formatTableAsHtml(rows);
//telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), table);
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, table);
} }
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder()
.monthDate(reportDate) .monthDate(reportDate)
.build(); .build();
//获取部长对应下级所有用户
deptUsers.parallelStream().forEach(deptUser -> { deptUsers.forEach(deptUser -> tasks.add(processDeptUser(deptUser, agentDataVisualListReq, reportDate, ministerUser)));
String message = getDeptUserMessage(deptUser, agentDataVisualListReq, reportDate);
CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
allTasks.join();
});
}
/**
* 构建表格
*
* @param ministerUser 部长
* @param startDateTime 起始时间
* @param endDateTime 结束时间
* @return CompletableFuture<Void>
*/
private CompletableFuture<Void> generateAndSendTeamReport(UserDO ministerUser, LocalDateTime
startDateTime, LocalDateTime endDateTime) {
return CompletableFuture.runAsync(() -> {
List<String[]> rows = new ArrayList<>();
rows.add(new String[]{"平台", "注册", "新增", "转化率"});
rows.add(new String[]{"----", "----", "----", "----"});
List<AccountResp> accounts = accountService.getAccountsByUserId(ministerUser.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(AccountResp::getIsTeam)
.toList();
int[] totals = {0, 0};
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
List<CompletableFuture<Void>> futures = accounts.stream()
.map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq)
.thenAcceptAsync(team -> {
int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int totalNewFirstDeposit = team.getList().stream().mapToInt(TeamAccount::getFirstDepositNum).sum();
synchronized (totals) {
totals[0] += totalNewMember;
totals[1] += totalNewFirstDeposit;
}
String percent = getPercent(totalNewFirstDeposit, totalNewMember);
synchronized (rows) {
rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String.valueOf(totalNewFirstDeposit), percent});
}
}, asyncTaskExecutor)
)
.toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
rows.add(new String[]{"总注册", String.valueOf(totals[0]), String.valueOf(totals[1]), getPercent(totals[1], totals[0])});
String message = TableFormatter.formatTableAsHtml(rows);
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message);
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error generating and sending team report", ex);
return null;
});
}
/**
* 查询部门所有用户的报表
*
* @param deptUser 部门下所有用户对象
* @param agentDataVisualListReq 请求参数
* @param reportDate 日期
* @param ministerUser 上级
* @return CompletableFuture<Void>
*/
private CompletableFuture<Void> processDeptUser(UserDO deptUser, AgentDataVisualListReq
agentDataVisualListReq, LocalDate reportDate, UserDO ministerUser) {
return CompletableFuture.runAsync(() -> {
List<AccountResp> currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(accountResp -> !accountResp.getIsTeam())
.toList();
List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
.map(currAccount -> agentDataVisualListService.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date"))
)
.exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage());
return null;
})
)
.toList();
CompletableFuture<Void> userStaticsFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
userStaticsFuture.thenRunAsync(() -> {
List<Statics> agentDataList = futures.stream()
.map(future -> {
try {
return future.join();
} catch (Exception ex) {
log.error("Error joining future", ex);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
// 构造消息体
String message = telegramMessageService.buildDailyReportMessage(agentDataList);
if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
String botToken = StrUtil.isEmpty(deptUser.getBotToken()) ? ministerUser.getBotToken() : deptUser.getBotToken(); String botToken = StrUtil.isEmpty(deptUser.getBotToken()) ? ministerUser.getBotToken() : deptUser.getBotToken();
telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message); telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message);
} }
}); }, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error collecting and processing data", ex);
return null;
}).join();
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error processing dept user", ex);
return null;
}); });
} }
//获取需要发送给用户的信息 private void saveData(LocalDate reportDate) {
private String getDeptUserMessage(UserDO deptUser,
AgentDataVisualListReq agentDataVisualListReq,
LocalDate reportDate) {
StringBuilder message = new StringBuilder();
// 主线注册新增不用报数,过滤
List<AccountResp> currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(accountResp -> !accountResp.getIsTeam())
.toList();
currUserAccounts.forEach(currAccount -> {
CompletableFuture<AgentDataVisualList> future = agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq);
AgentDataVisualList agentData = future.join();
Statics statics = agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow();
message.append(currAccount.getUsername())
.append("\n")
.append("注册: ")
.append(statics.getIsNew())
.append("\n")
.append("新增: ")
.append(statics.getFirstCount())
.append("\n")
.append("日活: ")
.append(statics.getCountBets())
.append("\n\n");
});
return message.toString();
}
private void saveData(LocalDate reportDate, LocalDateTime startDateTime, LocalDateTime endDateTime) {
// 获取传入年月 // 获取传入年月
YearMonth inputYearMonth = YearMonth.from(reportDate); YearMonth inputYearMonth = YearMonth.from(reportDate);
@ -169,6 +359,8 @@ public class DailyReport {
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId()); List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId());
List<CompletableFuture<Void>> tasks = new ArrayList<>();
userIds.forEach(userId -> { userIds.forEach(userId -> {
UserDO ministerUser = userService.getById(userId); UserDO ministerUser = userService.getById(userId);
List<UserDO> deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, ministerUser.getDeptId()); List<UserDO> deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, ministerUser.getDeptId());
@ -181,44 +373,73 @@ public class DailyReport {
.stream() .stream()
.filter(AccountResp::getIsTeam) .filter(AccountResp::getIsTeam)
.toList(); .toList();
ministerUserAccounts.parallelStream().forEach(accountResp -> {
TeamFinancePagination<List<TeamAccountFinance>> financePagination = completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq).join(); // 异步处理 ministerUserAccounts
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> { CompletableFuture<Void> ministerAccountsFuture = CompletableFuture.runAsync(() ->
FinanceDO financeDO = new FinanceDO(); ministerUserAccounts.parallelStream().forEach(accountResp -> {
BeanUtil.copyProperties(finance, financeDO); TeamFinancePagination<List<TeamAccountFinance>> financePagination = completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq).join();
return financeDO; List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
}).toList(); FinanceDO financeDO = new FinanceDO();
financeService.addAll(financeReqList); BeanUtil.copyProperties(finance, financeDO);
FinanceSumReq financeSumReq = new FinanceSumReq(); return financeDO;
BeanUtil.copyProperties(financePagination.getTotalSumVo(),financeSumReq); }).toList();
financeSumService.add(financeSumReq); financeService.addAll(financeReqList);
}); FinanceSumReq financeSumReq = new FinanceSumReq();
BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq);
financeSumService.add(financeSumReq);
}), asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error processing minister accounts", ex);
return null;
});
tasks.add(ministerAccountsFuture);
// 异步处理 deptUsers
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder() AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder()
.monthDate(reportDate) .monthDate(reportDate)
.build(); .build();
deptUsers.parallelStream().forEach(deptUser -> {
List<AccountResp> currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(accountResp -> !accountResp.getIsTeam())
.toList();
List<StatsDO> list = currUserAccounts.parallelStream().map(currAccount -> {
CompletableFuture<AgentDataVisualList> future = agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq);
AgentDataVisualList agentData = future.join();
Statics statics = agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow();
StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO);
return statsDO;
}
).toList();
statsService.addAll(list);
});
deptUsers.forEach(deptUser -> {
CompletableFuture<Void> deptUserFuture = CompletableFuture.runAsync(() -> {
List<AccountResp> currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(accountResp -> !accountResp.getIsTeam())
.toList();
List<CompletableFuture<StatsDO>> futures = currUserAccounts.stream()
.map(currAccount -> agentDataVisualListService.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> {
Statics statics = agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date"));
StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO);
return statsDO;
}, asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage());
return null;
})
).toList();
List<StatsDO> list = futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
statsService.addAll(list);
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error processing dept user accounts", ex);
return null;
});
tasks.add(deptUserFuture);
});
}); });
CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
allTasks.join();
} else { } else {
throw new BusinessException("只允许查询当月以及上个月的数据"); throw new BusinessException("只允许查询当月以及上个月的数据");
} }
@ -232,29 +453,4 @@ public class DailyReport {
percentInstance.setMinimumFractionDigits(2); percentInstance.setMinimumFractionDigits(2);
return percentInstance.format(d1 / d2); return percentInstance.format(d1 / d2);
} }
private List<String[]> generateRows(Long userId, LocalDateTime startDateTime, LocalDateTime endDateTime) {
List<String[]> rows = new ArrayList<>();
rows.add(new String[]{"平台", "注册", "新增", "转化率"});
rows.add(new String[]{"----", "----", "----", "----"});
List<AccountResp> accounts = accountService.getAccountsByUserId(userId, DisEnableStatusEnum.ENABLE)
.stream()
.filter(AccountResp::getIsTeam)
.toList();
int[] totals = {0, 0};
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
accounts.forEach(accountResp -> {
Team team = teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq).join();
int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int totalNewFirstDeposit = team.getList().stream().mapToInt(TeamAccount::getFirstDepositNum).sum();
totals[0] += totalNewMember;
totals[1] += totalNewFirstDeposit;
String percent = getPercent(totalNewFirstDeposit, totalNewMember);
rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String.valueOf(totalNewFirstDeposit), percent});
});
rows.add(new String[]{"总注册", String.valueOf(totals[0]), String.valueOf(totals[1]), getPercent(totals[1], totals[0])});
return rows;
}
} }

View File

@ -1,211 +0,0 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.schedule;
import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.MemberDetailsReq;
import com.zayac.admin.req.MemberListReq;
import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.resp.*;
import com.zayac.admin.service.BannerService;
import com.zayac.admin.service.CompletableFutureWebClientService;
import com.zayac.admin.service.TelegramMessageService;
import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.AccountService;
import com.zayac.admin.system.service.UserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* 定时任务 查询注册 新增用 暂定一分钟执行一次
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageSchedule {
private final UserService userService;
private final AccountService accountService;
private final CompletableFutureWebClientService completableFutureWebClientService;
private final TelegramMessageService telegramMessageService;
private final BannerService bannerService;
//@Scheduled(fixedDelay = 60000)
public void newCheckRegistrationAndNewDeposit() {
// Get the current date and time
LocalDate nowDate = LocalDate.now();
LocalDateTime nowDateTime = LocalDateTime.now();
// Fetch all enabled users
List<UserDO> allEnableUser = userService.getAllEnabledUsers();
allEnableUser.forEach(user -> processUser(user, nowDate, nowDateTime).join());
}
private CompletableFuture<Void> processUser(UserDO user, LocalDate nowDate, LocalDateTime nowDateTime) {
List<AccountResp> accounts = accountService.getAccountsByUserId(user.getId(), DisEnableStatusEnum.ENABLE);
List<CompletableFuture<Void>> futures = accounts.stream()
.map(account -> processAccount(account, user, nowDate, nowDateTime))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private CompletableFuture<Void> processAccount(AccountResp account,
UserDO user,
LocalDate nowDate,
LocalDateTime nowDateTime) {
CompletableFuture<Banner> bannerFuture = bannerService.getLatestBannerInfoAsync(account);
Banner prevBanner = bannerService.getPreviousBanner(account);
return CompletableFuture.allOf(bannerFuture).thenCompose(v -> {
Banner currentBanner = bannerFuture.join();
CompletableFuture<Void> registrationProcess = processRegistration(account, user, currentBanner, prevBanner, nowDate);
CompletableFuture<Void> depositProcess = processDeposits(account, user, currentBanner, prevBanner, nowDate, nowDateTime);
return CompletableFuture.allOf(registrationProcess, depositProcess)
.thenRun(() -> bannerService.updateBanner(account, currentBanner));
});
}
private CompletableFuture<Void> processRegistration(AccountResp account,
UserDO currUser,
Banner banner,
Banner prevBanner,
LocalDate nowDate) {
if (prevBanner != null && banner.getRegisterMembers() > prevBanner.getRegisterMembers()) {
int registerCount = banner.getRegisterMembers() - prevBanner.getRegisterMembers();
MemberListReq memberListReq = MemberListReq.builder()
.registerStartDate(nowDate)
.registerEndDate(nowDate)
.startDate(nowDate)
.endDate(nowDate)
.registerSort(1)
.pageSize(registerCount)
.build();
CompletableFuture<MemberPagination<List<Member>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> {
if (!members.isEmpty()) {
String memberNames = members.stream().map(Member::getName).collect(Collectors.joining(", "));
String notification = String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", account
.getNickname(), registerCount, memberNames, banner.getRegisterMembers());
if (DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getRegAndDepIds(), notification);
}
}
});
}
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> processDeposits(AccountResp account,
UserDO user,
Banner current,
Banner previous,
LocalDate nowDate,
LocalDateTime nowDateTime) {
return CompletableFuture.runAsync(() -> {
if (previous != null && current.getFirstDepositNum() > previous.getFirstDepositNum()) {
int newDeposits = current.getFirstDepositNum() - previous.getFirstDepositNum();
PayRecordsListReq req = PayRecordsListReq.builder()
.startDate(nowDate)
.endDate(nowDate)
.pageSize(100)
.payState(2)
.build();
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
List<String> names = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
.map(PayRecord::getName)
.distinct()
.toList();
names.forEach(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> {
Optional<PayRecord> matchingRecord = payRecords.stream()
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
.findFirst();
matchingRecord.ifPresent(record -> {
String depositResults = String.format("用户: `%s`, 首存金额: *%s*", member.getName(), record
.getScoreAmount());
String depResults = "🎉 " + account
.getNickname() + " 首存: " + newDeposits + depositResults + " 总数:*" + current
.getFirstDepositNum() + "*";
if (DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) {
telegramMessageService.sendMessage(user.getBotToken(), user.getRegAndDepIds(), depResults);
}
});
}).exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", name, ex.getMessage());
return null;
}).join());
}).exceptionally(ex -> {
log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
return null;
}).join();
}
});
}
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) {
MemberListReq memberListReq = MemberListReq.builder()
.name(name)
.startDate(nowDate)
.endDate(nowDate)
.status(1)
.build();
CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
return memberFuture.thenApply(MemberPagination::getList)
.thenApply(list -> list.stream().findFirst())
.thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member
.getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")));
}
private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
MemberDetailsReq detailsReq = MemberDetailsReq.builder()
.id(memberId)
.startDate(nowDate)
.endDate(nowDate)
.build();
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
});
}
}

View File

@ -69,43 +69,43 @@ public class CompletableFutureWebClientService {
ParameterizedTypeReference<ApiResponse<T>> typeRef, ParameterizedTypeReference<ApiResponse<T>> typeRef,
AccountResp account) { AccountResp account) {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Unable to acquire a permit"); throw new RuntimeException("Unable to acquire a permit");
} }
return true; return true;
}).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> { }).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> {
try { try {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() { Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true); }, true);
headerMap.forEach(httpHeaders::add); headerMap.forEach(httpHeaders::add);
} catch (Exception e) { } catch (Exception e) {
log.warn("Header conversion exception: " + e.getMessage()); log.warn("Header conversion exception: " + e.getMessage());
throw new BusinessException("Header conversion failed", e); throw new BusinessException("Header conversion failed", e);
} }
}) })
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve() .retrieve()
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
.bodyToMono(String.class) .bodyToMono(String.class)
.doOnNext(resStr -> { .doOnNext(resStr -> {
log.info("request url:{}", url); log.info("request url:{}", url);
log.info("request headers :{}", headers); log.info("request headers :{}", headers);
log.info("request params:{}", params); log.info("request params:{}", params);
log.info("response {}", resStr); log.info("response {}", resStr);
}) })
.flatMap(body -> { .flatMap(body -> {
try { try {
ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory()
.constructType(typeRef.getType())); .constructType(typeRef.getType()));
return Mono.justOrEmpty(apiResponse); return Mono.justOrEmpty(apiResponse);
} catch (Exception e) { } catch (Exception e) {
log.warn("JSON parsing exception: " + e.getMessage()); log.warn("JSON parsing exception: " + e.getMessage());
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008)); return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
} }
}) })
.flatMap(response -> respHandler(response, account)) .flatMap(response -> respHandler(response, account))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))) .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException)))
.doFinally(signal -> semaphore.release()); .doFinally(signal -> semaphore.release());
} }
private boolean isRetryableException(Throwable throwable) { private boolean isRetryableException(Throwable throwable) {

View File

@ -1,19 +1,3 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.service; package com.zayac.admin.service;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
@ -46,6 +30,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -62,89 +47,117 @@ public class DepositService {
Team currentTeam, Team currentTeam,
Team previousTeam, Team previousTeam,
LocalDate nowDate, LocalDate nowDate,
LocalDateTime nowDateTime) { LocalDateTime nowDateTime,
return CompletableFuture.runAsync(() -> { Executor asyncTaskExecutor) {
if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) { if (previousTeam == null || currentTeam.getFirstDepositNum() <= previousTeam.getFirstDepositNum()) {
PayRecordsListReq req = PayRecordsListReq.builder() return CompletableFuture.completedFuture(null);
.startDate(nowDate) }
.endDate(nowDate)
.pageSize(100)
.payState(2)
.build();
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
Set<String> changedAgentNames = changedTeamAccounts.stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.map(TeamAccountWithChange::getAgentName)
.collect(Collectors.toSet());
paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
Map<String, List<String>> agentNameWithNames = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames
.contains(record.getAgentName()))
.collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
.mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors
.toSet(), ArrayList::new))));
agentNameWithNames.forEach((agentName, names) -> { return processDepositRecords(minister, account, currentTeam, previousTeam, nowDate, nowDateTime, asyncTaskExecutor);
StringBuilder depositResults = new StringBuilder(); }
AtomicInteger depositCounter = new AtomicInteger(0);
TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream() private CompletableFuture<Void> processDepositRecords(UserDO minister,
.filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName)) AccountResp account,
.findFirst() Team currentTeam,
.orElseThrow(() -> new BusinessException(String Team previousTeam,
.format("can not find agent name %s", agentName))); LocalDate nowDate,
LocalDateTime nowDateTime,
List<CompletableFuture<Void>> fetchFutures = names.stream() Executor asyncTaskExecutor) {
.map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> { PayRecordsListReq req = PayRecordsListReq.builder()
payRecords.stream() .startDate(nowDate)
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) .endDate(nowDate)
.findFirst() .pageSize(100)
.ifPresent(record -> { .payState(2)
if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) { .build();
depositResults.append(telegramMessageService CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.buildDepositResultsMessage(member.getName(), record.getScoreAmount())); .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
} });
}); List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
}).exceptionally(ex -> { Set<String> changedAgentNames = changedTeamAccounts.stream()
log.error("Error fetching details for member {}: {}", name, ex.getMessage()); .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
return null; .map(TeamAccountWithChange::getAgentName)
})) .collect(Collectors.toSet());
.toList(); return paginationCompletableFuture.thenApplyAsync(Pagination::getList, asyncTaskExecutor)
.thenComposeAsync(payRecords ->
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures processPayRecords(payRecords, changedTeamAccounts, changedAgentNames, minister, account, nowDateTime, asyncTaskExecutor), asyncTaskExecutor)
.toArray(new CompletableFuture[0])); .exceptionally(ex -> {
allFetches.thenRun(() -> {
if (!depositResults.isEmpty()) {
String notification = telegramMessageService
.buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(), depositResults
.toString(), targetTeamAccount.getFirstDepositNum());
UserDO currUser = accountService.getUserByAccountUsername(agentName);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? minister.getBotToken() : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}
}).exceptionally(ex -> {
log.error("Error sending notification for account {}: {}", account.getId(), ex
.getMessage());
return null;
});
});
}).exceptionally(ex -> {
log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage()); log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
return null; return null;
}).join(); });
}
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
List<TeamAccountWithChange> changedTeamAccounts,
Set<String> changedAgentNames,
UserDO minister,
AccountResp account,
LocalDateTime nowDateTime,
Executor asyncTaskExecutor) {
Map<String, List<String>> agentNameWithNames = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames
.contains(record.getAgentName()))
.collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
.mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors
.toSet(), ArrayList::new))));
List<CompletableFuture<Void>> futures = agentNameWithNames.entrySet().stream()
.map(entry -> processAgentRecords(entry.getKey(), entry.getValue(), changedTeamAccounts, payRecords, minister, account, asyncTaskExecutor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private CompletableFuture<Void> processAgentRecords(String agentName,
List<String> names,
List<TeamAccountWithChange> changedTeamAccounts,
List<PayRecord> payRecords,
UserDO minister,
AccountResp account,
Executor asyncTaskExecutor) {
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
.filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
.findFirst()
.orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
List<CompletableFuture<Void>> fetchFutures = names.stream()
.map(name -> fetchMemberDetails(account, name, LocalDate.now(), asyncTaskExecutor).thenAcceptAsync(member ->
payRecords.stream()
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
.findFirst()
.ifPresent(record -> {
if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
depositResults.append(telegramMessageService
.buildDepositResultsMessage(member.getName(), record.getScoreAmount()));
}
}), asyncTaskExecutor).exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", name, ex.getMessage());
return null;
}))
.toList();
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0]));
return allFetches.thenRunAsync(() -> {
if (!depositResults.isEmpty()) {
String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(),
depositResults.toString(), targetTeamAccount.getFirstDepositNum());
UserDO currUser = accountService.getUserByAccountUsername(agentName);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? minister.getBotToken() : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
} }
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error sending notification for account {}: {}", account.getUsername(), ex.getMessage());
return null;
}); });
} }
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) { private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate, Executor asyncTaskExecutor) {
TeamMemberListReq memberListReq = TeamMemberListReq.builder() TeamMemberListReq memberListReq = TeamMemberListReq.builder()
.name(name) .name(name)
.startDate(nowDate) .startDate(nowDate)
@ -156,10 +169,9 @@ public class DepositService {
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
}); });
return memberFuture.thenApply(MemberPagination::getList) return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor)
.thenApply(list -> list.stream().findFirst()) .thenApplyAsync(list -> list.stream().findFirst().orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")), asyncTaskExecutor)
.thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor);
.getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")));
} }
private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
@ -196,4 +208,4 @@ public class DepositService {
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
} }

View File

@ -36,8 +36,12 @@ import java.time.LocalDate;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* 新注册处理相关逻辑
*/
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -51,7 +55,8 @@ public class RegistrationService {
Team currentTeamInfo, Team currentTeamInfo,
Team prevTeamInfo, Team prevTeamInfo,
LocalDate nowDate, LocalDate nowDate,
Map<String, TeamAccount> teamAccountMap) { Map<String, TeamAccount> teamAccountMap,
Executor asyncTaskExecutor) {
if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) { if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount(); int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount();
TeamMemberReq memberListReq = TeamMemberReq.builder() TeamMemberReq memberListReq = TeamMemberReq.builder()
@ -65,8 +70,8 @@ public class RegistrationService {
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
}); });
return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> { return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor).thenAcceptAsync(members -> {
log.info("Successfully get {} new registered members", members.size()); log.info("Successfully get [{}] new registered members", members.size());
if (!members.isEmpty()) { if (!members.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = members.stream() Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)); .collect(Collectors.groupingBy(TeamMember::getTopAgentName));
@ -82,7 +87,7 @@ public class RegistrationService {
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}); });
} }
}); }, asyncTaskExecutor);
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }

View File

@ -19,6 +19,7 @@ package com.zayac.admin.service;
import cn.hutool.core.convert.Convert; import cn.hutool.core.convert.Convert;
import cn.hutool.core.text.CharPool; import cn.hutool.core.text.CharPool;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.zayac.admin.resp.Statics;
import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.resp.team.TeamMember;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -97,4 +98,26 @@ public class TelegramMessageService {
public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) { public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) {
return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount); return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount);
} }
public String buildFailedPayMessage(String accountName,
List<TeamMember> accountMembers) {
String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining("\n"));
return String.format("%s\n%s\n", accountName, memberNames);
}
public String buildDailyReportMessage(List<Statics> statics) {
StringBuilder message = new StringBuilder();
statics.forEach(stat -> {
String formattedStat = String.format(
"%s\n注册: %s\n新增: %d\n日活: %d\n\n",
stat.getAgentName(),
stat.getIsNew(),
stat.getFirstCount(),
stat.getCountBets()
);
message.append(formattedStat);
});
return message.toString();
}
} }

View File

@ -54,19 +54,19 @@ public class WebClientService {
try { try {
semaphore.acquire(); // 尝试获取许可 semaphore.acquire(); // 尝试获取许可
return this.webClient.post().uri(url).headers(httpHeaders -> { return this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() { Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true); }, true);
headerMap.forEach(httpHeaders::add); headerMap.forEach(httpHeaders::add);
}) })
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve() .retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
.bodyToMono(String.class) .bodyToMono(String.class)
.map(body -> new BusinessException("Error response: " + body))) .map(body -> new BusinessException("Error response: " + body)))
.bodyToMono(typeRef) .bodyToMono(typeRef)
.flatMap(this::respHandler) .flatMap(this::respHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
.block(); // 遇到网络问题,每三秒重试一次 .block(); // 遇到网络问题,每三秒重试一次
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new BusinessException("Failed to acquire semaphore", e); throw new BusinessException("Failed to acquire semaphore", e);
@ -82,24 +82,24 @@ public class WebClientService {
System.out.println(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getId()); System.out.println(Thread.currentThread().getId());
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
semaphore.acquire(); // 尝试获取许可 semaphore.acquire(); // 尝试获取许可
return true; return true;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> { .flatMap(acquired -> this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() { Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true); }, true);
headerMap.forEach(httpHeaders::add); headerMap.forEach(httpHeaders::add);
}) })
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve() .retrieve()
.onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response .onStatus(status -> status.is4xxClientError() || status.is5xxServerError(), response -> response
.bodyToMono(String.class) .bodyToMono(String.class)
.map(body -> new BusinessException("Error response: " + body))) .map(body -> new BusinessException("Error response: " + body)))
.bodyToMono(typeRef) .bodyToMono(typeRef)
.flatMap(this::monoRespHandler) .flatMap(this::monoRespHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3))) .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)))
.doFinally(signal -> semaphore.release())); .doFinally(signal -> semaphore.release()));
} }
private <T> Mono<T> monoRespHandler(ApiResponse<T> apiResponse) { private <T> Mono<T> monoRespHandler(ApiResponse<T> apiResponse) {