重构查询首存用户方法,现在查询的结果会更准确,接口调用也会更少了

This commit is contained in:
zayac 2024-06-08 01:05:40 +08:00
parent ac6e89af79
commit e17a63b96d
3 changed files with 268 additions and 262 deletions

View File

@ -62,7 +62,7 @@ import java.util.stream.Collectors;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
//@Profile("prod") @Profile("prod")
public class DailyReport { public class DailyReport {
private final TeamService teamService; private final TeamService teamService;
private final DeptService deptService; private final DeptService deptService;
@ -80,7 +80,7 @@ public class DailyReport {
private static final String ASSISTANT_ROLE_CODE = "assistant"; private static final String ASSISTANT_ROLE_CODE = "assistant";
private static final String SEO_ROLE_CODE = "seo"; private static final String SEO_ROLE_CODE = "seo";
@Scheduled(cron = "0 51 11,14,17,21 * * ?") @Scheduled(cron = "0 40 11,14,17,21 * * ?")
public void teamAccountDailyReport() { public void teamAccountDailyReport() {
LocalDateTime nowDateTime = LocalDateTime.now(); LocalDateTime nowDateTime = LocalDateTime.now();
LocalDate nowDate = LocalDate.now(); LocalDate nowDate = LocalDate.now();
@ -160,8 +160,8 @@ public class DailyReport {
.mapping(Map.Entry::getValue, Collectors.toList()))); .mapping(Map.Entry::getValue, Collectors.toList())));
var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE); var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE);
userWithRolesAndAccountsResps.forEach(ministerUser -> userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate
generateAndSendTeamReport(ministerUser, nowDate.atStartOfDay(), nowDateTime, null)); .atStartOfDay(), nowDateTime, null));
}); });
} }
@ -286,7 +286,7 @@ public class DailyReport {
List<UserWithRolesAndAccountsResp> deptUsers) { List<UserWithRolesAndAccountsResp> deptUsers) {
List<CompletableFuture<Void>> tasks = new ArrayList<>(); List<CompletableFuture<Void>> tasks = new ArrayList<>();
//tasks.add(generateAndSendTeamReport(ministerUser, startDateTime, endDateTime, assistants)); tasks.add(generateAndSendTeamReport(ministerUser, startDateTime, endDateTime, assistants));
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build(); AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build();

View File

@ -42,8 +42,6 @@ import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -56,6 +54,9 @@ public class DepositService {
private final CompletableFutureWebClientService completableFutureWebClientService; private final CompletableFutureWebClientService completableFutureWebClientService;
private final TelegramMessageService telegramMessageService; private final TelegramMessageService telegramMessageService;
private static final String BOT_TOKEN = "6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto";
private static final Long TELEGRAM_CHAT_ID = 6054562838L;
public CompletableFuture<Void> processDeposits(UserWithRolesAndAccountsResp ministerUser, public CompletableFuture<Void> processDeposits(UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap, Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
AccountResp account, AccountResp account,
@ -79,104 +80,110 @@ public class DepositService {
LocalDate nowDate, LocalDate nowDate,
LocalDateTime nowDateTime, LocalDateTime nowDateTime,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
PayRecordsListReq req = PayRecordsListReq.builder() List<TeamAccountWithChange> hasNewDepositAccounts = findChangedTeamAccount(previousTeam, currentTeam).stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.toList();
List<CompletableFuture<Void>> allTasks = hasNewDepositAccounts.stream()
.map(accountWithChange -> processAccountChanges(accountWithChange, ministerUser, accountUsernameToUserMap, account, nowDate, nowDateTime, asyncTaskExecutor))
.toList();
return CompletableFuture.allOf(allTasks.toArray(new CompletableFuture[0]));
}
private CompletableFuture<Void> processAccountChanges(TeamAccountWithChange accountWithChange,
UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
AccountResp account,
LocalDate nowDate,
LocalDateTime nowDateTime,
Executor asyncTaskExecutor) {
PayRecordsListReq req = createPayRecordsListReq(accountWithChange.getAgentName(), nowDate);
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
return paginationCompletableFuture.thenApply(Pagination::getList)
.thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor)
.thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults, depositCounter), asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error processing account changes for agent {}: {}", accountWithChange.getAgentName(), ex
.getMessage());
return null;
});
}
private PayRecordsListReq createPayRecordsListReq(String agentName, LocalDate nowDate) {
return PayRecordsListReq.builder()
.startDate(nowDate) .startDate(nowDate)
.endDate(nowDate) .endDate(nowDate)
.pageSize(100) .pageSize(100)
.payState(2) .payState(2)
.agentName(agentName)
.build(); .build();
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
Set<String> changedAgentNames = changedTeamAccounts.stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.map(TeamAccountWithChange::getAgentName)
.collect(Collectors.toSet());
return paginationCompletableFuture.thenApplyAsync(Pagination::getList, asyncTaskExecutor)
.thenComposeAsync(payRecords -> processPayRecords(payRecords, changedTeamAccounts, changedAgentNames, ministerUser, accountUsernameToUserMap, account, nowDateTime, asyncTaskExecutor), asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
return null;
});
} }
private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords, private CompletableFuture<Void> processPayRecords(List<PayRecord> payRecords,
List<TeamAccountWithChange> changedTeamAccounts, TeamAccountWithChange accountWithChange,
Set<String> changedAgentNames,
UserWithRolesAndAccountsResp minister,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
AccountResp account, AccountResp account,
LocalDate nowDate,
LocalDateTime nowDateTime, LocalDateTime nowDateTime,
StringBuilder depositResults,
AtomicInteger depositCounter,
Executor asyncTaskExecutor) { Executor asyncTaskExecutor) {
Map<String, List<String>> agentNameWithNames = payRecords.stream() Map<String, PayRecord> earliestPayRecords = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
.contains(record.getAgentName())) .collect(Collectors.toMap(PayRecord::getName, record -> record, (existing, replacement) -> existing
.collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors.mapping(PayRecord::getName, Collectors .getCreatedAt()
.collectingAndThen(Collectors.toSet(), ArrayList::new)))); .isBefore(replacement.getCreatedAt()) ? existing : replacement));
List<CompletableFuture<Void>> futures = agentNameWithNames.entrySet() List<PayRecord> validPayRecords = earliestPayRecords.values().stream().toList();
.stream()
.map(entry -> processAgentRecords(entry.getKey(), entry
.getValue(), changedTeamAccounts, payRecords, minister, accountUsernameToUserMap, account, asyncTaskExecutor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); List<CompletableFuture<Void>> fetchMemberFutures = validPayRecords.stream()
} .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor)
.thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor)
private CompletableFuture<Void> processAgentRecords(String agentName,
List<String> names,
List<TeamAccountWithChange> changedTeamAccounts,
List<PayRecord> payRecords,
UserWithRolesAndAccountsResp minister,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
AccountResp account,
Executor asyncTaskExecutor) {
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
.filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
.findFirst()
.orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
List<CompletableFuture<Void>> fetchFutures = names.stream()
.map(name -> fetchMemberDetails(account, name, LocalDate.now(), asyncTaskExecutor)
.thenAcceptAsync(member -> payRecords.stream()
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
.findFirst()
.ifPresent(record -> {
if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
depositResults.append(telegramMessageService.buildDepositResultsMessage(member
.getName(), record.getScoreAmount()));
}
}), asyncTaskExecutor)
.exceptionally(ex -> { .exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", name, ex.getMessage()); log.error("Error fetching details for member {}: {}", payRecord.getName(), ex.getMessage());
return null; return null;
})) }))
.toList(); .toList();
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0])); return CompletableFuture.allOf(fetchMemberFutures.toArray(new CompletableFuture[0]));
}
return allFetches.thenRunAsync(() -> { private void processMemberDetails(Member member,
if (!depositResults.isEmpty()) { PayRecord payRecord,
String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount TeamAccountWithChange accountWithChange,
.getNewDepositNum(), depositResults.toString(), targetTeamAccount.getFirstDepositNum()); StringBuilder depositResults,
var currUser = accountUsernameToUserMap.get(agentName); AtomicInteger depositCounter) {
if (payRecord.getCreatedAt().equals(member.getFirstPayAt()) && depositCounter
.getAndIncrement() < accountWithChange.getNewDepositNum()) {
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord
.getScoreAmount()));
}
}
private void sendNotification(TeamAccountWithChange accountWithChange,
UserWithRolesAndAccountsResp ministerUser,
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap,
StringBuilder depositResults,
AtomicInteger depositCounter) {
if (depositCounter.get() > 0) {
String notification = telegramMessageService.buildDepositMessage(accountWithChange
.getAgentName(), accountWithChange.getNewDepositNum(), depositResults.toString(), accountWithChange
.getFirstDepositNum());
var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName());
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken()) String botToken = StrUtil.isEmpty(currUser.getBotToken())
? minister.getBotToken() ? ministerUser.getBotToken()
: currUser.getBotToken(); : currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification); telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
} }
telegramMessageService telegramMessageService.sendMessage(BOT_TOKEN, TELEGRAM_CHAT_ID, notification);
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
} }
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error sending notification for account {}: {}", account.getUsername(), ex.getMessage());
return null;
});
} }
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, private CompletableFuture<Member> fetchMemberDetails(AccountResp account,
@ -197,7 +204,7 @@ public class DepositService {
return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) return memberFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor)
.thenApplyAsync(list -> list.stream() .thenApplyAsync(list -> list.stream()
.findFirst() .findFirst()
.orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")), asyncTaskExecutor) .orElseThrow(() -> new BusinessException("没有找到匹配的成员信息")), asyncTaskExecutor)
.thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor); .thenComposeAsync(member -> fetchDetailedMemberInfo(account, member.getId(), nowDate), asyncTaskExecutor);
} }

View File

@ -16,6 +16,7 @@
package com.zayac.admin.service; package com.zayac.admin.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.constant.ApiPathConstants;
@ -26,7 +27,6 @@ import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.resp.team.TeamMember;
import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp;
import com.zayac.admin.system.service.AccountService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
@ -48,7 +48,6 @@ import java.util.stream.Collectors;
public class RegistrationService { public class RegistrationService {
private final CompletableFutureWebClientService completableFutureWebClientService; private final CompletableFutureWebClientService completableFutureWebClientService;
private final TelegramMessageService telegramMessageService; private final TelegramMessageService telegramMessageService;
private final AccountService accountService;
public CompletableFuture<Void> processRegistration(UserWithRolesAndAccountsResp minister, public CompletableFuture<Void> processRegistration(UserWithRolesAndAccountsResp minister,
AccountResp account, AccountResp account,
@ -75,7 +74,7 @@ public class RegistrationService {
return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor)
.thenAcceptAsync(members -> { .thenAcceptAsync(members -> {
log.info("Successfully get [{}] new registered members", members.size()); log.info("Successfully get [{}] new registered members", members.size());
if (!members.isEmpty()) { if (CollUtil.isNotEmpty(members)) {
Map<String, List<TeamMember>> groupByTopAgentName = members.stream() Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)); .collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> { groupByTopAgentName.forEach((accountName, accountMembers) -> {