tools-pyqt/src/ui/app.py

555 lines
21 KiB
Python

import configparser
import os
from loguru import logger
from PyQt6.QtCore import QDate, QDateTime, Qt, QThreadPool, QTime, QTimer
from PyQt6.QtGui import QColor, QIcon, QAction
from PyQt6.QtWidgets import (QApplication, QCheckBox, QDateEdit, QHBoxLayout,
QHeaderView, QMainWindow, QPushButton,
QSizePolicy, QTableWidget, QTableWidgetItem,
QTabWidget, QTextEdit, QVBoxLayout, QWidget, QMessageBox, QSystemTrayIcon, QMenu)
from src.core.message_client import send_message
from src.core.util import convert_data, resource_path
from src.entity.member import get_today_new_member_list
from src.entity.pay_record import get_latest_deposit_user
from src.entity.user import get_user_by_username_and_password
from src.ui import global_signals
from src.ui.data_query import ButtonTask, ReportTask
from src.ui.title_bar import CustomTitleBar
class Application(QMainWindow):
def __init__(self):
super().__init__()
self.tables = {}
self.is_dragging = False
self.drag_position = None
self.is_resizing = False
self.resize_direction = None
self.toaster_notify_enabled = True
self.telegram_notify_enabled = True
self.initialize_application()
def initialize_application(self):
# 1. 加载配置文件
self.load_config()
# 2. 设置线程池
self.thread_pool = QThreadPool()
# 3. 设置 UI
self.setup_ui()
# 4. 初始化系统托盘图标
self.init_tray_icon()
# 5. 初始化表格数据
self.init_table_data()
# 6. 设置日期更新和报告定时器
self.setup_date_update_timer()
self.setup_report_timer()
# 7. 全局信号处理,更新用户信息
global_signals.user_data_updated.connect(self.refresh_user_data)
# 8. 消息通知对象
self.chat_id = self.user.group_id if self.user.group_id else self.user.chat_id
def setup_ui(self):
self.apply_stylesheet()
self.set_window_properties()
self.create_central_widget()
self.setup_layouts()
def set_window_properties(self):
self.resize(600, 400)
self.setWindowTitle("zayac的小工具")
self.setWindowIcon(QIcon("icons:icon.png"))
self.setWindowFlags(Qt.WindowType.FramelessWindowHint)
def create_central_widget(self):
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
def setup_layouts(self):
self.setup_top_panel()
self.setup_middle_panel()
self.setup_bottom_panel()
self.customTitleBar = CustomTitleBar(self)
self.setMenuWidget(self.customTitleBar)
def init_tray_icon(self):
self.tray_icon = QSystemTrayIcon(QIcon("icons:icon.png"), self)
tray_menu = QMenu()
exit_action = QAction("退出", self)
exit_action.triggered.connect(self.exit_application)
tray_menu.addAction(exit_action)
self.tray_icon.setContextMenu(tray_menu)
self.tray_icon.activated.connect(self.tray_icon_clicked)
self.tray_icon.show()
def init_data_and_timers(self):
# 初始化数据
self.init_table_data()
# 设置定时器
self.setup_date_update_timer()
self.setup_report_timer()
def load_config(self):
config = configparser.ConfigParser()
config_file = 'config.ini'
if not os.path.exists(config_file):
QMessageBox.warning(None, "警告", "用户信息获取失败!")
return None, None
config.read(config_file)
username = config.get('Credentials', 'username')
password = config.get('Credentials', 'password')
minimum = config.get('Minimum', 'minimum')
self.username = username
self.password = password
self.user = get_user_by_username_and_password(username, password)
self.minimum = minimum
def init_table_data(self):
# 初始化表格数据
# 例如,加载账户数据并更新表格
for account in self.user.accounts:
data = self.query_initial_data(account)
self.update_table(account.username, data)
def query_initial_data(self, account):
# 实际实现应该根据您的业务逻辑来定义
return ReportTask.query_data_for_account(account)
def setup_report_timer(self):
self.report_timer = QTimer(self)
self.report_timer.timeout.connect(self.update_reports)
self.report_timer.start(60000) # 每60秒触发一次
def update_reports(self):
for account in self.user.accounts:
report_task = ReportTask(account)
report_task.signals.table_updated.connect(self.update_table)
self.thread_pool.start(report_task)
def apply_stylesheet(self):
style_sheet = self.load_stylesheet(resource_path('ui/style.qss'))
self.setStyleSheet(style_sheet)
def refresh_user_data(self):
# 刷新用户数据的逻辑
self.user = get_user_by_username_and_password(self.username, self.password) # 重新加载用户
@staticmethod
def load_stylesheet(file_path):
try:
with open(file_path, "r", encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
print(f"无法找到样式文件: {file_path}")
return ""
def get_account_by_account_username(self, username: str):
for account in self.user.accounts:
if account.username == username:
return account
def setup_top_panel(self):
try:
self.top_panel = QHBoxLayout()
self.add_buttons_to_top_panel()
self.add_date_picker_to_top_panel()
self.add_checkboxes_to_top_panel()
self.setup_report_button()
self.main_layout.addLayout(self.top_panel)
except Exception as e:
logger.error(f"Error setting up top panel: {e}")
def add_date_picker_to_top_panel(self):
# 创建日期选择器
self.dateEdit = QDateEdit()
self.dateEdit.setCalendarPopup(True)
self.dateEdit.setDate(QDate.currentDate())
# 设置日期范围
today = QDate.currentDate()
first_day_last_month = QDate(today.year(), today.month(), 1).addMonths(-1)
self.dateEdit.setMinimumDate(first_day_last_month)
self.dateEdit.setMaximumDate(today)
# 将日期选择器添加到顶部面板
self.top_panel.addWidget(self.dateEdit)
def setup_date_update_timer(self):
# 创建一个新的定时器
self.date_update_timer = QTimer(self)
# 设置定时器触发的槽函数
self.date_update_timer.timeout.connect(self.update_date_edit)
# 启动定时器
self.start_date_update_timer()
def start_date_update_timer(self):
now = QDateTime.currentDateTime()
next_midnight = QDateTime(now.date().addDays(1), QTime(0, 0))
interval = now.msecsTo(next_midnight)
self.date_update_timer.start(interval if interval > 0 else 86400000) # 86400000ms = 24小时
def update_date_edit(self):
# 更新日期选择器的日期为当前日期
self.dateEdit.setDate(QDate.currentDate())
# 更新日期范围
self.update_date_range()
print(self.dateEdit.date())
# 设置定时器每24小时触发一次
self.date_update_timer.start(86400000)
def update_date_range(self):
today = QDate.currentDate()
first_day_last_month = QDate(today.year(), today.month(), 1).addMonths(-1)
self.dateEdit.setMinimumDate(first_day_last_month)
self.dateEdit.setMaximumDate(today)
def setup_report_button(self):
self.report_button = QPushButton("停止喜报")
self.report_button.setCheckable(True)
self.report_button.setChecked(True) # 默认设置为选中状态
self.report_button.setObjectName("Warning")
self.report_button.clicked.connect(self.on_report_clicked)
self.top_panel.addWidget(self.report_button)
def add_buttons_to_top_panel(self):
for name, style in self.get_buttons_info():
self.create_and_add_button(name, style)
def get_buttons_info(self):
return [
("报数", "Primary"),
("存款失败用户", "Danger"),
("负盈利", "Success"),
("薪资", "Light"),
]
def create_and_add_button(self, name, style):
button = QPushButton(name)
button.setObjectName(style)
button.clicked.connect(lambda _, n=name: self.query_data(n))
self.top_panel.addWidget(button)
def add_checkboxes_to_top_panel(self):
# 创建垂直布局来放置复选框
checkbox_layout = QVBoxLayout()
# 添加复选框
self.system_notification_checkbox = QCheckBox("系统通知")
self.telegram_notification_checkbox = QCheckBox("飞机通知")
self.system_notification_checkbox.setChecked(True)
self.telegram_notification_checkbox.setChecked(True)
self.system_notification_checkbox.stateChanged.connect(self.toggle_system_notification)
self.telegram_notification_checkbox.stateChanged.connect(self.toggle_telegram_notification)
checkbox_layout.addWidget(self.system_notification_checkbox)
checkbox_layout.addWidget(self.telegram_notification_checkbox)
# 将复选框布局添加到顶部面板
self.top_panel.addLayout(checkbox_layout)
def toggle_system_notification(self, state):
self.toaster_notify_enabled = state == Qt.CheckState.Checked
def toggle_telegram_notification(self, state):
self.telegram_notify_enabled = state == Qt.CheckState.Checked
def query_data(self, btn_name):
# 获取日期控件的当前值
selected_date = self.dateEdit.date()
# 转换为所需的格式
selected_date_str = selected_date.toString("yyyy-MM-dd")
# 在文本框中显示查询中的消息
self.txt.append(f"正在查询{selected_date_str}{btn_name},请等待...\n")
self.start_data_query(btn_name, selected_date_str)
def start_data_query(self, query_type, selected_date_str):
task = ButtonTask(query_type, selected_date_str, self.user)
task.signals.query_completed.connect(self.display_query_result)
self.thread_pool.start(task)
def display_query_result(self, result, auto_clipboard, need_notify):
if auto_clipboard:
copy_to_clipboard(result)
if need_notify and '' not in result:
self.tray_icon.showMessage("", "自动复制成功", QSystemTrayIcon.MessageIcon.Information, 500)
self.txt.append(result)
def send_notification(self, emoji, account_name, title, count, results, total):
msg = f'{emoji} {account_name} {title}:{count} {results} 总数:*{total}*'
if self.toaster_notify_enabled:
self.tray_icon.showMessage(f"{title}通知", msg, QSystemTrayIcon.MessageIcon.Information, 2000)
if self.telegram_notify_enabled:
send_message(self.user.bot_token, self.chat_id, msg)
def on_report_clicked(self):
try:
if self.report_button.isChecked():
self.report_timer.start(60000) # 启动定时器
self.report_button.setText("停止喜报") # 更改按钮文本
# 更改按钮样式为 "Warning"
self.report_button.setObjectName("Warning")
else:
self.report_timer.stop() # 停止定时器
self.report_button.setText("启动喜报") # 恢复按钮文本
self.report_button.setObjectName("Success")
# 重新应用样式来更新按钮外观
self.apply_stylesheet()
except Exception as e:
logger.debug(e)
def update_table(self, account_username, data):
try:
table = self.tables.get(account_username)
if not table:
return
self.ensure_table_row_limit(table)
self.insert_data_in_table(table, data, account_username)
except Exception as e:
logger.error(f"Error updating table for {account_username}: {e}")
def ensure_table_row_limit(self, table, row_limit=20):
if table.rowCount() >= row_limit:
table.removeRow(0)
def insert_data_in_table(self, table, data, account_username):
# 获取当前的行数
row_count = table.rowCount()
table.insertRow(row_count)
notifications = []
for col, cell_data in enumerate(data):
cell = self.create_table_cell(cell_data, table, col)
# 注意这里我们使用 row_count 而不是 0
table.setItem(row_count, col, cell)
self.handle_data_change(table, cell_data, col, account_username, notifications)
self.send_all_notifications(notifications)
def create_table_cell(self, cell_data, table, col):
cell = QTableWidgetItem(str(cell_data))
cell.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
return cell
def handle_data_change(self, table, cell_data, col, account_username, notifications):
if table.rowCount() > 1 and col != 0:
old_data_str = table.item(table.rowCount() - 2, col).text()
old_data = convert_data(old_data_str)
new_data = convert_data(cell_data)
if old_data != new_data:
count_change = new_data - old_data
self.update_cell_color(table, col, count_change)
self.generate_notifications(account_username, col, cell_data, count_change, notifications)
def update_cell_color(self, table, col, count_change):
# 更新单元格颜色
cell = table.item(0, col)
if count_change > 0:
cell.setForeground(QColor(Qt.GlobalColor.green))
elif count_change < 0:
cell.setForeground(QColor(Qt.GlobalColor.red))
def generate_notifications(self, account_username, col, cell_data, count_change, notifications):
# 生成通知
account = self.get_account_by_account_username(account_username)
if count_change > 0:
if col == 1: # 第1列是注册用户数量
reg_results = ','.join(
[f'`{member.name}`' for member in get_today_new_member_list(account, count_change)])
notifications.append(
('👏', account.name, '注册', count_change, f'用户: {reg_results}', str(cell_data)))
elif col == 2: # 第2列是首存用户数量
deposit_results = '\n'.join(
[f"用户: `{member.name}`, 首存金额: *{member.deposit}*" for member in
get_latest_deposit_user(account, count_change)])
notifications.append(
('🎉', account.name, '首存', count_change, deposit_results, str(cell_data)))
def send_all_notifications(self, notifications):
for notification in notifications:
self.send_notification(*notification)
def addToggleButton(self, text, style):
toggleButton = QPushButton(text)
toggleButton.setCheckable(True)
toggleButton.setStyleSheet(style)
self.top_panel.addWidget(toggleButton)
def setup_middle_panel(self):
# 底部面板,包括文本框
self.bottom_panel = QVBoxLayout()
self.txt = QTextEdit()
self.bottom_panel.addWidget(self.txt)
self.main_layout.addLayout(self.bottom_panel)
def setup_bottom_panel(self):
# 中间面板,包括笔记本(标签页)
self.middle_panel = QVBoxLayout()
self.notebook = QTabWidget()
self.middle_panel.addWidget(self.notebook)
self.add_tabs(self.notebook)
self.main_layout.addLayout(self.middle_panel)
def add_tabs(self, notebook):
column_headers = ["时间", "注册", "首存", "负盈利", "有效", "活跃"]
for account in self.user.accounts:
# 创建 Tab 和布局
tab = QWidget()
tab_layout = QVBoxLayout(tab)
notebook.addTab(tab, account.name)
# 创建表格并设置列数和列标题
table = QTableWidget()
table.setColumnCount(len(column_headers))
table.setHorizontalHeaderLabels(column_headers)
# 禁用表格的编辑功能
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
# 设置列宽
header = table.horizontalHeader()
# 设置所有列为自适应宽度
for i in range(len(column_headers)):
header.setSectionResizeMode(i, QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
# 将表格的大小调整策略设置为填充整个 Tab
table.setSizePolicy(QSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding))
# 将表格添加到布局中
tab_layout.addWidget(table)
# 保存表格引用以便稍后更新
self.tables[account.username] = table
def mousePressEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
self.drag_position = event.globalPosition().toPoint()
self.resize_direction = self.get_resize_direction(event.pos())
if self.resize_direction:
self.is_resizing = True
elif self.is_draggable_area(event.pos()):
self.is_dragging = True
else:
return
super().mousePressEvent(event)
def is_draggable_area(self, pos):
# 将点转换为标题栏的局部坐标
title_bar_pos = self.customTitleBar.mapFromParent(pos)
# 检查点是否在标题栏内
return self.customTitleBar.rect().contains(title_bar_pos)
# 重置鼠标样式
def reset_cursor_style(self):
if not (self.is_dragging or self.is_resizing):
self.setCursor(Qt.CursorShape.ArrowCursor)
def mouseMoveEvent(self, event):
direction = self.get_resize_direction(event.pos())
# 更新鼠标样式
if direction == "left" or direction == "right":
self.setCursor(Qt.CursorShape.SizeHorCursor)
elif direction == "top" or direction == "bottom":
self.setCursor(Qt.CursorShape.SizeVerCursor)
elif direction in ["top-left", "bottom-right"]:
self.setCursor(Qt.CursorShape.SizeFDiagCursor)
elif direction in ["top-right", "bottom-left"]:
self.setCursor(Qt.CursorShape.SizeBDiagCursor)
else:
self.reset_cursor_style()
# 处理窗口拖动
if self.is_dragging:
self.move(self.pos() + (event.globalPosition().toPoint() - self.drag_position))
self.drag_position = event.globalPosition().toPoint()
# 处理窗口调整大小
elif self.is_resizing:
self.resize_window(event.globalPosition().toPoint())
super().mouseMoveEvent(event)
def mouseReleaseEvent(self, event):
self.is_dragging = False
self.is_resizing = False
super().mouseReleaseEvent(event)
def get_resize_direction(self, pos):
border_width = 10 # 边缘感应区的宽度
rect = self.rect()
left, right, top, bottom = rect.left(), rect.right(), rect.top(), rect.bottom()
if pos.x() < left + border_width and pos.y() < top + border_width:
return "top-left"
if pos.x() > right - border_width and pos.y() < top + border_width:
return "top-right"
if pos.x() < left + border_width and pos.y() > bottom - border_width:
return "bottom-left"
if pos.x() > right - border_width and pos.y() > bottom - border_width:
return "bottom-right"
if pos.x() < left + border_width:
return "left"
if pos.x() > right - border_width:
return "right"
if pos.y() < top + border_width:
return "top"
if pos.y() > bottom - border_width:
return "bottom"
return None
def resize_window(self, current_pos):
if not self.resize_direction:
return
delta = current_pos - self.drag_position
rect = self.geometry()
if "left" in self.resize_direction:
rect.setLeft(rect.left() + delta.x())
if "right" in self.resize_direction:
rect.setRight(rect.right() + delta.x())
if "top" in self.resize_direction:
rect.setTop(rect.top() + delta.y())
if "bottom" in self.resize_direction:
rect.setBottom(rect.bottom() + delta.y())
self.setGeometry(rect)
self.drag_position = current_pos
def closeEvent(self, event):
if self.minimum:
event.ignore()
self.hide()
else:
super().closeEvent(event)
def exit_application(self):
self.tray_icon.hide() # 隐藏托盘图标
QApplication.quit()
def tray_icon_clicked(self, reason):
if reason == QSystemTrayIcon.ActivationReason.Trigger:
if self.isVisible():
self.hide()
else:
self.showNormal()
def copy_to_clipboard(text):
clipboard = QApplication.clipboard()
clipboard.setText(text)