优化了消息发送的逻辑,改用docker部署了

This commit is contained in:
zayac 2024-06-03 15:21:27 +08:00
parent 1b0f7ea4e0
commit f510000e10
3 changed files with 105 additions and 72 deletions

34
src/core/Dockerfile Normal file
View File

@ -0,0 +1,34 @@
# 使用官方的Python基础镜像
FROM python:3.11-slim
# 安装必要的工具,包括 procps 包含 ps 命令
RUN apt-get update && apt-get install -y procps
# 设置工作目录
WORKDIR /app
# 将requirements.txt复制到工作目录
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 将当前目录下的所有文件复制到工作目录
COPY ./message_server.py .
COPY ./health_check.sh .
# 确保健康检查脚本有执行权限
RUN chmod +x /app/health_check.sh
# 设置环境变量
ENV RABBITMQ_USER=bot
ENV RABBITMQ_PASSWORD=xiaomi@123
ENV RABBITMQ_HOST=mq.stupidpz.com
ENV RABBITMQ_PORT=5672
ENV TELEGRAM_BOT_TOKEN=<your_telegram_bot_token>
# 配置健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 CMD ["sh", "/app/health_check.sh"]
# 启动应用
CMD ["python", "message_server.py"]

9
src/core/health_check.sh Normal file
View File

@ -0,0 +1,9 @@
#!/bin/bash
# 检查python进程是否在运行
if ps aux | grep "python message_server.py" | grep -v grep > /dev/null
then
exit 0
else
exit 1
fi

View File

@ -1,15 +1,10 @@
import time
import pika
import requests
import asyncio
import aio_pika
import aiohttp
from loguru import logger
import os, sys
from collections import defaultdict
# 移除默认的日志处理器
logger.remove()
# 添加新的日志处理器,设置日志级别为 INFO
logger.add(sys.stderr, level="INFO")
import time
# 环境变量或配置文件中获取敏感信息
rabbitmq_user = os.getenv("RABBITMQ_USER", "bot")
@ -18,6 +13,13 @@ rabbitmq_host = os.getenv("RABBITMQ_HOST", "mq.stupidpz.com")
rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672))
telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
# 移除默认的日志处理器
logger.remove()
# 添加新的日志处理器,设置日志级别为 DEBUG
logger.add(sys.stderr, level="INFO")
class RateLimiter:
def __init__(self, rate, per):
@ -41,87 +43,75 @@ class RateLimiter:
return True
class MessageServer:
def __init__(self):
self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
self.connection_params = pika.ConnectionParameters(
rabbitmq_host,
rabbitmq_port,
'/',
self.credentials,
heartbeat=600 # 设置心跳时间
)
self.connection = None
self.channel = None
self.rate_limiters = defaultdict(lambda: RateLimiter(rate=30, per=1)) # 每秒最多30条消息
self.group_rate_limiters = defaultdict(lambda: RateLimiter(rate=20, per=60)) # 每分钟最多20条消息
async def _send_message_to_user(bot_token, target_id, message, rate_limiter, session):
if not rate_limiter.can_send():
await asyncio.sleep(1)
return await _send_message_to_user(bot_token, target_id, message, rate_limiter, session)
def _ensure_connection(self):
if not self.connection or self.connection.is_closed:
self.connection = pika.BlockingConnection(self.connection_params)
if not self.channel or self.channel.is_closed:
self.channel = self.connection.channel()
self.channel.queue_declare(queue='message_queue')
def start(self):
while True:
try:
self._ensure_connection()
def callback(ch, method, properties, body):
logger.info(body.decode())
bot_token, target_id, message = body.decode().split('|')
if target_id.startswith('-'): # 群组ID以'-'开头
rate_limiter = self.group_rate_limiters[target_id]
else:
rate_limiter = self.rate_limiters[target_id]
_send_message_to_user(bot_token, target_id, message, rate_limiter)
self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True)
logger.info("开始消费消息...")
self.channel.start_consuming()
except Exception as e:
logger.error(f"发生异常,正在尝试重新连接...错误详情:{e}")
time.sleep(10) # 休眠一段时间后重试
def stop(self):
if self.connection and not self.connection.is_closed:
self.connection.close()
def _send_message_to_user(bot_token, target_id, message, rate_limiter):
base_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
params = {
"chat_id": target_id,
"text": message,
"parse_mode": "MarkdownV2"
}
max_retry = 3
retry_count = 0
while retry_count < max_retry:
if rate_limiter.can_send():
try:
response = requests.post(base_url, params=params)
if response.status_code == 200:
logger.debug(f'消息发送成功:{message}')
try:
async with session.post(base_url, params=params) as response:
if response.status == 200:
logger.debug(f'消息发送成功: {message}')
return
else:
logger.debug('消息发送失败,重试中...')
logger.error(response.text)
except requests.exceptions.RequestException as e:
logger.error(f'网络异常,重试中... 错误详情: {e}')
time.sleep(10)
retry_count += 1
else:
time.sleep(1) # 如果超出速率限制,休眠一秒钟
logger.error(await response.text())
except aiohttp.ClientError as e:
logger.error(f'网络异常,重试中... 错误详情: {e}')
await asyncio.sleep(10)
retry_count += 1
logger.debug('消息发送失败')
async def message_handler(message: aio_pika.IncomingMessage):
async with message.process():
logger.info(message.body.decode())
bot_token, target_id, message_text = message.body.decode().split('|')
if target_id.startswith('-'): # 群组ID以'-'开头
rate_limiter = group_rate_limiters[target_id]
else:
rate_limiter = rate_limiters[target_id]
await _send_message_to_user(bot_token, target_id, message_text, rate_limiter, session)
async def main():
connection = await aio_pika.connect_robust(
host=rabbitmq_host,
port=rabbitmq_port,
login=rabbitmq_user,
password=rabbitmq_password
)
channel = await connection.channel()
queue = await channel.declare_queue('message_queue')
global session
async with aiohttp.ClientSession() as session:
await queue.consume(message_handler)
logger.info("开始消费消息...")
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
server = MessageServer()
rate_limiters = defaultdict(lambda: RateLimiter(rate=30, per=1)) # 每秒最多30条消息
group_rate_limiters = defaultdict(lambda: RateLimiter(rate=20, per=60)) # 每分钟最多20条消息
try:
server.start()
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序被手动停止")
server.stop()