优化了线程部分,防止接口请求次数太多

This commit is contained in:
zayac 2024-06-02 14:58:26 +08:00
parent ac960fae29
commit 7c25929383
8 changed files with 346 additions and 390 deletions

View File

@ -12,6 +12,7 @@ from src.entity.account import Account
from src.entity.member import get_today_new_member_list
from src.entity.pay_record import get_latest_deposit_user
from src.entity.user import User
from src.ui.thread_pool_manager import global_thread_pool
@dataclass
@ -83,6 +84,5 @@ def query_banner_info(account: Account):
def get_banner_info_by_user(user: User) -> List[BannerInfo]:
with ThreadPoolExecutor(max_workers=len(user.accounts)) as executor:
futures = [executor.submit(get_banner_info, account) for account in user.accounts]
return [future.result() for future in futures]
futures = [global_thread_pool.submit(get_banner_info, account) for account in user.accounts]
return [future.result() for future in futures]

View File

@ -11,6 +11,7 @@ from src.core.constant import FINANCE_URL
from src.core.util import get_curr_day, get_first_day_by_str
from src.entity.account import Account
from src.entity.user import User, get_user_by_telegram_id
from src.ui.thread_pool_manager import global_thread_pool
'''
财务报表
@ -56,9 +57,9 @@ def get_finance(account: Account, start_date=util.get_first_day_month(), end_dat
def get_finances_by_user(user: User, date) -> List[Finance]:
accounts = user.accounts
start_date = util.get_first_day_by_str(date)
with ThreadPoolExecutor(max_workers=len(accounts)) as t:
futures = [t.submit(get_finance, account, start_date, date) for account in accounts]
return [future.result() for future in futures]
futures = [global_thread_pool.submit(get_finance, account, start_date, date) for account in accounts]
return [future.result() for future in futures]
def get_net_win_by_user(user: User, date: str) -> str:

View File

@ -10,6 +10,7 @@ from src.entity.account import Account
from src.entity.member import (MemberList, async_get_member_detail_by_name,
get_member_by_name, get_member_list)
from src.entity.user import User, get_user_by_telegram_id, get_user_by_username_and_password
from src.ui.thread_pool_manager import global_thread_pool
@dataclass
@ -67,14 +68,13 @@ def get_latest_deposit_user(account: Account, count: int):
# 开启多线程根据用户名查询所有数据
results = []
with ThreadPoolExecutor(max_workers=min(len(unique_names_within_time), 20)) as executor: # 限制最大工作线程数
futures = [executor.submit(get_member_by_name, account, name) for name in unique_names_within_time]
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.debug(f'查询失败:{e}')
futures = [global_thread_pool.submit(get_member_by_name, account, name) for name in unique_names_within_time]
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.debug(f'查询失败:{e}')
# 筛选出有有效结果的成员,并按首次付款时间降序排序
valid_results = [result for result in results if result is not None]
@ -179,12 +179,11 @@ def get_pay_record_list(account: Account, date: str) -> Dict[str, List[str]]:
}
member_list = get_member_list(account, params)
if member_list is not None and len(member_list) > 0:
with ThreadPoolExecutor(max_workers=len(member_list)) as executor:
futures = [executor.submit(get_pay_record_detail, account, member, date) for member in member_list]
for future in futures:
result = future.result()
if result:
_names['names'].append(result)
futures = [global_thread_pool.submit(get_pay_record_detail, account, member, date) for member in member_list]
for future in futures:
result = future.result()
if result:
_names['names'].append(result)
logger.info(f'Finished getting pay record list for account: {account.name} and date: {date}')
return _names
@ -207,14 +206,13 @@ def get_pay_record_detail(account: Account, member: MemberList, date: str) -> Op
def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
logger.info(f'Getting pay failed by user: {user.username}')
with ThreadPoolExecutor(max_workers=len(user.accounts)) as executor:
futures = [executor.submit(get_pay_record_list, account, date) for account in user.accounts]
futures = [global_thread_pool.submit(get_pay_record_list, account, date) for account in user.accounts]
# 使用列表推导式构建结果字符串
text_lines = [
"{}\n{}".format(res['name'], '\n'.join(res['names']))
for future in futures if (res := future.result())['names']
]
# 使用列表推导式构建结果字符串
text_lines = [
"{}\n{}".format(res['name'], '\n'.join(res['names']))
for future in futures if (res := future.result())['names']
]
text = '\n'.join(text_lines)
@ -228,14 +226,13 @@ def get_pay_failed_by_user(user: User, date: str) -> Optional[str]:
def get_pay_failed_by_telegram_id(telegram_id: int) -> Optional[str]:
user = get_user_by_telegram_id(telegram_id)
with ThreadPoolExecutor(max_workers=len(user.accounts)) as executor:
futures = [executor.submit(get_pay_record_list, account, get_curr_day()) for account in user.accounts]
futures = [global_thread_pool.submit(get_pay_record_list, account, get_curr_day()) for account in user.accounts]
# 使用列表推导式构建结果字符串
text_lines = [
"{}\n{}".format(res['name'], '\n'.join(res['names']))
for future in futures if (res := future.result())['names']
]
# 使用列表推导式构建结果字符串
text_lines = [
"{}\n{}".format(res['name'], '\n'.join(res['names']))
for future in futures if (res := future.result())['names']
]
text = '\n'.join(text_lines)

View File

@ -9,6 +9,7 @@ from src.core.constant import VISUAL_LIST_URL
from src.core.util import get_curr_day, get_curr_month
from src.entity.account import Account
from src.entity.user import User, get_user_by_telegram_id
from src.ui.thread_pool_manager import global_thread_pool
# 视图列表对象 对应界面上的图表
@ -102,9 +103,9 @@ def get_statics(account, date=get_curr_day()) -> VisualInfo:
def count_by_user(user: User, date: str):
accounts = user.accounts
with ThreadPoolExecutor(max_workers=len(accounts)) as t:
futures = [t.submit(get_statics, account, date) for account in accounts]
return [future.result() for future in futures]
futures = [global_thread_pool.submit(get_statics, account, date) for account in accounts]
return [future.result() for future in futures]
def text_count_by_user(user: User, date: str) -> str:
@ -125,4 +126,4 @@ def text_count_by_telegram_id(telegram_id: int) -> str:
for result in visual_list
)
logger.info(f'Generated text: {text}')
return text
return text

View File

@ -2,80 +2,70 @@ import time
from enum import Enum
from loguru import logger
from PyQt6.QtCore import QDate, QDateTime, Qt, QThreadPool, QTime, QTimer
from PyQt6.QtGui import QAction, QColor, QIcon
from PyQt6.QtWidgets import (QApplication, QCheckBox, QDateEdit, QHBoxLayout,
QHeaderView, QMainWindow, QMenu, QMessageBox,
QPushButton, QSizePolicy, QSystemTrayIcon,
QTableWidget, QTableWidgetItem, QTabWidget,
QTextEdit, QVBoxLayout, QWidget)
from PyQt6.QtCore import QDate, QDateTime, Qt, QThread, QTimer, pyqtSignal, QTime
from PyQt6.QtGui import QAction, QIcon, QColor
from PyQt6.QtWidgets import (
QCheckBox, QDateEdit, QHBoxLayout, QHeaderView, QMainWindow, QMenu,
QMessageBox, QPushButton, QSystemTrayIcon, QTableWidget, QTableWidgetItem,
QTabWidget, QTextEdit, QVBoxLayout, QWidget, QSizePolicy, QApplication
)
from src.ui.config import ConfigManager
from src.core.message_client import send_message
from src.core.util import convert_data, resource_path
from src.ui.config import ConfigManager
from src.core.util import resource_path, convert_data
from src.entity.user import get_user_by_username_and_password
from src.entity.member import get_today_new_member_list
from src.entity.pay_record import get_latest_deposit_user
from src.entity.user import get_user_by_username_and_password
from src.ui import global_signals
from src.ui.data_query import ButtonTask, ReportTask
from src.ui.thread_pool_manager import pyqt_thread_pool
from src.ui.title_bar import CustomTitleBar
class DataLoaderThread(QThread):
tab_ready = pyqtSignal(str, list)
def __init__(self, user):
super().__init__()
self.user = user
def run(self):
data = ReportTask.get_banner_info_by_user(self.user)
for row in data:
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
self.tab_ready.emit(row.agentCode, [time_str, row.registerMembers, row.firstDepositNum, row.netWinLose,
row.effectiveNew, row.activeMembers])
class Application(QMainWindow):
def __init__(self):
super().__init__()
self.config_manager = ConfigManager()
self.thread_pool = QThreadPool()
self.report_timer = None
self.date_update_timer = None
self.username = None
self.password = None
self.minimum = False
self.customTitleBar = None
self.tray_icon = None
self.user = None
self.tables = {}
self.toaster_notify_enabled = True
self.telegram_notify_enabled = True
self.is_dragging = False
self.drag_position = None
self.is_resizing = False
self.resize_direction = None
self.toaster_notify_enabled = True
self.telegram_notify_enabled = True
self.initialize_application()
def initialize_application(self):
# 1. 加载配置文件
self.init_ui()
self.load_config()
# 2. 设置 UI
self.setup_ui()
# 4. 初始化系统托盘图标
self.init_tray_icon()
# 5. 初始化表格数据
self.init_table_data()
self.setup_timers()
self.connect_signals()
self.load_data()
def setup_ui(self):
self.apply_stylesheet()
def init_ui(self):
self.set_window_properties()
self.create_central_widget()
self.setup_layouts()
def setup_timers(self):
self.setup_date_update_timer()
self.setup_report_timer()
def connect_signals(self):
self.tray_icon.activated.connect(self.tray_icon_clicked)
global_signals.user_data_updated.connect(self.refresh_user_data)
self.apply_stylesheet()
def set_window_properties(self):
self.resize(600, 400)
self.setWindowTitle(f'{self.username}的小工具')
self.resize(800, 600)
self.setWindowIcon(QIcon("icons:icon.png"))
self.setWindowFlags(Qt.WindowType.FramelessWindowHint)
@ -85,58 +75,243 @@ class Application(QMainWindow):
self.main_layout = QVBoxLayout(self.central_widget)
def setup_layouts(self):
self.top_panel = QHBoxLayout()
self.setup_top_panel()
self.setup_middle_panel()
self.setup_bottom_panel()
self.customTitleBar = CustomTitleBar(self)
self.setMenuWidget(self.customTitleBar)
self.main_layout.addLayout(self.top_panel)
self.notebook = QTabWidget()
self.main_layout.addWidget(self.notebook)
self.bottom_panel = QVBoxLayout()
self.txt = QTextEdit()
self.bottom_panel.addWidget(self.txt)
self.main_layout.addLayout(self.bottom_panel)
def setup_top_panel(self):
self.add_buttons_to_top_panel()
self.add_date_picker_to_top_panel()
self.add_checkboxes_to_top_panel()
self.setup_report_button()
def add_buttons_to_top_panel(self):
buttons_info = [
("报数", "Primary"),
("存款失败用户", "Danger"),
("负盈利", "Success"),
("薪资", "Light"),
]
for name, style in buttons_info:
button = QPushButton(name)
button.setObjectName(style)
button.clicked.connect(lambda _, n=name: self.query_data(n))
self.top_panel.addWidget(button)
def add_date_picker_to_top_panel(self):
self.dateEdit = QDateEdit()
self.dateEdit.setCalendarPopup(True)
self.dateEdit.setDate(QDate.currentDate())
today = QDate.currentDate()
first_day_last_month = QDate(today.year(), today.month(), 1).addMonths(-1)
self.dateEdit.setMinimumDate(first_day_last_month)
self.dateEdit.setMaximumDate(today)
self.top_panel.addWidget(self.dateEdit)
def add_checkboxes_to_top_panel(self):
checkbox_layout = QVBoxLayout()
self.system_notification_checkbox = QCheckBox("系统通知")
self.telegram_notification_checkbox = QCheckBox("飞机通知")
self.system_notification_checkbox.setChecked(True)
self.telegram_notification_checkbox.setChecked(True)
self.system_notification_checkbox.stateChanged.connect(self.toggle_system_notification)
self.telegram_notification_checkbox.stateChanged.connect(self.toggle_telegram_notification)
checkbox_layout.addWidget(self.system_notification_checkbox)
checkbox_layout.addWidget(self.telegram_notification_checkbox)
self.top_panel.addLayout(checkbox_layout)
def setup_report_button(self):
self.report_button = QPushButton("停止喜报")
self.report_button.setCheckable(True)
self.report_button.setChecked(True)
self.report_button.setObjectName("Warning")
self.report_button.clicked.connect(self.on_report_clicked)
self.top_panel.addWidget(self.report_button)
def init_tray_icon(self):
self.tray_icon = SystemTrayIcon(QIcon("icons:icon.png"), self) # 替换为正确的图标路径
self.tray_icon.setToolTip(self.windowTitle()) # 设置工具提示文本为窗口标题
self.tray_icon = SystemTrayIcon(QIcon("icons:icon.png"), self)
self.tray_icon.setToolTip(self.windowTitle())
self.tray_icon.show()
def load_config(self):
# 使用
try:
self.username = self.config_manager.get('Credentials', 'username')
self.password = self.config_manager.get('Credentials', 'password')
self.minimum = self.config_manager.get('Minimum', 'minimum')
self.user = get_user_by_username_and_password(self.username, self.password)
except FileNotFoundError as e:
QMessageBox.warning(None, "警告", "配置文件信息加载失败!请检查文件是否存在")
return None, None
self.setWindowTitle(f'{self.username}的小工具')
self.customTitleBar = CustomTitleBar(self)
self.setMenuWidget(self.customTitleBar)
self.init_tray_icon() # 在设置标题后初始化托盘图标
except FileNotFoundError:
QMessageBox.warning(self, "警告", "配置文件信息加载失败!请检查文件是否存在")
def init_table_data(self):
# 初始化表格数据
data = self.query_initial_data(self.user)
for row in data:
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
self.update_table(row.agentCode, [time_str, row.registerMembers, row.firstDepositNum, row.netWinLose,
row.effectiveNew, row.activeMembers])
def query_initial_data(self, user):
return ReportTask.get_banner_info_by_user(user)
def setup_report_timer(self):
def setup_timers(self):
self.report_timer = QTimer(self)
self.report_timer.timeout.connect(self.update_reports)
self.report_timer.start(60000)
self.date_update_timer = QTimer(self)
self.date_update_timer.timeout.connect(self.update_date_edit)
self.start_date_update_timer()
def update_reports(self):
def start_date_update_timer(self):
now = QDateTime.currentDateTime()
next_midnight = QDateTime(now.date().addDays(1), QTime(0, 0))
interval = now.msecsTo(next_midnight)
self.date_update_timer.start(interval if interval > 0 else 86400000)
def update_date_edit(self):
self.dateEdit.setDate(QDate.currentDate())
self.update_date_range()
self.start_date_update_timer()
def update_date_range(self):
today = QDate.currentDate()
first_day_last_month = QDate(today.year(), today.month(), 1).addMonths(-1)
self.dateEdit.setMinimumDate(first_day_last_month)
self.dateEdit.setMaximumDate(today)
def connect_signals(self):
self.tray_icon.activated.connect(self.tray_icon_clicked)
global_signals.user_data_updated.connect(self.refresh_user_data)
def load_data(self):
self.data_loader_thread = DataLoaderThread(self.user)
self.data_loader_thread.tab_ready.connect(self.add_tab)
self.data_loader_thread.start()
def add_tab(self, account_username, data):
if account_username not in self.tables:
tab = QWidget()
tab_layout = QVBoxLayout(tab)
table = self.create_table_for_tab()
tab_layout.addWidget(table)
self.tables[account_username] = table
account_name = self.get_account_name(account_username)
self.notebook.addTab(tab, account_name)
self.update_table(account_username, data)
def get_account_name(self, account_username):
for account in self.user.accounts:
report_task = ReportTask(account)
report_task.signals.table_updated.connect(self.update_table)
self.thread_pool.start(report_task)
if account.username == account_username:
return account.name
return account_username
def create_table_for_tab(self):
table = QTableWidget()
column_headers = ColumnHeaders.list()
table.setColumnCount(len(column_headers))
table.setHorizontalHeaderLabels(column_headers)
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
header = table.horizontalHeader()
header.setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
table.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding))
return table
def update_table(self, account_username, data):
try:
table = self.tables[account_username]
self.ensure_table_row_limit(table)
self.insert_data_in_table(table, data, account_username)
except Exception as e:
logger.error(f"Error updating table for {account_username}: {e}")
def ensure_table_row_limit(self, table, row_limit=20):
while table.rowCount() >= row_limit:
table.removeRow(0)
def insert_data_in_table(self, table, data, account_username):
row_count = table.rowCount()
table.insertRow(row_count)
notifications = []
for col, cell_data in enumerate(data):
self.set_table_item(table, row_count, col, cell_data)
self.check_data_change_for_notifications(table, row_count, col, cell_data, account_username, notifications)
self.send_all_notifications(notifications)
def set_table_item(self, table, row, col, cell_data):
cell = QTableWidgetItem(str(cell_data))
cell.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
table.setItem(row, col, cell)
def check_data_change_for_notifications(self, table, row, col, new_data, account_username, notifications):
if row > 0 and col != 0:
old_data_str = table.item(row - 1, col).text()
old_data = convert_data(old_data_str)
new_data = convert_data(new_data)
if old_data != new_data:
count_change = new_data - old_data
self.update_cell_color(table, row, col, count_change)
self.generate_notifications(account_username, col, new_data, count_change, notifications)
def update_cell_color(self, table, row, col, count_change):
cell = table.item(row, col)
if cell:
if count_change > 0:
cell.setForeground(QColor(Qt.GlobalColor.green))
elif count_change < 0:
cell.setForeground(QColor(Qt.GlobalColor.red))
def generate_notifications(self, account_username, col, cell_data, count_change, notifications):
account = self.get_account(account_username)
if count_change > 0:
if col == 1:
reg_results = ','.join(
[f'`{member.name}`' for member in get_today_new_member_list(account, count_change)])
notifications.append(
('👏', account.name, '注册', count_change, f'用户: {reg_results}', str(cell_data)))
elif col == 2:
deposit_results = '\n'.join(
[f"用户: `{member.name}`, 首存金额: *{member.deposit}*" for member in
get_latest_deposit_user(account, count_change)])
notifications.append(
('🎉', account.name, '首存', count_change, deposit_results, str(cell_data)))
def get_account(self, account_username):
for account in self.user.accounts:
if account.username == account_username:
return account
return None
def send_all_notifications(self, notifications):
for notification in notifications:
self.send_notification(*notification)
def on_report_clicked(self):
if self.report_button.isChecked():
self.report_timer.start(60000)
self.report_button.setText("停止喜报")
self.report_button.setObjectName("Warning")
else:
self.report_timer.stop()
self.report_button.setText("启动喜报")
self.report_button.setObjectName("Success")
self.apply_stylesheet()
def tray_icon_clicked(self, reason):
if reason == QSystemTrayIcon.ActivationReason.Trigger:
if self.isMinimized() or not self.isVisible():
self.showNormal()
self.activateWindow()
else:
self.hide()
def refresh_user_data(self):
self.user = get_user_by_username_and_password(self.username, self.password)
self.setWindowTitle(f'{self.username}的小工具')
def apply_stylesheet(self):
style_sheet = self.load_stylesheet(resource_path('ui/style.qss'))
self.setStyleSheet(style_sheet)
def refresh_user_data(self):
# 刷新用户数据的逻辑
self.user = get_user_by_username_and_password(self.username, self.password) # 重新加载用户
@staticmethod
def load_stylesheet(file_path):
try:
@ -146,123 +321,25 @@ class Application(QMainWindow):
print(f"无法找到样式文件: {file_path}")
return ""
def get_account_by_account_username(self, username: str):
for account in self.user.accounts:
if account.username == username:
return account
def setup_top_panel(self):
try:
self.top_panel = QHBoxLayout()
self.add_buttons_to_top_panel()
self.add_date_picker_to_top_panel()
self.add_checkboxes_to_top_panel()
self.setup_report_button()
self.main_layout.addLayout(self.top_panel)
except Exception as e:
logger.error(f"Error setting up top panel: {e}")
def add_date_picker_to_top_panel(self):
# 创建日期选择器
self.dateEdit = QDateEdit()
self.dateEdit.setCalendarPopup(True)
self.dateEdit.setDate(QDate.currentDate())
# 设置日期范围
today = QDate.currentDate()
first_day_last_month = QDate(today.year(), today.month(), 1).addMonths(-1)
self.dateEdit.setMinimumDate(first_day_last_month)
self.dateEdit.setMaximumDate(today)
# 将日期选择器添加到顶部面板
self.top_panel.addWidget(self.dateEdit)
def setup_date_update_timer(self):
self.date_update_timer = QTimer(self)
self.date_update_timer.timeout.connect(self.update_date_edit)
self.start_date_update_timer()
def start_date_update_timer(self):
now = QDateTime.currentDateTime()
next_midnight = QDateTime(now.date().addDays(1), QTime(0, 0)) # 设置为次日的午夜 00:00
interval = now.msecsTo(next_midnight)
self.date_update_timer.start(interval if interval > 0 else 86400000) # 86400000ms = 24小时
def update_date_edit(self):
self.dateEdit.setDate(QDate.currentDate()) # 设置 QDateEdit 控件的日期为当前日期
self.update_date_range()
self.start_date_update_timer()
def update_date_range(self):
today = QDate.currentDate()
first_day_last_month = QDate(today.year(), today.month(), 1).addMonths(-1)
self.dateEdit.setMinimumDate(first_day_last_month)
self.dateEdit.setMaximumDate(today)
def setup_report_button(self):
self.report_button = QPushButton("停止喜报")
self.report_button.setCheckable(True)
self.report_button.setChecked(True) # 默认设置为选中状态
self.report_button.setObjectName("Warning")
self.report_button.clicked.connect(self.on_report_clicked)
self.top_panel.addWidget(self.report_button)
def add_buttons_to_top_panel(self):
for name, style in self.get_buttons_info():
self.create_and_add_button(name, style)
def get_buttons_info(self):
return [
("报数", "Primary"),
("存款失败用户", "Danger"),
("负盈利", "Success"),
("薪资", "Light"),
]
def create_and_add_button(self, name, style):
button = QPushButton(name)
button.setObjectName(style)
button.clicked.connect(lambda _, n=name: self.query_data(n))
self.top_panel.addWidget(button)
def add_checkboxes_to_top_panel(self):
# 创建垂直布局来放置复选框
checkbox_layout = QVBoxLayout()
# 添加复选框
self.system_notification_checkbox = QCheckBox("系统通知")
self.telegram_notification_checkbox = QCheckBox("飞机通知")
self.system_notification_checkbox.setChecked(True)
self.telegram_notification_checkbox.setChecked(True)
self.system_notification_checkbox.stateChanged.connect(self.toggle_system_notification)
self.telegram_notification_checkbox.stateChanged.connect(self.toggle_telegram_notification)
checkbox_layout.addWidget(self.system_notification_checkbox)
checkbox_layout.addWidget(self.telegram_notification_checkbox)
# 将复选框布局添加到顶部面板
self.top_panel.addLayout(checkbox_layout)
def toggle_system_notification(self, state):
self.toaster_notify_enabled = state == Qt.CheckState.Checked
def toggle_telegram_notification(self, state):
self.telegram_notify_enabled = state == Qt.CheckState.Checked
def update_reports(self):
for account in self.user.accounts:
report_task = ReportTask(account)
report_task.signals.table_updated.connect(self.update_table)
pyqt_thread_pool.start(report_task)
def query_data(self, btn_name):
# 获取日期控件的当前值
selected_date = self.dateEdit.date()
# 转换为所需的格式
selected_date_str = selected_date.toString("yyyy-MM-dd")
# 在文本框中显示查询中的消息
self.txt.append(f"正在查询{selected_date_str}{btn_name},请等待...\n")
self.start_data_query(btn_name, selected_date_str)
def start_data_query(self, query_type, selected_date_str):
task = ButtonTask(query_type, selected_date_str, self.user)
task = ButtonTask(btn_name, selected_date_str, self.user)
task.signals.query_completed.connect(self.display_query_result)
self.thread_pool.start(task)
pyqt_thread_pool.start(task)
def display_query_result(self, result, auto_clipboard, need_notify):
if auto_clipboard:
@ -278,139 +355,19 @@ class Application(QMainWindow):
if self.telegram_notify_enabled:
send_message(self.user.bot_token, self.user.group_id, msg)
def on_report_clicked(self):
try:
if self.report_button.isChecked():
self.report_timer.start(60000) # 启动定时器
self.report_button.setText("停止喜报") # 更改按钮文本
# 更改按钮样式为 "Warning"
self.report_button.setObjectName("Warning")
else:
self.report_timer.stop() # 停止定时器
self.report_button.setText("启动喜报") # 恢复按钮文本
self.report_button.setObjectName("Success")
# 重新应用样式来更新按钮外观
self.apply_stylesheet()
except Exception as e:
logger.debug(e)
def exit_application(self):
self.tray_icon.hide()
QApplication.quit()
def update_table(self, account_username, data):
try:
table = self.tables.get(account_username)
if not table:
return
self.ensure_table_row_limit(table)
self.insert_data_in_table(table, data, account_username)
except Exception as e:
logger.error(f"Error updating table for {account_username}: {e}")
def ensure_table_row_limit(self, table, row_limit=20):
if table.rowCount() >= row_limit:
table.removeRow(0)
def insert_data_in_table(self, table, data, account_username):
# 获取当前的行数
row_count = table.rowCount()
table.insertRow(row_count)
notifications = []
for col, cell_data in enumerate(data):
self.set_table_item(table, row_count, col, cell_data)
self.check_data_change_for_notifications(table, row_count, col, cell_data, account_username, notifications)
self.send_all_notifications(notifications)
def send_all_notifications(self, notifications):
"""发送所有通知"""
for notification in notifications:
self.send_notification(*notification)
def set_table_item(self, table, row, col, cell_data):
"""在表格指定行列设置单元格数据"""
cell = QTableWidgetItem(str(cell_data))
cell.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
table.setItem(row, col, cell)
def check_data_change_for_notifications(self, table, row, col, new_data, account_username, notifications):
"""检查数据变更并准备通知"""
if row > 0 and col != 0: # 忽略第一列,通常是时间戳
old_data_str = table.item(row - 1, col).text()
old_data = convert_data(old_data_str)
new_data = convert_data(new_data)
if old_data != new_data:
count_change = new_data - old_data
self.update_cell_color(table, row, col, count_change)
# 确保传递 cell_data 和 count_change 参数
self.generate_notifications(account_username, col, new_data, count_change, notifications)
def update_cell_color(self, table, row, col, count_change):
"""更新单元格颜色"""
cell = table.item(row, col)
if count_change > 0:
cell.setForeground(QColor(Qt.GlobalColor.green))
elif count_change < 0:
cell.setForeground(QColor(Qt.GlobalColor.red))
def generate_notifications(self, account_username, col, cell_data, count_change, notifications):
# 生成通知
account = self.get_account_by_account_username(account_username)
if count_change > 0:
if col == 1: # 第1列是注册用户数量
reg_results = ','.join(
[f'`{member.name}`' for member in get_today_new_member_list(account, count_change)])
notifications.append(
('👏', account.name, '注册', count_change, f'用户: {reg_results}', str(cell_data)))
elif col == 2: # 第2列是首存用户数量
deposit_results = '\n'.join(
[f"用户: `{member.name}`, 首存金额: *{member.deposit}*" for member in
get_latest_deposit_user(account, count_change)])
notifications.append(
('🎉', account.name, '首存', count_change, deposit_results, str(cell_data)))
def addToggleButton(self, text, style):
toggleButton = QPushButton(text)
toggleButton.setCheckable(True)
toggleButton.setStyleSheet(style)
self.top_panel.addWidget(toggleButton)
def setup_middle_panel(self):
# 底部面板,包括文本框
self.bottom_panel = QVBoxLayout()
self.txt = QTextEdit()
self.bottom_panel.addWidget(self.txt)
self.main_layout.addLayout(self.bottom_panel)
def setup_bottom_panel(self):
# 中间面板,包括笔记本(标签页)
self.middle_panel = QVBoxLayout()
self.notebook = QTabWidget()
self.middle_panel.addWidget(self.notebook)
self.add_tabs(self.notebook)
self.main_layout.addLayout(self.middle_panel)
def add_tabs(self, notebook):
column_headers = ColumnHeaders.list()
for account in self.user.accounts:
tab = QWidget()
tab_layout = QVBoxLayout(tab)
notebook.addTab(tab, account.name)
table = self.create_table_for_tab(column_headers)
tab_layout.addWidget(table)
self.tables[account.username] = table
def create_table_for_tab(self, column_headers):
table = QTableWidget()
table.setColumnCount(len(column_headers))
table.setHorizontalHeaderLabels(column_headers)
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
header = table.horizontalHeader()
header.setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
table.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding))
return table
def closeEvent(self, event):
event.ignore()
self.hide()
self.tray_icon.showMessage(
"最小化到托盘",
"应用程序已最小化到托盘。要退出,请使用托盘菜单。",
QSystemTrayIcon.MessageIcon.Information,
2000
)
def mousePressEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
@ -425,20 +382,15 @@ class Application(QMainWindow):
super().mousePressEvent(event)
def is_draggable_area(self, pos):
# 将点转换为标题栏的局部坐标
title_bar_pos = self.customTitleBar.mapFromParent(pos)
# 检查点是否在标题栏内
return self.customTitleBar.rect().contains(title_bar_pos)
# 重置鼠标样式
def reset_cursor_style(self):
if not (self.is_dragging or self.is_resizing):
self.setCursor(Qt.CursorShape.ArrowCursor)
def mouseMoveEvent(self, event):
direction = self.get_resize_direction(event.pos())
# 更新鼠标样式
if direction == "left" or direction == "right":
self.setCursor(Qt.CursorShape.SizeHorCursor)
elif direction == "top" or direction == "bottom":
@ -449,12 +401,11 @@ class Application(QMainWindow):
self.setCursor(Qt.CursorShape.SizeBDiagCursor)
else:
self.reset_cursor_style()
# 处理窗口拖动
if self.is_dragging:
self.move(self.pos() + (event.globalPosition().toPoint() - self.drag_position))
self.drag_position = event.globalPosition().toPoint()
# 处理窗口调整大小
elif self.is_resizing:
self.resize_window(event.globalPosition().toPoint())
@ -466,7 +417,7 @@ class Application(QMainWindow):
super().mouseReleaseEvent(event)
def get_resize_direction(self, pos):
border_width = 10 # 边缘感应区的宽度
border_width = 10
rect = self.rect()
left, right, top, bottom = rect.left(), rect.right(), rect.top(), rect.bottom()
@ -507,30 +458,6 @@ class Application(QMainWindow):
self.setGeometry(rect)
self.drag_position = current_pos
def closeEvent(self, event):
if self.minimum:
event.ignore()
self.hide()
else:
super().closeEvent(event)
def exit_application(self):
self.tray_icon.hide() # 隐藏托盘图标
QApplication.quit()
def tray_icon_clicked(self, reason):
if reason == QSystemTrayIcon.ActivationReason.Trigger:
if self.isMinimized() or not self.isVisible():
self.showNormal() # 恢复窗口大小
self.activateWindow() # 激活窗口
else:
self.hide() # 点击托盘图标再次隐藏窗口
def copy_to_clipboard(text):
clipboard = QApplication.clipboard()
clipboard.setText(text)
class SystemTrayIcon(QSystemTrayIcon):
def __init__(self, icon, parent=None):
@ -557,3 +484,9 @@ class ColumnHeaders(Enum):
def list(cls):
return [cls.TIME.value, cls.REGISTER.value, cls.FIRST_DEPOSIT.value,
cls.NET_PROFIT.value, cls.EFFECTIVE.value, cls.ACTIVE.value]
def copy_to_clipboard(text):
clipboard = QApplication.clipboard()
clipboard.setText(text)

View File

@ -5,7 +5,7 @@ from PyQt6.QtCore import QObject, QRunnable, pyqtSignal
from src.entity.banner_info import get_banner_info, get_banner_info_by_user
from src.entity.finance import get_adjusted_salary, get_net_win_by_user
from src.entity.pay_record import get_pay_failed_by_user
from src.entity.visual_list import text_count_by_user, get_statics
from src.entity.visual_list import text_count_by_user
class TaskSignals(QObject):

View File

@ -0,0 +1,23 @@
# threadpool_manager.py
from PyQt6.QtCore import QThreadPool
# 创建一个全局的线程池实例
pyqt_thread_pool = QThreadPool.globalInstance()
from concurrent.futures import ThreadPoolExecutor
class ThreadPoolManager:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ThreadPoolManager, cls).__new__(cls, *args, **kwargs)
cls._instance.thread_pool = ThreadPoolExecutor(max_workers=5)
return cls._instance
def get_thread_pool(self):
return self.thread_pool
global_thread_pool = ThreadPoolManager().get_thread_pool()

View File

@ -65,12 +65,13 @@ class CustomTitleBar(QWidget):
self.mousePressed = False
def toggle_stay_on_top(self):
current_flags = self.parent.windowFlags()
if self.stayOnTopButton.isChecked():
self.parent.setWindowFlags(current_flags | Qt.WindowType.WindowStaysOnTopHint)
if self.parent.windowFlags() & Qt.WindowType.WindowStaysOnTopHint:
self.parent.setWindowFlags(self.parent.windowFlags() & ~Qt.WindowType.WindowStaysOnTopHint)
self.stayOnTopButton.setIcon(QIcon('icons:top.svg'))
else:
self.parent.setWindowFlags(current_flags & ~Qt.WindowType.WindowStaysOnTopHint)
self.parent.show() # Re-show the window to apply new window flags
self.parent.setWindowFlags(self.parent.windowFlags() | Qt.WindowType.WindowStaysOnTopHint)
self.stayOnTopButton.setIcon(QIcon('icons:normal.svg'))
self.parent.show()
def toggle_maximize(self):
if self.parent.isMaximized():
@ -78,4 +79,4 @@ class CustomTitleBar(QWidget):
self.maximizeButton.setIcon(QIcon('icons:max.svg')) # Update icon if needed
else:
self.parent.showMaximized()
self.maximizeButton.setIcon(QIcon('icons:restore.svg')) # Update icon if needed
self.maximizeButton.setIcon(QIcon('icons:max.svg')) # Update icon if needed