移除调用登录接口,改用python接口维护token有效期

修复数据查询错误问题
This commit is contained in:
zayac 2024-05-29 17:49:29 +08:00
parent 60b7109419
commit 43131210f3
7 changed files with 288 additions and 327 deletions

View File

@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.NumberFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@ -84,7 +85,9 @@ public class DailyReport {
//获取当天总线的所有注册新增
int currentDayTotalNewMember = list.stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int currentDayTotalNewFirstDeposit = list.stream().mapToInt(TeamAccount::getFirstDepositNum).sum();
String message = String.format("截至%s 注册:*%d*,新增:*%d*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit);
//当日转化率
String percent = getPercent(currentDayTotalNewFirstDeposit, currentDayTotalNewMember);
String message = String.format("截至%s 注册:*%d*,新增:*%d* 转化率: *%s*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit, percent);
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getChatId(), message);
}
}
@ -112,6 +115,15 @@ public class DailyReport {
});
});
}
public static String getPercent(int x, int y) {
double d1 = x * 1.0;
double d2 = y * 1.0;
NumberFormat percentInstance = NumberFormat.getPercentInstance();
percentInstance.setMinimumFractionDigits(2);
return percentInstance.format(d1 / d2);
}
}

View File

@ -1,61 +1,31 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.schedule;
import cn.hutool.core.util.StrUtil;
import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.MemberDetailsReq;
import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.req.team.TeamMemberListReq;
import com.zayac.admin.req.team.TeamMemberReq;
import com.zayac.admin.resp.*;
import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamAccountWithChange;
import com.zayac.admin.resp.team.TeamMember;
import com.zayac.admin.service.CompletableFutureWebClientService;
import com.zayac.admin.service.TeamService;
import com.zayac.admin.service.TelegramMessageService;
import com.zayac.admin.service.*;
import com.zayac.admin.system.model.entity.RoleDO;
import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.*;
import com.zayac.admin.system.service.AccountService;
import com.zayac.admin.system.service.RoleService;
import com.zayac.admin.system.service.UserRoleService;
import com.zayac.admin.system.service.UserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import top.continew.starter.core.exception.BusinessException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.TemporalAdjusters;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 定时任务 查询注册 新增用 暂定一分钟执行一次
*/
@Slf4j
@Component
@RequiredArgsConstructor
@ -63,32 +33,27 @@ public class TelegramTeamMessageSchedule {
private final UserService userService;
private final AccountService accountService;
private final CompletableFutureWebClientService completableFutureWebClientService;
private final TelegramMessageService telegramMessageService;
private final TeamService teamService;
private final RoleService roleService;
private final UserRoleService userRoleService;
private final RegistrationService registrationService;
private final DepositService depositService;
private static final String MINISTER_ROLE_CODE = "minister";
private static final long FIXED_DELAY = 60000L;
@Scheduled(fixedDelay = FIXED_DELAY)
public void newCheckRegistrationAndNewDeposit() {
// Get the current date and time
public void CheckRegistrationAndNewDeposit() {
LocalDate nowDate = LocalDate.now();
LocalDateTime nowDateTime = LocalDateTime.now();
//获取部长角色
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> users = userRoleService.listUserIdByRoleId(minister.getId());
//获取所有部长角色对应部门的用户
users.forEach(userId -> {
//传入对应部长用户
processUser(userService.getById(userId), nowDate, nowDateTime).join();
});
}
private CompletableFuture<Void> processUser(UserDO user, LocalDate nowDate, LocalDateTime nowDateTime) {
//获取当前用户名下的所有团队账号
List<AccountResp> accounts = accountService.getAccountsByUserId(user.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(AccountResp::getIsTeam)
@ -101,9 +66,7 @@ public class TelegramTeamMessageSchedule {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private CompletableFuture<Void> processTeamAccount(AccountResp account,
LocalDate nowDate,
LocalDateTime nowDateTime) {
private CompletableFuture<Void> processTeamAccount(AccountResp account, LocalDate nowDate, LocalDateTime nowDateTime) {
TeamInfoReq teamInfoReq = TeamInfoReq.builder()
.startDate(nowDateTime.with(TemporalAdjusters.firstDayOfMonth()))
.endDate(nowDateTime)
@ -115,207 +78,15 @@ public class TelegramTeamMessageSchedule {
Team currentTeamInfo = teamFuture.join();
log.info("Previous Team Info: {}", prevTeamInfo);
log.info("Current Team Info: {}", currentTeamInfo);
//获取代理线 代理对象映射
Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity()));
CompletableFuture<Void> registrationProcess = processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap);
CompletableFuture<Void> depositProcess = processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime);
CompletableFuture<Void> registrationProcess = registrationService.processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap);
CompletableFuture<Void> depositProcess = depositService.processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime);
return CompletableFuture.allOf(registrationProcess, depositProcess)
.thenRun(() -> teamService.updateTeamInfo(account, currentTeamInfo));
});
}
private CompletableFuture<Void> processRegistration(AccountResp account,
Team currentTeamInfo,
Team prevTeamInfo,
LocalDate nowDate,
Map<String, TeamAccount> teamAccountMap) {
if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
//代理线总共的新增注册数量
int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount();
//构建查询参数
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(nowDate)
.registerEndDate(nowDate)
.startDate(nowDate)
.endDate(nowDate)
.registerSort(1)
.pageSize(registerCount)
.build();
//查询所有新注册的会员
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> {
log.info("Successfully get {} new registered members", members.size());
//获取所有的新注册的会员对象
if (!members.isEmpty()) {
//根据上级代理名分组
Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String memberNames = accountMembers.stream()
.map(TeamMember::getName)
.collect(Collectors.joining(", "));
log.info("Successfully get new registrations {}", memberNames);
String notification = String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers
.size(), memberNames, teamAccountMap.get(accountName).getSubMemberNum());
UserDO currUser = accountService.getUserByAccountUsername(accountName);
// if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
// telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), notification);
// }
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}
);
}
});
}
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> processDeposits(AccountResp account,
Team currentTeam,
Team previousTeam,
LocalDate nowDate,
LocalDateTime nowDateTime) {
return CompletableFuture.runAsync(() -> {
if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) {
PayRecordsListReq req = PayRecordsListReq.builder()
.startDate(nowDate)
.endDate(nowDate)
.pageSize(100)
.payState(2)
.build();
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
Set<String> changedAgentNames = changedTeamAccounts.stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.map(TeamAccountWithChange::getAgentName)
.collect(Collectors.toSet());
paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
Map<String, List<String>> agentNameWithNames = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName()))
.collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
.mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new))));
agentNameWithNames.forEach((agentName, names) -> {
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
.filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
.findFirst()
.orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
List<CompletableFuture<Void>> fetchFutures = names.stream()
.map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> {
payRecords.stream()
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
.findFirst()
.ifPresent(record -> {
if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
depositResults.append(String.format("用户: `%s`, 首存金额: *%s*\n", member.getName(), record.getScoreAmount()));
}
});
}).exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", name, ex.getMessage());
return null;
}))
.toList();
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0]));
allFetches.thenRun(() -> {
if (!depositResults.isEmpty()) {
String notification = String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, targetTeamAccount.getNewDepositNum(), depositResults, targetTeamAccount.getFirstDepositNum());
UserDO currUser = accountService.getUserByAccountUsername(agentName);
// if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
// telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), depResults);
// }
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}
}).exceptionally(ex -> {
log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage());
return null;
});
});
}).exceptionally(ex -> {
log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
return null;
}).join();
}
});
}
//根据用户名查询对应会员详情
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) {
TeamMemberListReq memberListReq = TeamMemberListReq.builder()
.name(name)
.startDate(nowDate)
.endDate(nowDate)
.status(1)
.build();
CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
return memberFuture.thenApply(MemberPagination::getList)
.thenApply(list -> list.stream().findFirst())
.thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member
.getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")));
}
private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
MemberDetailsReq detailsReq = MemberDetailsReq.builder()
.id(memberId)
.startDate(nowDate)
.endDate(nowDate)
.build();
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
});
}
/**
* 对比两个Team中发生变化的具体TeamAccount
*
* @param prevTeam 缓存中的Team
* @param currTeam 新查询到的Team
* @return TeamAccountWithChange
*/
public List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
return prevTeam.getList()
.stream()
.filter(account1 -> team2AccountMap.containsKey(account1.getId()))
.map(account1 -> {
TeamAccount account2 = team2AccountMap.get(account1.getId());
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
BeanUtils.copyProperties(account2, changedAccount);
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
}
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
}
return changedAccount;
})
.collect(Collectors.toList());
}
}

View File

@ -33,43 +33,22 @@ public class CompletableFutureWebClientService {
private final WebClient webClient;
private final Semaphore semaphore;
private final ObjectMapper objectMapper;
private final LoginService loginService;
private final AccountService accountService;
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests,
ObjectMapper objectMapper,
LoginService loginService,
AccountService accountService) {
ObjectMapper objectMapper) {
this.webClient = webClientBuilder.build();
this.semaphore = new Semaphore(maxConcurrentRequests);
this.objectMapper = objectMapper;
this.loginService = loginService;
this.accountService = accountService;
}
public <T> CompletableFuture<T> fetchDataForAccount(AccountResp account,
String apiPath,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
return ensureLoggedIn(account)
.thenCompose(ignored -> fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture());
return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture();
}
private CompletableFuture<Void> ensureLoggedIn(AccountResp account) {
if (StrUtil.isEmptyIfStr(account.getHeaders())) {
return loginService.reLoginAndGetHeaders(account)
.flatMap(newHeaders -> {
account.setHeaders(newHeaders);
accountService.updateHeaders(newHeaders, account.getId());
return Mono.empty();
})
.then()
.toFuture();
} else {
return CompletableFuture.completedFuture(null);
}
}
public <T> Mono<T> fetchData(String url,
String headers,
@ -112,7 +91,7 @@ public class CompletableFutureWebClientService {
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
}
})
.flatMap(response -> respHandler(response, url, params, typeRef, account))
.flatMap(response -> respHandler(response, account))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))
).doFinally(signal -> semaphore.release());
}
@ -121,21 +100,12 @@ public class CompletableFutureWebClientService {
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable;
}
private <T> Mono<T> respHandler(ApiResponse<T> response, String url, Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef, AccountResp account) {
private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) {
if (response.getStatusCode().equals(6000)) {
return Mono.just(response.getData());
} else if (response.getStatusCode().equals(6001)) {
return loginService.reLoginAndGetHeaders(account)
.flatMap(newHeaders -> {
// 更新 account 对象中的 headers
account.setHeaders(newHeaders);
accountService.updateHeaders(newHeaders, account.getId());
// 重新发送原始请求
return fetchData(url, newHeaders, params, typeRef, account)
.doOnNext(data -> log.info("Retried request successful"));
})
.onErrorResume(e -> Mono.error(new BusinessException("Re-login failed: " + e.getMessage())));
log.warn("[{}]登录失效,请检查token有效性", account.getUsername());
throw new BusinessException("登录失效,请检查token有效性");
} else {
return Mono.error(new BusinessException("Error status code: " + response.getStatusCode()));
}

View File

@ -0,0 +1,168 @@
package com.zayac.admin.service;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.MemberDetailsReq;
import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamMemberListReq;
import com.zayac.admin.resp.Member;
import com.zayac.admin.resp.MemberPagination;
import com.zayac.admin.resp.Pagination;
import com.zayac.admin.resp.PayRecord;
import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamAccountWithChange;
import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import top.continew.starter.core.exception.BusinessException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class DepositService {
private final CompletableFutureWebClientService completableFutureWebClientService;
private final TelegramMessageService telegramMessageService;
private final AccountService accountService;
public CompletableFuture<Void> processDeposits(AccountResp account,
Team currentTeam,
Team previousTeam,
LocalDate nowDate,
LocalDateTime nowDateTime) {
return CompletableFuture.runAsync(() -> {
if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) {
PayRecordsListReq req = PayRecordsListReq.builder()
.startDate(nowDate)
.endDate(nowDate)
.pageSize(100)
.payState(2)
.build();
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
Set<String> changedAgentNames = changedTeamAccounts.stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.map(TeamAccountWithChange::getAgentName)
.collect(Collectors.toSet());
paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
Map<String, List<String>> agentNameWithNames = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName()))
.collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
.mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new))));
agentNameWithNames.forEach((agentName, names) -> {
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
.filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
.findFirst()
.orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
List<CompletableFuture<Void>> fetchFutures = names.stream()
.map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> {
payRecords.stream()
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
.findFirst()
.ifPresent(record -> {
if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), record.getScoreAmount()));
}
});
}).exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", name, ex.getMessage());
return null;
}))
.toList();
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0]));
allFetches.thenRun(() -> {
if (!depositResults.isEmpty()) {
String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(), depositResults.toString(), targetTeamAccount.getFirstDepositNum());
UserDO currUser = accountService.getUserByAccountUsername(agentName);
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}
}).exceptionally(ex -> {
log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage());
return null;
});
});
}).exceptionally(ex -> {
log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
return null;
}).join();
}
});
}
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) {
TeamMemberListReq memberListReq = TeamMemberListReq.builder()
.name(name)
.startDate(nowDate)
.endDate(nowDate)
.status(1)
.build();
CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
return memberFuture.thenApply(MemberPagination::getList)
.thenApply(list -> list.stream().findFirst())
.thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member
.getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")));
}
private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
MemberDetailsReq detailsReq = MemberDetailsReq.builder()
.id(memberId)
.startDate(nowDate)
.endDate(nowDate)
.build();
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
});
}
private List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
return prevTeam.getList()
.stream()
.filter(account1 -> team2AccountMap.containsKey(account1.getId()))
.map(account1 -> {
TeamAccount account2 = team2AccountMap.get(account1.getId());
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
BeanUtil.copyProperties(account2, changedAccount);
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
}
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
}
return changedAccount;
})
.collect(Collectors.toList());
}
}

View File

@ -1,47 +0,0 @@
package com.zayac.admin.service;
import com.zayac.admin.system.model.resp.AccountResp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import top.continew.starter.core.exception.BusinessException;
import java.util.Map;
@Service
@Slf4j
public class LoginService {
private final WebClient webClient;
public LoginService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("http://localhost:8081").build(); // 替换为你的 FastAPI 地址
}
public Mono<String> reLoginAndGetHeaders(AccountResp account) {
return webClient.post()
.uri("/login")
.body(BodyInserters.fromValue(account))
.retrieve()
.onStatus(status -> !status.is2xxSuccessful(), response ->
Mono.error(new BusinessException("Login failed with status code: " + response.statusCode()))
)
.bodyToMono(Map.class)
.flatMap(response -> {
if (response.isEmpty() || !response.containsKey("headers")) {
log.error("Login response is empty or invalid");
return Mono.error(new BusinessException("Login failed: response is empty or invalid"));
}
String headers = (String) response.get("headers");
return Mono.just(headers);
})
.doOnError(WebClientResponseException.class, e ->
log.error("Login request failed with error: {}", e.getMessage(), e)
)
.onErrorMap(WebClientResponseException.class, e ->
new BusinessException("Login request failed", e)
);
}
}

View File

@ -0,0 +1,68 @@
package com.zayac.admin.service;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.team.TeamMemberReq;
import com.zayac.admin.resp.MemberPagination;
import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamMember;
import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class RegistrationService {
private final CompletableFutureWebClientService completableFutureWebClientService;
private final TelegramMessageService notificationService;
private final TelegramMessageService telegramMessageService;
private final AccountService accountService;
public CompletableFuture<Void> processRegistration(AccountResp account,
Team currentTeamInfo,
Team prevTeamInfo,
LocalDate nowDate,
Map<String, TeamAccount> teamAccountMap) {
if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount();
TeamMemberReq memberListReq = TeamMemberReq.builder()
.registerStartDate(nowDate)
.registerEndDate(nowDate)
.startDate(nowDate)
.endDate(nowDate)
.registerSort(1)
.pageSize(registerCount)
.build();
CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
});
return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> {
log.info("Successfully get {} new registered members", members.size());
if (!members.isEmpty()) {
Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
.collect(Collectors.groupingBy(TeamMember::getTopAgentName));
groupByTopAgentName.forEach((accountName, accountMembers) -> {
String notification = telegramMessageService.buildRegistrationMessage(accountName, accountMembers, teamAccountMap.get(accountName));
UserDO currUser = accountService.getUserByAccountUsername(accountName);
// if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
// telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), notification);
// }
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
});
}
});
}
return CompletableFuture.completedFuture(null);
}
}

View File

@ -16,12 +16,16 @@
package com.zayac.admin.service;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.resp.team.TeamMember;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@ -36,10 +40,25 @@ public class TelegramMessageService {
private static String escapeMarkdown(String text) {
List<Character> escapeChars = Arrays
.asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!');
.asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!');
for (Character charItem : escapeChars) {
text = text.replace(charItem.toString(), "\\" + charItem);
}
return text;
}
public String buildRegistrationMessage(String accountName, List<TeamMember> accountMembers, TeamAccount teamAccount) {
String memberNames = accountMembers.stream()
.map(TeamMember::getName)
.collect(Collectors.joining(", "));
return String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers.size(), memberNames, teamAccount.getSubMemberNum());
}
public String buildDepositMessage(String agentName, int newDepositNum, String depositResults, int firstDepositNum) {
return String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, newDepositNum, depositResults, firstDepositNum);
}
public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) {
return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount);
}
}