优化mq服务端代码,引入重试机制,防止服务器断联导致的服务中断
This commit is contained in:
parent
6a2cfd8362
commit
1ad5ae06d6
@ -1,8 +1,57 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
import pika
|
import pika
|
||||||
import requests
|
import requests
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
import os
|
||||||
|
|
||||||
|
# 环境变量或配置文件中获取敏感信息
|
||||||
|
rabbitmq_user = os.getenv("RABBITMQ_USER", "bot")
|
||||||
|
rabbitmq_password = os.getenv("RABBITMQ_PASSWORD", "xiaomi@123")
|
||||||
|
rabbitmq_host = os.getenv("RABBITMQ_HOST", "mq.stupidpz.com")
|
||||||
|
rabbitmq_port = int(os.getenv("RABBITMQ_PORT", 5672))
|
||||||
|
telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
|
||||||
|
|
||||||
|
|
||||||
|
class MessageServer:
|
||||||
|
def __init__(self):
|
||||||
|
self.credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
|
||||||
|
self.connection_params = pika.ConnectionParameters(
|
||||||
|
rabbitmq_host,
|
||||||
|
rabbitmq_port,
|
||||||
|
'/',
|
||||||
|
self.credentials,
|
||||||
|
heartbeat=600 # 设置心跳时间
|
||||||
|
)
|
||||||
|
self.connection = None
|
||||||
|
self.channel = None
|
||||||
|
|
||||||
|
def _ensure_connection(self):
|
||||||
|
if not self.connection or self.connection.is_closed:
|
||||||
|
self.connection = pika.BlockingConnection(self.connection_params)
|
||||||
|
if not self.channel or self.channel.is_closed:
|
||||||
|
self.channel = self.connection.channel()
|
||||||
|
self.channel.queue_declare(queue='message_queue')
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self._ensure_connection()
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
logger.info(body.decode())
|
||||||
|
bot_token, target_id, message = body.decode().split('|')
|
||||||
|
_send_message_to_user(bot_token, target_id, message)
|
||||||
|
|
||||||
|
self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True)
|
||||||
|
logger.info("开始消费消息...")
|
||||||
|
self.channel.start_consuming()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"发生异常,正在尝试重新连接...错误详情:{e}")
|
||||||
|
time.sleep(10) # 休眠一段时间后重试
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self.connection and not self.connection.is_closed:
|
||||||
|
self.connection.close()
|
||||||
|
|
||||||
|
|
||||||
def _send_message_to_user(bot_token, target_id, message):
|
def _send_message_to_user(bot_token, target_id, message):
|
||||||
@ -15,44 +64,26 @@ def _send_message_to_user(bot_token, target_id, message):
|
|||||||
max_retry = 3
|
max_retry = 3
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
while retry_count < max_retry:
|
while retry_count < max_retry:
|
||||||
response = requests.post(base_url, params=params)
|
try:
|
||||||
if response.status_code == 200:
|
response = requests.post(base_url, params=params)
|
||||||
logger.debug(f'消息发送成功:{message}')
|
if response.status_code == 200:
|
||||||
return # 如果发送成功,立即返回
|
logger.debug(f'消息发送成功:{message}')
|
||||||
else:
|
return
|
||||||
logger.debug('消息发送失败,重试中...')
|
else:
|
||||||
logger.error(response.text)
|
logger.debug('消息发送失败,重试中...')
|
||||||
time.sleep(10)
|
logger.error(response.text)
|
||||||
retry_count += 1
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f'网络异常,重试中... 错误详情: {e}')
|
||||||
|
time.sleep(10)
|
||||||
|
retry_count += 1
|
||||||
|
|
||||||
logger.debug('消息发送失败')
|
logger.debug('消息发送失败')
|
||||||
|
|
||||||
|
|
||||||
# 你的消息发送逻辑
|
if __name__ == "__main__":
|
||||||
|
server = MessageServer()
|
||||||
class MessageServer:
|
try:
|
||||||
def __init__(self):
|
server.start()
|
||||||
self.credentials = pika.PlainCredentials('bot', 'xiaom@i123')
|
except KeyboardInterrupt:
|
||||||
self.connection = pika.BlockingConnection(
|
logger.info("程序被手动停止")
|
||||||
pika.ConnectionParameters('mq.stupidpz.com', 5672, '/', credentials=self.credentials))
|
server.stop()
|
||||||
self.channel = self.connection.channel()
|
|
||||||
self.channel.queue_declare(queue='message_queue')
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
try:
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
logger.info(body.decode())
|
|
||||||
bot_token, target_id, message = body.decode().split('|')
|
|
||||||
_send_message_to_user(bot_token, target_id, message)
|
|
||||||
|
|
||||||
self.channel.basic_consume(queue='message_queue', on_message_callback=callback, auto_ack=True)
|
|
||||||
self.channel.start_consuming()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(e)
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.connection.close()
|
|
||||||
|
|
||||||
|
|
||||||
server = MessageServer()
|
|
||||||
server.start()
|
|
||||||
|
Loading…
Reference in New Issue
Block a user