diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java index d8f32f49..b68adc38 100644 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/schedule/TelegramTeamMessageSchedule.java @@ -65,13 +65,16 @@ public class TelegramTeamMessageSchedule { private final RoleService roleService; private final UserRoleService userRoleService; - @Scheduled(fixedDelay = 60000) + private static final String MINISTER_ROLE_CODE = "minister"; + private static final long FIXED_DELAY = 60000L; + + @Scheduled(fixedDelay = FIXED_DELAY) public void newCheckRegistrationAndNewDeposit() { // Get the current date and time LocalDate nowDate = LocalDate.now(); LocalDateTime nowDateTime = LocalDateTime.now(); //获取部长角色 - RoleDO minister = roleService.getByCode("minister"); + RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE); List users = userRoleService.listUserIdByRoleId(minister.getId()); //获取所有部长角色对应部门的用户 users.forEach(userId -> { @@ -102,8 +105,8 @@ public class TelegramTeamMessageSchedule { return CompletableFuture.allOf(teamFuture).thenCompose(v -> { Team currentTeamInfo = teamFuture.join(); - log.info("prev:{}", prevTeamInfo); - log.info("curr:{}", currentTeamInfo); + log.info("Previous Team Info: {}", prevTeamInfo); + log.info("Current Team Info: {}", currentTeamInfo); //获取代理线 代理对象映射 Map teamAccountMap = currentTeamInfo.getList() .stream() diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java index 563d1c57..c214244b 100644 --- a/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/CompletableFutureWebClientService.java @@ -1,19 +1,3 @@ -/* - * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.zayac.admin.service; import cn.hutool.core.lang.TypeReference; @@ -21,6 +5,7 @@ import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.databind.ObjectMapper; import com.zayac.admin.resp.ApiResponse; import com.zayac.admin.system.model.resp.AccountResp; +import com.zayac.admin.system.service.AccountService; import io.netty.handler.timeout.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -47,97 +32,91 @@ public class CompletableFutureWebClientService { private final WebClient webClient; private final Semaphore semaphore; private final ObjectMapper objectMapper; + private final LoginService loginService; + + private final AccountService accountService; public CompletableFutureWebClientService(WebClient.Builder webClientBuilder, @Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, + LoginService loginService, + AccountService accountService) { this.webClient = webClientBuilder.build(); this.semaphore = new Semaphore(maxConcurrentRequests); this.objectMapper = objectMapper; + this.loginService = loginService; + this.accountService = accountService; } public CompletableFuture fetchDataForAccount(AccountResp account, String apiPath, Object params, ParameterizedTypeReference> typeRef) { - // 创建一个CompletableFuture列表,每个元素对应一个API请求 - return this.fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef); + return this.fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture(); } - // public CompletableFuture fetchData(String url, String headers, Object params, ParameterizedTypeReference> typeRef) { - // webClient.mutate() - // .filters(filters -> filters.add(0, new ApiResponseHandler<>(typeRef))) // 添加到过滤器链的开始 - // .build(); - // System.out.println(webClient); - // return webClient.post() - // .uri(url) - // - // .headers(httpHeaders -> { - // Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - // }, true); - // headerMap.forEach(httpHeaders::add); - // }) - // .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - // .retrieve() - // .bodyToMono(typeRef) - // .map(ApiResponse::data) - // .doFinally(signal -> semaphore.release()) - // .toFuture(); - // } - - public CompletableFuture fetchData(String url, - String headers, - Object params, - ParameterizedTypeReference> typeRef) { + public Mono fetchData(String url, + String headers, + Object params, + ParameterizedTypeReference> typeRef, + AccountResp account) { return Mono.fromCallable(() -> { - if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { // 使用非阻塞方式获取许可 + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { throw new RuntimeException("Unable to acquire a permit"); } return true; - }).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> { - Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { - }, true); - headerMap.forEach(httpHeaders::add); - }) - .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) - .retrieve() - .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) - .bodyToMono(String.class) - .doOnNext(resStr -> { - log.info("request url:{}", url); - log.info("request headers :{}", headers); - log.info("request params:{}", params); - log.info("response {}", resStr); - }) - .flatMap(body -> { - try { - ApiResponse apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() - .constructType(typeRef.getType())); - return Mono.justOrEmpty(apiResponse); - } catch (Exception e) { - log.warn("JSON parsing exception: " + e.getMessage()); - return Mono.just(new ApiResponse(null, "Decoding error", 6008)); - } - }) - .flatMap(this::respHandler) - .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))) - .doFinally(signal -> semaphore.release()) - .toFuture(); + }).subscribeOn(Schedulers.boundedElastic()).then( + this.webClient.post().uri(url).headers(httpHeaders -> { + Map headerMap = JSONUtil.toBean(headers, new TypeReference<>() { + }, true); + headerMap.forEach(httpHeaders::add); + }) + .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty()) + .retrieve() + .onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed"))) + .bodyToMono(String.class) + .doOnNext(resStr -> { + log.info("request url:{}", url); + log.info("request headers :{}", headers); + log.info("request params:{}", params); + log.info("response {}", resStr); + }) + .flatMap(body -> { + try { + ApiResponse apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory() + .constructType(typeRef.getType())); + return Mono.justOrEmpty(apiResponse); + } catch (Exception e) { + log.warn("JSON parsing exception: " + e.getMessage()); + return Mono.just(new ApiResponse(null, "Decoding error", 6008)); + } + }) + .flatMap(response -> respHandler(response, url, params, typeRef, account)) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException)) + ).doFinally(signal -> semaphore.release()); } private boolean isRetryableException(Throwable throwable) { - // 判断错误类型,只对特定的错误进行重试 return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable; } - private Mono respHandler(ApiResponse response) { + private Mono respHandler(ApiResponse response, String url, Object params, + ParameterizedTypeReference> typeRef, AccountResp account) { if (response.getStatusCode().equals(6000)) { return Mono.just(response.getData()); } else if (response.getStatusCode().equals(6001)) { - // 重新登录逻辑或其他处理 - return Mono.error(new BusinessException("API token expired")); + return loginService.reLoginAndGetHeaders(account) + .flatMap(newHeaders -> { + // 更新 account 对象中的 headers + account.setHeaders(newHeaders); + accountService.updateHeaders(newHeaders, account.getId()); + // 重新发送原始请求 + return fetchData(url, newHeaders, params, typeRef, account) + .doOnNext(data -> log.info("Retried request successful")); + }) + .onErrorResume(e -> Mono.error(new BusinessException("Re-login failed: " + e.getMessage()))); } else { return Mono.error(new BusinessException("Error status code: " + response.getStatusCode())); } } -} \ No newline at end of file +} diff --git a/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java b/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java new file mode 100644 index 00000000..f475ce55 --- /dev/null +++ b/zayac-admin-backend/src/main/java/com/zayac/admin/service/LoginService.java @@ -0,0 +1,28 @@ +package com.zayac.admin.service; + +import com.zayac.admin.system.model.resp.AccountResp; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +/** + * api_token失效的时候执行的逻辑 + */ +@Service +public class LoginService { + private final WebClient webClient; + + public LoginService(WebClient.Builder webClientBuilder) { + this.webClient = webClientBuilder.baseUrl("http://localhost:8000").build(); // 替换为你的 FastAPI 地址 + } + + public Mono reLoginAndGetHeaders(AccountResp account) { + return webClient.post() + .uri("/login") + .body(BodyInserters.fromValue(account)) + .retrieve() + .bodyToMono(String.class) + .map(response -> response); + } +} diff --git a/zayac-admin-system/src/main/java/com/zayac/admin/system/service/AccountService.java b/zayac-admin-system/src/main/java/com/zayac/admin/system/service/AccountService.java index acae3141..fdb86f44 100644 --- a/zayac-admin-system/src/main/java/com/zayac/admin/system/service/AccountService.java +++ b/zayac-admin-system/src/main/java/com/zayac/admin/system/service/AccountService.java @@ -17,6 +17,7 @@ package com.zayac.admin.system.service; import com.zayac.admin.common.enums.DisEnableStatusEnum; +import com.zayac.admin.system.model.entity.AccountDO; import com.zayac.admin.system.model.entity.UserDO; import com.zayac.admin.system.model.query.AccountQuery; import com.zayac.admin.system.model.req.AccountReq; @@ -36,4 +37,6 @@ public interface AccountService extends BaseService getAccountsByUserId(Long id, DisEnableStatusEnum status); UserDO getUserByAccountUsername(String username); + + void updateHeaders(String headers,Long id); } \ No newline at end of file diff --git a/zayac-admin-system/src/main/java/com/zayac/admin/system/service/impl/AccountServiceImpl.java b/zayac-admin-system/src/main/java/com/zayac/admin/system/service/impl/AccountServiceImpl.java index 466b55c5..1acbcb05 100644 --- a/zayac-admin-system/src/main/java/com/zayac/admin/system/service/impl/AccountServiceImpl.java +++ b/zayac-admin-system/src/main/java/com/zayac/admin/system/service/impl/AccountServiceImpl.java @@ -33,6 +33,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; +import top.continew.starter.core.util.validate.CheckUtils; import top.continew.starter.extension.crud.service.impl.BaseServiceImpl; import java.util.List; @@ -67,4 +68,9 @@ public class AccountServiceImpl extends BaseServiceImpl userService.getById(account.getId())) .orElse(null); } + + @Override + public void updateHeaders(String headers, Long id) { + baseMapper.lambdaUpdate().set(AccountDO::getHeaders, headers).eq(AccountDO::getId, id).update(); + } } \ No newline at end of file