From f510000e10f5be688f1e0b5211de555442bf81f3 Mon Sep 17 00:00:00 2001 From: zayac Date: Mon, 3 Jun 2024 15:21:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=86=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=8F=91=E9=80=81=E7=9A=84=E9=80=BB=E8=BE=91,=E6=94=B9?= =?UTF-8?q?=E7=94=A8docker=E9=83=A8=E7=BD=B2=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/Dockerfile | 34 ++++++++++ src/core/health_check.sh | 9 +++ src/core/message_server.py | 134 +++++++++++++++++-------------------- 3 files changed, 105 insertions(+), 72 deletions(-) create mode 100644 src/core/Dockerfile create mode 100644 src/core/health_check.sh diff --git a/src/core/Dockerfile b/src/core/Dockerfile new file mode 100644 index 0000000..eae6576 --- /dev/null +++ b/src/core/Dockerfile @@ -0,0 +1,34 @@ +# 使用官方的Python基础镜像 +FROM python:3.11-slim + +# 安装必要的工具,包括 procps 包含 ps 命令 +RUN apt-get update && apt-get install -y procps + +# 设置工作目录 +WORKDIR /app + +# 将requirements.txt复制到工作目录 +COPY requirements.txt . + +# 安装Python依赖 +RUN pip install --no-cache-dir -r requirements.txt + +# 将当前目录下的所有文件复制到工作目录 +COPY ./message_server.py . +COPY ./health_check.sh . + +# 确保健康检查脚本有执行权限 +RUN chmod +x /app/health_check.sh + +# 设置环境变量 +ENV RABBITMQ_USER=bot +ENV RABBITMQ_PASSWORD=xiaomi@123 +ENV RABBITMQ_HOST=mq.stupidpz.com +ENV RABBITMQ_PORT=5672 +ENV TELEGRAM_BOT_TOKEN= + +# 配置健康检查 +HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 CMD ["sh", "/app/health_check.sh"] + +# 启动应用 +CMD ["python", "message_server.py"] diff --git a/src/core/health_check.sh b/src/core/health_check.sh new file mode 100644 index 0000000..fca472d --- /dev/null +++ b/src/core/health_check.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +# 检查python进程是否在运行 +if ps aux | grep "python message_server.py" | grep -v grep > /dev/null +then + exit 0 +else + exit 1 +fi diff --git a/src/core/message_server.py b/src/core/message_server.py index 83dbd0b..6708823 100644 --- a/src/core/message_server.py +++ b/src/core/message_server.py @@ -1,15 +1,10 @@ -import time -import pika -import requests +import asyncio +import aio_pika +import aiohttp from loguru import logger import os, sys from collections import defaultdict - -# 移除默认的日志处理器 -logger.remove() - -# 添加新的日志处理器,设置日志级别为 INFO -logger.add(sys.stderr, level="INFO") +import time # 环境变量或配置文件中获取敏感信息 rabbitmq_user = os.getenv("RABBITMQ_USER", "bot") @@ -18,6 +13,13 @@ rabbitmq_host = os.getenv("RABBITMQ_HOST", "mq.stupidpz.com") rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672)) telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN") +# 移除默认的日志处理器 +logger.remove() + +# 添加新的日志处理器,设置日志级别为 DEBUG + +logger.add(sys.stderr, level="INFO") + class RateLimiter: def __init__(self, rate, per): @@ -41,87 +43,75 @@ class RateLimiter: return True -class MessageServer: - def __init__(self): - self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) - self.connection_params = pika.ConnectionParameters( - rabbitmq_host, - rabbitmq_port, - '/', - self.credentials, - heartbeat=600 # 设置心跳时间 - ) - self.connection = None - self.channel = None - self.rate_limiters = defaultdict(lambda: RateLimiter(rate=30, per=1)) # 每秒最多30条消息 - self.group_rate_limiters = defaultdict(lambda: RateLimiter(rate=20, per=60)) # 每分钟最多20条消息 +async def _send_message_to_user(bot_token, target_id, message, rate_limiter, session): + if not rate_limiter.can_send(): + await asyncio.sleep(1) + return await _send_message_to_user(bot_token, target_id, message, rate_limiter, session) - def _ensure_connection(self): - if not self.connection or self.connection.is_closed: - self.connection = pika.BlockingConnection(self.connection_params) - if not self.channel or self.channel.is_closed: - self.channel = self.connection.channel() - self.channel.queue_declare(queue='message_queue') - - def start(self): - while True: - try: - self._ensure_connection() - - def callback(ch, method, properties, body): - logger.info(body.decode()) - bot_token, target_id, message = body.decode().split('|') - if target_id.startswith('-'): # 群组ID以'-'开头 - rate_limiter = self.group_rate_limiters[target_id] - else: - rate_limiter = self.rate_limiters[target_id] - _send_message_to_user(bot_token, target_id, message, rate_limiter) - - self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True) - logger.info("开始消费消息...") - self.channel.start_consuming() - except Exception as e: - logger.error(f"发生异常,正在尝试重新连接...错误详情:{e}") - time.sleep(10) # 休眠一段时间后重试 - - def stop(self): - if self.connection and not self.connection.is_closed: - self.connection.close() - - -def _send_message_to_user(bot_token, target_id, message, rate_limiter): base_url = f"https://api.telegram.org/bot{bot_token}/sendMessage" params = { "chat_id": target_id, "text": message, "parse_mode": "MarkdownV2" } + max_retry = 3 retry_count = 0 while retry_count < max_retry: - if rate_limiter.can_send(): - try: - response = requests.post(base_url, params=params) - if response.status_code == 200: - logger.debug(f'消息发送成功:{message}') + try: + async with session.post(base_url, params=params) as response: + if response.status == 200: + logger.debug(f'消息发送成功: {message}') return else: logger.debug('消息发送失败,重试中...') - logger.error(response.text) - except requests.exceptions.RequestException as e: - logger.error(f'网络异常,重试中... 错误详情: {e}') - time.sleep(10) - retry_count += 1 - else: - time.sleep(1) # 如果超出速率限制,休眠一秒钟 + logger.error(await response.text()) + except aiohttp.ClientError as e: + logger.error(f'网络异常,重试中... 错误详情: {e}') + await asyncio.sleep(10) + retry_count += 1 logger.debug('消息发送失败') +async def message_handler(message: aio_pika.IncomingMessage): + async with message.process(): + logger.info(message.body.decode()) + bot_token, target_id, message_text = message.body.decode().split('|') + if target_id.startswith('-'): # 群组ID以'-'开头 + rate_limiter = group_rate_limiters[target_id] + else: + rate_limiter = rate_limiters[target_id] + await _send_message_to_user(bot_token, target_id, message_text, rate_limiter, session) + + +async def main(): + connection = await aio_pika.connect_robust( + host=rabbitmq_host, + port=rabbitmq_port, + login=rabbitmq_user, + password=rabbitmq_password + ) + + channel = await connection.channel() + queue = await channel.declare_queue('message_queue') + + global session + async with aiohttp.ClientSession() as session: + await queue.consume(message_handler) + + logger.info("开始消费消息...") + try: + await asyncio.Future() + finally: + await connection.close() + + if __name__ == "__main__": - server = MessageServer() + rate_limiters = defaultdict(lambda: RateLimiter(rate=30, per=1)) # 每秒最多30条消息 + group_rate_limiters = defaultdict(lambda: RateLimiter(rate=20, per=60)) # 每分钟最多20条消息 + try: - server.start() + asyncio.run(main()) except KeyboardInterrupt: logger.info("程序被手动停止") - server.stop()