diff --git a/src/core/message_server.py b/src/core/message_server.py index 66cacf5..83dbd0b 100644 --- a/src/core/message_server.py +++ b/src/core/message_server.py @@ -2,7 +2,14 @@ import time import pika import requests from loguru import logger -import os +import os, sys +from collections import defaultdict + +# 移除默认的日志处理器 +logger.remove() + +# 添加新的日志处理器,设置日志级别为 INFO +logger.add(sys.stderr, level="INFO") # 环境变量或配置文件中获取敏感信息 rabbitmq_user = os.getenv("RABBITMQ_USER", "bot") @@ -12,6 +19,28 @@ rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672)) telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN") +class RateLimiter: + def __init__(self, rate, per): + self.rate = rate # 短时间内允许的最大请求数 + self.per = per # 时间窗口,单位秒 + self.allowance = rate + self.last_check = time.time() + + def can_send(self): + current = time.time() + time_passed = current - self.last_check + self.last_check = current + self.allowance += time_passed * (self.rate / self.per) + if self.allowance > self.rate: + self.allowance = self.rate + + if self.allowance < 1.0: + return False + else: + self.allowance -= 1.0 + return True + + class MessageServer: def __init__(self): self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) @@ -24,6 +53,8 @@ class MessageServer: ) self.connection = None self.channel = None + self.rate_limiters = defaultdict(lambda: RateLimiter(rate=30, per=1)) # 每秒最多30条消息 + self.group_rate_limiters = defaultdict(lambda: RateLimiter(rate=20, per=60)) # 每分钟最多20条消息 def _ensure_connection(self): if not self.connection or self.connection.is_closed: @@ -40,7 +71,11 @@ class MessageServer: def callback(ch, method, properties, body): logger.info(body.decode()) bot_token, target_id, message = body.decode().split('|') - _send_message_to_user(bot_token, target_id, message) + if target_id.startswith('-'): # 群组ID以'-'开头 + rate_limiter = self.group_rate_limiters[target_id] + else: + rate_limiter = self.rate_limiters[target_id] + _send_message_to_user(bot_token, target_id, message, rate_limiter) self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True) logger.info("开始消费消息...") @@ -54,7 +89,7 @@ class MessageServer: self.connection.close() -def _send_message_to_user(bot_token, target_id, message): +def _send_message_to_user(bot_token, target_id, message, rate_limiter): base_url = f"https://api.telegram.org/bot{bot_token}/sendMessage" params = { "chat_id": target_id, @@ -64,18 +99,21 @@ def _send_message_to_user(bot_token, target_id, message): max_retry = 3 retry_count = 0 while retry_count < max_retry: - try: - response = requests.post(base_url, params=params) - if response.status_code == 200: - logger.debug(f'消息发送成功:{message}') - return - else: - logger.debug('消息发送失败,重试中...') - logger.error(response.text) - except requests.exceptions.RequestException as e: - logger.error(f'网络异常,重试中... 错误详情: {e}') - time.sleep(10) - retry_count += 1 + if rate_limiter.can_send(): + try: + response = requests.post(base_url, params=params) + if response.status_code == 200: + logger.debug(f'消息发送成功:{message}') + return + else: + logger.debug('消息发送失败,重试中...') + logger.error(response.text) + except requests.exceptions.RequestException as e: + logger.error(f'网络异常,重试中... 错误详情: {e}') + time.sleep(10) + retry_count += 1 + else: + time.sleep(1) # 如果超出速率限制,休眠一秒钟 logger.debug('消息发送失败')