diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java index e1e73e64..32f3e114 100644 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java @@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.text.NumberFormat; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -84,7 +85,9 @@ public class DailyReport { //获取当天总线的所有注册新增 int currentDayTotalNewMember = list.stream().mapToInt(TeamAccount::getSubMemberNum).sum(); int currentDayTotalNewFirstDeposit = list.stream().mapToInt(TeamAccount::getFirstDepositNum).sum(); - String message = String.format("截至%s 注册:*%d*,新增:*%d*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit); + //当日转化率 + String percent = getPercent(currentDayTotalNewFirstDeposit, currentDayTotalNewMember); + String message = String.format("截至%s 注册:*%d*,新增:*%d* 转化率: *%s*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit, percent); telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getChatId(), message); } } @@ -112,6 +115,15 @@ public class DailyReport { }); }); } + + public static String getPercent(int x, int y) { + double d1 = x * 1.0; + double d2 = y * 1.0; + NumberFormat percentInstance = NumberFormat.getPercentInstance(); + percentInstance.setMinimumFractionDigits(2); + return percentInstance.format(d1 / d2); + } } + diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java index 3c2358f4..466ac502 100644 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java @@ -1,61 +1,31 @@ -/* - * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.zayac.admin.schedule; -import cn.hutool.core.util.StrUtil; import com.zayac.admin.common.enums.DisEnableStatusEnum; -import com.zayac.admin.constant.ApiPathConstants; -import com.zayac.admin.req.MemberDetailsReq; -import com.zayac.admin.req.PayRecordsListReq; import com.zayac.admin.req.team.TeamInfoReq; -import com.zayac.admin.req.team.TeamMemberListReq; -import com.zayac.admin.req.team.TeamMemberReq; -import com.zayac.admin.resp.*; import com.zayac.admin.resp.team.Team; import com.zayac.admin.resp.team.TeamAccount; -import com.zayac.admin.resp.team.TeamAccountWithChange; -import com.zayac.admin.resp.team.TeamMember; -import com.zayac.admin.service.CompletableFutureWebClientService; -import com.zayac.admin.service.TeamService; -import com.zayac.admin.service.TelegramMessageService; +import com.zayac.admin.service.*; import com.zayac.admin.system.model.entity.RoleDO; import com.zayac.admin.system.model.entity.UserDO; import com.zayac.admin.system.model.resp.AccountResp; -import com.zayac.admin.system.service.*; +import com.zayac.admin.system.service.AccountService; +import com.zayac.admin.system.service.RoleService; +import com.zayac.admin.system.service.UserRoleService; +import com.zayac.admin.system.service.UserService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.core.ParameterizedTypeReference; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import top.continew.starter.core.exception.BusinessException; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.temporal.TemporalAdjusters; -import java.util.*; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; -/** - * 定时任务 查询注册 新增用 暂定一分钟执行一次 - */ @Slf4j @Component @RequiredArgsConstructor @@ -63,32 +33,27 @@ public class TelegramTeamMessageSchedule { private final UserService userService; private final AccountService accountService; - private final CompletableFutureWebClientService completableFutureWebClientService; - private final TelegramMessageService telegramMessageService; private final TeamService teamService; private final RoleService roleService; private final UserRoleService userRoleService; + private final RegistrationService registrationService; + private final DepositService depositService; private static final String MINISTER_ROLE_CODE = "minister"; private static final long FIXED_DELAY = 60000L; @Scheduled(fixedDelay = FIXED_DELAY) - public void newCheckRegistrationAndNewDeposit() { - // Get the current date and time + public void CheckRegistrationAndNewDeposit() { LocalDate nowDate = LocalDate.now(); LocalDateTime nowDateTime = LocalDateTime.now(); - //获取部长角色 RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); List users = userRoleService.listUserIdByRoleId(minister.getId()); - //获取所有部长角色对应部门的用户 users.forEach(userId -> { - //传入对应部长用户 processUser(userService.getById(userId), nowDate, nowDateTime).join(); }); } private CompletableFuture processUser(UserDO user, LocalDate nowDate, LocalDateTime nowDateTime) { - //获取当前用户名下的所有团队账号 List accounts = accountService.getAccountsByUserId(user.getId(), DisEnableStatusEnum.ENABLE) .stream() .filter(AccountResp::getIsTeam) @@ -101,9 +66,7 @@ public class TelegramTeamMessageSchedule { return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } - private CompletableFuture processTeamAccount(AccountResp account, - LocalDate nowDate, - LocalDateTime nowDateTime) { + private CompletableFuture processTeamAccount(AccountResp account, LocalDate nowDate, LocalDateTime nowDateTime) { TeamInfoReq teamInfoReq = TeamInfoReq.builder() .startDate(nowDateTime.with(TemporalAdjusters.firstDayOfMonth())) .endDate(nowDateTime) @@ -115,207 +78,15 @@ public class TelegramTeamMessageSchedule { Team currentTeamInfo = teamFuture.join(); log.info("Previous Team Info: {}", prevTeamInfo); log.info("Current Team Info: {}", currentTeamInfo); - //获取代理线 代理对象映射 Map teamAccountMap = currentTeamInfo.getList() .stream() .collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity())); - CompletableFuture registrationProcess = processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap); - CompletableFuture depositProcess = processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime); + CompletableFuture registrationProcess = registrationService.processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap); + CompletableFuture depositProcess = depositService.processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime); return CompletableFuture.allOf(registrationProcess, depositProcess) .thenRun(() -> teamService.updateTeamInfo(account, currentTeamInfo)); }); } - - private CompletableFuture processRegistration(AccountResp account, - Team currentTeamInfo, - Team prevTeamInfo, - LocalDate nowDate, - Map teamAccountMap) { - if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) { - //代理线总共的新增注册数量 - int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount(); - //构建查询参数 - TeamMemberReq memberListReq = TeamMemberReq.builder() - .registerStartDate(nowDate) - .registerEndDate(nowDate) - .startDate(nowDate) - .endDate(nowDate) - .registerSort(1) - .pageSize(registerCount) - .build(); - //查询所有新注册的会员 - CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); - return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> { - log.info("Successfully get {} new registered members", members.size()); - //获取所有的新注册的会员对象 - if (!members.isEmpty()) { - //根据上级代理名分组 - Map> groupByTopAgentName = members.stream() - .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); - groupByTopAgentName.forEach((accountName, accountMembers) -> { - String memberNames = accountMembers.stream() - .map(TeamMember::getName) - .collect(Collectors.joining(", ")); - log.info("Successfully get new registrations {}", memberNames); - String notification = String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers - .size(), memberNames, teamAccountMap.get(accountName).getSubMemberNum()); - UserDO currUser = accountService.getUserByAccountUsername(accountName); - // if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { - // telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), notification); - // } - - telegramMessageService - .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); - } - - ); - } - }); - } - return CompletableFuture.completedFuture(null); - } - - private CompletableFuture processDeposits(AccountResp account, - Team currentTeam, - Team previousTeam, - LocalDate nowDate, - LocalDateTime nowDateTime) { - return CompletableFuture.runAsync(() -> { - if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) { - PayRecordsListReq req = PayRecordsListReq.builder() - .startDate(nowDate) - .endDate(nowDate) - .pageSize(100) - .payState(2) - .build(); - CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { - }); - List changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam); - Set changedAgentNames = changedTeamAccounts.stream() - .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) - .map(TeamAccountWithChange::getAgentName) - .collect(Collectors.toSet()); - paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> { - - Map> agentNameWithNames = payRecords.stream() - .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName())) - .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors - .mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new)))); - - agentNameWithNames.forEach((agentName, names) -> { - StringBuilder depositResults = new StringBuilder(); - AtomicInteger depositCounter = new AtomicInteger(0); - - TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream() - .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName)) - .findFirst() - .orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName))); - - List> fetchFutures = names.stream() - .map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> { - payRecords.stream() - .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) - .findFirst() - .ifPresent(record -> { - if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) { - depositResults.append(String.format("用户: `%s`, 首存金额: *%s*\n", member.getName(), record.getScoreAmount())); - } - }); - }).exceptionally(ex -> { - log.error("Error fetching details for member {}: {}", name, ex.getMessage()); - return null; - })) - .toList(); - - CompletableFuture allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0])); - - allFetches.thenRun(() -> { - if (!depositResults.isEmpty()) { - String notification = String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, targetTeamAccount.getNewDepositNum(), depositResults, targetTeamAccount.getFirstDepositNum()); - - UserDO currUser = accountService.getUserByAccountUsername(agentName); -// if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { -// telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), depResults); -// } - telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); - } - }).exceptionally(ex -> { - log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage()); - return null; - }); - }); - }).exceptionally(ex -> { - log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage()); - return null; - }).join(); - } - }); - } - - - //根据用户名查询对应会员详情 - private CompletableFuture fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) { - TeamMemberListReq memberListReq = TeamMemberListReq.builder() - .name(name) - .startDate(nowDate) - .endDate(nowDate) - .status(1) - .build(); - - CompletableFuture>> memberFuture = completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { - }); - - return memberFuture.thenApply(MemberPagination::getList) - .thenApply(list -> list.stream().findFirst()) - .thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member - .getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息"))); - } - - private CompletableFuture fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { - MemberDetailsReq detailsReq = MemberDetailsReq.builder() - .id(memberId) - .startDate(nowDate) - .endDate(nowDate) - .build(); - - return completableFutureWebClientService - .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { - }); - } - - /** - * 对比两个Team中发生变化的具体TeamAccount - * - * @param prevTeam 缓存中的Team - * @param currTeam 新查询到的Team - * @return TeamAccountWithChange - */ - public List findChangedTeamAccount(Team prevTeam, Team currTeam) { - Map team2AccountMap = currTeam.getList() - .stream() - .collect(Collectors.toMap(TeamAccount::getId, account -> account)); - - return prevTeam.getList() - .stream() - .filter(account1 -> team2AccountMap.containsKey(account1.getId())) - .map(account1 -> { - TeamAccount account2 = team2AccountMap.get(account1.getId()); - TeamAccountWithChange changedAccount = new TeamAccountWithChange(); - BeanUtils.copyProperties(account2, changedAccount); - if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { - changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); - } - if (account1.getSubMemberNum() != account2.getSubMemberNum()) { - changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); - } - return changedAccount; - }) - .collect(Collectors.toList()); - } } diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java index 0c75b166..d64465d9 100644 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java @@ -33,43 +33,22 @@ public class CompletableFutureWebClientService { private final WebClient webClient; private final Semaphore semaphore; private final ObjectMapper objectMapper; - private final LoginService loginService; - private final AccountService accountService; public CompletableFutureWebClientService(WebClient.Builder webClientBuilder, @Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests, - ObjectMapper objectMapper, - LoginService loginService, - AccountService accountService) { + ObjectMapper objectMapper) { this.webClient = webClientBuilder.build(); this.semaphore = new Semaphore(maxConcurrentRequests); this.objectMapper = objectMapper; - this.loginService = loginService; - this.accountService = accountService; } public CompletableFuture fetchDataForAccount(AccountResp account, String apiPath, Object params, ParameterizedTypeReference> typeRef) { - return ensureLoggedIn(account) - .thenCompose(ignored -> fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture()); + return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture(); } - private CompletableFuture ensureLoggedIn(AccountResp account) { - if (StrUtil.isEmptyIfStr(account.getHeaders())) { - return loginService.reLoginAndGetHeaders(account) - .flatMap(newHeaders -> { - account.setHeaders(newHeaders); - accountService.updateHeaders(newHeaders, account.getId()); - return Mono.empty(); - }) - .then() - .toFuture(); - } else { - return CompletableFuture.completedFuture(null); - } - } public Mono fetchData(String url, String headers, @@ -112,7 +91,7 @@ public class CompletableFutureWebClientService { return Mono.just(new ApiResponse(null, "Decoding error", 6008)); } }) - .flatMap(response -> respHandler(response, url, params, typeRef, account)) + .flatMap(response -> respHandler(response, account)) .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException)) ).doFinally(signal -> semaphore.release()); } @@ -121,21 +100,12 @@ public class CompletableFutureWebClientService { return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable; } - private Mono respHandler(ApiResponse response, String url, Object params, - ParameterizedTypeReference> typeRef, AccountResp account) { + private Mono respHandler(ApiResponse response, AccountResp account) { if (response.getStatusCode().equals(6000)) { return Mono.just(response.getData()); } else if (response.getStatusCode().equals(6001)) { - return loginService.reLoginAndGetHeaders(account) - .flatMap(newHeaders -> { - // 更新 account 对象中的 headers - account.setHeaders(newHeaders); - accountService.updateHeaders(newHeaders, account.getId()); - // 重新发送原始请求 - return fetchData(url, newHeaders, params, typeRef, account) - .doOnNext(data -> log.info("Retried request successful")); - }) - .onErrorResume(e -> Mono.error(new BusinessException("Re-login failed: " + e.getMessage()))); + log.warn("[{}]登录失效,请检查token有效性", account.getUsername()); + throw new BusinessException("登录失效,请检查token有效性"); } else { return Mono.error(new BusinessException("Error status code: " + response.getStatusCode())); } diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java new file mode 100644 index 00000000..8f477ff3 --- /dev/null +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java @@ -0,0 +1,168 @@ +package com.zayac.admin.service; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.StrUtil; +import com.zayac.admin.constant.ApiPathConstants; +import com.zayac.admin.req.MemberDetailsReq; +import com.zayac.admin.req.PayRecordsListReq; +import com.zayac.admin.req.team.TeamMemberListReq; +import com.zayac.admin.resp.Member; +import com.zayac.admin.resp.MemberPagination; +import com.zayac.admin.resp.Pagination; +import com.zayac.admin.resp.PayRecord; +import com.zayac.admin.resp.team.Team; +import com.zayac.admin.resp.team.TeamAccount; +import com.zayac.admin.resp.team.TeamAccountWithChange; +import com.zayac.admin.system.model.entity.UserDO; +import com.zayac.admin.system.model.resp.AccountResp; +import com.zayac.admin.system.service.AccountService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.stereotype.Service; +import top.continew.starter.core.exception.BusinessException; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DepositService { + private final CompletableFutureWebClientService completableFutureWebClientService; + private final TelegramMessageService telegramMessageService; + private final AccountService accountService; + + public CompletableFuture processDeposits(AccountResp account, + Team currentTeam, + Team previousTeam, + LocalDate nowDate, + LocalDateTime nowDateTime) { + return CompletableFuture.runAsync(() -> { + if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) { + PayRecordsListReq req = PayRecordsListReq.builder() + .startDate(nowDate) + .endDate(nowDate) + .pageSize(100) + .payState(2) + .build(); + CompletableFuture>> paginationCompletableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() { + }); + List changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam); + Set changedAgentNames = changedTeamAccounts.stream() + .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0) + .map(TeamAccountWithChange::getAgentName) + .collect(Collectors.toSet()); + paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> { + Map> agentNameWithNames = payRecords.stream() + .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName())) + .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors + .mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new)))); + + agentNameWithNames.forEach((agentName, names) -> { + StringBuilder depositResults = new StringBuilder(); + AtomicInteger depositCounter = new AtomicInteger(0); + + TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream() + .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName)) + .findFirst() + .orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName))); + + List> fetchFutures = names.stream() + .map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> { + payRecords.stream() + .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt())) + .findFirst() + .ifPresent(record -> { + if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) { + depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), record.getScoreAmount())); + } + }); + }).exceptionally(ex -> { + log.error("Error fetching details for member {}: {}", name, ex.getMessage()); + return null; + })) + .toList(); + + CompletableFuture allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0])); + + allFetches.thenRun(() -> { + if (!depositResults.isEmpty()) { + String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(), depositResults.toString(), targetTeamAccount.getFirstDepositNum()); + UserDO currUser = accountService.getUserByAccountUsername(agentName); + telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); + } + }).exceptionally(ex -> { + log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage()); + return null; + }); + }); + }).exceptionally(ex -> { + log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage()); + return null; + }).join(); + } + }); + } + + private CompletableFuture fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) { + TeamMemberListReq memberListReq = TeamMemberListReq.builder() + .name(name) + .startDate(nowDate) + .endDate(nowDate) + .status(1) + .build(); + + CompletableFuture>> memberFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { + }); + + return memberFuture.thenApply(MemberPagination::getList) + .thenApply(list -> list.stream().findFirst()) + .thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member + .getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息"))); + } + + private CompletableFuture fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) { + MemberDetailsReq detailsReq = MemberDetailsReq.builder() + .id(memberId) + .startDate(nowDate) + .endDate(nowDate) + .build(); + + return completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() { + }); + } + + private List findChangedTeamAccount(Team prevTeam, Team currTeam) { + Map team2AccountMap = currTeam.getList() + .stream() + .collect(Collectors.toMap(TeamAccount::getId, account -> account)); + + return prevTeam.getList() + .stream() + .filter(account1 -> team2AccountMap.containsKey(account1.getId())) + .map(account1 -> { + TeamAccount account2 = team2AccountMap.get(account1.getId()); + TeamAccountWithChange changedAccount = new TeamAccountWithChange(); + BeanUtil.copyProperties(account2, changedAccount); + if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) { + changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum()); + } + if (account1.getSubMemberNum() != account2.getSubMemberNum()) { + changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum()); + } + return changedAccount; + }) + .collect(Collectors.toList()); + } +} \ No newline at end of file diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java deleted file mode 100644 index b23252e6..00000000 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.zayac.admin.service; - -import com.zayac.admin.system.model.resp.AccountResp; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.Mono; -import top.continew.starter.core.exception.BusinessException; - -import java.util.Map; - -@Service -@Slf4j -public class LoginService { - private final WebClient webClient; - - public LoginService(WebClient.Builder webClientBuilder) { - this.webClient = webClientBuilder.baseUrl("http://localhost:8081").build(); // 替换为你的 FastAPI 地址 - } - - public Mono reLoginAndGetHeaders(AccountResp account) { - return webClient.post() - .uri("/login") - .body(BodyInserters.fromValue(account)) - .retrieve() - .onStatus(status -> !status.is2xxSuccessful(), response -> - Mono.error(new BusinessException("Login failed with status code: " + response.statusCode())) - ) - .bodyToMono(Map.class) - .flatMap(response -> { - if (response.isEmpty() || !response.containsKey("headers")) { - log.error("Login response is empty or invalid"); - return Mono.error(new BusinessException("Login failed: response is empty or invalid")); - } - String headers = (String) response.get("headers"); - return Mono.just(headers); - }) - .doOnError(WebClientResponseException.class, e -> - log.error("Login request failed with error: {}", e.getMessage(), e) - ) - .onErrorMap(WebClientResponseException.class, e -> - new BusinessException("Login request failed", e) - ); - } -} diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java new file mode 100644 index 00000000..268fd05e --- /dev/null +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java @@ -0,0 +1,68 @@ +package com.zayac.admin.service; + +import com.zayac.admin.constant.ApiPathConstants; +import com.zayac.admin.req.team.TeamMemberReq; +import com.zayac.admin.resp.MemberPagination; +import com.zayac.admin.resp.team.Team; +import com.zayac.admin.resp.team.TeamAccount; +import com.zayac.admin.resp.team.TeamMember; +import com.zayac.admin.system.model.entity.UserDO; +import com.zayac.admin.system.model.resp.AccountResp; +import com.zayac.admin.system.service.AccountService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.stereotype.Service; + +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +@Slf4j +@Service +@RequiredArgsConstructor +public class RegistrationService { + private final CompletableFutureWebClientService completableFutureWebClientService; + private final TelegramMessageService notificationService; + private final TelegramMessageService telegramMessageService; + private final AccountService accountService; + + public CompletableFuture processRegistration(AccountResp account, + Team currentTeamInfo, + Team prevTeamInfo, + LocalDate nowDate, + Map teamAccountMap) { + if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) { + int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount(); + TeamMemberReq memberListReq = TeamMemberReq.builder() + .registerStartDate(nowDate) + .registerEndDate(nowDate) + .startDate(nowDate) + .endDate(nowDate) + .registerSort(1) + .pageSize(registerCount) + .build(); + CompletableFuture>> memberPaginationCompletableFuture = completableFutureWebClientService + .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() { + }); + return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> { + log.info("Successfully get {} new registered members", members.size()); + if (!members.isEmpty()) { + Map> groupByTopAgentName = members.stream() + .collect(Collectors.groupingBy(TeamMember::getTopAgentName)); + groupByTopAgentName.forEach((accountName, accountMembers) -> { + String notification = telegramMessageService.buildRegistrationMessage(accountName, accountMembers, teamAccountMap.get(accountName)); + UserDO currUser = accountService.getUserByAccountUsername(accountName); + // if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) { + // telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), notification); + // } + telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification); + }); + } + }); + } + return CompletableFuture.completedFuture(null); + } +} diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java index 2b156aab..9e74c8b1 100644 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java @@ -16,12 +16,16 @@ package com.zayac.admin.service; +import com.zayac.admin.resp.team.TeamAccount; +import com.zayac.admin.resp.team.TeamMember; import lombok.RequiredArgsConstructor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; +import java.math.BigDecimal; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; @Service @RequiredArgsConstructor @@ -36,10 +40,25 @@ public class TelegramMessageService { private static String escapeMarkdown(String text) { List escapeChars = Arrays - .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'); + .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'); for (Character charItem : escapeChars) { text = text.replace(charItem.toString(), "\\" + charItem); } return text; } + + public String buildRegistrationMessage(String accountName, List accountMembers, TeamAccount teamAccount) { + String memberNames = accountMembers.stream() + .map(TeamMember::getName) + .collect(Collectors.joining(", ")); + return String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers.size(), memberNames, teamAccount.getSubMemberNum()); + } + + public String buildDepositMessage(String agentName, int newDepositNum, String depositResults, int firstDepositNum) { + return String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, newDepositNum, depositResults, firstDepositNum); + } + + public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) { + return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount); + } }