添加了自动登录模块

This commit is contained in:
zayac 2024-05-25 21:07:56 +08:00
parent 1837a7c1ae
commit eb3f82d7c3
5 changed files with 102 additions and 83 deletions

View File

@ -65,13 +65,16 @@ public class TelegramTeamMessageSchedule {
private final RoleService roleService;
private final UserRoleService userRoleService;
@Scheduled(fixedDelay = 60000)
private static final String MINISTER_ROLE_CODE = "minister";
private static final long FIXED_DELAY = 60000L;
@Scheduled(fixedDelay = FIXED_DELAY)
public void newCheckRegistrationAndNewDeposit() {
// Get the current date and time
LocalDate nowDate = LocalDate.now();
LocalDateTime nowDateTime = LocalDateTime.now();
//获取部长角色
RoleDO minister = roleService.getByCode("minister");
RoleDO minister = roleService.getByCode(MINISTER_ROLE_CODE);
List<Long> users = userRoleService.listUserIdByRoleId(minister.getId());
//获取所有部长角色对应部门的用户
users.forEach(userId -> {
@ -102,8 +105,8 @@ public class TelegramTeamMessageSchedule {
return CompletableFuture.allOf(teamFuture).thenCompose(v -> {
Team currentTeamInfo = teamFuture.join();
log.info("prev:{}", prevTeamInfo);
log.info("curr:{}", currentTeamInfo);
log.info("Previous Team Info: {}", prevTeamInfo);
log.info("Current Team Info: {}", currentTeamInfo);
//获取代理线 代理对象映射
Map<String, TeamAccount> teamAccountMap = currentTeamInfo.getList()
.stream()

View File

@ -1,19 +1,3 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zayac.admin.service;
import cn.hutool.core.lang.TypeReference;
@ -21,6 +5,7 @@ import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zayac.admin.resp.ApiResponse;
import com.zayac.admin.system.model.resp.AccountResp;
import com.zayac.admin.system.service.AccountService;
import io.netty.handler.timeout.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -47,97 +32,91 @@ public class CompletableFutureWebClientService {
private final WebClient webClient;
private final Semaphore semaphore;
private final ObjectMapper objectMapper;
private final LoginService loginService;
private final AccountService accountService;
public CompletableFutureWebClientService(WebClient.Builder webClientBuilder,
@Value("${webclient.max-concurrent-requests}") int maxConcurrentRequests,
ObjectMapper objectMapper) {
ObjectMapper objectMapper,
LoginService loginService,
AccountService accountService) {
this.webClient = webClientBuilder.build();
this.semaphore = new Semaphore(maxConcurrentRequests);
this.objectMapper = objectMapper;
this.loginService = loginService;
this.accountService = accountService;
}
public <T> CompletableFuture<T> fetchDataForAccount(AccountResp account,
String apiPath,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
// 创建一个CompletableFuture列表每个元素对应一个API请求
return this.fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef);
return this.fetchData(account.getPlatformUrl() + apiPath, account.getHeaders(), params, typeRef, account).toFuture();
}
// public <T> CompletableFuture<T> fetchData(String url, String headers, Object params, ParameterizedTypeReference<ApiResponse<T>> typeRef) {
// webClient.mutate()
// .filters(filters -> filters.add(0, new ApiResponseHandler<>(typeRef))) // 添加到过滤器链的开始
// .build();
// System.out.println(webClient);
// return webClient.post()
// .uri(url)
//
// .headers(httpHeaders -> {
// Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
// }, true);
// headerMap.forEach(httpHeaders::add);
// })
// .body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
// .retrieve()
// .bodyToMono(typeRef)
// .map(ApiResponse::data)
// .doFinally(signal -> semaphore.release())
// .toFuture();
// }
public <T> CompletableFuture<T> fetchData(String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef) {
public <T> Mono<T> fetchData(String url,
String headers,
Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef,
AccountResp account) {
return Mono.fromCallable(() -> {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { // 使用非阻塞方式获取许可
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Unable to acquire a permit");
}
return true;
}).subscribeOn(Schedulers.boundedElastic()).then(this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
.bodyToMono(String.class)
.doOnNext(resStr -> {
log.info("request url:{}", url);
log.info("request headers :{}", headers);
log.info("request params:{}", params);
log.info("response {}", resStr);
})
.flatMap(body -> {
try {
ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory()
.constructType(typeRef.getType()));
return Mono.justOrEmpty(apiResponse);
} catch (Exception e) {
log.warn("JSON parsing exception: " + e.getMessage());
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
}
})
.flatMap(this::respHandler)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException)))
.doFinally(signal -> semaphore.release())
.toFuture();
}).subscribeOn(Schedulers.boundedElastic()).then(
this.webClient.post().uri(url).headers(httpHeaders -> {
Map<String, String> headerMap = JSONUtil.toBean(headers, new TypeReference<>() {
}, true);
headerMap.forEach(httpHeaders::add);
})
.body(params != null ? BodyInserters.fromValue(params) : BodyInserters.empty())
.retrieve()
.onStatus(HttpStatusCode::isError, response -> Mono.error(new BusinessException("API call failed")))
.bodyToMono(String.class)
.doOnNext(resStr -> {
log.info("request url:{}", url);
log.info("request headers :{}", headers);
log.info("request params:{}", params);
log.info("response {}", resStr);
})
.flatMap(body -> {
try {
ApiResponse<T> apiResponse = objectMapper.readValue(body, objectMapper.getTypeFactory()
.constructType(typeRef.getType()));
return Mono.justOrEmpty(apiResponse);
} catch (Exception e) {
log.warn("JSON parsing exception: " + e.getMessage());
return Mono.just(new ApiResponse<T>(null, "Decoding error", 6008));
}
})
.flatMap(response -> respHandler(response, url, params, typeRef, account))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(this::isRetryableException))
).doFinally(signal -> semaphore.release());
}
private boolean isRetryableException(Throwable throwable) {
// 判断错误类型只对特定的错误进行重试
return throwable instanceof TimeoutException || throwable instanceof WebClientResponseException.ServiceUnavailable;
}
private <T> Mono<T> respHandler(ApiResponse<T> response) {
private <T> Mono<T> respHandler(ApiResponse<T> response, String url, Object params,
ParameterizedTypeReference<ApiResponse<T>> typeRef, AccountResp account) {
if (response.getStatusCode().equals(6000)) {
return Mono.just(response.getData());
} else if (response.getStatusCode().equals(6001)) {
// 重新登录逻辑或其他处理
return Mono.error(new BusinessException("API token expired"));
return loginService.reLoginAndGetHeaders(account)
.flatMap(newHeaders -> {
// 更新 account 对象中的 headers
account.setHeaders(newHeaders);
accountService.updateHeaders(newHeaders, account.getId());
// 重新发送原始请求
return fetchData(url, newHeaders, params, typeRef, account)
.doOnNext(data -> log.info("Retried request successful"));
})
.onErrorResume(e -> Mono.error(new BusinessException("Re-login failed: " + e.getMessage())));
} else {
return Mono.error(new BusinessException("Error status code: " + response.getStatusCode()));
}
}
}
}

View File

@ -0,0 +1,28 @@
package com.zayac.admin.service;
import com.zayac.admin.system.model.resp.AccountResp;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
/**
* api_token失效的时候执行的逻辑
*/
@Service
public class LoginService {
private final WebClient webClient;
public LoginService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("http://localhost:8000").build(); // 替换为你的 FastAPI 地址
}
public Mono<String> reLoginAndGetHeaders(AccountResp account) {
return webClient.post()
.uri("/login")
.body(BodyInserters.fromValue(account))
.retrieve()
.bodyToMono(String.class)
.map(response -> response);
}
}

View File

@ -17,6 +17,7 @@
package com.zayac.admin.system.service;
import com.zayac.admin.common.enums.DisEnableStatusEnum;
import com.zayac.admin.system.model.entity.AccountDO;
import com.zayac.admin.system.model.entity.UserDO;
import com.zayac.admin.system.model.query.AccountQuery;
import com.zayac.admin.system.model.req.AccountReq;
@ -36,4 +37,6 @@ public interface AccountService extends BaseService<AccountResp, AccountDetailRe
List<AccountResp> getAccountsByUserId(Long id, DisEnableStatusEnum status);
UserDO getUserByAccountUsername(String username);
void updateHeaders(String headers,Long id);
}

View File

@ -33,6 +33,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import top.continew.starter.core.util.validate.CheckUtils;
import top.continew.starter.extension.crud.service.impl.BaseServiceImpl;
import java.util.List;
@ -67,4 +68,9 @@ public class AccountServiceImpl extends BaseServiceImpl<AccountMapper, AccountDO
.map(account -> userService.getById(account.getId()))
.orElse(null);
}
@Override
public void updateHeaders(String headers, Long id) {
baseMapper.lambdaUpdate().set(AccountDO::getHeaders, headers).eq(AccountDO::getId, id).update();
}
}