加入了限流器,现在不会出现查询失败的情况了

This commit is contained in:
zayac 2024-06-02 16:58:22 +08:00
parent 7c25929383
commit 6975ef0041
7 changed files with 124 additions and 28 deletions

View File

@ -1,6 +1,9 @@
;[Credentials]
;username = zayac
;password = 123456
[Credentials]
username = zayac
password = 123456
username = luffy
password = luffy230505
[Minimum]
minimum = True

View File

@ -1,5 +1,5 @@
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List
@ -12,7 +12,7 @@ from src.entity.account import Account
from src.entity.member import get_today_new_member_list
from src.entity.pay_record import get_latest_deposit_user
from src.entity.user import User
from src.ui.thread_pool_manager import global_thread_pool
from src.ui.thread_pool_manager import global_thread_pool, rate_limiter
@dataclass
@ -84,5 +84,18 @@ def query_banner_info(account: Account):
def get_banner_info_by_user(user: User) -> List[BannerInfo]:
futures = [global_thread_pool.submit(get_banner_info, account) for account in user.accounts]
return [future.result() for future in futures]
futures = []
for account in user.accounts:
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_banner_info, account))
banner_info_list = []
for future in as_completed(futures):
try:
banner_info = future.result()
banner_info_list.append(banner_info)
logger.info(f"Task completed for account: {banner_info.agentCode}")
except Exception as e:
logger.error(f"Error in future result: {e}")
return banner_info_list

View File

@ -59,4 +59,5 @@ class Database:
session.commit()
db = Database('mysql+mysqlconnector://ky_tools:HJQY35seXen8patn@1panel.stupidpz.com:3306/ky_tools')
# db = Database('mysql+mysqlconnector://ky_tools:HJQY35seXen8patn@1panel.stupidpz.com:3306/ky_tools')
db = Database('mysql+mysqlconnector://www_luffy_tool:GrCDtSynbK5MHb48@110.40.20.148:3306/www_luffy_tool')

View File

@ -1,4 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from decimal import Decimal
from typing import List
@ -11,7 +11,7 @@ from src.core.constant import FINANCE_URL
from src.core.util import get_curr_day, get_first_day_by_str
from src.entity.account import Account
from src.entity.user import User, get_user_by_telegram_id
from src.ui.thread_pool_manager import global_thread_pool
from src.ui.thread_pool_manager import global_thread_pool, rate_limiter
'''
财务报表
@ -58,8 +58,20 @@ def get_finances_by_user(user: User, date) -> List[Finance]:
accounts = user.accounts
start_date = util.get_first_day_by_str(date)
futures = [global_thread_pool.submit(get_finance, account, start_date, date) for account in accounts]
return [future.result() for future in futures]
futures = []
for account in accounts:
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_finance, account, start_date, date))
finance_list = []
for future in as_completed(futures):
try:
finance = future.result()
finance_list.append(finance)
except Exception as e:
logger.error(f"Error in future result: {e}")
return finance_list
def get_net_win_by_user(user: User, date: str) -> str:

View File

@ -10,7 +10,7 @@ from src.entity.account import Account
from src.entity.member import (MemberList, async_get_member_detail_by_name,
get_member_by_name, get_member_list)
from src.entity.user import User, get_user_by_telegram_id, get_user_by_username_and_password
from src.ui.thread_pool_manager import global_thread_pool
from src.ui.thread_pool_manager import global_thread_pool, rate_limiter
@dataclass
@ -50,7 +50,7 @@ def get_pay_record(account: Account):
return [PayRecord(**item) for item in api_response.data['list']]
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import as_completed
from datetime import datetime
@ -68,7 +68,11 @@ def get_latest_deposit_user(account: Account, count: int):
# 开启多线程根据用户名查询所有数据
results = []
futures = [global_thread_pool.submit(get_member_by_name, account, name) for name in unique_names_within_time]
futures = []
for name in unique_names_within_time:
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_member_by_name, account, name))
for future in as_completed(futures):
try:
result = future.result()
@ -179,7 +183,10 @@ def get_pay_record_list(account: Account, date: str) -> Dict[str, List[str]]:
}
member_list = get_member_list(account, params)
if member_list is not None and len(member_list) > 0:
futures = [global_thread_pool.submit(get_pay_record_detail, account, member, date) for member in member_list]
futures = []
for member in member_list:
rate_limiter.acquire() # 确保每个任务的速率限制
futures.append(global_thread_pool.submit(get_pay_record_detail, account, member, date))
for future in futures:
result = future.result()
if result:
@ -206,7 +213,10 @@ def get_pay_record_detail(account: Account, member: MemberList, date: str) -> Op
def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
logger.info(f'Getting pay failed by user: {user.username}')
futures = [global_thread_pool.submit(get_pay_record_list, account, date) for account in user.accounts]
futures = []
for account in user.accounts:
rate_limiter.acquire() # 确保每个任务的速率限制
futures.append(global_thread_pool.submit(get_pay_record_list, account, date))
# 使用列表推导式构建结果字符串
text_lines = [
@ -214,7 +224,7 @@ def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
for future in futures if (res := future.result())['names']
]
text = '\n'.join(text_lines)
text = '\n\n'.join(text_lines)
if not text:
logger.info('无存款失败用户')
@ -224,9 +234,34 @@ def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
return text
# def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
# logger.info(f'Getting pay failed by user: {user.username}')
# rate_limiter.acquire()
# futures = [global_thread_pool.submit(get_pay_record_list, account, date) for account in user.accounts]
#
# # 使用列表推导式构建结果字符串
# text_lines = [
# "{}\n{}".format(res['name'], '\n'.join(res['names']))
# for future in futures if (res := future.result())['names']
# ]
#
# text = '\n'.join(text_lines)
#
# if not text:
# logger.info('无存款失败用户')
# return '无存款失败用户'
#
# logger.info(text)
# return text
def get_pay_failed_by_telegram_id(telegram_id: int) -> Optional[str]:
user = get_user_by_telegram_id(telegram_id)
futures = [global_thread_pool.submit(get_pay_record_list, account, get_curr_day()) for account in user.accounts]
futures = []
for account in user.accounts:
rate_limiter.acquire() # 确保每个任务的速率限制
futures.append(global_thread_pool.submit(get_pay_record_list, account, get_curr_day()))
# 使用列表推导式构建结果字符串
text_lines = [
@ -234,7 +269,7 @@ def get_pay_failed_by_telegram_id(telegram_id: int) -> Optional[str]:
for future in futures if (res := future.result())['names']
]
text = '\n'.join(text_lines)
text = '\n\n'.join(text_lines)
if not text:
logger.info('无存款失败用户')

View File

@ -9,7 +9,7 @@ from src.core.constant import VISUAL_LIST_URL
from src.core.util import get_curr_day, get_curr_month
from src.entity.account import Account
from src.entity.user import User, get_user_by_telegram_id
from src.ui.thread_pool_manager import global_thread_pool
from src.ui.thread_pool_manager import global_thread_pool, rate_limiter
# 视图列表对象 对应界面上的图表
@ -103,8 +103,10 @@ def get_statics(account, date=get_curr_day()) -> VisualInfo:
def count_by_user(user: User, date: str):
accounts = user.accounts
futures = [global_thread_pool.submit(get_statics, account, date) for account in accounts]
futures = []
for account in accounts:
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_statics, account, date))
return [future.result() for future in futures]

View File

@ -1,23 +1,53 @@
# threadpool_manager.py
from PyQt6.QtCore import QThreadPool
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 创建一个全局的线程池实例
pyqt_thread_pool = QThreadPool.globalInstance()
from concurrent.futures import ThreadPoolExecutor
class ThreadPoolManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ThreadPoolManager, cls).__new__(cls, *args, **kwargs)
cls._instance.thread_pool = ThreadPoolExecutor(max_workers=5)
with cls._lock:
if not cls._instance:
cls._instance = super(ThreadPoolManager, cls).__new__(cls)
max_workers = kwargs.get('max_workers', 5)
cls._instance.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
return cls._instance
def get_thread_pool(self):
return self.thread_pool
global_thread_pool = ThreadPoolManager().get_thread_pool()
class RateLimiter:
def __init__(self, rate: int, per: float):
self._rate = rate
self._per = per
self._allowance = rate
self._last_check = time.monotonic()
self._lock = threading.Lock()
def acquire(self):
with self._lock:
current = time.monotonic()
time_passed = current - self._last_check
self._last_check = current
self._allowance += time_passed * (self._rate / self._per)
if self._allowance > self._rate:
self._allowance = self._rate
if self._allowance < 1.0:
sleep_time = (1.0 - self._allowance) * (self._per / self._rate)
time.sleep(sleep_time)
self._allowance = 0
return
self._allowance -= 1.0
# 创建全局实例
global_thread_pool = ThreadPoolManager(max_workers=20).get_thread_pool()
rate_limiter = RateLimiter(rate=5, per=1.0)