tools-pyqt/src/change_url/change_url.py

172 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import requests
from sqlalchemy import create_engine
from sqlalchemy import text as sql_text
try:
engine = create_engine("mysql+mysqlconnector://ky_tools:HJQY35seXen8patn@1panel.stupidpz.com:3306/ky_tools")
connection = engine.connect()
print('链接成功')
except:
print('连接失败!')
ky_base64 = ''
def read_file(filename):
try:
with open(filename, 'r') as file:
return file.read()
except IOError as e:
print(f"Error reading file {filename}: {e}")
raise
def write_file(filename, content):
try:
with open(filename, 'w') as file:
file.write(content)
except IOError as e:
print(f"Error writing to file {filename}: {e}")
raise
def replace_urls(js_content, url_mapping):
for placeholder, url in url_mapping.items():
js_content = js_content.replace(placeholder, url)
return js_content
def get_url_mapping(urls_with_context, template_type):
categories = {"WEB": [], "H5": [], "全站": [], "体育": [], "棋牌": [], "电子": [], "web": [], "h5": [],
"全站APP": [], "体育APP": []}
# 遍历URLs及其前文本的元组列表进行分类
for context, url in urls_with_context:
key = context.strip().replace(" ", "")
if key in categories:
categories[key].append(url)
# 构建映射字典,直接返回结果
if template_type == 'hth':
url_mapping = {
"{{hthApp}}": categories.get("全站", [""])[0] if categories.get("全站", [""]) else
categories.get("全站APP", [""])[0],
"{{hthtyApp}}": categories.get("体育", [""])[0] if categories.get("体育", [""]) else
categories.get("体育APP", [""])[0],
"{{hthPc}}": categories.get("WEB", [""])[0] if categories.get("WEB", [""]) else categories.get("web", [""])[
0],
"{{hthH5}}": categories.get("H5", [""])[0] if categories.get("H5", [""]) else categories.get("h5", [""])[0]
}
elif template_type == 'ky': # ky模板
url_mapping = {
"{{kyPc}}": categories.get("WEB", [""])[0],
"{{kyPc2}}": categories.get("WEB", ["", ""])[1],
"{{kyH51}}": categories.get("H5", [""])[0],
"{{kyH52}}": categories.get("H5", ["", ""])[1],
"{{kyApp1}}": categories.get("全站", [""])[0],
"{{kyApp2}}": categories.get("体育", [""])[0],
# "{{kyApp2}}": categories.get("全站", ["", ""])[1],
"{{kyTy1}}": categories.get("体育", [""])[0],
"{{kyTy2}}": categories.get("全站", [""])[0],
# "{{kyTy2}}": categories.get("体育", ["", ""])[1]
}
elif template_type == 'jy': # jy模板
url_mapping = {
"{{jyPc}}": categories.get("WEB", [""])[0],
"{{jyH5}}": categories.get("H5", [""])[0],
"{{jyApp}}": categories.get("全站", [""])[0],
"{{jyQp}}": categories.get("棋牌", [""])[0],
"{{jyDz}}": categories.get("电子", [""])[0],
"{{jyFljPc}}": categories.get("WEB", [""])[1] if len(categories.get("WEB", [""])) == 2 else '',
"{{jyFljH5}}": categories.get("H5", [""])[1] if len(categories.get("H5", [""])) == 2 else '',
}
else:
print('未知模板')
return
return url_mapping
def extract_urls(text):
# seo_part_index = text.find("SEO 防拦截域名")
# relevant_text = text[:seo_part_index] if seo_part_index != -1 else text
pattern = r'(?:\n|^)\s*([^\n]*?)\s*(https?://[^\s]+)'
# 查找到所有的url
url_type = 'unknown'
is_agent = False
url_list = re.findall(pattern, text)
for context, url in url_list:
try:
response = requests.get(url, headers={
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'})
if response.status_code == 200:
if "开云" in response.text or ky_base64 in response.text or "开云" in text:
url_type = 'ky'
elif '华体会' in response.text:
url_type = 'hth'
elif '九游' in response.text:
url_type = 'jy'
# 判断是后台代理连接
if '马甲包' in response.text:
is_agent = True
if url_type != 'unknown':
update_agent_url(url_list, url_type)
break
except requests.RequestException:
print(f"URL '{url}' 不可用,跳过。")
else:
print(f"未找到匹配的关键词。")
return [(context.strip(), url) for context, url in url_list], url_type, is_agent
def update_agent_url(url_list, url_type):
for context, url in url_list:
if "代理web" in context.replace(" ", "").lower():
try:
update_stmt = sql_text(f"UPDATE ky_tools.ky_account SET url = :url WHERE type = :url_type")
connection.execute(update_stmt, {"url": url, "url_type": url_type})
connection.commit()
except Exception as e:
print(f"执行SQL语句出错{e}")
finally:
connection.close()
break
def change_url(text):
try:
urls_with_context, url_type, is_agent = extract_urls(text)
expected_filename = None
if url_type == 'unknown':
return False, 'unknown.js'
if not is_agent:
template_path = f'/www/wwwroot/change_url/{url_type}.js.template'
js_content = read_file(template_path)
url_mapping = get_url_mapping(urls_with_context, url_type)
updated_content = replace_urls(js_content, url_mapping)
expected_filename = template_path.replace('.template', '')
write_file(expected_filename, updated_content)
return True, expected_filename
return True, '代理更新'
except Exception as e:
print(f"An error occurred: {e}")
return False, expected_filename if expected_filename else 'unknown.js'
if __name__ == '__main__':
text = '''代理 web https://www.krb01.com:8002  
代理 web https://www.k7hhk.com:9015   
代理 web https://www.ku1fs.com:9966    
代理H5 https://www.lwa5x.com:8004
代理H5 https://www.libsz.com:9009  
代理H5   https://www.lqqgl.com:9152'''
# res = requests.get("https://www.e5vj65s.com:9961")
# print(res)
# print("华体会" in res.text)
# print("开云" in res.text)
# print("九游" in res.text)
change_url(text)