调整线程池

优化部分代码逻辑,现在查询首存不用那么麻烦了
调整定时任务
This commit is contained in:
zayac 2024-06-17 06:37:31 +08:00
parent 584350f843
commit 0cd73df06a
9 changed files with 365 additions and 134 deletions

View File

@ -33,9 +33,9 @@ public class ThreadPoolConfig {
@Bean(name = "asyncTaskExecutor") @Bean(name = "asyncTaskExecutor")
public Executor taskExecutor() { public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); executor.setCorePoolSize(20);
executor.setMaxPoolSize(20); executor.setMaxPoolSize(50);
executor.setQueueCapacity(500); executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("asyncTaskExecutor-"); executor.setThreadNamePrefix("asyncTaskExecutor-");
executor.initialize(); executor.initialize();
return executor; return executor;
@ -49,7 +49,7 @@ public class ThreadPoolConfig {
@Bean @Bean
public ThreadPoolTaskScheduler taskScheduler() { public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10); scheduler.setPoolSize(20);
scheduler.setThreadNamePrefix("ScheduledTask-"); scheduler.setThreadNamePrefix("ScheduledTask-");
scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(60); scheduler.setAwaitTerminationSeconds(60);

View File

@ -74,4 +74,9 @@ public class ApiPathConstants {
* 团队财务报表 * 团队财务报表
*/ */
public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team"; public static final String TEAM_FINANCE_EXCEL = "/agent/api/v1/finance/excel/team";
/**
* 活跃会员
*/
public static final String ACTIVE_LIST = "/agent/api/v1/member/activeList";
} }

View File

@ -0,0 +1,57 @@
package com.zayac.admin.req;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDate;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ActiveListReq {
/**
* 活跃类型
*/
private Integer activeType;
/**
* 统计月份
*/
@JsonFormat(pattern = "yyyy-MM")
private LocalDate date;
/**
* 是否重置
*/
private Boolean isRest = false;
/**
* 会员名称
*/
private String name;
/**
* 页码
*/
@Builder.Default
private int pageNum = 1;
/**
* 每页显示数据
*/
@Builder.Default
private int pageSize = 10;
/**
* 注册结束日期
*/
private LocalDate registerEndDate;
/**
* 注册开始日期
*/
private LocalDate registerStartDate;
/**
* 上级代理线
*/
private String topAgentName;
}

View File

@ -0,0 +1,59 @@
package com.zayac.admin.resp;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* "memberId": 509419585,
* "name": "zaq198014",
* "topId": 500007970,
* "topName": "ky3tg107032",
* "firstPayTime": "2024-06-15 18:20:49",
* "registerTime": "2023-08-18 19:18:30",
* "deposit": 1522,
* "bets": 1394,
* "activeStr": "首存会员"
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ActiveListResp {
/**
* 会员id
*/
private Long memberId;
/**
* 会员名称
*/
private String name;
/**
* 会员上级id
*/
private Long topId;
/**
* 首存时间
*/
private LocalDateTime firstPayTime;
/**
* 注册时间
*/
private LocalDateTime registerTime;
/**
* 存款
*/
private BigDecimal deposit;
/**
* 投注
*/
private BigDecimal bets;
/**
* 活跃类型
*/
private String activeStr;
}

View File

@ -118,12 +118,10 @@ public class CheckRegAndDep {
log.info("Current Team Info: {}", currentTeamInfo); log.info("Current Team Info: {}", currentTeamInfo);
CompletableFuture<Void> registrationProcess = registrationService CompletableFuture<Void> registrationProcess = registrationService
.processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor) .processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor);
.thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor);
CompletableFuture<Void> depositProcess = depositService CompletableFuture<Void> depositProcess = depositService
.processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, asyncTaskExecutor) .processDeposits(minister, accountUsernameToUserMap, account, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor);
.thenRunAsync(() -> log.info("Deposit process completed"), asyncTaskExecutor);
return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> { return CompletableFuture.allOf(registrationProcess, depositProcess).thenRunAsync(() -> {
teamService.updateTeamInfo(account, currentTeamInfo); teamService.updateTeamInfo(account, currentTeamInfo);

View File

@ -59,7 +59,9 @@ import java.time.LocalTime;
import java.time.YearMonth; import java.time.YearMonth;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -86,9 +88,10 @@ public class DailyReport {
private static final String SEO_ROLE_CODE = "seo"; private static final String SEO_ROLE_CODE = "seo";
private static final String PLATFORM_HTH = "华体会"; private static final String PLATFORM_HTH = "华体会";
@Scheduled(cron = "0 40 11,14,17,21 * * ?") @Scheduled(cron = "0 40 11,14,17,21,23,1,3,5 * * ?")
public void teamAccountDailyReport() { public void teamAccountDailyReport() {
LocalDateTime nowDateTime = LocalDateTime.now(); LocalDateTime nowDateTime = LocalDateTime.now();
log.info("dailySummarize started at {}", nowDateTime);
LocalDate nowDate = LocalDate.now(); LocalDate nowDate = LocalDate.now();
//查询部门下的所有用户 //查询部门下的所有用户
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE); List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
@ -111,18 +114,24 @@ public class DailyReport {
} }
@Scheduled(cron = "0 40 23 * * ?") // @Scheduled(cron = "0 40 23 * * ?")
public void ScheduledSendTeamDailyReport1() { // public void ScheduledSendTeamDailyReport1() {
sendTeamDailyReport(); // log.info("ScheduledSendTeamDailyReport1 started at {}", LocalDateTime.now());
} // sendTeamDailyReport();
// log.info("ScheduledSendTeamDailyReport1 finished at {}", LocalDateTime.now());
// }
@Scheduled(cron = "0 0 1-23 * * ?") @Scheduled(cron = "0 0/30 1-23 * * ?")
public void ScheduledSendTeamDailyReport2() { public void ScheduledSendTeamDailyReport2() {
log.info("ScheduledSendTeamDailyReport2 started at {}", LocalDateTime.now());
sendTeamDailyReport(); sendTeamDailyReport();
log.info("ScheduledSendTeamDailyReport2 finished at {}", LocalDateTime.now());
} }
@Scheduled(cron = "0 15 0 * * ?") @Scheduled(cron = "0 15 0 * * ?")
public void dailySummarize() { public void dailySummarize() {
log.info("dailySummarize started at {}", LocalDateTime.now());
LocalDate yesterday = LocalDate.now().minusDays(1); LocalDate yesterday = LocalDate.now().minusDays(1);
//查询部门下的所有用户 //查询部门下的所有用户
List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE); List<DeptUsersResp> deptWithUsersAndAccounts = deptService.getDeptWithUsersAndAccounts(MINISTER_ROLE_CODE);
@ -166,9 +175,10 @@ public class DailyReport {
.of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers); .of(yesterday, LocalTime.MAX), ministerUser, assistants, deptUsers);
//保存数据 //保存数据
saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName); saveData(ministerUser, deptUsers, yesterday, accountNameWithTopAgentName);
getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday); //getPayFailedMember(ministerUser, accountUsernameToUserMap, yesterday);
sendFinance(yesterday, accountUsernameToUserMap, ministerUser); sendFinance(yesterday, accountUsernameToUserMap, ministerUser);
}); });
log.info("dailySummarize finished at {}", LocalDateTime.now());
} }
private void sendTeamDailyReport() { private void sendTeamDailyReport() {
@ -410,11 +420,13 @@ public class DailyReport {
LocalDateTime startDateTime, LocalDateTime startDateTime,
LocalDateTime endDateTime, LocalDateTime endDateTime,
List<UserWithRolesAndAccountsResp> assistants) { List<UserWithRolesAndAccountsResp> assistants) {
log.info("Starting to generate and send team report for minister user: {}", ministerUser.getUsername());
return CompletableFuture.runAsync(() -> { return CompletableFuture.runAsync(() -> {
List<String[]> rows = new ArrayList<>(); ConcurrentLinkedQueue<String[]> rows = new ConcurrentLinkedQueue<>();
List<AccountResp> accounts = ministerUser.getAccounts(); List<AccountResp> accounts = ministerUser.getAccounts();
int[] totals = {0, 0}; AtomicIntegerArray totals = new AtomicIntegerArray(2);
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build(); TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
List<CompletableFuture<Void>> futures = accounts.stream() List<CompletableFuture<Void>> futures = accounts.stream()
@ -425,36 +437,46 @@ public class DailyReport {
.stream() .stream()
.mapToInt(TeamAccount::getFirstDepositNum) .mapToInt(TeamAccount::getFirstDepositNum)
.sum(); .sum();
synchronized (totals) {
totals[0] += totalNewMember; totals.addAndGet(0, totalNewMember);
totals[1] += totalNewFirstDeposit; totals.addAndGet(1, totalNewFirstDeposit);
}
String percent = getPercent(totalNewFirstDeposit, totalNewMember); String percent = getPercent(totalNewFirstDeposit, totalNewMember);
synchronized (rows) { rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String
rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String .valueOf(totalNewFirstDeposit), percent});
.valueOf(totalNewFirstDeposit), percent}); }, asyncTaskExecutor)
} .exceptionally(ex -> {
}, asyncTaskExecutor)) log.error("Error fetching team info for account: {}", accountResp.getPlatformName(), ex);
return null;
}))
.toList(); .toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join(); allFutures.join();
// rows 列表进行排序
rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]), log.info("Completed all async tasks for minister user: {}", ministerUser.getUsername());
getPercent(totals[1], totals[0])});
String message = TableFormatter.formatTableAsHtml(rows); // rows 列表进行排序
List<String[]> sortedRows = new ArrayList<>(rows);
sortedRows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
sortedRows.add(new String[] {"总计", String.valueOf(totals.get(0)), String.valueOf(totals.get(1)),
getPercent(totals.get(1), totals.get(0))});
String message = TableFormatter.formatTableAsHtml(sortedRows);
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message); telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message);
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message); telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message);
} }
//发送消息给助理 // 发送消息给助理
if (!CollUtil.isEmpty(assistants)) { if (!CollUtil.isEmpty(assistants)) {
assistants.forEach(assistant -> { assistants.forEach(assistant -> {
if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) { if (assistant.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message); telegramMessageService.sendMessage(assistant.getBotToken(), assistant.getReportIds(), message);
log.info("Sent report to assistant: {}", assistant.getUsername());
} }
}); });
} }

View File

@ -19,6 +19,7 @@ package com.zayac.admin.service;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.ActiveListReq;
import com.zayac.admin.req.MemberDetailsReq; import com.zayac.admin.req.MemberDetailsReq;
import com.zayac.admin.req.PayRecordsListReq; import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamMemberListReq; import com.zayac.admin.req.team.TeamMemberListReq;
@ -48,6 +49,7 @@ import java.util.stream.Collectors;
import static com.zayac.admin.common.constant.CacheConstants.SUCCESSFULLY_PAYED_ACCOUNTNAME; import static com.zayac.admin.common.constant.CacheConstants.SUCCESSFULLY_PAYED_ACCOUNTNAME;
import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount; import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount;
import static com.zayac.admin.utils.CommonUtils.getLastNElements;
@Slf4j @Slf4j
@Service @Service
@ -59,19 +61,19 @@ public class DepositService {
private static final String BOT_TOKEN = "6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto"; private static final String BOT_TOKEN = "6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto";
private static final Long TELEGRAM_CHAT_ID = 6054562838L; private static final Long TELEGRAM_CHAT_ID = 6054562838L;
public CompletableFuture<Void> processDeposits(UserWithRolesAndAccountsResp ministerUser, public CompletableFuture<Void> processDeposits(UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
AccountResp account, AccountResp account,
Team currentTeam, Team currentTeam,
Team previousTeam, Team previousTeam,
LocalDate nowDate, LocalDate nowDate,
LocalDateTime nowDateTime,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
if (previousTeam == null || currentTeam.getFirstDepositNum() <= previousTeam.getFirstDepositNum()) { if (previousTeam == null || currentTeam.getFirstDepositNum() <= previousTeam.getFirstDepositNum()) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
return processDepositRecords(ministerUser, accountUsernameToUserMap, account, currentTeam, previousTeam, nowDate, nowDateTime, asyncTaskExecutor); return processDepositRecords(ministerUser, accountUsernameToUserMap, account, currentTeam, previousTeam, nowDate, asyncTaskExecutor);
} }
private CompletableFuture<Void> processDepositRecords(UserWithRolesAndAccountsResp ministerUser, private CompletableFuture<Void> processDepositRecords(UserWithRolesAndAccountsResp ministerUser,
@ -80,15 +82,14 @@ public class DepositService {
Team currentTeam, Team currentTeam,
Team previousTeam, Team previousTeam,
LocalDate nowDate, LocalDate nowDate,
LocalDateTime nowDateTime,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
List<TeamAccountWithChange> hasNewDepositAccounts = findChangedTeamAccount(previousTeam, currentTeam).stream() List<TeamAccountWithChange> hasNewDepositAccounts = findChangedTeamAccount(previousTeam, currentTeam).stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.toList(); .toList();
List<CompletableFuture<Void>> allTasks = hasNewDepositAccounts.stream() List<CompletableFuture<Void>> allTasks = hasNewDepositAccounts.stream()
.map(accountWithChange -> processAccountChanges(accountWithChange, ministerUser, accountUsernameToUserMap, account, nowDate, nowDateTime, asyncTaskExecutor)) .map(accountWithChange -> processAccountChanges(accountWithChange, ministerUser, accountUsernameToUserMap, account, nowDate, asyncTaskExecutor))
.toList(); .toList();
return CompletableFuture.allOf(allTasks.toArray(new CompletableFuture[0])); return CompletableFuture.allOf(allTasks.toArray(new CompletableFuture[0]));
} }
@ -98,34 +99,101 @@ public class DepositService {
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
AccountResp account, AccountResp account,
LocalDate nowDate, LocalDate nowDate,
LocalDateTime nowDateTime,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
// 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题 ActiveListReq req = ActiveListReq.builder()
LocalDate startDate = nowDateTime.minusHours(1).toLocalDate(); .pageNum(1)
PayRecordsListReq req = PayRecordsListReq.builder() .activeType(3)
.startDate(startDate) .topAgentName(accountWithChange.getAgentName())
.endDate(nowDate) .date(nowDate)
.pageSize(100) .isRest(false)
.payState(2) .build();
.agentName(accountWithChange.getAgentName())
.build();
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
return completableFutureWebClientService return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() { .fetchDataForAccount(account, ApiPathConstants.ACTIVE_LIST, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<ActiveListResp>>>>() {
}) })
.thenApply(Pagination::getList) .thenComposeAsync(page -> fetchAndProcessActiveList(account, req, page, accountWithChange, ministerUser, accountUsernameToUserMap, asyncTaskExecutor), asyncTaskExecutor)
.thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor) .exceptionally(ex -> {
.thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults), asyncTaskExecutor) // 处理异常
.exceptionally(ex -> { log.error("Error processing account changes for account: {}", account, ex);
log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex return null;
.getMessage()); });
return null;
});
} }
private CompletableFuture<Void> fetchAndProcessActiveList(AccountResp account, ActiveListReq req, Pagination<List<ActiveListResp>> page,
TeamAccountWithChange accountWithChange, UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Executor asyncTaskExecutor) {
req.setPageNum(page.getPageNum());
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.ACTIVE_LIST, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<ActiveListResp>>>>() {
})
.thenApply(Pagination::getList)
.thenAcceptAsync(activeListResps -> {
List<ActiveListResp> sortedList = activeListResps.stream()
.sorted(Comparator.comparing(ActiveListResp::getFirstPayTime))
.collect(Collectors.toList());
List<ActiveListResp> activeListRespList = getLastNElements(sortedList, accountWithChange.getNewDepositNum());
String depositResults = buildDepositResults(activeListRespList);
String notification = buildNotificationMessage(accountWithChange, depositResults);
sendNotifications(accountWithChange, ministerUser, accountUsernameToUserMap, notification);
}, asyncTaskExecutor);
}
private String buildDepositResults(List<ActiveListResp> activeListRespList) {
StringBuilder depositResults = new StringBuilder();
activeListRespList.forEach(activeListResp ->
depositResults.append(telegramMessageService.buildDepositResultsMessage(activeListResp.getName(), activeListResp.getDeposit()))
);
return depositResults.toString();
}
private String buildNotificationMessage(TeamAccountWithChange accountWithChange, String depositResults) {
return telegramMessageService.buildDepositMessage(
accountWithChange.getAgentName(),
accountWithChange.getNewDepositNum(),
depositResults,
accountWithChange.getFirstDepositNum()
);
}
private void sendNotifications(TeamAccountWithChange accountWithChange, UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, String notification) {
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken()
: currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
}
// // 凌晨的时候查询存款记录时往前减一天 防止凌晨的时候出现查询不到存款记录的问题
// LocalDate startDate = nowDateTime.minusHours(1).toLocalDate();
// PayRecordsListReq req = PayRecordsListReq.builder()
// .startDate(startDate)
// .endDate(nowDate)
// .pageSize(100)
// .payState(2)
// .agentName(accountWithChange.getAgentName())
// .build();
//
// StringBuilder depositResults = new StringBuilder();
// AtomicInteger depositCounter = new AtomicInteger(0);
//
// return completableFutureWebClientService
// .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<ApiResponse<Pagination<List<PayRecord>>>>() {
// })
// .thenApply(Pagination::getList)
// .thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor)
// .thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults), asyncTaskExecutor)
// .exceptionally(ex -> {
// log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex
// .getMessage());
// return null;
// });
// }
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords, private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
TeamAccountWithChange accountWithChange, TeamAccountWithChange accountWithChange,
AccountResp account, AccountResp account,
@ -137,31 +205,34 @@ public class DepositService {
//根据用户名去重,保留时间最早的支付记录 //根据用户名去重,保留时间最早的支付记录
List<PayRecord> sortedPayRecords = payRecords.stream() List<PayRecord> sortedPayRecords = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
.collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, ( .collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, (
existingRecord, existingRecord,
newRecord) -> existingRecord newRecord) -> existingRecord
.getCreatedAt() .getCreatedAt()
.isBefore(newRecord .isBefore(newRecord
.getCreatedAt()) .getCreatedAt())
? existingRecord ? existingRecord
: newRecord, LinkedHashMap::new), map -> map : newRecord, LinkedHashMap::new), map -> map
.values() .values()
.stream() .stream()
.sorted(Comparator .sorted(Comparator
.comparing(PayRecord::getCreatedAt) .comparing(PayRecord::getCreatedAt)
.reversed()) .reversed())
.collect(Collectors .filter(payRecord -> !RedisUtils
.toList()))); .hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord
.getBillNo()))
.collect(Collectors
.toList())));
List<CompletableFuture<Void>> fetchMemberFutures = sortedPayRecords.stream() List<CompletableFuture<Void>> fetchMemberFutures = sortedPayRecords.stream()
.map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor) .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor)
.thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor) .thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", payRecord.getName(), ex.getMessage()); log.error("Error fetching details for member {}: {}", payRecord.getName(), ex.getMessage());
return null; return null;
})) }))
.toList(); .toList();
return CompletableFuture.allOf(fetchMemberFutures.toArray(new CompletableFuture[0])); return CompletableFuture.allOf(fetchMemberFutures.toArray(new CompletableFuture[0]));
} }
@ -176,13 +247,13 @@ public class DepositService {
if (!RedisUtils.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo())) { if (!RedisUtils.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo())) {
//如果订单记录有存款成功但是会员的首存时间还为空,数据未同步,也是首存 //如果订单记录有存款成功但是会员的首存时间还为空,数据未同步,也是首存
if ((member.getFirstPayAt() == null || payRecord.getCreatedAt() if ((member.getFirstPayAt() == null || payRecord.getCreatedAt()
.equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange .equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange
.getNewDepositNum()) { .getNewDepositNum()) {
//把存款成功的用户保存起来,设置一个小时的缓存时间 防止重复用户 //把存款成功的用户保存起来,设置一个小时的缓存时间 防止重复用户
RedisUtils.set(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo(), member.getName(), Duration RedisUtils.set(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo(), member.getName(), Duration
.ofHours(1)); .ofHours(1));
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord
.getScoreAmount())); .getScoreAmount()));
} }
} }
} }
@ -193,49 +264,50 @@ public class DepositService {
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
StringBuilder depositResults) { StringBuilder depositResults) {
String notification = telegramMessageService.buildDepositMessage(accountWithChange String notification = telegramMessageService.buildDepositMessage(accountWithChange
.getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange .getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange
.getFirstDepositNum()); .getFirstDepositNum());
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName()); var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken() ? ministerUser.getBotToken()
: currUser.getBotToken(); : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
} }
telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification); telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
} }
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, private CompletableFuture<Member> fetchMemberDetails(AccountResp account,
String name, String name,
LocalDate nowDate, LocalDate nowDate,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
TeamMemberListReq memberListReq = TeamMemberListReq.builder() TeamMemberListReq memberListReq = TeamMemberListReq.builder()
.name(name) .name(name)
.startDate(nowDate) .startDate(nowDate)
.endDate(nowDate) .endDate(nowDate)
.status(1) .status(1)
.build(); .build();
CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
}); });
return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor)
.thenApplyAsync(list -> list.stream() .thenApplyAsync(list -> list.stream()
.findFirst() .findFirst()
.orElseThrow(() -> new BusinessException("没有找到匹配的成员信息")), asyncTaskExecutor) .orElseThrow(() -> new BusinessException("没有找到匹配的成员信息")), asyncTaskExecutor)
.thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor); .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor);
} }
private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
MemberDetailsReq detailsReq = MemberDetailsReq.builder() MemberDetailsReq detailsReq = MemberDetailsReq.builder()
.id(memberId) .id(memberId)
.startDate(nowDate) .startDate(nowDate)
.endDate(nowDate) .endDate(nowDate)
.build(); .build();
return completableFutureWebClientService return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
}); });
} }
} }

View File

@ -61,7 +61,7 @@ public class TelegramMessageService {
public void sendMessage(String botToken, String targetIds, String message) { public void sendMessage(String botToken, String targetIds, String message) {
convertStringToList(targetIds).parallelStream() convertStringToList(targetIds).parallelStream()
.forEach(targetId -> this.sendMessage(botToken, targetId, message)); .forEach(targetId -> this.sendMessage(botToken, targetId, message));
} }
public static List<Long> convertStringToList(String str) { public static List<Long> convertStringToList(String str) {
@ -73,7 +73,7 @@ public class TelegramMessageService {
private static String escapeMarkdown(String text) { private static String escapeMarkdown(String text) {
List<Character> escapeChars = Arrays List<Character> escapeChars = Arrays
.asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'); .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!');
for (Character charItem : escapeChars) { for (Character charItem : escapeChars) {
text = text.replace(charItem.toString(), "\\" + charItem); text = text.replace(charItem.toString(), "\\" + charItem);
} }
@ -92,17 +92,19 @@ public class TelegramMessageService {
String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", ")); String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", "));
if (currUser != null) { if (currUser != null) {
return String.format("👏 [%s] %s 注册: %d 会员: `%s` 总数:*%d*", currUser.getNickname(), teamAccount return String.format("👏 [%s] %s 注册: %d 会员: `%s` 总数:*%d*", currUser.getNickname(), teamAccount
.getAgentName(), accountMembers.size(), memberNames, teamAccount.getSubMemberNum()); .getAgentName(), accountMembers.size(), memberNames, teamAccount.getSubMemberNum());
} }
return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers
.size(), memberNames, teamAccount.getSubMemberNum()); .size(), memberNames, teamAccount.getSubMemberNum());
} }
public String buildRegistrationMessage(List<TeamMember> accountMembers, TeamAccountWithChange teamAccount) { public String buildRegistrationMessage(List<TeamMember> accountMembers, TeamAccountWithChange teamAccount) {
String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", ")); String memberNames = accountMembers.stream()
return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers .map(member -> "`" + member.getName() + "`")
.size(), memberNames, teamAccount.getSubMemberNum()); .collect(Collectors.joining(", "));
return String.format("👏 %s 注册: %d 会员: %s 总数:*%d*", teamAccount.getAgentName(), accountMembers
.size(), memberNames, teamAccount.getSubMemberNum());
} }
public String buildDepositMessage(String agentName, int newDepositNum, String depositResults, int firstDepositNum) { public String buildDepositMessage(String agentName, int newDepositNum, String depositResults, int firstDepositNum) {
@ -122,7 +124,7 @@ public class TelegramMessageService {
StringBuilder message = new StringBuilder(); StringBuilder message = new StringBuilder();
statics.forEach(stat -> { statics.forEach(stat -> {
String formattedStat = String.format("%s\n注册: %s\n新增: %d\n日活: %d\n\n", stat.getAgentName(), stat String formattedStat = String.format("%s\n注册: %s\n新增: %d\n日活: %d\n\n", stat.getAgentName(), stat
.getIsNew(), stat.getFirstCount(), stat.getCountBets()); .getIsNew(), stat.getFirstCount(), stat.getCountBets());
message.append(formattedStat); message.append(formattedStat);
}); });
return message.toString(); return message.toString();
@ -132,7 +134,7 @@ public class TelegramMessageService {
StringBuilder message = new StringBuilder(); StringBuilder message = new StringBuilder();
userFinancesList.forEach(financeDO -> { userFinancesList.forEach(financeDO -> {
String formattedFinance = String.format("%s: *%s*\n", financeDO.getAgentName(), financeDO.getNetProfit() String formattedFinance = String.format("%s: *%s*\n", financeDO.getAgentName(), financeDO.getNetProfit()
.toPlainString()); .toPlainString());
message.append(formattedFinance); message.append(formattedFinance);
}); });
return message.toString(); return message.toString();

View File

@ -21,6 +21,8 @@ import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamAccountWithChange; import com.zayac.admin.resp.team.TeamAccountWithChange;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -28,24 +30,38 @@ import java.util.stream.Collectors;
public class CommonUtils { public class CommonUtils {
public static List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) { public static List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
Map<Long, TeamAccount> team2AccountMap = currTeam.getList() Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
.stream() .stream()
.collect(Collectors.toMap(TeamAccount::getId, account -> account)); .collect(Collectors.toMap(TeamAccount::getId, account -> account));
return prevTeam.getList() return prevTeam.getList()
.stream() .stream()
.filter(account1 -> team2AccountMap.containsKey(account1.getId())) .filter(account1 -> team2AccountMap.containsKey(account1.getId()))
.map(account1 -> { .map(account1 -> {
TeamAccount account2 = team2AccountMap.get(account1.getId()); TeamAccount account2 = team2AccountMap.get(account1.getId());
TeamAccountWithChange changedAccount = new TeamAccountWithChange(); TeamAccountWithChange changedAccount = new TeamAccountWithChange();
BeanUtil.copyProperties(account2, changedAccount); BeanUtil.copyProperties(account2, changedAccount);
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
} }
if (account1.getSubMemberNum() != account2.getSubMemberNum()) { if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
} }
return changedAccount; return changedAccount;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
}
public static <T> List<T> getLastNElements(List<T> list, int n) {
if (list == null || list.isEmpty() || n <= 0) {
return Collections.emptyList();
}
int size = list.size();
if (n > size) {
return new ArrayList<>(list);
}
return new ArrayList<>(list.subList(size - n, size));
} }
} }