优化了部分设置,更新了新版验证码

This commit is contained in:
zayac 2024-05-31 14:42:41 +08:00
parent 85cfc9ae60
commit c79c832128
6 changed files with 189 additions and 132 deletions

View File

@ -1,12 +1,14 @@
# 使用官方 Python 3.11 镜像 # 使用官方 Python 3.11 镜像
FROM python:3.11 FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 将应用代码复制到 /app 目录下 # 将应用代码复制到 /app 目录下
COPY . /app COPY . /app
COPY requirements.txt /app/requirements.txt COPY requirements.txt /app/requirements.txt
# 设置工作目录
WORKDIR /app
# 安装 Python 依赖 # 安装 Python 依赖
RUN pip install -r requirements.txt RUN pip install -r requirements.txt

View File

@ -4,7 +4,7 @@ import time
import os import os
from loguru import logger from loguru import logger
BOT_TOKEN = os.getenv('BOT_TOKEN', default="6356456493:AAF2J03isyhlOFF6WgoovRCzuvHheTrTKmM") BOT_TOKEN = os.getenv('BOT_TOKEN', default="7153107488:AAEfeznAQzcvJhoEa0QqKN9baP4luQ4Xd1Y")
API_URL = os.getenv('API_URL', default="http://127.0.0.1:8080/changeurl") API_URL = os.getenv('API_URL', default="http://127.0.0.1:8080/changeurl")
logger.debug(f"bot_token:{BOT_TOKEN}") logger.debug(f"bot_token:{BOT_TOKEN}")

View File

@ -44,7 +44,7 @@ async def async_post(url: str, headers: dict, params: dict) -> ApiResponse[Any]:
def account_post(url: str, account: Account, params: dict) -> ApiResponse[Any]: def account_post(url: str, account: Account, params: dict) -> ApiResponse[Any]:
for _ in range(3): for _ in range(3):
try: try:
if account.headers is None: if not account.headers:
account = login(account) account = login(account)
api_res = post(url=account.url + url, headers=account.headers, params=params) api_res = post(url=account.url + url, headers=account.headers, params=params)
if api_res.status_code == 6000: if api_res.status_code == 6000:
@ -89,5 +89,6 @@ async def async_account_post(url: str, account: Account, params: dict) -> ApiRes
# Add a delay before retrying # Add a delay before retrying
await asyncio.sleep(10) await asyncio.sleep(10)
send_message(account.user.bot_token, account.user.group_id, f'{account.url}: Retry limit exceeded, please check the code') send_message(account.user.bot_token, account.user.group_id,
f'{account.url}: Retry limit exceeded, please check the code')
logger.error("Retry limit exceeded, please check the code") logger.error("Retry limit exceeded, please check the code")

View File

@ -1,156 +1,214 @@
import base64 import base64
import json
import re import re
import time import asyncio
from playwright.async_api import async_playwright, TimeoutError
import requests
from playwright.sync_api import Position, TimeoutError, sync_playwright
from loguru import logger from loguru import logger
from src.core.message_client import send_message
from src.entity.account import Account, AccountType from src.entity.account import Account, AccountType
import aiohttp
from src.entity.database import db from src.entity.database import db
from src.ui import global_signals
def on_request(request, account: Account): # 提取重复的请求和响应处理逻辑
if 'banner' in request.url: async def handle_event(page, event_type, callback, *args):
page.on(event_type, lambda event: asyncio.create_task(callback(event, *args)))
async def handle_request(request, result_container, request_event):
if 'perInfo' in request.url:
headers = request.headers headers = request.headers
account.headers = headers result_container['headers'] = headers
logger.info(f'Headers for account {account.name}: {headers}') logger.info(f'Successfully got headers: {headers}')
persistence(account, headers) request_event.set()
# 通知app数据更新了
global_signals.user_data_updated.emit()
def login(account: Account) -> Account: async def handle_response(response, response_container, response_event):
logger.info(f'Starting login for account: {account.name}') if 'loginFlow' in response.url:
data = await response.json()
response_container['data'] = data
response_event.set()
async def playwright_login(account: Account) -> dict:
result_container = {'headers': {}}
response_container = {'data': {}}
logger.info(f'Starting login for username: {account.username}')
try: try:
with sync_playwright() as playwright: async with async_playwright() as playwright:
account = perform_login(playwright, account) await perform_login(playwright, account, result_container, response_container)
except Exception as e: except Exception as e:
logger.error(f'Error during login for account {account.name}: {e}', exc_info=True) logger.error(f'Error during login for account {account.username}: {e}', exc_info=True)
handle_login_failure(account) await handle_login_failure(account)
return account return result_container['headers']
def perform_login(playwright, account: Account) -> Account: async def perform_login(playwright, account: Account, request_container: dict,
browser = playwright.chromium.launch(headless=False) response_container: dict):
context = browser.new_context() browser = await playwright.chromium.launch(headless=False)
page = context.new_page() context = await browser.new_context()
page.goto(account.url) page = await context.new_page()
fill_login_form(page, account)
if handle_captcha(page, account.type, retry_count=0): request_event = asyncio.Event()
account.headers = capture_request_headers(page, account) response_event = asyncio.Event()
logger.info('登录成功')
await page.goto(account.url)
await handle_event(page, 'request', handle_request, request_container, request_event)
await handle_event(page, 'response', handle_response, response_container, response_event)
# 填充基本的元素
await fill_form_common(page, account)
# 如果是九游 验证码滑动,并且滑动之后自动登录成功
if account.type == AccountType.jy:
await handle_jy_captcha(page, account, 0)
logger.info("验证码处理成功")
try:
# 等待请求发送 获取header返回值
await request_event.wait()
except TimeoutError:
# 超时 即验证码处理失败
logger.warning("登录处理失败")
else: else:
logger.error('登录失败了') # 验证码输入成功
close_resources(page, context, browser) max_retries = 3
return account for attempt in range(max_retries):
if await handle_captcha(page):
response_event.clear() # 重置事件
await page.locator("div").filter(has_text=re.compile(r"^登录$")).get_by_role("button").click()
try:
await response_event.wait()
if response_container['data'].get('status_code') != 6000:
logger.warning(f"登录发生错误, {response_container['data'].get('message', '未知错误')}")
close_button = page.locator("button.ant-modal-close")
await close_button.wait_for()
await close_button.click()
else:
break
except TimeoutError:
logger.warning(f"等待响应超时,重试 {attempt + 1}/{max_retries}")
else:
logger.warning(f"验证码输入失败,重试 {attempt + 1}/{max_retries}")
else:
logger.warning("登录失败,达到最大重试次数")
try:
# 等待请求发送 获取header返回值
await request_event.wait()
except TimeoutError:
# 超时 即验证码处理失败
logger.warning("登录处理失败")
await close_resources(page, context, browser)
def fill_login_form(page, account: Account): async def fill_form_common(page, account: Account):
username_input = page.get_by_placeholder('用户名') username_input = page.get_by_placeholder('用户名')
password_input = page.get_by_placeholder('密码') password_input = page.get_by_placeholder('密码')
username_input.click() await username_input.click()
username_input.fill(account.username) await username_input.fill(account.username)
password_input.click() await password_input.click()
password_input.fill(account.password) await password_input.fill(account.password)
page.locator("div").filter(has_text=re.compile(r"^登录$")).get_by_role("button").click()
logger.info(f'{account.name}登录ing...........')
def handle_captcha(page, account_type, retry_count) -> bool: async def handle_jy_captcha(page, account, retry_count=0):
if retry_count < 3: await page.locator("div").filter(has_text=re.compile(r"^登录$")).get_by_role("button").click()
try: validate_code = page.locator('.geetest_box')
validate_code = page.locator('.geetest_box') await validate_code.wait_for(state='visible')
validate_code.wait_for(state='visible') await asyncio.sleep(1)
time.sleep(1) validate_code_buffer = await validate_code.screenshot()
return process_validate_code(page, validate_code, account_type)
except TimeoutError:
retry_count += 1
logger.error(f'验证码识别失败,正在重试:{retry_count}')
return handle_captcha(page, account_type, retry_count)
return False
def process_validate_code(page, validate_code, account_type):
validate_code_buffer = validate_code.screenshot()
img = base64.b64encode(validate_code_buffer).decode('utf-8') img = base64.b64encode(validate_code_buffer).decode('utf-8')
# 九游滑动验证 需要单独处理 res = await base64_api(img=img, typeid=33)
if account_type == AccountType.jy: drag_btn = validate_code.locator(".geetest_btn")
res = base64_api(img=img, typeid=33) drag_btn_box = await drag_btn.bounding_box()
drag_btn = validate_code.locator(".geetest_btn") try:
drag_btn_box = drag_btn.bounding_box() distance = int(res) + 3
target_x = drag_btn_box['x'] + distance
await drag_btn.hover()
await page.mouse.down()
await page.mouse.move(target_x, drag_btn_box["y"], steps=25)
await page.mouse.up()
await validate_code.wait_for(state='hidden')
return True
except (ValueError, TimeoutError):
retry_count += 1
logger.error(f"验证码处理失败,重试: {res}")
return await handle_jy_captcha(page, account, retry_count)
try:
distance = int(res)
target_x = drag_btn_box['x'] + distance
drag_btn.hover()
page.mouse.down()
# 缓慢滑动元素
step = distance // 10 # 每次滑动的步长
for i in range(0, distance, step):
page.mouse.move(drag_btn_box["x"] + i, drag_btn_box["y"])
page.wait_for_timeout(1 // 10)
# 模拟鼠标移动操作,将元素向 x 轴滑动指定距离 async def handle_captcha(page):
page.mouse.move(target_x, drag_btn_box["y"]) validate_code_input = page.get_by_placeholder('请输入验证码')
page.mouse.up() validate_code_img = page.locator('#boxModle')
validate_code.wait_for(state='hidden') await validate_code_img.wait_for()
logger.debug('验证码点击成功') validate_code_buffer = await validate_code_img.screenshot()
return True img_base64 = base64.b64encode(validate_code_buffer).decode('utf-8')
except ValueError: res = await base64_api(img=img_base64, typeid=1003)
logger.error(f"获取移动距离失败,实际返回内容为:{res}") await validate_code_input.click()
return False await validate_code_input.fill(res)
except TimeoutError: return True
logger.debug("验证码滑动失败")
return process_validate_code(page, validate_code, account_type)
async def handle_standard_captcha(page, validate_code, img):
res = await base64_api(img=img)
if '|' in res:
await click_captcha_positions(validate_code, res)
await validate_code.locator('.geetest_submit').click()
await validate_code.wait_for(state='hidden')
logger.debug('验证码点击成功')
return True
else: else:
res = base64_api(img=img) logger.error(res)
if '|' in res: return False
click_captcha_positions(validate_code, res)
validate_code.locator('.geetest_submit').click()
validate_code.wait_for(state='hidden')
logger.debug('验证码点击成功')
return True
else:
logger.error(res)
return False
def click_captcha_positions(validate_code, positions_str): async def click_captcha_positions(validate_code, positions_str):
for part in positions_str.split('|'): for part in positions_str.split('|'):
x, y = part.split(',') x, y = part.split(',')
validate_code.click(position=Position(x=int(x), y=int(y))) await validate_code.click(position={"x": int(x), "y": int(y)})
time.sleep(.5) await asyncio.sleep(.5)
def capture_request_headers(page, account: Account): async def close_resources(page, context, browser):
page.on('request', lambda request: on_request(request, account)) try:
page.wait_for_url(f'{account.url}/app/home?showWelcome=false') await page.close()
return account.headers await context.close()
await browser.close()
except Exception as e:
logger.error(f'Error closing browser resources: {e}', exc_info=True)
def close_resources(page, context, browser): async def handle_login_failure(account: Account):
page.close() send_message(account.user.bot_token, account.user.group_id,
context.close() f'{account.url}:加载超时,请检查是否后台更换了链接')
browser.close() return
def handle_login_failure(account: Account): async def base64_api(uname='luffy230505', pwd='qwer12345', img='', typeid=20):
# 处理登录失败的情况 logger.info('Calling third part interfaces')
pass data = {
"username": uname,
"password": pwd,
"typeid": typeid,
"image": img,
'softid': '8d13df0efe074035b54ee9c2bef85106'
}
async with aiohttp.ClientSession() as session:
async with session.post("http://api.ttshitu.com/predict", json=data) as response:
result = await response.json()
if result['success']:
return result["data"]["result"]
else:
return result["message"]
def base64_api(uname='luffy230505', pwd='qwer12345', img='', typeid=20): def login(account: Account):
logger.info('Calling base64_api') headers = asyncio.run(playwright_login(account))
data = {"username": uname, "password": pwd, "typeid": typeid, "image": img, if headers:
'softid': '8d13df0efe074035b54ee9c2bef85106'} account.headers = headers
result = json.loads(requests.post("http://api.ttshitu.com/predict", json=data).text) persistence(account, headers)
if result['success']: return account
return result["data"]["result"]
else: else:
return result["message"] send_message(account.user.bot_token, account.user.group_id,
f'{account.url}:加载超时,请检查是否后台更换了链接')
return
def persistence(account: Account, headers: dict): def persistence(account: Account, headers: dict):
@ -161,10 +219,3 @@ def persistence(account: Account, headers: dict):
db_account.headers = headers db_account.headers = headers
session.commit() session.commit()
logger.info(f'Headers persisted for account {account.name}') logger.info(f'Headers persisted for account {account.name}')
# if __name__ == '__main__':
# with open('C:\\Users\\Administrator\\Desktop\\Snipaste_2024-03-26_21-46-35.png', 'rb') as f:
# base64_data = base64.b64encode(f.read())
# b64 = base64_data.decode('utf-8')
# res = base64_api(img= b64,typeid=33)
# print(res)

View File

@ -75,7 +75,7 @@ class Application(QMainWindow):
def set_window_properties(self): def set_window_properties(self):
self.resize(600, 400) self.resize(600, 400)
self.setWindowTitle("zayac的小工具") self.setWindowTitle(f'{self.username}的小工具')
self.setWindowIcon(QIcon("icons:icon.png")) self.setWindowIcon(QIcon("icons:icon.png"))
self.setWindowFlags(Qt.WindowType.FramelessWindowHint) self.setWindowFlags(Qt.WindowType.FramelessWindowHint)
@ -93,6 +93,7 @@ class Application(QMainWindow):
def init_tray_icon(self): def init_tray_icon(self):
self.tray_icon = SystemTrayIcon(QIcon("icons:icon.png"), self) # 替换为正确的图标路径 self.tray_icon = SystemTrayIcon(QIcon("icons:icon.png"), self) # 替换为正确的图标路径
self.tray_icon.setToolTip(self.windowTitle()) # 设置工具提示文本为窗口标题
self.tray_icon.show() self.tray_icon.show()
def load_config(self): def load_config(self):
@ -341,6 +342,7 @@ class Application(QMainWindow):
self.update_cell_color(table, row, col, count_change) self.update_cell_color(table, row, col, count_change)
# 确保传递 cell_data 和 count_change 参数 # 确保传递 cell_data 和 count_change 参数
self.generate_notifications(account_username, col, new_data, count_change, notifications) self.generate_notifications(account_username, col, new_data, count_change, notifications)
def update_cell_color(self, table, row, col, count_change): def update_cell_color(self, table, row, col, count_change):
"""更新单元格颜色""" """更新单元格颜色"""
cell = table.item(row, col) cell = table.item(row, col)
@ -482,7 +484,7 @@ class Application(QMainWindow):
return "right" return "right"
if pos.y() < top + border_width: if pos.y() < top + border_width:
return "top" return "top"
if pos.y() > bottom - border_width: if pos.y() > bottom + border_width:
return "bottom" return "bottom"
return None return None

View File

@ -2,6 +2,7 @@ from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont, QIcon, QPixmap from PyQt6.QtGui import QFont, QIcon, QPixmap
from PyQt6.QtWidgets import QHBoxLayout, QLabel, QToolButton, QWidget from PyQt6.QtWidgets import QHBoxLayout, QLabel, QToolButton, QWidget
class CustomTitleBar(QWidget): class CustomTitleBar(QWidget):
def __init__(self, parent): def __init__(self, parent):
super().__init__(parent) super().__init__(parent)
@ -30,7 +31,7 @@ class CustomTitleBar(QWidget):
def setup_title_label(self): def setup_title_label(self):
font = QFont() font = QFont()
font.setPointSize(11) font.setPointSize(11)
self.titleLabel = QLabel("zayacs Toolkit") self.titleLabel = QLabel(f"{self.parent.username}的小工具")
self.titleLabel.setFont(font) self.titleLabel.setFont(font)
self.titleLabel.setAlignment(Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter) self.titleLabel.setAlignment(Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter)
self.layout.addWidget(self.titleLabel, 1) self.layout.addWidget(self.titleLabel, 1)
@ -77,4 +78,4 @@ class CustomTitleBar(QWidget):
self.maximizeButton.setIcon(QIcon('icons:max.svg')) # Update icon if needed self.maximizeButton.setIcon(QIcon('icons:max.svg')) # Update icon if needed
else: else:
self.parent.showMaximized() self.parent.showMaximized()
self.maximizeButton.setIcon(QIcon('icons:restore.svg')) # Update icon if needed self.maximizeButton.setIcon(QIcon('icons:restore.svg')) # Update icon if needed