修复了一系列查询数据错误的问题

新增了日报功能
完善了api-token过期之后调用登录的接口
This commit is contained in:
zayac 2024-05-27 15:01:28 +08:00
parent eb3f82d7c3
commit 60b7109419
8 changed files with 245 additions and 53 deletions

View File

@ -0,0 +1,117 @@
package com.zayac.admin.schedule;
import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.req.AgentDataVisualListReq;
import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.resp.AgentDataVisualList;
import com.zayac.admin.resp.Banner;
import com.zayac.admin.resp.Statics;
import com.zayac.admin.resp.team.Team;
import com.zayac.admin.resp.team.TeamAccount;
import com.zayac.admin.service.AgentDataVisualListService;
import com.zayac.admin.service.BannerService;
import com.zayac.admin.service.TeamService;
import com.zayac.admin.service.TelegramMessageService;
import com.zayac.admin.system.model.entity.RoleDO;
import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.*;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.TemporalAdjusters;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
/**
* 日报
*/
@Component
@RequiredArgsConstructor
public class DailyReport {
private final UserService userService;
private final TeamService teamService;
private final RoleService roleService;
private final UserRoleService userRoleService;
private final AccountService accountService;
private final TelegramMessageService telegramMessageService;
private final RoleDeptService roleDeptService;
private final DeptService deptService;
private final BannerService bannerService;
private final AgentDataVisualListService agentDataVisualListService;
private static final String MINISTER_ROLE_CODE = "minister";
//每天11:40 14:40 17:40 21:40 23:59开启发送日报
@Scheduled(cron = "0 40 11,14,17,21 * * ?")
@Scheduled(cron = "0 59 23 * * ?")
public void TeamAccountDailyReport() {
LocalDate nowDate = LocalDate.now();
LocalDateTime nowDateTime = LocalDateTime.now();
//获取部长角色
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> userIds = userRoleService.listUserIdByRoleId(minister.getId());
//获取所有部长角色对应部门的用户
userIds.forEach(userId -> {
//传入对应部长用户
UserDO ministerUser = userService.getById(userId);
//获取当前用户名下的所有团队账号
List<AccountResp> accounts = accountService.getAccountsByUserId(ministerUser.getId(), DisEnableStatusEnum.ENABLE)
.stream()
.filter(AccountResp::getIsTeam)
.toList();
Long deptId = ministerUser.getDeptId();
List<UserDO> deptUsers = userService.getByDeptId(DisEnableStatusEnum.ENABLE, deptId);
accounts.forEach(
accountResp -> {
TeamInfoReq teamInfoReq = TeamInfoReq.builder()
.startDate(nowDate.atStartOfDay())
.endDate(nowDateTime)
.build();
Team team = teamService.getLatestTeamInfoAsync(accountResp, teamInfoReq).join();
List<TeamAccount> list = team.getList();
//发送消息给部长
if (ministerUser.getNeedNotify() == DisEnableStatusEnum.ENABLE) {
//获取当天总线的所有注册新增
int currentDayTotalNewMember = list.stream().mapToInt(TeamAccount::getSubMemberNum).sum();
int currentDayTotalNewFirstDeposit = list.stream().mapToInt(TeamAccount::getFirstDepositNum).sum();
String message = String.format("截至%s 注册:*%d*,新增:*%d*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit);
telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getChatId(), message);
}
}
);
AgentDataVisualListReq agentDataVisualListReq = AgentDataVisualListReq.builder().build();
deptUsers.forEach(deptUser -> {
StringBuilder message = new StringBuilder();
List<AccountResp> currUserAccounts = accountService.getAccountsByUserId(deptUser.getId(), DisEnableStatusEnum.ENABLE);
currUserAccounts.forEach(currAccount -> {
CompletableFuture<AgentDataVisualList> agentDataVisualListCompletableFuture = agentDataVisualListService.getAgentDataVisualList(currAccount, agentDataVisualListReq);
AgentDataVisualList agentDataVisualList = agentDataVisualListCompletableFuture.join();
Statics statics = agentDataVisualList.getCurData().stream().filter(data -> data.getStaticsDate().equals(nowDate)).findFirst().orElseThrow();
message.append(currAccount.getUsername()).append("\n");
message.append("注册: ").append(statics.getIsNew()).append("\n");
message.append("新增: ").append(statics.getFirstCount()).append("\n");
message.append("日活: ").append(statics.getCountBets()).append("\n\n");
});
// if (DisEnableStatusEnum.ENABLE.equals(deptUser.getNeedNotify())) {
// if (DisEnableStatusEnum.ENABLE.equals(deptUser.getNeedNotify())) {
// telegramMessageService.sendMessage(user.getBotToken(), user.getGroupId(), message.toString());
// }
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, message.toString());
});
});
}
}

View File

@ -16,11 +16,12 @@
package com.zayac.admin.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.MemberDetailsReq;
import com.zayac.admin.req.PayRecordsListReq;
import com.zayac.admin.req.team.TeamInfoReq;
import com.zayac.admin.req.team.TeamMemberListReq;
import com.zayac.admin.req.team.TeamMemberReq;
import com.zayac.admin.resp.*;
@ -41,11 +42,14 @@ import org.springframework.beans.BeanUtils;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import top.continew.starter.core.exception.BusinessException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.TemporalAdjusters;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -100,7 +104,11 @@ public class TelegramTeamMessageSchedule {
private CompletableFuture<Void> processTeamAccount(AccountResp account,
LocalDate nowDate,
LocalDateTime nowDateTime) {
CompletableFuture<Team> teamFuture = teamService.getLatestTeamInfoAsync(account, nowDate, nowDateTime);
TeamInfoReq teamInfoReq = TeamInfoReq.builder()
.startDate(nowDateTime.with(TemporalAdjusters.firstDayOfMonth()))
.endDate(nowDateTime)
.build();
CompletableFuture<Team> teamFuture = teamService.getLatestTeamInfoAsync(account, teamInfoReq);
Team prevTeamInfo = teamService.getPreviousTeamInfo(account);
return CompletableFuture.allOf(teamFuture).thenCompose(v -> {
@ -113,7 +121,7 @@ public class TelegramTeamMessageSchedule {
.collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity()));
CompletableFuture<Void> registrationProcess = processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap);
CompletableFuture<Void> depositProcess = processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime, teamAccountMap);
CompletableFuture<Void> depositProcess = processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime);
return CompletableFuture.allOf(registrationProcess, depositProcess)
.thenRun(() -> teamService.updateTeamInfo(account, currentTeamInfo));
@ -175,8 +183,7 @@ public class TelegramTeamMessageSchedule {
Team currentTeam,
Team previousTeam,
LocalDate nowDate,
LocalDateTime nowDateTime,
Map<String, TeamAccount> teamAccountMap) {
LocalDateTime nowDateTime) {
return CompletableFuture.runAsync(() -> {
if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) {
PayRecordsListReq req = PayRecordsListReq.builder()
@ -188,15 +195,26 @@ public class TelegramTeamMessageSchedule {
CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
});
List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
Set<String> changedAgentNames = changedTeamAccounts.stream()
.filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
.map(TeamAccountWithChange::getAgentName)
.collect(Collectors.toSet());
paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
Map<String, List<String>> agentNameWithNames = payRecords.stream()
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)))
.filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName()))
.collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
.mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors
.toSet(), ArrayList::new))));
.mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new))));
agentNameWithNames.forEach((agentName, names) -> {
StringBuilder depositResults = new StringBuilder();
AtomicInteger depositCounter = new AtomicInteger(0);
TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
.filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
.findFirst()
.orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
List<CompletableFuture<Void>> fetchFutures = names.stream()
.map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> {
@ -204,8 +222,9 @@ public class TelegramTeamMessageSchedule {
.filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
.findFirst()
.ifPresent(record -> {
depositResults.append(String.format("用户: `%s`, 首存金额: *%s*\n", member
.getName(), record.getScoreAmount()));
if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
depositResults.append(String.format("用户: `%s`, 首存金额: *%s*\n", member.getName(), record.getScoreAmount()));
}
});
}).exceptionally(ex -> {
log.error("Error fetching details for member {}: {}", name, ex.getMessage());
@ -213,25 +232,20 @@ public class TelegramTeamMessageSchedule {
}))
.toList();
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures
.toArray(new CompletableFuture[0]));
CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0]));
allFetches.thenRun(() -> {
if (!depositResults.isEmpty()) {
String notification = String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, fetchFutures
.size(), depositResults, teamAccountMap.get(agentName).getFirstDepositNum());
String notification = String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, targetTeamAccount.getNewDepositNum(), depositResults, targetTeamAccount.getFirstDepositNum());
UserDO currUser = accountService.getUserByAccountUsername(agentName);
// if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
// telegramMessageService.sendMessage(currUser.getBotToken(), currUser
// .getGroupId(), depResults);
// }
telegramMessageService
.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
// if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
// telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), depResults);
// }
telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
}
}).exceptionally(ex -> {
log.error("Error sending notification for account {}: {}", account.getId(), ex
.getMessage());
log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage());
return null;
});
});
@ -243,6 +257,7 @@ public class TelegramTeamMessageSchedule {
});
}
//根据用户名查询对应会员详情
private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) {
TeamMemberListReq memberListReq = TeamMemberListReq.builder()
@ -282,11 +297,6 @@ public class TelegramTeamMessageSchedule {
* @return TeamAccountWithChange
*/
public List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
if (prevTeam == null || currTeam == null || CollUtil.isEmpty(prevTeam.getList()) || CollUtil.isEmpty(currTeam
.getList())) {
return CollUtil.newArrayList();
}
Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
.stream()
.collect(Collectors.toMap(TeamAccount::getId, account -> account));
@ -297,12 +307,12 @@ public class TelegramTeamMessageSchedule {
.map(account1 -> {
TeamAccount account2 = team2AccountMap.get(account1.getId());
TeamAccountWithChange changedAccount = new TeamAccountWithChange();
BeanUtils.copyProperties(account1, changedAccount);
BeanUtils.copyProperties(account2, changedAccount);
if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
changedAccount.setNewDepositNum(account1.getFirstDepositNum() - account2.getFirstDepositNum());
changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
}
if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
changedAccount.setNewRegisterNum(account1.getSubMemberNum() - account2.getSubMemberNum());
changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
}
return changedAccount;
})

View File

@ -0,0 +1,23 @@
package com.zayac.admin.service;
import com.zayac.admin.constant.ApiPathConstants;
import com.zayac.admin.req.AgentDataVisualListReq;
import com.zayac.admin.resp.AgentDataVisualList;
import com.zayac.admin.resp.Banner;
import com.zayac.admin.system.model.resp.AccountResp;
import lombok.RequiredArgsConstructor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
@RequiredArgsConstructor
public class AgentDataVisualListService {
private final CompletableFutureWebClientService completableFutureWebClientService;
public CompletableFuture<AgentDataVisualList> getAgentDataVisualList(AccountResp account, AgentDataVisualListReq agentDataVisualListReq) {
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.VISUAL_LIST_URL, agentDataVisualListReq, new ParameterizedTypeReference<>() {});
}
}

View File

@ -1,6 +1,7 @@
package com.zayac.admin.service;
import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zayac.admin.resp.ApiResponse;
@ -33,7 +34,6 @@ public class CompletableFutureWebClientService {
private final Semaphore semaphore;
private final ObjectMapper objectMapper;
private final LoginService loginService;
private final AccountService accountService;
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
@ -52,7 +52,23 @@ public class CompletableFutureWebClientService {
String apiPath,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
return this.fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture();
return ensureLoggedIn(account)
.thenCompose(ignored -> fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture());
}
private CompletableFuture<Void> ensureLoggedIn(AccountResp account) {
if (StrUtil.isEmptyIfStr(account.getHeaders())) {
return loginService.reLoginAndGetHeaders(account)
.flatMap(newHeaders -> {
account.setHeaders(newHeaders);
accountService.updateHeaders(newHeaders, account.getId());
return Mono.empty();
})
.then()
.toFuture();
} else {
return CompletableFuture.completedFuture(null);
}
}
public <T> Mono<T> fetchData(String url,
@ -67,9 +83,14 @@ public class CompletableFutureWebClientService {
return true;
}).subscribeOn(Schedulers.boundedElastic()).then(
this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
try {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
} catch (Exception e) {
log.warn("Header conversion exception: " + e.getMessage());
throw new BusinessException("Header conversion failed", e);
}
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()

View File

@ -1,20 +1,23 @@
package com.zayac.admin.service;
import com.zayac.admin.system.model.resp.AccountResp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import top.continew.starter.core.exception.BusinessException;
import java.util.Map;
/**
* api_token失效的时候执行的逻辑
*/
@Service
@Slf4j
public class LoginService {
private final WebClient webClient;
public LoginService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("http://localhost:8000").build(); // 替换为你的 FastAPI 地址
this.webClient = webClientBuilder.baseUrl("http://localhost:8081").build(); // 替换为你的 FastAPI 地址
}
public Mono<String> reLoginAndGetHeaders(AccountResp account) {
@ -22,7 +25,23 @@ public class LoginService {
.uri("/login")
.body(BodyInserters.fromValue(account))
.retrieve()
.bodyToMono(String.class)
.map(response -> response);
.onStatus(status -> !status.is2xxSuccessful(), response ->
Mono.error(new BusinessException("Login failed with status code: " + response.statusCode()))
)
.bodyToMono(Map.class)
.flatMap(response -> {
if (response.isEmpty() || !response.containsKey("headers")) {
log.error("Login response is empty or invalid");
return Mono.error(new BusinessException("Login failed: response is empty or invalid"));
}
String headers = (String) response.get("headers");
return Mono.just(headers);
})
.doOnError(WebClientResponseException.class, e ->
log.error("Login request failed with error: {}", e.getMessage(), e)
)
.onErrorMap(WebClientResponseException.class, e ->
new BusinessException("Login request failed", e)
);
}
}

View File

@ -38,16 +38,18 @@ public class TeamService {
private final CompletableFutureWebClientService completableFutureWebClientService;
public CompletableFuture<Team> getLatestTeamInfoAsync(AccountResp account,
LocalDate nowDate,
LocalDateTime nowDateTime) {
TeamInfoReq.TeamInfoReqBuilder teamInfoReqBuilder = TeamInfoReq.builder()
.pageNum(1)
.pageSize(100)
.startDate(nowDateTime.with(TemporalAdjusters.firstDayOfMonth()))
.endDate(nowDateTime);
TeamInfoReq teamInfoReq) {
//设置一个超大的分页参数 确保一次查询到所有的代理线
if (teamInfoReq.getPageSize() == 0) {
teamInfoReq.setPageSize(200);
}
if (teamInfoReq.getPageNum() == 0) {
teamInfoReq.setPageNum(1);
}
return completableFutureWebClientService
.fetchDataForAccount(account, ApiPathConstants.TEAM_LIST_URL, teamInfoReqBuilder, new ParameterizedTypeReference<>() {
});
.fetchDataForAccount(account, ApiPathConstants.TEAM_LIST_URL, teamInfoReq, new ParameterizedTypeReference<>() {
});
}
@Cached(name = "TEAM_CACHE", key = "#account.id", expire = 90, cacheType = CacheType.BOTH, syncLocal = true)

View File

@ -61,7 +61,7 @@ public class AccountResp extends BaseResp {
* 密码
*/
@Schema(description = "密码", example = "tg666888")
@JsonMask(MaskType.PASSWORD)
// @JsonMask(MaskType.PASSWORD)
private String password;
/**

View File

@ -65,7 +65,7 @@ public class AccountServiceImpl extends BaseServiceImpl<AccountMapper, AccountDO
.eq(AccountDO::getUsername, username)
.eq(AccountDO::getStatus, DisEnableStatusEnum.ENABLE)
.oneOpt()
.map(account -> userService.getById(account.getId()))
.map(account -> userService.getById(account.getUserId()))
.orElse(null);
}