修复存款失败用户不能正常发送消息的问题

This commit is contained in:
zayac 2024-06-08 16:12:29 +08:00
parent 616d55d1a7
commit 8326e316fb
4 changed files with 173 additions and 178 deletions

View File

@ -62,7 +62,7 @@ import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
@Slf4j
@Profile("prod")
@Profile("dev")
public class DailyReport {
private final TeamService teamService;
private final DeptService deptService;
@ -89,12 +89,12 @@ public class DailyReport {
deptWithUsersAndAccounts.forEach(dept -> {
//根据用户角色对部门用户进行分组
Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers()
.stream()
.flatMap(user -> user.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
.flatMap(user -> user.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
//获取账号不为空的用户
@ -114,19 +114,19 @@ public class DailyReport {
deptWithUsersAndAccounts.forEach(dept -> {
//根据用户角色对部门用户进行分组
Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers()
.stream()
.flatMap(user -> user.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
.flatMap(user -> user.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
// 获取所有账号的username与用户的映射
Map<String, UserWithRolesAndAccountsResp> accountUsernameToUserMap = dept.getUsers()
.stream()
.flatMap(user -> user.getAccounts()
.stream()
.map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user)))
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
.flatMap(user -> user.getAccounts()
.stream()
.map(account -> new AbstractMap.SimpleEntry<>(account.getUsername(), user)))
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
var ministerUser = usersByRole.get(MINISTER_ROLE_CODE).get(0);
//获取账号不为空的用户
@ -142,7 +142,6 @@ public class DailyReport {
// 一小时发送一次
@Scheduled(cron = "0 0 * * * ?")
//@Scheduled(fixedDelay = 6000L)
public void generateTeamReportTask() {
LocalDateTime nowDateTime = LocalDateTime.now();
LocalDate nowDate = LocalDate.now();
@ -152,16 +151,16 @@ public class DailyReport {
deptWithUsersAndAccounts.forEach(dept -> {
//根据用户角色对部门用户进行分组
Map<String, List<UserWithRolesAndAccountsResp>> usersByRole = dept.getUsers()
.stream()
.flatMap(user -> user.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
.flatMap(user -> user.getRoles()
.stream()
.map(role -> new AbstractMap.SimpleEntry<>(role.getCode(), user)))
.collect(Collectors.groupingByConcurrent(Map.Entry::getKey, Collectors
.mapping(Map.Entry::getValue, Collectors.toList())));
var userWithRolesAndAccountsResps = usersByRole.get(MINISTER_ROLE_CODE);
userWithRolesAndAccountsResps.forEach(ministerUser -> generateAndSendTeamReport(ministerUser, nowDate
.atStartOfDay(), nowDateTime, null));
.atStartOfDay(), nowDateTime, null));
});
}
@ -175,76 +174,76 @@ public class DailyReport {
LocalDate date) {
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(100)
.build();
.registerStartDate(date)
.registerEndDate(date)
.startDate(date)
.endDate(date)
.registerSort(1)
.status(1)
.pageSize(100)
.build();
List<CompletableFuture<List<TeamMember>>> futureList = new ArrayList<>();
ministerUser.getAccounts().forEach(account -> {
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
CompletableFuture<List<TeamMember>> teamMembersFuture = memberPaginationCompletableFuture
.thenApply(MemberPagination::getList)
.thenApplyAsync(members -> members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit()
.compareTo(BigDecimal.ZERO) == 0)
.collect(Collectors.toList()), asyncTaskExecutor)
.thenComposeAsync(membersWithoutDep -> {
List<CompletableFuture<TeamMember>> memberFutures = membersWithoutDep.stream()
.map(memberWithoutDep -> {
PayRecordListReq req = PayRecordListReq.builder()
.startDate(date)
.endDate(date)
.pageSize(100)
.id(memberWithoutDep.getId())
.build();
CompletableFuture<PayRecordList<List<PayRecord>>> completableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() {
});
return completableFuture.thenApplyAsync(pagination -> {
if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination
.getScoreAmountTotal()
.compareTo(BigDecimal.ZERO) == 0) {
return memberWithoutDep;
} else {
return null;
}
}, asyncTaskExecutor);
})
.toList();
.thenApply(MemberPagination::getList)
.thenApplyAsync(members -> members.stream()
.filter(member -> member.getDeposit() != null && member.getDeposit()
.compareTo(BigDecimal.ZERO) == 0)
.collect(Collectors.toList()), asyncTaskExecutor)
.thenComposeAsync(membersWithoutDep -> {
List<CompletableFuture<TeamMember>> memberFutures = membersWithoutDep.stream()
.map(memberWithoutDep -> {
PayRecordListReq req = PayRecordListReq.builder()
.startDate(date)
.endDate(date)
.pageSize(100)
.id(memberWithoutDep.getId())
.build();
CompletableFuture<PayRecordList<List<PayRecord>>> completableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORD_LIST_URL, req, new ParameterizedTypeReference<>() {
});
return completableFuture.thenApplyAsync(pagination -> {
if (pagination.getOrderAmountTotal().compareTo(BigDecimal.ZERO) > 0 && pagination
.getScoreAmountTotal()
.compareTo(BigDecimal.ZERO) == 0) {
return memberWithoutDep;
} else {
return null;
}
}, asyncTaskExecutor);
})
.toList();
return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> memberFutures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}, asyncTaskExecutor)
.thenApplyAsync(membersWithoutDep -> {
// 发送给每个account关联的user用户
if (!membersWithoutDep.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService
.buildFailedPayMessage(accountName, accountMembers);
UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName);
if (DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken()
: currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
});
}
return membersWithoutDep;
}, asyncTaskExecutor);
return CompletableFuture.allOf(memberFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> memberFutures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}, asyncTaskExecutor)
.thenApplyAsync(membersWithoutDep -> {
// 发送给每个account关联的user用户
if (!membersWithoutDep.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = membersWithoutDep.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService
.buildFailedPayMessage(accountName, accountMembers);
UserWithRolesAndAccountsResp currUser = accountUsernameToUserMap.get(accountName);
if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
String botToken = StrUtil.isEmpty(currUser.getBotToken())
? ministerUser.getBotToken()
: currUser.getBotToken();
telegramMessageService.sendMessage(botToken, currUser.getRegAndDepIds(), notification);
}
});
}
return membersWithoutDep;
}, asyncTaskExecutor);
futureList.add(teamMembersFuture);
});
@ -253,22 +252,22 @@ public class DailyReport {
allFutures.thenRunAsync(() -> {
// 主线下的所有的存款失败用户
List<TeamMember> allTeamMembers = futureList.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.toList();
.map(CompletableFuture::join)
.flatMap(List::stream)
.toList();
Map<String, List<TeamMember>> groupByTopAgentName = new TreeMap<>(allTeamMembers.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
.collect(Collectors.groupingBy(TeamMember::getTopAgentName)));
StringBuilder combinedNotification = new StringBuilder();
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService.buildFailedPayMessage(accountName, accountMembers);
combinedNotification.append(notification).append("\n");
});
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
.toString());
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, combinedNotification
.toString());
if (DisEnableStatusEnum.ENABLE.equals(ministerUser.getNeedNotify())) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser
.getReportIds(), combinedNotification.toString());
.getReportIds(), combinedNotification.toString());
}
}, asyncTaskExecutor).exceptionally(ex -> {
@ -291,7 +290,7 @@ public class DailyReport {
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().monthDate(reportDate).build();
deptUsers.forEach(deptUser -> tasks
.add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate)));
.add(processDeptUser(deptUser, ministerUser, agentDataVisualListReq, reportDate)));
CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
allTasks.join();
@ -318,32 +317,32 @@ public class DailyReport {
TeamInfoReq teamInfoReq = TeamInfoReq.builder().startDate(startDateTime).endDate(endDateTime).build();
List<CompletableFuture<Void>> futures = accounts.stream()
.map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq)
.thenAcceptAsync(team -> {
int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int totalNewFirstDeposit = team.getList()
.stream()
.mapToInt(TeamAccount::getFirstDepositNum)
.sum();
synchronized (totals) {
totals[0] += totalNewMember;
totals[1] += totalNewFirstDeposit;
}
String percent = getPercent(totalNewFirstDeposit, totalNewMember);
synchronized (rows) {
rows.add(new String[] {accountResp.getPlatformName(), String.valueOf(totalNewMember), String
.valueOf(totalNewFirstDeposit), percent});
}
}, asyncTaskExecutor))
.toList();
.map(accountResp -> teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq)
.thenAcceptAsync(team -> {
int totalNewMember = team.getList().stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int totalNewFirstDeposit = team.getList()
.stream()
.mapToInt(TeamAccount::getFirstDepositNum)
.sum();
synchronized (totals) {
totals[0] += totalNewMember;
totals[1] += totalNewFirstDeposit;
}
String percent = getPercent(totalNewFirstDeposit, totalNewMember);
synchronized (rows) {
rows.add(new String[]{accountResp.getPlatformName(), String.valueOf(totalNewMember), String
.valueOf(totalNewFirstDeposit), percent});
}
}, asyncTaskExecutor))
.toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
// rows 列表进行排序
rows.sort(Comparator.comparing((String[] row) -> row[0].length()).thenComparing(row -> row[0]));
rows.add(new String[] {"总计", String.valueOf(totals[0]), String.valueOf(totals[1]),
getPercent(totals[1], totals[0])});
rows.add(new String[]{"总计", String.valueOf(totals[0]), String.valueOf(totals[1]),
getPercent(totals[1], totals[0])});
String message = TableFormatter.formatTableAsHtml(rows);
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getReportIds(), message);
@ -380,21 +379,21 @@ public class DailyReport {
List<AccountResp> currUserAccounts = deptUser.getAccounts();
List<CompletableFuture<Statics>> futures = currUserAccounts.stream()
.map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date")))
.exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage());
return null;
}))
.toList();
.map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date")))
.exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex.getMessage());
return null;
}))
.toList();
CompletableFuture<Void> userStaticsFuture = CompletableFuture.allOf(futures
.toArray(new CompletableFuture[0]));
.toArray(new CompletableFuture[0]));
userStaticsFuture.thenRunAsync(() -> {
List<Statics> agentDataList = futures.stream().map(future -> {
try {
@ -410,8 +409,8 @@ public class DailyReport {
if (StrUtil.isNotBlank(message) && deptUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
String botToken = StrUtil.isEmpty(deptUser.getBotToken())
? ministerUser.getBotToken()
: deptUser.getBotToken();
? ministerUser.getBotToken()
: deptUser.getBotToken();
telegramMessageService.sendMessage(botToken, deptUser.getReportIds(), message);
}
}, asyncTaskExecutor).exceptionally(ex -> {
@ -438,33 +437,33 @@ public class DailyReport {
List<CompletableFuture<Void>> tasks = new ArrayList<>();
TeamFinanceReq teamFinanceReq = TeamFinanceReq.builder()
.pageNum(1)
.pageSize(999)
.commissionDate(reportDate)
.build();
.pageNum(1)
.pageSize(999)
.commissionDate(reportDate)
.build();
// 异步处理 ministerUserAccounts
CompletableFuture<Void> ministerAccountsFuture = CompletableFuture.runAsync(() -> {
List<CompletableFuture<Void>> accountFutures = ministerUser.getAccounts()
.stream()
.map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq)
.thenAcceptAsync(financePagination -> {
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
FinanceDO financeDO = new FinanceDO();
BeanUtil.copyProperties(finance, financeDO);
return financeDO;
}).toList();
financeService.addAll(financeReqList);
FinanceSumReq financeSumReq = new FinanceSumReq();
BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq);
financeSumService.add(financeSumReq);
}, asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error processing minister accounts for account {}", accountResp
.getUsername(), ex);
return null;
}))
.toList();
.stream()
.map(accountResp -> completableFutureFinanceService.getTeamFinance(accountResp, teamFinanceReq)
.thenAcceptAsync(financePagination -> {
List<FinanceDO> financeReqList = financePagination.getList().stream().map(finance -> {
FinanceDO financeDO = new FinanceDO();
BeanUtil.copyProperties(finance, financeDO);
return financeDO;
}).toList();
financeService.addAll(financeReqList);
FinanceSumReq financeSumReq = new FinanceSumReq();
BeanUtil.copyProperties(financePagination.getTotalSumVo(), financeSumReq);
financeSumService.add(financeSumReq);
}, asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error processing minister accounts for account {}", accountResp
.getUsername(), ex);
return null;
}))
.toList();
CompletableFuture.allOf(accountFutures.toArray(new CompletableFuture[0])).join();
}, asyncTaskExecutor).exceptionally(ex -> {
log.error("Error processing minister accounts", ex);
@ -474,36 +473,36 @@ public class DailyReport {
// 异步处理 deptUsers
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder()
.monthDate(reportDate)
.build();
.monthDate(reportDate)
.build();
deptUsers.forEach(deptUser -> {
CompletableFuture<Void> deptUserFuture = CompletableFuture.runAsync(() -> {
List<AccountResp> currUserAccounts = deptUser.getAccounts();
List<CompletableFuture<StatsDO>> futures = currUserAccounts.stream()
.map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> {
Statics statics = agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date"));
StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO);
return statsDO;
}, asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex
.getMessage());
return null;
}))
.toList();
.map(currAccount -> agentDataVisualListService
.getAgentDataVisualList(currAccount, agentDataVisualListReq)
.thenApplyAsync(agentData -> {
Statics statics = agentData.getCurData()
.stream()
.filter(data -> data.getStaticsDate().equals(reportDate))
.findFirst()
.orElseThrow(() -> new BusinessException("No data found for report date"));
StatsDO statsDO = new StatsDO();
BeanUtil.copyProperties(statics, statsDO);
return statsDO;
}, asyncTaskExecutor)
.exceptionally(ex -> {
log.error("Error fetching data for account {}: {}", currAccount.getId(), ex
.getMessage());
return null;
}))
.toList();
List<StatsDO> list = futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
statsService.addAll(list);
}, asyncTaskExecutor).exceptionally(ex -> {

View File

@ -80,7 +80,6 @@ public class LoginServiceImpl implements LoginService {
private final PasswordEncoder passwordEncoder;
private final OptionService optionService;
@Override
public String accountLogin(String username, String password) {
UserDO user = userService.getByUsername(username);
boolean isError = ObjectUtil.isNull(user) || !passwordEncoder.matches(password, user.getPassword());

View File

@ -90,5 +90,4 @@ public class OptionServiceImpl implements OptionService {
RedisUtils.set(CacheConstants.OPTION_KEY_PREFIX + code.getValue(), value);
return mapper.apply(value);
}
}

View File

@ -95,8 +95,6 @@ public class UserCenterController {
String rawNewPassword = ExceptionUtils.exToNull(() -> SecureUtils.decryptByRsaPrivateKey(updateReq
.getNewPassword()));
ValidationUtils.throwIfNull(rawNewPassword, "新密码解密失败");
ValidationUtils.throwIf(!ReUtil
.isMatch(RegexConstants.PASSWORD, rawNewPassword), "密码长度为 6 到 32 位,可以包含字母、数字、下划线,特殊字符,同时包含字母和数字");
userService.updatePassword(rawOldPassword, rawNewPassword, LoginHelper.getUserId());
return R.ok("修改成功,请牢记你的新密码");
}