import asyncio from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import datetime from typing import Dict, List, Optional from src import logger from src.core import api_request, util from src.core.constant import PAY_RECORD_LIST_URL, PAY_RECORD_URL from src.core.util import get_curr_day, get_first_day_month from src.entity.account import Account from src.entity.member import (MemberList, async_get_member_detail_by_name, get_member_by_name, get_member_list) from src.entity.user import User, get_user_by_telegram_id, get_user_by_username_and_password @dataclass class PayRecord(object): billNo: str name: str agentName: str orderAmount: str rebateAmount: str flowRatio: str payType: int payName: str recipientAccount: str createdAt: str payStatus: int payStatusName: str whetherGetCard: str topId: int scoreAmount: str # 根据用户查询最新存款信息 def get_pay_record(account: Account): logger.info(f'Getting pay record list for account: {account.name}') # 获取当前成功存款的用户 params = { "memberName": "", "payState": 2, "isRest": False, "pageNum": 1, "pageSize": 100, "startDate": get_curr_day(), "endDate": get_curr_day() } api_response = api_request.account_post(PAY_RECORD_URL, account, params) logger.info(f'Finished getting pay record list for account: {account.name}') return [PayRecord(**item) for item in api_response.data['list']] from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime def get_latest_deposit_user(account: Account, count: int): logger.info(f'Getting latest deposit user for account: {account.name} and count: {count}') pay_record_list = get_pay_record(account) # 提取所有用户名,并确保在指定时间内 now = datetime.now() unique_names_within_time = { item.name for item in pay_record_list if util.get_difference(item.createdAt, now) < 7200 } logger.debug(f"获取到{len(unique_names_within_time)}个成功存款人数") # 开启多线程根据用户名查询所有数据 results = [] with ThreadPoolExecutor(max_workers=min(len(unique_names_within_time), 20)) as executor: # 限制最大工作线程数 futures = [executor.submit(get_member_by_name, account, name) for name in unique_names_within_time] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: logger.debug(f'查询失败:{e}') # 筛选出有有效结果的成员,并按首次付款时间降序排序 valid_results = [result for result in results if result is not None] sorted_members = sorted(valid_results, key=lambda member_detail: member_detail.get_first_pay_datetime(), reverse=True) # 截取前n个 selected_members = sorted_members[:count] # 构建时间和金额的映射字典 record_dict = {record.createdAt: record.scoreAmount for record in pay_record_list} member_details = [] for detail in selected_members: # 检查 detail.firstPayAt 是否在 record_dict 中,避免 KeyError if detail.firstPayAt in record_dict: detail.deposit = record_dict[detail.firstPayAt] member_details.append(detail) logger.info(f'Finished getting latest deposit user for account: {account.name} and count: {count}') return member_details # 根据用户查询最新存款信息 async def async_get_pay_record_list(account: Account): logger.info(f'Async getting pay record list for account: {account.name}') today = get_curr_day() params = { "memberName": "", "payState": 2, "isRest": False, "pageNum": 1, "pageSize": 100, "startDate": today, "endDate": today } res = await api_request.async_account_post(PAY_RECORD_URL, account, params) logger.info(f'Finished async getting pay record list for account: {account.name}') return [PayRecord(**item) for item in res.data['list']] # 获取最新存款用户 async def async_get_latest_deposit_user(account: Account, count: int): logger.info(f'Async getting latest deposit user for account: {account.name} and count: {count}') pay_record_list = await async_get_pay_record_list(account) # 提取所有用户名 names = [] seen = set() now = datetime.now() for item in pay_record_list: name = item.name # 存款订单有效期一般两个小时左右,所以默认只查询两个小时以内的订单即可 if name not in seen and util.get_difference(item.createdAt, now) < 7200: names.append(name) seen.add(name) logger.debug(f"获取到{len(names)}个成功存款人数") tasks = [] for name in names: task = asyncio.create_task(async_get_member_detail_by_name(account, name)) tasks.append(task) results = await asyncio.gather(*tasks) # logger.info(f'查询成功:{results}') sorted_members = sorted(results, key=lambda member_detail: member_detail.get_first_pay_datetime(), reverse=True) # logger.debug(f'首存金额:{[member_detail.deposit for member_detail in sorted_members]}') # 因为会员列表的存款是统计用户当月的所有金额 所以有延迟 所以需要将订单中的金额赋值给存款金额 然后再返回 details = sorted_members[:count] record_dict = {record.createdAt: record.scoreAmount for record in pay_record_list} for detail in details: if detail.firstPayAt in record_dict: detail.deposit = record_dict[detail.firstPayAt] logger.info(f'Finished async getting latest deposit user for account: {account.name} and count: {count}') return details def get_pay_record_list(account: Account, date: str) -> Dict[str, List[str]]: logger.info(f'Getting pay record list for account: {account.username} and date: {date}') _names = {'name': account.username, 'names': []} params = { "pageNum": 1, "pageSize": 100, "registerSort": 1, "drawSort": -1, "depositSort": -1, "lastLoginTimeSort": -1, "name": "", "minPay": None, "maxPay": None, "startDate": get_first_day_month(), "registerStartDate": date, "endDate": date, "registerEndDate": date, "firstPayStartTime": "", "firstPayEndTime": "", "isBet": "0", "tagsFlag": "1" } member_list = get_member_list(account, params) if member_list is not None and len(member_list) > 0: with ThreadPoolExecutor(max_workers=len(member_list)) as executor: futures = [executor.submit(get_pay_record_detail, account, member, date) for member in member_list] for future in futures: result = future.result() if result: _names['names'].append(result) logger.info(f'Finished getting pay record list for account: {account.name} and date: {date}') return _names def get_pay_record_detail(account: Account, member: MemberList, date: str) -> Optional[str]: logger.info(f'Getting pay record for account: {account.name}, member: {member.name}, and date: {date}') params = { "pageNum": 1, "pageSize": 15, "id": member.id, "startDate": get_first_day_month(), "endDate": date } res = api_request.account_post(PAY_RECORD_LIST_URL, account=account, params=params) if int(res.data['orderAmountTotal']) > 0 and int(res.data['scoreAmountTotal']) == 0: return member.name return "" def get_pay_failed_by_user(user: User, date: str) -> Optional[str]: logger.info(f'Getting pay failed by user: {user.username}') with ThreadPoolExecutor(max_workers=len(user.accounts)) as executor: futures = [executor.submit(get_pay_record_list, account, date) for account in user.accounts] # 使用列表推导式构建结果字符串 text_lines = [ "{}\n{}".format(res['name'], '\n'.join(res['names'])) for future in futures if (res := future.result())['names'] ] text = '\n'.join(text_lines) if not text: logger.info('无存款失败用户') return '无存款失败用户' logger.info(text) return text def get_pay_failed_by_telegram_id(telegram_id: int) -> Optional[str]: user = get_user_by_telegram_id(telegram_id) with ThreadPoolExecutor(max_workers=len(user.accounts)) as executor: futures = [executor.submit(get_pay_record_list, account, get_curr_day()) for account in user.accounts] # 使用列表推导式构建结果字符串 text_lines = [ "{}\n{}".format(res['name'], '\n'.join(res['names'])) for future in futures if (res := future.result())['names'] ] text = '\n'.join(text_lines) if not text: logger.info('无存款失败用户') return '无存款失败用户' logger.info(text) return text if __name__ == '__main__': deposit_results = '\n'.join( [f"用户: `{member.name}`, 首存金额: *{member.deposit}*" for member in get_latest_deposit_user(get_user_by_username_and_password('zayac', 'password').accounts[0], 1)]) print(deposit_results)