加入了telegram官方对机器人发送消息频率的限制,防止消息丢失
This commit is contained in:
parent
1b18df5258
commit
1b0f7ea4e0
@ -2,7 +2,14 @@ import time
|
|||||||
import pika
|
import pika
|
||||||
import requests
|
import requests
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
import os
|
import os, sys
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
# 移除默认的日志处理器
|
||||||
|
logger.remove()
|
||||||
|
|
||||||
|
# 添加新的日志处理器,设置日志级别为 INFO
|
||||||
|
logger.add(sys.stderr, level="INFO")
|
||||||
|
|
||||||
# 环境变量或配置文件中获取敏感信息
|
# 环境变量或配置文件中获取敏感信息
|
||||||
rabbitmq_user = os.getenv("RABBITMQ_USER", "bot")
|
rabbitmq_user = os.getenv("RABBITMQ_USER", "bot")
|
||||||
@ -12,6 +19,28 @@ rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672))
|
|||||||
telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
|
telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
|
||||||
|
|
||||||
|
|
||||||
|
class RateLimiter:
|
||||||
|
def __init__(self, rate, per):
|
||||||
|
self.rate = rate # 短时间内允许的最大请求数
|
||||||
|
self.per = per # 时间窗口,单位秒
|
||||||
|
self.allowance = rate
|
||||||
|
self.last_check = time.time()
|
||||||
|
|
||||||
|
def can_send(self):
|
||||||
|
current = time.time()
|
||||||
|
time_passed = current - self.last_check
|
||||||
|
self.last_check = current
|
||||||
|
self.allowance += time_passed * (self.rate / self.per)
|
||||||
|
if self.allowance > self.rate:
|
||||||
|
self.allowance = self.rate
|
||||||
|
|
||||||
|
if self.allowance < 1.0:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
self.allowance -= 1.0
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class MessageServer:
|
class MessageServer:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
|
self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
|
||||||
@ -24,6 +53,8 @@ class MessageServer:
|
|||||||
)
|
)
|
||||||
self.connection = None
|
self.connection = None
|
||||||
self.channel = None
|
self.channel = None
|
||||||
|
self.rate_limiters = defaultdict(lambda: RateLimiter(rate=30, per=1)) # 每秒最多30条消息
|
||||||
|
self.group_rate_limiters = defaultdict(lambda: RateLimiter(rate=20, per=60)) # 每分钟最多20条消息
|
||||||
|
|
||||||
def _ensure_connection(self):
|
def _ensure_connection(self):
|
||||||
if not self.connection or self.connection.is_closed:
|
if not self.connection or self.connection.is_closed:
|
||||||
@ -40,7 +71,11 @@ class MessageServer:
|
|||||||
def callback(ch, method, properties, body):
|
def callback(ch, method, properties, body):
|
||||||
logger.info(body.decode())
|
logger.info(body.decode())
|
||||||
bot_token, target_id, message = body.decode().split('|')
|
bot_token, target_id, message = body.decode().split('|')
|
||||||
_send_message_to_user(bot_token, target_id, message)
|
if target_id.startswith('-'): # 群组ID以'-'开头
|
||||||
|
rate_limiter = self.group_rate_limiters[target_id]
|
||||||
|
else:
|
||||||
|
rate_limiter = self.rate_limiters[target_id]
|
||||||
|
_send_message_to_user(bot_token, target_id, message, rate_limiter)
|
||||||
|
|
||||||
self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True)
|
self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True)
|
||||||
logger.info("开始消费消息...")
|
logger.info("开始消费消息...")
|
||||||
@ -54,7 +89,7 @@ class MessageServer:
|
|||||||
self.connection.close()
|
self.connection.close()
|
||||||
|
|
||||||
|
|
||||||
def _send_message_to_user(bot_token, target_id, message):
|
def _send_message_to_user(bot_token, target_id, message, rate_limiter):
|
||||||
base_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
base_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||||
params = {
|
params = {
|
||||||
"chat_id": target_id,
|
"chat_id": target_id,
|
||||||
@ -64,18 +99,21 @@ def _send_message_to_user(bot_token, target_id, message):
|
|||||||
max_retry = 3
|
max_retry = 3
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
while retry_count < max_retry:
|
while retry_count < max_retry:
|
||||||
try:
|
if rate_limiter.can_send():
|
||||||
response = requests.post(base_url, params=params)
|
try:
|
||||||
if response.status_code == 200:
|
response = requests.post(base_url, params=params)
|
||||||
logger.debug(f'消息发送成功:{message}')
|
if response.status_code == 200:
|
||||||
return
|
logger.debug(f'消息发送成功:{message}')
|
||||||
else:
|
return
|
||||||
logger.debug('消息发送失败,重试中...')
|
else:
|
||||||
logger.error(response.text)
|
logger.debug('消息发送失败,重试中...')
|
||||||
except requests.exceptions.RequestException as e:
|
logger.error(response.text)
|
||||||
logger.error(f'网络异常,重试中... 错误详情: {e}')
|
except requests.exceptions.RequestException as e:
|
||||||
time.sleep(10)
|
logger.error(f'网络异常,重试中... 错误详情: {e}')
|
||||||
retry_count += 1
|
time.sleep(10)
|
||||||
|
retry_count += 1
|
||||||
|
else:
|
||||||
|
time.sleep(1) # 如果超出速率限制,休眠一秒钟
|
||||||
|
|
||||||
logger.debug('消息发送失败')
|
logger.debug('消息发送失败')
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user