优化了输出结果顺序不一致的问题

This commit is contained in:
zayac 2024-06-02 23:32:21 +08:00
parent 6975ef0041
commit 1b18df5258
4 changed files with 62 additions and 28 deletions

View File

@ -85,17 +85,22 @@ def query_banner_info(account: Account):
def get_banner_info_by_user(user: User) -> List[BannerInfo]:
futures = []
for account in user.accounts:
for index, account in enumerate(user.accounts):
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_banner_info, account))
future = global_thread_pool.submit(get_banner_info, account)
futures.append((index, future))
banner_info_list = []
for future in as_completed(futures):
# 初始化一个与 futures 长度相同的列表来存储结果
results = [None] * len(futures)
for index, future in futures:
try:
banner_info = future.result()
banner_info_list.append(banner_info)
results[index] = banner_info
logger.info(f"Task completed for account: {banner_info.agentCode}")
except Exception as e:
logger.error(f"Error in future result: {e}")
# 过滤掉 None 结果
banner_info_list = [result for result in results if result is not None]
return banner_info_list

View File

@ -54,22 +54,27 @@ def get_finance(account: Account, start_date=util.get_first_day_month(), end_dat
return finance
def get_finances_by_user(user: User, date) -> List[Finance]:
def get_finances_by_user(user: User, date: str) -> List[Finance]:
accounts = user.accounts
start_date = util.get_first_day_by_str(date)
futures = []
for account in accounts:
# 使用字典来映射 future 到其索引位置
future_to_index = {}
for index, account in enumerate(accounts):
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_finance, account, start_date, date))
future = global_thread_pool.submit(get_finance, account, start_date, date)
future_to_index[future] = index
finance_list = []
for future in as_completed(futures):
# 初始化一个与 accounts 大小相同的列表,用于按顺序存储结果
finance_list = [None] * len(accounts)
for future in as_completed(future_to_index):
index = future_to_index[future]
try:
finance = future.result()
finance_list.append(finance)
finance_list[index] = finance
except Exception as e:
logger.error(f"Error in future result: {e}")
finance_list[index] = Finance() # 或者其他默认值
return finance_list

View File

@ -67,16 +67,18 @@ def get_latest_deposit_user(account: Account, count: int):
logger.debug(f"获取到{len(unique_names_within_time)}个成功存款人数")
# 开启多线程根据用户名查询所有数据
results = []
futures = []
for name in unique_names_within_time:
for index, name in enumerate(unique_names_within_time):
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_member_by_name, account, name))
future = global_thread_pool.submit(get_member_by_name, account, name)
futures.append((index, future))
for future in as_completed(futures):
# 初始化一个与 futures 长度相同的列表来存储结果
results = [None] * len(futures)
for index, future in futures:
try:
result = future.result()
results.append(result)
results[index] = result
except Exception as e:
logger.debug(f'查询失败:{e}')
@ -184,13 +186,21 @@ def get_pay_record_list(account: Account, date: str) -> Dict[str, List[str]]:
member_list = get_member_list(account, params)
if member_list is not None and len(member_list) > 0:
futures = []
for member in member_list:
for index, member in enumerate(member_list):
rate_limiter.acquire() # 确保每个任务的速率限制
futures.append(global_thread_pool.submit(get_pay_record_detail, account, member, date))
for future in futures:
future = global_thread_pool.submit(get_pay_record_detail, account, member, date)
futures.append((index, future))
# 按照索引顺序收集结果
results = [None] * len(futures)
for index, future in futures:
result = future.result()
if result:
_names['names'].append(result)
results[index] = result
# 过滤掉 None 结果
_names['names'] = [res for res in results if res is not None]
logger.info(f'Finished getting pay record list for account: {account.name} and date: {date}')
return _names
@ -214,14 +224,22 @@ def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
logger.info(f'Getting pay failed by user: {user.username}')
futures = []
for account in user.accounts:
for index, account in enumerate(user.accounts):
rate_limiter.acquire() # 确保每个任务的速率限制
futures.append(global_thread_pool.submit(get_pay_record_list, account, date))
future = global_thread_pool.submit(get_pay_record_list, account, date)
futures.append((index, future))
# 初始化一个与 futures 长度相同的列表来存储结果
results = [None] * len(futures)
for index, future in futures:
res = future.result()
if res['names']:
results[index] = res
# 使用列表推导式构建结果字符串
text_lines = [
"{}\n{}".format(res['name'], '\n'.join(res['names']))
for future in futures if (res := future.result())['names']
for res in results if res is not None
]
text = '\n\n'.join(text_lines)
@ -230,7 +248,6 @@ def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
logger.info('无存款失败用户')
return '无存款失败用户'
logger.info(text)
return text

View File

@ -104,10 +104,17 @@ def get_statics(account, date=get_curr_day()) -> VisualInfo:
def count_by_user(user: User, date: str):
accounts = user.accounts
futures = []
for account in accounts:
for index, account in enumerate(accounts):
rate_limiter.acquire()
futures.append(global_thread_pool.submit(get_statics, account, date))
return [future.result() for future in futures]
future = global_thread_pool.submit(get_statics, account, date)
futures.append((index, future))
# 初始化一个与 futures 长度相同的列表来存储结果
results = [None] * len(futures)
for index, future in futures:
results[index] = future.result()
return results
def text_count_by_user(user: User, date: str) -> str: