From 43131210f36819f334f7bdaf5309647076f7c747 Mon Sep 17 00:00:00 2001
From: zayac <stupidzayac@gmail.com>
Date: Wed, 29 May 2024 17:49:29 +0800
Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E8=B0=83=E7=94=A8=E7=99=BB?=
 =?UTF-8?q?=E5=BD=95=E6=8E=A5=E5=8F=A3,=E6=94=B9=E7=94=A8python=E6=8E=A5?=
 =?UTF-8?q?=E5=8F=A3=E7=BB=B4=E6=8A=A4token=E6=9C=89=E6=95=88=E6=9C=9F=20?=
 =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2=E9=94=99?=
 =?UTF-8?q?=E8=AF=AF=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../com/zayac/admin/schedule/DailyReport.java |  14 +-
 .../schedule/TelegramTeamMessageSchedule.java | 255 +-----------------
 .../CompletableFutureWebClientService.java    |  42 +--
 .../zayac/admin/service/DepositService.java   | 168 ++++++++++++
 .../com/zayac/admin/service/LoginService.java |  47 ----
 .../admin/service/RegistrationService.java    |  68 +++++
 .../admin/service/TelegramMessageService.java |  21 +-
 7 files changed, 288 insertions(+), 327 deletions(-)
 create mode 100644 zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java
 delete mode 100644 zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java
 create mode 100644 zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java

diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java
index e1e73e64..32f3e114 100644
--- a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java
+++ b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/DailyReport.java
@@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.text.NumberFormat;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -84,7 +85,9 @@ public class DailyReport {
                             //获取当天总线的所有注册新增
                             int currentDayTotalNewMember = list.stream().mapToInt(TeamAccount::getSubMemberNum).sum();
                             int currentDayTotalNewFirstDeposit = list.stream().mapToInt(TeamAccount::getFirstDepositNum).sum();
-                            String message = String.format("截至%s 注册:*%d*,新增:*%d*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit);
+                            //当日转化率
+                            String percent = getPercent(currentDayTotalNewFirstDeposit, currentDayTotalNewMember);
+                            String message = String.format("截至%s 注册:*%d*,新增:*%d* 转化率: *%s*", nowDateTime, currentDayTotalNewMember, currentDayTotalNewFirstDeposit, percent);
                             telegramMessageService.sendMessage(ministerUser.getBotToken(), ministerUser.getChatId(), message);
                         }
                     }
@@ -112,6 +115,15 @@ public class DailyReport {
             });
         });
     }
+
+    public static String getPercent(int x, int y) {
+        double d1 = x * 1.0;
+        double d2 = y * 1.0;
+        NumberFormat percentInstance = NumberFormat.getPercentInstance();
+        percentInstance.setMinimumFractionDigits(2);
+        return percentInstance.format(d1 / d2);
+    }
 }
 
 
+
diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java
index 3c2358f4..466ac502 100644
--- a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java
+++ b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java
@@ -1,61 +1,31 @@
-/*
- * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package com.zayac.admin.schedule;
 
-import cn.hutool.core.util.StrUtil;
 import com.zayac.admin.common.enums.DisEnableStatusEnum;
-import com.zayac.admin.constant.ApiPathConstants;
-import com.zayac.admin.req.MemberDetailsReq;
-import com.zayac.admin.req.PayRecordsListReq;
 import com.zayac.admin.req.team.TeamInfoReq;
-import com.zayac.admin.req.team.TeamMemberListReq;
-import com.zayac.admin.req.team.TeamMemberReq;
-import com.zayac.admin.resp.*;
 import com.zayac.admin.resp.team.Team;
 import com.zayac.admin.resp.team.TeamAccount;
-import com.zayac.admin.resp.team.TeamAccountWithChange;
-import com.zayac.admin.resp.team.TeamMember;
-import com.zayac.admin.service.CompletableFutureWebClientService;
-import com.zayac.admin.service.TeamService;
-import com.zayac.admin.service.TelegramMessageService;
+import com.zayac.admin.service.*;
 import com.zayac.admin.system.model.entity.RoleDO;
 import com.zayac.admin.system.model.entity.UserDO;
 import com.zayac.admin.system.model.resp.AccountResp;
-import com.zayac.admin.system.service.*;
+import com.zayac.admin.system.service.AccountService;
+import com.zayac.admin.system.service.RoleService;
+import com.zayac.admin.system.service.UserRoleService;
+import com.zayac.admin.system.service.UserService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.BeanUtils;
-import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
-import top.continew.starter.core.exception.BusinessException;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.temporal.TemporalAdjusters;
-import java.util.*;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-/**
- * 定时任务 查询注册 新增用 暂定一分钟执行一次
- */
 @Slf4j
 @Component
 @RequiredArgsConstructor
@@ -63,32 +33,27 @@ public class TelegramTeamMessageSchedule {
 
     private final UserService userService;
     private final AccountService accountService;
-    private final CompletableFutureWebClientService completableFutureWebClientService;
-    private final TelegramMessageService telegramMessageService;
     private final TeamService teamService;
     private final RoleService roleService;
     private final UserRoleService userRoleService;
+    private final RegistrationService registrationService;
+    private final DepositService depositService;
 
     private static final String MINISTER_ROLE_CODE = "minister";
     private static final long FIXED_DELAY = 60000L;
 
     @Scheduled(fixedDelay = FIXED_DELAY)
-    public void newCheckRegistrationAndNewDeposit() {
-        // Get the current date and time
+    public void CheckRegistrationAndNewDeposit() {
         LocalDate nowDate = LocalDate.now();
         LocalDateTime nowDateTime = LocalDateTime.now();
-        //获取部长角色
         RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
         List<Long> users = userRoleService.listUserIdByRoleId(minister.getId());
-        //获取所有部长角色对应部门的用户
         users.forEach(userId -> {
-            //传入对应部长用户
             processUser(userService.getById(userId), nowDate, nowDateTime).join();
         });
     }
 
     private CompletableFuture<Void> processUser(UserDO user, LocalDate nowDate, LocalDateTime nowDateTime) {
-        //获取当前用户名下的所有团队账号
         List<AccountResp> accounts = accountService.getAccountsByUserId(user.getId(), DisEnableStatusEnum.ENABLE)
                 .stream()
                 .filter(AccountResp::getIsTeam)
@@ -101,9 +66,7 @@ public class TelegramTeamMessageSchedule {
         return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
-    private CompletableFuture<Void> processTeamAccount(AccountResp account,
-                                                       LocalDate nowDate,
-                                                       LocalDateTime nowDateTime) {
+    private CompletableFuture<Void> processTeamAccount(AccountResp account, LocalDate nowDate, LocalDateTime nowDateTime) {
         TeamInfoReq teamInfoReq = TeamInfoReq.builder()
                 .startDate(nowDateTime.with(TemporalAdjusters.firstDayOfMonth()))
                 .endDate(nowDateTime)
@@ -115,207 +78,15 @@ public class TelegramTeamMessageSchedule {
             Team currentTeamInfo = teamFuture.join();
             log.info("Previous Team Info: {}", prevTeamInfo);
             log.info("Current Team Info: {}", currentTeamInfo);
-            //获取代理线 代理对象映射
             Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList()
                     .stream()
                     .collect(Collectors.toMap(TeamAccount::getAgentName, Function.identity()));
 
-            CompletableFuture<Void> registrationProcess = processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap);
-            CompletableFuture<Void> depositProcess = processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime);
+            CompletableFuture<Void> registrationProcess = registrationService.processRegistration(account, currentTeamInfo, prevTeamInfo, nowDate, teamAccountMap);
+            CompletableFuture<Void> depositProcess = depositService.processDeposits(account, currentTeamInfo, prevTeamInfo, nowDate, nowDateTime);
 
             return CompletableFuture.allOf(registrationProcess, depositProcess)
                     .thenRun(() -> teamService.updateTeamInfo(account, currentTeamInfo));
         });
     }
-
-    private CompletableFuture<Void> processRegistration(AccountResp account,
-                                                        Team currentTeamInfo,
-                                                        Team prevTeamInfo,
-                                                        LocalDate nowDate,
-                                                        Map<String, TeamAccount> teamAccountMap) {
-        if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
-            //代理线总共的新增注册数量
-            int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount();
-            //构建查询参数
-            TeamMemberReq memberListReq = TeamMemberReq.builder()
-                    .registerStartDate(nowDate)
-                    .registerEndDate(nowDate)
-                    .startDate(nowDate)
-                    .endDate(nowDate)
-                    .registerSort(1)
-                    .pageSize(registerCount)
-                    .build();
-            //查询所有新注册的会员
-            CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
-                    .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
-                    });
-            return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> {
-                log.info("Successfully get {} new registered members", members.size());
-                //获取所有的新注册的会员对象
-                if (!members.isEmpty()) {
-                    //根据上级代理名分组
-                    Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
-                            .collect(Collectors.groupingBy(TeamMember::getTopAgentName));
-                    groupByTopAgentName.forEach((accountName, accountMembers) -> {
-                                String memberNames = accountMembers.stream()
-                                        .map(TeamMember::getName)
-                                        .collect(Collectors.joining(", "));
-                                log.info("Successfully get new registrations {}", memberNames);
-                                String notification = String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers
-                                        .size(), memberNames, teamAccountMap.get(accountName).getSubMemberNum());
-                                UserDO currUser = accountService.getUserByAccountUsername(accountName);
-                                //                                if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
-                                //                                    telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), notification);
-                                //                                }
-
-                                telegramMessageService
-                                        .sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
-                            }
-
-                    );
-                }
-            });
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private CompletableFuture<Void> processDeposits(AccountResp account,
-                                                    Team currentTeam,
-                                                    Team previousTeam,
-                                                    LocalDate nowDate,
-                                                    LocalDateTime nowDateTime) {
-        return CompletableFuture.runAsync(() -> {
-            if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) {
-                PayRecordsListReq req = PayRecordsListReq.builder()
-                        .startDate(nowDate)
-                        .endDate(nowDate)
-                        .pageSize(100)
-                        .payState(2)
-                        .build();
-                CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
-                        .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
-                        });
-                List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
-                Set<String> changedAgentNames = changedTeamAccounts.stream()
-                        .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
-                        .map(TeamAccountWithChange::getAgentName)
-                        .collect(Collectors.toSet());
-                paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
-
-                    Map<String, List<String>> agentNameWithNames = payRecords.stream()
-                            .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName()))
-                            .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
-                                    .mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new))));
-
-                    agentNameWithNames.forEach((agentName, names) -> {
-                        StringBuilder depositResults = new StringBuilder();
-                        AtomicInteger depositCounter = new AtomicInteger(0);
-
-                        TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
-                                .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
-                                .findFirst()
-                                .orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
-
-                        List<CompletableFuture<Void>> fetchFutures = names.stream()
-                                .map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> {
-                                    payRecords.stream()
-                                            .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
-                                            .findFirst()
-                                            .ifPresent(record -> {
-                                                if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
-                                                    depositResults.append(String.format("用户: `%s`, 首存金额: *%s*\n", member.getName(), record.getScoreAmount()));
-                                                }
-                                            });
-                                }).exceptionally(ex -> {
-                                    log.error("Error fetching details for member {}: {}", name, ex.getMessage());
-                                    return null;
-                                }))
-                                .toList();
-
-                        CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0]));
-
-                        allFetches.thenRun(() -> {
-                            if (!depositResults.isEmpty()) {
-                                String notification = String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, targetTeamAccount.getNewDepositNum(), depositResults, targetTeamAccount.getFirstDepositNum());
-
-                                UserDO currUser = accountService.getUserByAccountUsername(agentName);
-//                            if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
-//                                telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), depResults);
-//                            }
-                                telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
-                            }
-                        }).exceptionally(ex -> {
-                            log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage());
-                            return null;
-                        });
-                    });
-                }).exceptionally(ex -> {
-                    log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
-                    return null;
-                }).join();
-            }
-        });
-    }
-
-
-    //根据用户名查询对应会员详情
-    private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) {
-        TeamMemberListReq memberListReq = TeamMemberListReq.builder()
-                .name(name)
-                .startDate(nowDate)
-                .endDate(nowDate)
-                .status(1)
-                .build();
-
-        CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService
-                .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
-                });
-
-        return memberFuture.thenApply(MemberPagination::getList)
-                .thenApply(list -> list.stream().findFirst())
-                .thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member
-                        .getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")));
-    }
-
-    private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
-        MemberDetailsReq detailsReq = MemberDetailsReq.builder()
-                .id(memberId)
-                .startDate(nowDate)
-                .endDate(nowDate)
-                .build();
-
-        return completableFutureWebClientService
-                .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
-                });
-    }
-
-    /**
-     * 对比两个Team中发生变化的具体TeamAccount
-     *
-     * @param prevTeam 缓存中的Team
-     * @param currTeam 新查询到的Team
-     * @return TeamAccountWithChange
-     */
-    public List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
-        Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
-                .stream()
-                .collect(Collectors.toMap(TeamAccount::getId, account -> account));
-
-        return prevTeam.getList()
-                .stream()
-                .filter(account1 -> team2AccountMap.containsKey(account1.getId()))
-                .map(account1 -> {
-                    TeamAccount account2 = team2AccountMap.get(account1.getId());
-                    TeamAccountWithChange changedAccount = new TeamAccountWithChange();
-                    BeanUtils.copyProperties(account2, changedAccount);
-                    if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
-                        changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
-                    }
-                    if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
-                        changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
-                    }
-                    return changedAccount;
-                })
-                .collect(Collectors.toList());
-    }
 }
diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java
index 0c75b166..d64465d9 100644
--- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java
+++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java
@@ -33,43 +33,22 @@ public class CompletableFutureWebClientService {
     private final WebClient webClient;
     private final Semaphore semaphore;
     private final ObjectMapper objectMapper;
-    private final LoginService loginService;
-    private final AccountService accountService;
 
     public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
                                              @Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests,
-                                             ObjectMapper objectMapper,
-                                             LoginService loginService,
-                                             AccountService accountService) {
+                                             ObjectMapper objectMapper) {
         this.webClient = webClientBuilder.build();
         this.semaphore = new Semaphore(maxConcurrentRequests);
         this.objectMapper = objectMapper;
-        this.loginService = loginService;
-        this.accountService = accountService;
     }
 
     public <T> CompletableFuture<T> fetchDataForAccount(AccountResp account,
                                                         String apiPath,
                                                         Object params,
                                                         ParameterizedTypeReference<ApiResponse<T>> typeRef) {
-        return ensureLoggedIn(account)
-                .thenCompose(ignored -> fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture());
+        return fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture();
     }
 
-    private CompletableFuture<Void> ensureLoggedIn(AccountResp account) {
-        if (StrUtil.isEmptyIfStr(account.getHeaders())) {
-            return loginService.reLoginAndGetHeaders(account)
-                    .flatMap(newHeaders -> {
-                        account.setHeaders(newHeaders);
-                        accountService.updateHeaders(newHeaders, account.getId());
-                        return Mono.empty();
-                    })
-                    .then()
-                    .toFuture();
-        } else {
-            return CompletableFuture.completedFuture(null);
-        }
-    }
 
     public <T> Mono<T> fetchData(String url,
                                  String headers,
@@ -112,7 +91,7 @@ public class CompletableFutureWebClientService {
                                 return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
                             }
                         })
-                        .flatMap(response -> respHandler(response, url, params, typeRef, account))
+                        .flatMap(response -> respHandler(response, account))
                         .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))
         ).doFinally(signal -> semaphore.release());
     }
@@ -121,21 +100,12 @@ public class CompletableFutureWebClientService {
         return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable;
     }
 
-    private <T> Mono<T> respHandler(ApiResponse<T> response, String url, Object params,
-                                    ParameterizedTypeReference<ApiResponse<T>> typeRef, AccountResp account) {
+    private <T> Mono<T> respHandler(ApiResponse<T> response, AccountResp account) {
         if (response.getStatusCode().equals(6000)) {
             return Mono.just(response.getData());
         } else if (response.getStatusCode().equals(6001)) {
-            return loginService.reLoginAndGetHeaders(account)
-                    .flatMap(newHeaders -> {
-                        // 更新 account 对象中的 headers
-                        account.setHeaders(newHeaders);
-                        accountService.updateHeaders(newHeaders, account.getId());
-                        // 重新发送原始请求
-                        return fetchData(url, newHeaders, params, typeRef, account)
-                                .doOnNext(data -> log.info("Retried request successful"));
-                    })
-                    .onErrorResume(e -> Mono.error(new BusinessException("Re-login failed: " + e.getMessage())));
+            log.warn("[{}]登录失效,请检查token有效性", account.getUsername());
+            throw new BusinessException("登录失效,请检查token有效性");
         } else {
             return Mono.error(new BusinessException("Error status code: " + response.getStatusCode()));
         }
diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java
new file mode 100644
index 00000000..8f477ff3
--- /dev/null
+++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/DepositService.java
@@ -0,0 +1,168 @@
+package com.zayac.admin.service;
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.StrUtil;
+import com.zayac.admin.constant.ApiPathConstants;
+import com.zayac.admin.req.MemberDetailsReq;
+import com.zayac.admin.req.PayRecordsListReq;
+import com.zayac.admin.req.team.TeamMemberListReq;
+import com.zayac.admin.resp.Member;
+import com.zayac.admin.resp.MemberPagination;
+import com.zayac.admin.resp.Pagination;
+import com.zayac.admin.resp.PayRecord;
+import com.zayac.admin.resp.team.Team;
+import com.zayac.admin.resp.team.TeamAccount;
+import com.zayac.admin.resp.team.TeamAccountWithChange;
+import com.zayac.admin.system.model.entity.UserDO;
+import com.zayac.admin.system.model.resp.AccountResp;
+import com.zayac.admin.system.service.AccountService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.stereotype.Service;
+import top.continew.starter.core.exception.BusinessException;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class DepositService {
+    private final CompletableFutureWebClientService completableFutureWebClientService;
+    private final TelegramMessageService telegramMessageService;
+    private final AccountService accountService;
+
+    public CompletableFuture<Void> processDeposits(AccountResp account,
+                                                   Team currentTeam,
+                                                   Team previousTeam,
+                                                   LocalDate nowDate,
+                                                   LocalDateTime nowDateTime) {
+        return CompletableFuture.runAsync(() -> {
+            if (previousTeam != null && currentTeam.getFirstDepositNum() > previousTeam.getFirstDepositNum()) {
+                PayRecordsListReq req = PayRecordsListReq.builder()
+                        .startDate(nowDate)
+                        .endDate(nowDate)
+                        .pageSize(100)
+                        .payState(2)
+                        .build();
+                CompletableFuture<Pagination<List<PayRecord>>> paginationCompletableFuture = completableFutureWebClientService
+                        .fetchDataForAccount(account, ApiPathConstants.PAY_RECORDS_LIST_URL, req, new ParameterizedTypeReference<>() {
+                        });
+                List<TeamAccountWithChange> changedTeamAccounts = findChangedTeamAccount(previousTeam, currentTeam);
+                Set<String> changedAgentNames = changedTeamAccounts.stream()
+                        .filter(teamAccountWithChange -> teamAccountWithChange.getNewDepositNum() > 0)
+                        .map(TeamAccountWithChange::getAgentName)
+                        .collect(Collectors.toSet());
+                paginationCompletableFuture.thenApply(Pagination::getList).thenAccept(payRecords -> {
+                    Map<String, List<String>> agentNameWithNames = payRecords.stream()
+                            .filter(record -> record.getCreatedAt().isAfter(nowDateTime.minusHours(1)) && changedAgentNames.contains(record.getAgentName()))
+                            .collect(Collectors.groupingBy(PayRecord::getAgentName, Collectors
+                                    .mapping(PayRecord::getName, Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new))));
+
+                    agentNameWithNames.forEach((agentName, names) -> {
+                        StringBuilder depositResults = new StringBuilder();
+                        AtomicInteger depositCounter = new AtomicInteger(0);
+
+                        TeamAccountWithChange targetTeamAccount = changedTeamAccounts.stream()
+                                .filter(teamAccount -> StrUtil.equals(teamAccount.getAgentName(), agentName))
+                                .findFirst()
+                                .orElseThrow(() -> new BusinessException(String.format("can not find agent name %s", agentName)));
+
+                        List<CompletableFuture<Void>> fetchFutures = names.stream()
+                                .map(name -> fetchMemberDetails(account, name, nowDate).thenAccept(member -> {
+                                    payRecords.stream()
+                                            .filter(record -> record.getCreatedAt().equals(member.getFirstPayAt()))
+                                            .findFirst()
+                                            .ifPresent(record -> {
+                                                if (depositCounter.getAndIncrement() < targetTeamAccount.getNewDepositNum()) {
+                                                    depositResults.append(telegramMessageService.buildDepositResultsMessage(member.getName(), record.getScoreAmount()));
+                                                }
+                                            });
+                                }).exceptionally(ex -> {
+                                    log.error("Error fetching details for member {}: {}", name, ex.getMessage());
+                                    return null;
+                                }))
+                                .toList();
+
+                        CompletableFuture<Void> allFetches = CompletableFuture.allOf(fetchFutures.toArray(new CompletableFuture[0]));
+
+                        allFetches.thenRun(() -> {
+                            if (!depositResults.isEmpty()) {
+                                String notification = telegramMessageService.buildDepositMessage(agentName, targetTeamAccount.getNewDepositNum(), depositResults.toString(), targetTeamAccount.getFirstDepositNum());
+                                UserDO currUser = accountService.getUserByAccountUsername(agentName);
+                                telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
+                            }
+                        }).exceptionally(ex -> {
+                            log.error("Error sending notification for account {}: {}", account.getId(), ex.getMessage());
+                            return null;
+                        });
+                    });
+                }).exceptionally(ex -> {
+                    log.error("Error processing deposits for account {}: {}", account.getId(), ex.getMessage());
+                    return null;
+                }).join();
+            }
+        });
+    }
+
+    private CompletableFuture<Member> fetchMemberDetails(AccountResp account, String name, LocalDate nowDate) {
+        TeamMemberListReq memberListReq = TeamMemberListReq.builder()
+                .name(name)
+                .startDate(nowDate)
+                .endDate(nowDate)
+                .status(1)
+                .build();
+
+        CompletableFuture<MemberPagination<List<Member>>> memberFuture = completableFutureWebClientService
+                .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
+                });
+
+        return memberFuture.thenApply(MemberPagination::getList)
+                .thenApply(list -> list.stream().findFirst())
+                .thenCompose(optionalMember -> optionalMember.map(member -> fetchDetailedMemberInfo(account, member
+                        .getId(), nowDate)).orElseThrow(() -> new RuntimeException("没有找到匹配的成员信息")));
+    }
+
+    private CompletableFuture<Member> fetchDetailedMemberInfo(AccountResp account, Long memberId, LocalDate nowDate) {
+        MemberDetailsReq detailsReq = MemberDetailsReq.builder()
+                .id(memberId)
+                .startDate(nowDate)
+                .endDate(nowDate)
+                .build();
+
+        return completableFutureWebClientService
+                .fetchDataForAccount(account, ApiPathConstants.MEMBER_DETAIL_URL, detailsReq, new ParameterizedTypeReference<>() {
+                });
+    }
+
+    private List<TeamAccountWithChange> findChangedTeamAccount(Team prevTeam, Team currTeam) {
+        Map<Long, TeamAccount> team2AccountMap = currTeam.getList()
+                .stream()
+                .collect(Collectors.toMap(TeamAccount::getId, account -> account));
+
+        return prevTeam.getList()
+                .stream()
+                .filter(account1 -> team2AccountMap.containsKey(account1.getId()))
+                .map(account1 -> {
+                    TeamAccount account2 = team2AccountMap.get(account1.getId());
+                    TeamAccountWithChange changedAccount = new TeamAccountWithChange();
+                    BeanUtil.copyProperties(account2, changedAccount);
+                    if (account1.getFirstDepositNum() != account2.getFirstDepositNum()) {
+                        changedAccount.setNewDepositNum(account2.getFirstDepositNum() - account1.getFirstDepositNum());
+                    }
+                    if (account1.getSubMemberNum() != account2.getSubMemberNum()) {
+                        changedAccount.setNewRegisterNum(account2.getSubMemberNum() - account1.getSubMemberNum());
+                    }
+                    return changedAccount;
+                })
+                .collect(Collectors.toList());
+    }
+}
\ No newline at end of file
diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java
deleted file mode 100644
index b23252e6..00000000
--- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.zayac.admin.service;
-
-import com.zayac.admin.system.model.resp.AccountResp;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
-import reactor.core.publisher.Mono;
-import top.continew.starter.core.exception.BusinessException;
-
-import java.util.Map;
-
-@Service
-@Slf4j
-public class LoginService {
-    private final WebClient webClient;
-
-    public LoginService(WebClient.Builder webClientBuilder) {
-        this.webClient = webClientBuilder.baseUrl("http://localhost:8081").build();  // 替换为你的 FastAPI 地址
-    }
-
-    public Mono<String> reLoginAndGetHeaders(AccountResp account) {
-        return webClient.post()
-                .uri("/login")
-                .body(BodyInserters.fromValue(account))
-                .retrieve()
-                .onStatus(status -> !status.is2xxSuccessful(), response ->
-                        Mono.error(new BusinessException("Login failed with status code: " + response.statusCode()))
-                )
-                .bodyToMono(Map.class)
-                .flatMap(response -> {
-                    if (response.isEmpty() || !response.containsKey("headers")) {
-                        log.error("Login response is empty or invalid");
-                        return Mono.error(new BusinessException("Login failed: response is empty or invalid"));
-                    }
-                    String headers = (String) response.get("headers");
-                    return Mono.just(headers);
-                })
-                .doOnError(WebClientResponseException.class, e ->
-                        log.error("Login request failed with error: {}", e.getMessage(), e)
-                )
-                .onErrorMap(WebClientResponseException.class, e ->
-                        new BusinessException("Login request failed", e)
-                );
-    }
-}
diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java
new file mode 100644
index 00000000..268fd05e
--- /dev/null
+++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/RegistrationService.java
@@ -0,0 +1,68 @@
+package com.zayac.admin.service;
+
+import com.zayac.admin.constant.ApiPathConstants;
+import com.zayac.admin.req.team.TeamMemberReq;
+import com.zayac.admin.resp.MemberPagination;
+import com.zayac.admin.resp.team.Team;
+import com.zayac.admin.resp.team.TeamAccount;
+import com.zayac.admin.resp.team.TeamMember;
+import com.zayac.admin.system.model.entity.UserDO;
+import com.zayac.admin.system.model.resp.AccountResp;
+import com.zayac.admin.system.service.AccountService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class RegistrationService {
+    private final CompletableFutureWebClientService completableFutureWebClientService;
+    private final TelegramMessageService notificationService;
+    private final TelegramMessageService telegramMessageService;
+    private final AccountService accountService;
+
+    public CompletableFuture<Void> processRegistration(AccountResp account,
+                                                       Team currentTeamInfo,
+                                                       Team prevTeamInfo,
+                                                       LocalDate nowDate,
+                                                       Map<String, TeamAccount> teamAccountMap) {
+        if (prevTeamInfo != null && currentTeamInfo.getSubMemberCount() > prevTeamInfo.getSubMemberCount()) {
+            int registerCount = currentTeamInfo.getSubMemberCount() - prevTeamInfo.getSubMemberCount();
+            TeamMemberReq memberListReq = TeamMemberReq.builder()
+                    .registerStartDate(nowDate)
+                    .registerEndDate(nowDate)
+                    .startDate(nowDate)
+                    .endDate(nowDate)
+                    .registerSort(1)
+                    .pageSize(registerCount)
+                    .build();
+            CompletableFuture<MemberPagination<List<TeamMember>>> memberPaginationCompletableFuture = completableFutureWebClientService
+                    .fetchDataForAccount(account, ApiPathConstants.MEMBER_TEAM_LIST_URL, memberListReq, new ParameterizedTypeReference<>() {
+                    });
+            return memberPaginationCompletableFuture.thenApply(MemberPagination::getList).thenAccept(members -> {
+                log.info("Successfully get {} new registered members", members.size());
+                if (!members.isEmpty()) {
+                    Map<String, List<TeamMember>> groupByTopAgentName = members.stream()
+                            .collect(Collectors.groupingBy(TeamMember::getTopAgentName));
+                    groupByTopAgentName.forEach((accountName, accountMembers) -> {
+                        String notification = telegramMessageService.buildRegistrationMessage(accountName, accountMembers, teamAccountMap.get(accountName));
+                        UserDO currUser = accountService.getUserByAccountUsername(accountName);
+                        //                                if (currUser != null && DisEnableStatusEnum.ENABLE.equals(currUser.getNeedNotify())) {
+                        //                                    telegramMessageService.sendMessage(currUser.getBotToken(), currUser.getGroupId(), notification);
+                        //                                }
+                        telegramMessageService.sendMessage("6013830443:AAHUOS4v6Ln19ziZkH-L28-HZQLJrGcvhto", 6054562838L, notification);
+                    });
+                }
+            });
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java
index 2b156aab..9e74c8b1 100644
--- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java
+++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/TelegramMessageService.java
@@ -16,12 +16,16 @@
 
 package com.zayac.admin.service;
 
+import com.zayac.admin.resp.team.TeamAccount;
+import com.zayac.admin.resp.team.TeamMember;
 import lombok.RequiredArgsConstructor;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Service;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @Service
 @RequiredArgsConstructor
@@ -36,10 +40,25 @@ public class TelegramMessageService {
     private static String escapeMarkdown(String text) {
 
         List<Character> escapeChars = Arrays
-            .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!');
+                .asList('_', '[', ']', '(', ')', '~', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!');
         for (Character charItem : escapeChars) {
             text = text.replace(charItem.toString(), "\\" + charItem);
         }
         return text;
     }
+
+    public String buildRegistrationMessage(String accountName, List<TeamMember> accountMembers, TeamAccount teamAccount) {
+        String memberNames = accountMembers.stream()
+                .map(TeamMember::getName)
+                .collect(Collectors.joining(", "));
+        return String.format("👏 %s 注册: %d 用户: `%s` 总数:*%d*", accountName, accountMembers.size(), memberNames, teamAccount.getSubMemberNum());
+    }
+
+    public String buildDepositMessage(String agentName, int newDepositNum, String depositResults, int firstDepositNum) {
+        return String.format("🎉 %s 首存: *%d* %s 总数: *%d*", agentName, newDepositNum, depositResults, firstDepositNum);
+    }
+
+    public String buildDepositResultsMessage(String name, BigDecimal scoreAmount) {
+        return String.format("用户: `%s`, 首存金额: *%s*\n", name, scoreAmount);
+    }
 }