From 1ad5ae06d671675c99c3b87b63d3c9123299c9fb Mon Sep 17 00:00:00 2001 From: stupidzayac Date: Sun, 3 Mar 2024 13:06:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mq=E6=9C=8D=E5=8A=A1=E7=AB=AF?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E5=BC=95=E5=85=A5=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=EF=BC=8C=E9=98=B2=E6=AD=A2=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E6=96=AD=E8=81=94=E5=AF=BC=E8=87=B4=E7=9A=84=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E4=B8=AD=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/message_server.py | 107 ++++++++++++++++++++++++------------- 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/src/core/message_server.py b/src/core/message_server.py index 1f3f8ce..66cacf5 100644 --- a/src/core/message_server.py +++ b/src/core/message_server.py @@ -1,8 +1,57 @@ import time - import pika import requests from loguru import logger +import os + +# 环境变量或配置文件中获取敏感信息 +rabbitmq_user = os.getenv("RABBITMQ_USER", "bot") +rabbitmq_password = os.getenv("RABBITMQ_PASSWORD", "xiaomi@123") +rabbitmq_host = os.getenv("RABBITMQ_HOST", "mq.stupidpz.com") +rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672)) +telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN") + + +class MessageServer: + def __init__(self): + self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) + self.connection_params = pika.ConnectionParameters( + rabbitmq_host, + rabbitmq_port, + '/', + self.credentials, + heartbeat=600 # 设置心跳时间 + ) + self.connection = None + self.channel = None + + def _ensure_connection(self): + if not self.connection or self.connection.is_closed: + self.connection = pika.BlockingConnection(self.connection_params) + if not self.channel or self.channel.is_closed: + self.channel = self.connection.channel() + self.channel.queue_declare(queue='message_queue') + + def start(self): + while True: + try: + self._ensure_connection() + + def callback(ch, method, properties, body): + logger.info(body.decode()) + bot_token, target_id, message = body.decode().split('|') + _send_message_to_user(bot_token, target_id, message) + + self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True) + logger.info("开始消费消息...") + self.channel.start_consuming() + except Exception as e: + logger.error(f"发生异常,正在尝试重新连接...错误详情:{e}") + time.sleep(10) # 休眠一段时间后重试 + + def stop(self): + if self.connection and not self.connection.is_closed: + self.connection.close() def _send_message_to_user(bot_token, target_id, message): @@ -15,44 +64,26 @@ def _send_message_to_user(bot_token, target_id, message): max_retry = 3 retry_count = 0 while retry_count < max_retry: - response = requests.post(base_url, params=params) - if response.status_code == 200: - logger.debug(f'消息发送成功:{message}') - return # 如果发送成功,立即返回 - else: - logger.debug('消息发送失败,重试中...') - logger.error(response.text) - time.sleep(10) - retry_count += 1 + try: + response = requests.post(base_url, params=params) + if response.status_code == 200: + logger.debug(f'消息发送成功:{message}') + return + else: + logger.debug('消息发送失败,重试中...') + logger.error(response.text) + except requests.exceptions.RequestException as e: + logger.error(f'网络异常,重试中... 错误详情: {e}') + time.sleep(10) + retry_count += 1 logger.debug('消息发送失败') -# 你的消息发送逻辑 - -class MessageServer: - def __init__(self): - self.credentials = pika.PlainCredentials('bot', 'xiaom@i123') - self.connection = pika.BlockingConnection( - pika.ConnectionParameters('mq.stupidpz.com', 5672, '/', credentials=self.credentials)) - self.channel = self.connection.channel() - self.channel.queue_declare(queue='message_queue') - - def start(self): - try: - def callback(ch, method, properties, body): - logger.info(body.decode()) - bot_token, target_id, message = body.decode().split('|') - _send_message_to_user(bot_token, target_id, message) - - self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True) - self.channel.start_consuming() - except Exception as e: - logger.error(e) - - def stop(self): - self.connection.close() - - -server = MessageServer() -server.start() +if __name__ == "__main__": + server = MessageServer() + try: + server.start() + except KeyboardInterrupt: + logger.info("程序被手动停止") + server.stop()