import time import pika import requests from loguru import logger import os # 环境变量或配置文件中获取敏感信息 rabbitmq_user = os.getenv("RABBITMQ_USER", "bot") rabbitmq_password = os.getenv("RABBITMQ_PASSWORD", "xiaomi@123") rabbitmq_host = os.getenv("RABBITMQ_HOST", "mq.stupidpz.com") rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672)) telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN") class MessageServer: def __init__(self): self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) self.connection_params = pika.ConnectionParameters( rabbitmq_host, rabbitmq_port, '/', self.credentials, heartbeat=600 # 设置心跳时间 ) self.connection = None self.channel = None def _ensure_connection(self): if not self.connection or self.connection.is_closed: self.connection = pika.BlockingConnection(self.connection_params) if not self.channel or self.channel.is_closed: self.channel = self.connection.channel() self.channel.queue_declare(queue='message_queue') def start(self): while True: try: self._ensure_connection() def callback(ch, method, properties, body): logger.info(body.decode()) bot_token, target_id, message = body.decode().split('|') _send_message_to_user(bot_token, target_id, message) self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True) logger.info("开始消费消息...") self.channel.start_consuming() except Exception as e: logger.error(f"发生异常,正在尝试重新连接...错误详情:{e}") time.sleep(10) # 休眠一段时间后重试 def stop(self): if self.connection and not self.connection.is_closed: self.connection.close() def _send_message_to_user(bot_token, target_id, message): base_url = f"https://api.telegram.org/bot{bot_token}/sendMessage" params = { "chat_id": target_id, "text": message, "parse_mode": "MarkdownV2" } max_retry = 3 retry_count = 0 while retry_count < max_retry: try: response = requests.post(base_url, params=params) if response.status_code == 200: logger.debug(f'消息发送成功:{message}') return else: logger.debug('消息发送失败,重试中...') logger.error(response.text) except requests.exceptions.RequestException as e: logger.error(f'网络异常,重试中... 错误详情: {e}') time.sleep(10) retry_count += 1 logger.debug('消息发送失败') if __name__ == "__main__": server = MessageServer() try: server.start() except KeyboardInterrupt: logger.info("程序被手动停止") server.stop()