tools-pyqt/src/change_url/change_url.py

266 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import requests
from sqlalchemy import create_engine, text as sql_text
import portalocker
from loguru import logger
try:
engine = create_engine("mysql+mysqlconnector://ky_tools:HJQY35seXen8patn@1panel.stupidpz.com:3306/ky_tools")
connection = engine.connect()
logger.debug('链接成功')
except:
logger.debug('连接失败!')
ky_base64 = ''
def read_file(filename):
try:
with open(filename, 'r') as file:
return file.read()
except IOError as e:
logger.debug(f"Error reading file {filename}: {e}")
raise
def write_file(filename, content):
try:
with open(filename, 'w') as file:
portalocker.lock(file, portalocker.LOCK_EX) # 获取独占锁
file.write(content)
portalocker.unlock(file) # 释放锁
except IOError as e:
logger.debug(f"Error writing to file {filename}: {e}")
raise
def get_url_mapping(urls_with_context, template_type):
categories = {"WEB": [], "H5": [], "全站": [], "体育": [], "棋牌": [], "电子": [], "web": [], "h5": [],
"全站APP": [], "体育APP": []}
# 遍历URLs及其前文本的元组列表进行分类
for context, url in urls_with_context:
key = context.strip().replace(" ", "")
if key in categories:
categories[key].append(url)
# 构建映射字典,直接返回结果
if template_type == 'hth':
url_mapping = {
"hthApp": categories.get("全站", [""])[0] if categories.get("全站", [""]) else
categories.get("全站APP", [""])[0],
"hthtyApp": categories.get("体育", [""])[0] if categories.get("体育", [""]) else
categories.get("体育APP", [""])[0],
"hthWeb": categories.get("WEB", [""])[0] if categories.get("WEB", [""]) else categories.get("web", [""])[
0],
"hthH5": categories.get("H5", [""])[0] if categories.get("H5", [""]) else categories.get("h5", [""])[0]
}
elif template_type == 'ky': # ky模板
url_mapping = {
"kyWeb": categories.get("WEB", [""])[0],
"kyFljWeb": categories.get("WEB", ["", ""])[1],
"kyH5": categories.get("H5", [""])[0],
"kyFljH5": categories.get("H5", ["", ""])[1],
"kyApp": categories.get("全站", [""])[0],
"kyTyApp": categories.get("体育", [""])[0],
}
elif template_type == 'jy': # jy模板
url_mapping = {
"jyWeb": categories.get("WEB", [""])[0],
"jyH5": categories.get("H5", [""])[0],
"jyApp": categories.get("全站", [""])[0],
"jyQp": categories.get("棋牌", [""])[0],
"jyDz": categories.get("电子", [""])[0],
"jyFljPc": categories.get("WEB", ["", ""])[1],
"jyFljH5": categories.get("H5", ["", ""])[1]
}
elif template_type == 'aty': # aty模板
url_mapping = {
"atyApp": categories.get("全站", [""])[0] if categories.get("全站", [""]) else
categories.get("全站APP", [""])[0],
"atytyApp": categories.get("体育", [""])[0] if categories.get("体育", [""]) else
categories.get("体育APP", [""])[0],
"atyWeb": categories.get("WEB", [""])[0] if categories.get("WEB", [""]) else categories.get("web", [""])[
0],
"atyH5": categories.get("H5", [""])[0] if categories.get("H5", [""]) else categories.get("h5", [""])[0]
}
else:
logger.debug('未知模板')
return
return url_mapping
def extract_urls(text):
pattern = r'(?:\n|^)\s*([^\n]*?)\s*(https?://[^\s]+)'
url_list = re.findall(pattern, text)
url_type, is_agent = get_url_type(text, url_list)
if url_type != 'unknown':
update_agent_url(url_list, url_type)
return [(context.strip(), url) for context, url in url_list], url_type, is_agent
def get_url_type(text, url_list):
keywords = {
'ky': '开云',
'hth': '华体会',
'jy': '九游',
'aty': '爱体育'
}
for url_type, keyword in keywords.items():
if keyword in text:
return url_type, False
return check_urls(url_list)
def check_urls(url_list):
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
for context, url in url_list:
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
content = response.text
if '开云' in content or ky_base64 in content:
return 'ky', '马甲包' in content
elif '华体会' in content:
return 'hth', '马甲包' in content
elif '九游' in content:
return 'jy', '马甲包' in content
elif '爱体育' in content:
return 'aty', '马甲包' in content
except requests.RequestException:
logger.debug(f"URL '{url}' 不可用,跳过。")
return 'unknown', False
def update_agent_url(url_list, url_type):
for context, url in url_list:
if "代理web" in context.replace(" ", "").lower():
try:
with engine.connect() as connection:
update_stmt = sql_text("UPDATE ky_tools.ky_account SET url = :url WHERE type = :url_type")
connection.execute(update_stmt, {"url": url, "url_type": url_type})
connection.commit()
logger.debug(f"{url_type}后台链接更新成功")
except Exception as e:
logger.debug(f"执行SQL语句出错: {e}")
break
def update_js_content(url_mapping, url_type):
js_file = 'static/link.js'
# 读取现有的 link.js 文件内容
try:
existing_content = read_file(js_file)
except FileNotFoundError:
# 如果文件不存在,则创建一个新的 JavaScript 文件
existing_content = """
const codes = {
'hth': '3016341',
'ky': '97238304',
'jy': '90413847',
'aty': '123456'
};
const links = {
'hth': {
'hthWeb': 'https://www.3wmnr0.vip:8005/register?i_code='+codes['hth'],
'hthH5': 'https://www.95b8ns.vip:8002/entry/register?i_code='+codes['hth'],
'hthApp': 'https://www.jv4n7s1.com:8000/?i_code='+codes['hth'],
'hthtyApp': 'https://www.cmqd0v.vip:9515/?i_code='+codes['hth'],
},
'ky': {
'kyWeb': 'https://www.2ezmig.com:9501/register?i_code='+codes['ky'],
'kyH5': 'https://www.u3hnhg.com:9970/entry/register?i_code='+codes['ky'],
'kyApp': 'https://www.1wpeq.vip:8663/?i_code='+codes['ky'],
'kyTyApp': 'https://www.3djjf.vip:9512/?i_code='+codes['ky'],
'kyFljWeb': 'https://www.qpiao.vip:9006/register?i_code='+codes['ky'],
'kyFljH5': 'https://www.7ig4jf.com:8443/entry/register?i_code='+codes['ky'],
},
'jy': {
'jyWeb': 'https://www.1wv3jq.com:9008/register/?i_code='+codes['jy'],
'jyH5': 'https://www.1j5voy.vip:9113/entry/register?i_code='+codes['jy'],
'jyApp': 'https://www.shbb1.vip:9511/?i_code='+codes['jy'],
'jyQp': 'https://www.3a10en.vip:9077/?i_code='+codes['jy'],
'jyDz': 'https://www.3z9tsr.vip:9964/?i_code='+codes['jy'],
'jyFljPc': 'https://www.az8z32.vip:8002/register/?i_code='+codes['jy'],
'jyFljH5': 'https://www.c053i2.vip:6004/entry/register?i_code='+codes['jy'],
},
'aty': {
'atyWeb': 'https://www.3wmnr0.vip:8005/register?i_code='+codes['aty'],
'atyH5': 'https://www.95b8ns.vip:8002/entry/register?i_code='+codes['aty'],
'atyApp': 'https://www.jv4n7s1.com:8000/?i_code='+codes['aty'],
'atytyApp': 'https://www.cmqd0v.vip:9515/?i_code='+codes['aty'],
},
};
function visit(platform, key) {
window.open(links[platform][key]);
}
"""
# 使用正则表达式匹配和替换链接的域名部分
for key, value in url_mapping.items():
logger.debug(f"key: {key}, value: {value}")
pattern = re.compile(r"'{}':?\s*'(https?://[^/]+)(/[^']*)'".format(key))
existing_content = pattern.sub(lambda match: "'{0}': '{1}{2}'".format(key, value, match.group(2)),
existing_content)
# 将更新后的内容写回到 combined.js 文件中
write_file(js_file, existing_content)
def change_url(text):
try:
urls_with_context, url_type, is_agent = extract_urls(text)
if url_type == 'unknown':
return False, 'unknown.js'
if not is_agent:
url_mapping = get_url_mapping(urls_with_context, url_type)
update_js_content(url_mapping, url_type)
return True, url_type
return True, '代理更新'
except Exception as e:
logger.debug(f"An error occurred: {e}")
return False, 'unknown.js'
# 主程序入口
if __name__ == '__main__':
try:
text = '''
九游娱乐
SEO 专用域名
WEB https://www.1zsaxz.com:9514
H5 https://www.1l3dee.vip:9192  【新】
全站 https://www.sjieu.vip:9013
棋牌   https://www.2w33ay.vip:8001
电子 https://www.3ku3z2.vip:9443
九游娱乐【SEO防拦截域名】
WEB https://www.as421r.vip:9520
H5 https://www.c0azge.vip:7988  【新】
'''
res = requests.post("http://127.0.0.1:8000/changeurl", json={'text': text})
if res.status_code == 200:
if res.json()['res']:
msg = f'{res.json()["msg"]}修改成功'
else:
msg = f'{res.json()["msg"]}修改失败'
else:
msg = '请求发生错误,请检查服务是否正常'
logger.debug(msg)
except Exception as e:
logger.debug(f"An error occurred in the main program: {e}")