diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java index df8f0f1b..f79e5043 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/CheckRegAndDep.java @@ -18,7 +18,6 @@ package com.zayac.admin.schedule; import com.zayac.admin.req.team.TeamInfoReq; import com.zayac.admin.resp.team.Team; -import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.service.*; import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.DeptUsersResp; @@ -39,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -118,12 +116,9 @@ public class CheckRegAndDep { return teamFuture.thenComposeAsync(currentTeamInfo -> { log.info("Previous Team Info: {}", prevTeamInfo); log.info("Current Team Info: {}", currentTeamInfo); - Map teamAccountMap = currentTeamInfo.getList() - .stream() - .collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity())); CompletableFuture registrationProcess = registrationService - .processRegistration(minister, account, accountUsernameToUserMap, teamAccountMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor) + .processRegistration(minister, account, accountUsernameToUserMap, currentTeamInfo, prevTeamInfo, nowDate, asyncTaskExecutor) .thenRunAsync(() -> log.info("Registration process completed"), asyncTaskExecutor); CompletableFuture depositProcess = depositService diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java index 73b502c6..325b6210 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -202,7 +202,8 @@ public class DailyReport { .collect(Collectors.groupingBy(finance -> userWithRolesAndAccountsRespMap.get(finance.getAgentName()))); userFinances.forEach((user, userFinancesList) -> { - if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify())) { + if (user != null && DisEnableStatusEnum.ENABLE.equals(user.getNeedNotify()) && !user.getUserId() + .equals(ministerUser.getUserId())) { String message = telegramMessageService.buildFinanceMessage(userFinancesList); String botToken = StrUtil.isEmpty(user.getBotToken()) ? ministerUser.getBotToken() : user.getBotToken(); telegramMessageService.sendMessage(botToken, user.getReportIds(), message); @@ -256,13 +257,12 @@ public class DailyReport { CompletableFuture allPaginationFutures = CompletableFuture.allOf(paginationFutures .toArray(new CompletableFuture[0])); - CompletableFuture> aggregatedMembersFuture = allPaginationFutures.thenApply(v -> { - return paginationFutures.stream().map(CompletableFuture::join).flatMap(memberPagination -> { + CompletableFuture> aggregatedMembersFuture = allPaginationFutures + .thenApply(v -> paginationFutures.stream().map(CompletableFuture::join).flatMap(memberPagination -> { List members = memberPagination.getList(); log.info("members size:{}", members.size()); return members.stream(); - }).collect(Collectors.toList()); - }); + }).collect(Collectors.toList())); CompletableFuture> filteredMembersFuture = aggregatedMembersFuture .thenApplyAsync(members -> members.stream() @@ -366,7 +366,7 @@ public class DailyReport { PayRecordListReq req) { //华体会接口限流严重,先加上 if (PLATFORM_HTH.equals(account.getPlatformName())) { - RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter("hth", 0.5); + RateLimiter rateLimiter = rateLimiterConfig.getRateLimiter("hth", 0.2); log.info("限流器触发中...."); rateLimiter.acquire(); // 通过限流器限流 } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java index a42ee6e6..6f59d3af 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/DepositService.java @@ -16,19 +16,14 @@ package com.zayac.admin.service; -import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.req.MemberDetailsReq; import com.zayac.admin.req.PayRecordsListReq; import com.zayac.admin.req.team.TeamMemberListReq; -import com.zayac.admin.resp.Member; -import com.zayac.admin.resp.MemberPagination; -import com.zayac.admin.resp.Pagination; -import com.zayac.admin.resp.PayRecord; +import com.zayac.admin.resp.*; import com.zayac.admin.resp.team.Team; -import com.zayac.admin.resp.team.TeamAccount; import com.zayac.admin.resp.team.TeamAccountWithChange; import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; @@ -36,10 +31,14 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.stereotype.Service; +import top.continew.starter.cache.redisson.util.RedisUtils; import top.continew.starter.core.exception.BusinessException; +import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -47,6 +46,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static com.zayac.admin.common.constant.CacheConstants.SUCCESSFULLY_PAYED_ACCOUNTNAME; +import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount; + @Slf4j @Service @RequiredArgsConstructor @@ -107,14 +109,14 @@ public class DepositService { .payState(2) .agentName(accountWithChange.getAgentName()) .build(); - CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { - }); StringBuilder depositResults = new StringBuilder(); AtomicInteger depositCounter = new AtomicInteger(0); - return paginationCompletableFuture.thenApply(Pagination::getList) + return completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference>>>() { + }) + .thenApply(Pagination::getList) .thenComposeAsync(payRecords -> processPayRecords(payRecords, accountWithChange, account, nowDate, nowDateTime, depositResults, depositCounter, asyncTaskExecutor), asyncTaskExecutor) .thenRunAsync(() -> sendNotification(accountWithChange, ministerUser, accountUsernameToUserMap, depositResults), asyncTaskExecutor) .exceptionally(ex -> { @@ -132,15 +134,27 @@ public class DepositService { StringBuilder depositResults, AtomicInteger depositCounter, Executor asyncTaskExecutor) { - Map earliestPayRecords = payRecords.stream() + + //根据用户名去重,保留时间最早的支付记录 + List sortedPayRecords = payRecords.stream() .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1))) - .collect(Collectors.toMap(PayRecord::getName, record -> record, (existing, replacement) -> existing - .getCreatedAt() - .isBefore(replacement.getCreatedAt()) ? existing : replacement)); + .collect(Collectors.collectingAndThen(Collectors.toMap(PayRecord::getName, record -> record, ( + existingRecord, + newRecord) -> existingRecord + .getCreatedAt() + .isBefore(newRecord + .getCreatedAt()) + ? existingRecord + : newRecord, LinkedHashMap::new), map -> map + .values() + .stream() + .sorted(Comparator + .comparing(PayRecord::getCreatedAt) + .reversed()) + .collect(Collectors + .toList()))); - List validPayRecords = earliestPayRecords.values().stream().toList(); - - List> fetchMemberFutures = validPayRecords.stream() + List> fetchMemberFutures = sortedPayRecords.stream() .map(payRecord -> fetchMemberDetails(account, payRecord.getName(), nowDate, asyncTaskExecutor) .thenAcceptAsync(member -> processMemberDetails(member, payRecord, accountWithChange, depositResults, depositCounter), asyncTaskExecutor) .exceptionally(ex -> { @@ -157,10 +171,20 @@ public class DepositService { TeamAccountWithChange accountWithChange, StringBuilder depositResults, AtomicInteger depositCounter) { - if (payRecord.getCreatedAt().equals(member.getFirstPayAt()) && depositCounter - .getAndIncrement() < accountWithChange.getNewDepositNum()) { - depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord - .getScoreAmount())); + synchronized (this) { + //如果从缓存中没有key,那就是新存款用户 + if (!RedisUtils.hasKey(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo())) { + //如果订单记录有存款成功但是会员的首存时间还为空,数据未同步,也是首存 + if ((member.getFirstPayAt() == null || payRecord.getCreatedAt() + .equals(member.getFirstPayAt())) && depositCounter.getAndIncrement() < accountWithChange + .getNewDepositNum()) { + //把存款成功的用户保存起来,设置一个小时的缓存时间 防止重复用户 + RedisUtils.set(SUCCESSFULLY_PAYED_ACCOUNTNAME + payRecord.getBillNo(), member.getName(), Duration + .ofHours(1)); + depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), payRecord + .getScoreAmount())); + } + } } } @@ -214,27 +238,4 @@ public class DepositService { .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { }); } - - private List findChangedTeamAccount(Team prevTeam, Team currTeam) { - Map team2AccountMap = currTeam.getList() - .stream() - .collect(Collectors.toMap(TeamAccount::getId, account -> account)); - - return prevTeam.getList() - .stream() - .filter(account1 -> team2AccountMap.containsKey(account1.getId())) - .map(account1 -> { - TeamAccount account2 = team2AccountMap.get(account1.getId()); - TeamAccountWithChange changedAccount = new TeamAccountWithChange(); - BeanUtil.copyProperties(account2, changedAccount); - if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { - changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); - } - if (account1.getSubMemberNum() != account2.getSubMemberNum()) { - changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); - } - return changedAccount; - }) - .collect(Collectors.toList()); - } } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java index e20935e5..b6dff2df 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/RegistrationService.java @@ -21,9 +21,10 @@ import cn.hutool.core.util.StrUtil; import com.zayac.admin.common.enums.DisEnableStatusEnum; import com.zayac.admin.constant.ApiPathConstants; import com.zayac.admin.req.team.TeamMemberReq; +import com.zayac.admin.resp.ApiResponse; import com.zayac.admin.resp.MemberPagination; import com.zayac.admin.resp.team.Team; -import com.zayac.admin.resp.team.TeamAccount; +import com.zayac.admin.resp.team.TeamAccountWithChange; import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.system.model.resp.AccountResp; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; @@ -37,7 +38,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.stream.Collectors; + +import static com.zayac.admin.utils.CommonUtils.findChangedTeamAccount; /** * 新注册处理相关逻辑 @@ -52,35 +54,37 @@ public class RegistrationService { public CompletableFuture processRegistration(UserWithRolesAndAccountsResp minister, AccountResp account, Map accountUsernameToUserMap, - Map teamAccountMap, Team currentTeamInfo, Team prevTeamInfo, LocalDate nowDate, Executor asyncTaskExecutor) { if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) { - int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount(); - TeamMemberReq memberListReq = TeamMemberReq.builder() - .registerStartDate(nowDate) - .registerEndDate(nowDate) - .startDate(nowDate) - .endDate(nowDate) - .registerSort(1) - .pageSize(registerCount) - .build(); - CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); - return memberPaginationCompletableFuture.thenApplyAsync(MemberPagination::getList, asyncTaskExecutor) - .thenAcceptAsync(members -> { - log.info("Successfully get [{}] new registered members", members.size()); - if (CollUtil.isNotEmpty(members)) { - Map> groupByTopAgentName = members.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); - groupByTopAgentName.forEach((accountName, accountMembers) -> { + List hasNewRegAccounts = findChangedTeamAccount(prevTeamInfo, currentTeamInfo) + .stream() + .filter(teamAccountWithChange -> teamAccountWithChange.getNewRegisterNum() > 0) + .toList(); + hasNewRegAccounts.parallelStream().forEach(accountWithChange -> { + TeamMemberReq memberListReq = TeamMemberReq.builder() + .registerStartDate(nowDate) + .registerEndDate(nowDate) + .startDate(nowDate) + .endDate(nowDate) + .registerSort(1) + .topAgentName(accountWithChange.getAgentName()) + .pageSize(accountWithChange.getNewRegisterNum()) + .build(); + completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference>>>() { + }) + .thenApply(MemberPagination::getList) + .thenAcceptAsync(members -> { + log.info("Successfully get [{}] new registered members for {}", members + .size(), accountWithChange.getAgentName()); + if (CollUtil.isNotEmpty(members)) { String notification = telegramMessageService - .buildRegistrationMessage(accountMembers, teamAccountMap.get(accountName)); - var currUser = accountUsernameToUserMap.get(accountName); + .buildRegistrationMessage(members, accountWithChange); + var currUser = accountUsernameToUserMap.get(accountWithChange.getAgentName()); if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { String botToken = StrUtil.isEmpty(currUser.getBotToken()) ? minister.getBotToken() @@ -89,9 +93,9 @@ public class RegistrationService { } telegramMessageService .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); - }); - } - }, asyncTaskExecutor); + } + }, asyncTaskExecutor); + }); } return CompletableFuture.completedFuture(null); } diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java index a4fef1c1..a663de31 100644 --- a/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/service/TelegramMessageService.java @@ -22,6 +22,7 @@ import cn.hutool.core.util.StrUtil; import com.zayac.admin.agent.model.entity.FinanceDO; import com.zayac.admin.resp.Statics; import com.zayac.admin.resp.team.TeamAccount; +import com.zayac.admin.resp.team.TeamAccountWithChange; import com.zayac.admin.resp.team.TeamMember; import com.zayac.admin.system.model.resp.UserWithRolesAndAccountsResp; import lombok.RequiredArgsConstructor; @@ -97,7 +98,7 @@ public class TelegramMessageService { .size(), memberNames, teamAccount.getSubMemberNum()); } - public String buildRegistrationMessage(List accountMembers, TeamAccount teamAccount) { + public String buildRegistrationMessage(List accountMembers, TeamAccountWithChange teamAccount) { String memberNames = accountMembers.stream().map(TeamMember::getName).collect(Collectors.joining(", ")); return String.format("👏 %s 注册: %d 会员: `%s` 总数:*%d*", teamAccount.getAgentName(), accountMembers diff --git a/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java b/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java new file mode 100644 index 00000000..bf4037ae --- /dev/null +++ b/zayac-admin-agent/src/main/java/com/zayac/admin/utils/CommonUtils.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zayac.admin.utils; + +import cn.hutool.core.bean.BeanUtil; +import com.zayac.admin.resp.team.Team; +import com.zayac.admin.resp.team.TeamAccount; +import com.zayac.admin.resp.team.TeamAccountWithChange; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CommonUtils { + public static List findChangedTeamAccount(Team prevTeam, Team currTeam) { + Map team2AccountMap = currTeam.getList() + .stream() + .collect(Collectors.toMap(TeamAccount::getId, account -> account)); + + return prevTeam.getList() + .stream() + .filter(account1 -> team2AccountMap.containsKey(account1.getId())) + .map(account1 -> { + TeamAccount account2 = team2AccountMap.get(account1.getId()); + TeamAccountWithChange changedAccount = new TeamAccountWithChange(); + BeanUtil.copyProperties(account2, changedAccount); + if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { + changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); + } + if (account1.getSubMemberNum() != account2.getSubMemberNum()) { + changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); + } + return changedAccount; + }) + .collect(Collectors.toList()); + } +} diff --git a/zayac-admin-common/src/main/java/com/zayac/admin/common/constant/CacheConstants.java b/zayac-admin-common/src/main/java/com/zayac/admin/common/constant/CacheConstants.java index 628f11c6..05aa6f16 100644 --- a/zayac-admin-common/src/main/java/com/zayac/admin/common/constant/CacheConstants.java +++ b/zayac-admin-common/src/main/java/com/zayac/admin/common/constant/CacheConstants.java @@ -76,6 +76,11 @@ public class CacheConstants { */ public static final String DEPT_USERS_ROLES_ACCOUNTS_KEY_PREFIX = "DEPT_USERS_ROLES_ACCOUNTS" + DELIMITER; + /** + * 存款成功的用户名 + */ + public static final String SUCCESSFULLY_PAYED_ACCOUNTNAME = "SUCCESSFULLY_PAYED_ACCOUNTNAME" + DELIMITER; + private CacheConstants() { } }