import base64 import json import re import time import requests from playwright.sync_api import Position, TimeoutError, sync_playwright from src import logger from src.entity.account import Account from src.entity.database import db from src.ui import global_signals def on_request(request, account: Account): if 'banner' in request.url: headers = request.headers account.headers = headers logger.info(f'Headers for account {account.name}: {headers}') persistence(account, headers) # 通知app数据更新了 global_signals.user_data_updated.emit() def login(account: Account) -> Account: logger.info(f'Starting login for account: {account.name}') try: with sync_playwright() as playwright: account = perform_login(playwright, account) except Exception as e: logger.error(f'Error during login for account {account.name}: {e}', exc_info=True) handle_login_failure(account) return account def perform_login(playwright, account: Account) -> Account: browser = playwright.chromium.launch(headless=False) context = browser.new_context() page = context.new_page() page.goto(account.url) fill_login_form(page, account) if handle_captcha(page): account.headers = capture_request_headers(page, account) logger.info('登录成功') else: logger.error('登录失败或验证码处理失败') close_resources(page, context, browser) return account def fill_login_form(page, account: Account): username_input = page.get_by_placeholder('用户名') password_input = page.get_by_placeholder('密码') username_input.click() username_input.fill(account.username) password_input.click() password_input.fill(account.password) page.locator("div").filter(has_text=re.compile(r"^登录$")).get_by_role("button").click() logger.info(f'{account.name}登录ing...........') def handle_captcha(page) -> bool: try: validate_code = page.wait_for_selector('.geetest_box', state='visible') time.sleep(1) return process_validate_code(validate_code) except TimeoutError: logger.error('超时了') return False def process_validate_code(validate_code): validate_code_buffer = validate_code.screenshot() img = base64.b64encode(validate_code_buffer).decode('utf-8') res = base64_api(img=img) if '|' in res: click_captcha_positions(validate_code, res) validate_code.query_selector('.geetest_submit').click() validate_code.wait_for_element_state('hidden') logger.debug('验证码点击成功') return True else: logger.error(res) return False def click_captcha_positions(validate_code, positions_str): for part in positions_str.split('|'): x, y = part.split(',') validate_code.click(position=Position(x=int(x), y=int(y))) time.sleep(.5) def capture_request_headers(page, account: Account): page.on('request', lambda request: on_request(request, account)) page.wait_for_url(f'{account.url}/app/home?showWelcome=false') return account.headers def close_resources(page, context, browser): page.close() context.close() browser.close() def handle_login_failure(account: Account): # 处理登录失败的情况 pass def base64_api(uname='luffy230505', pwd='qwer12345', img='', typeid=20): logger.info('Calling base64_api') data = {"username": uname, "password": pwd, "typeid": typeid, "image": img, 'softid': '8d13df0efe074035b54ee9c2bef85106'} result = json.loads(requests.post("http://api.ttshitu.com/predict", json=data).text) if result['success']: return result["data"]["result"] else: return result["message"] def persistence(account: Account, headers: dict): logger.info(f'Persisting headers for account {account.name}') with db.Session() as session: db_account = session.query(Account).filter(Account.username == account.username, Account.password == account.password).one() db_account.headers = headers db_account.x_api_token = headers['x-api-token'] session.commit() logger.info(f'Headers persisted for account {account.name}')