mirror of
https://github.com/hanxi/xiaomusic.git
synced 2026-03-15 08:13:16 +08:00
修复无法关闭的问题
This commit is contained in:
@@ -25,24 +25,26 @@ import xiaomusic.api.dependencies as deps
|
||||
@asynccontextmanager
|
||||
async def app_lifespan(app):
|
||||
"""应用生命周期管理"""
|
||||
task = None
|
||||
if deps.xiaomusic is not None:
|
||||
await asyncio.create_task(deps.xiaomusic.run_forever())
|
||||
task = asyncio.create_task(deps.xiaomusic.run_forever())
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
deps.log.exception(f"Execption {e}")
|
||||
except asyncio.CancelledError:
|
||||
# 正常关闭时的取消,不需要记录
|
||||
pass
|
||||
finally:
|
||||
# 应用关闭时的清理工作
|
||||
if (
|
||||
deps.xiaomusic is not None
|
||||
and hasattr(deps.xiaomusic, "js_plugin_manager")
|
||||
and deps.xiaomusic.js_plugin_manager
|
||||
):
|
||||
# 关闭时取消后台任务
|
||||
if task is not None and not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
deps.log.info("Shutting down application, cleaning up resources...")
|
||||
deps.xiaomusic.js_plugin_manager.shutdown()
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
if deps.log:
|
||||
deps.log.info("Background task cleanup: CancelledError")
|
||||
except Exception as e:
|
||||
deps.log.error(f"Error during shutdown: {e}")
|
||||
if deps.log:
|
||||
deps.log.error(f"Background task cleanup error: {e}")
|
||||
|
||||
|
||||
# 创建 FastAPI 应用实例
|
||||
|
||||
114
xiaomusic/cli.py
114
xiaomusic/cli.py
@@ -99,6 +99,15 @@ def main():
|
||||
options = parser.parse_args()
|
||||
config = Config.from_options(options)
|
||||
|
||||
# 自定义过滤器,过滤掉关闭时的 CancelledError
|
||||
class CancelledErrorFilter(logging.Filter):
|
||||
def filter(self, record):
|
||||
if record.exc_info:
|
||||
exc_type = record.exc_info[0]
|
||||
if exc_type and exc_type.__name__ == 'CancelledError':
|
||||
return False
|
||||
return True
|
||||
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
@@ -113,11 +122,17 @@ def main():
|
||||
"datefmt": "[%X]",
|
||||
},
|
||||
},
|
||||
"filters": {
|
||||
"cancelled_error": {
|
||||
"()": CancelledErrorFilter,
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stderr",
|
||||
"filters": ["cancelled_error"],
|
||||
},
|
||||
"access": {
|
||||
"formatter": "access",
|
||||
@@ -131,6 +146,7 @@ def main():
|
||||
"filename": config.log_file,
|
||||
"maxBytes": 10 * 1024 * 1024,
|
||||
"backupCount": 1,
|
||||
"filters": ["cancelled_error"],
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
@@ -163,78 +179,36 @@ def main():
|
||||
except Exception as e:
|
||||
print(f"Execption {e}")
|
||||
|
||||
xiaomusic_instance = None
|
||||
original_sigint_handler = None
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
"""信号处理函数 - 立即关闭并退出"""
|
||||
print("\n收到中断信号,正在关闭...")
|
||||
# 设置退出标志,避免重复处理
|
||||
signal_handler.has_been_called = getattr(signal_handler, 'has_been_called', False)
|
||||
if signal_handler.has_been_called:
|
||||
print("程序正在关闭中,请稍候...")
|
||||
return
|
||||
signal_handler.has_been_called = True
|
||||
|
||||
if xiaomusic_instance and hasattr(xiaomusic_instance, "js_plugin_manager") and xiaomusic_instance.js_plugin_manager:
|
||||
try:
|
||||
xiaomusic_instance.js_plugin_manager.shutdown()
|
||||
print("JS 插件管理器已关闭")
|
||||
except Exception as e:
|
||||
print(f"关闭 JS 插件管理器时出错: {e}")
|
||||
|
||||
# 强制退出程序
|
||||
print("程序已退出")
|
||||
os._exit(0)
|
||||
|
||||
# 提前注册信号处理函数,确保在 uvicorn 启动前生效
|
||||
original_sigint_handler = signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
print("[DEBUG] 信号处理函数已注册")
|
||||
|
||||
def run_server(port):
|
||||
nonlocal xiaomusic_instance, original_sigint_handler
|
||||
xiaomusic_instance = XiaoMusic(config)
|
||||
HttpInit(xiaomusic_instance)
|
||||
|
||||
try:
|
||||
# 使用 Uvicorn 的 Config 类来配置服务器,避免信号处理冲突
|
||||
from uvicorn import Config, Server
|
||||
|
||||
uvicorn_config = Config(
|
||||
app=HttpApp,
|
||||
host="0.0.0.0",
|
||||
port=port,
|
||||
log_config=LOGGING_CONFIG,
|
||||
)
|
||||
server = Server(config=uvicorn_config)
|
||||
|
||||
# 运行服务器
|
||||
import asyncio
|
||||
asyncio.run(server.serve())
|
||||
except ImportError:
|
||||
# 如果无法导入 Config 和 Server,则回退到原来的 uvicorn.run 方式
|
||||
uvicorn.run(
|
||||
HttpApp,
|
||||
host="0.0.0.0",
|
||||
port=port,
|
||||
log_config=LOGGING_CONFIG,
|
||||
)
|
||||
finally:
|
||||
# uvicorn 关闭后清理资源
|
||||
if (
|
||||
xiaomusic_instance
|
||||
and hasattr(xiaomusic_instance, "js_plugin_manager")
|
||||
and xiaomusic_instance.js_plugin_manager
|
||||
):
|
||||
try:
|
||||
xiaomusic_instance.js_plugin_manager.shutdown()
|
||||
print("JS 插件管理器已关闭")
|
||||
except Exception as e:
|
||||
print(f"关闭 JS 插件管理器时出错: {e}")
|
||||
import asyncio
|
||||
from uvicorn import Config as UvicornConfig, Server
|
||||
|
||||
xiaomusic = XiaoMusic(config)
|
||||
HttpInit(xiaomusic)
|
||||
port = int(config.port)
|
||||
run_server(port)
|
||||
|
||||
# 创建 uvicorn 配置,禁用其信号处理
|
||||
uvicorn_config = UvicornConfig(
|
||||
HttpApp,
|
||||
host="0.0.0.0",
|
||||
port=port,
|
||||
log_config=LOGGING_CONFIG,
|
||||
)
|
||||
server = Server(uvicorn_config)
|
||||
|
||||
# 自定义信号处理
|
||||
shutdown_initiated = False
|
||||
def handle_exit(signum, frame):
|
||||
nonlocal shutdown_initiated
|
||||
if not shutdown_initiated:
|
||||
shutdown_initiated = True
|
||||
print("\n正在关闭服务器...")
|
||||
server.should_exit = True
|
||||
|
||||
signal.signal(signal.SIGINT, handle_exit)
|
||||
signal.signal(signal.SIGTERM, handle_exit)
|
||||
|
||||
# 运行服务器
|
||||
asyncio.run(server.serve())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -84,23 +84,32 @@ class ConversationPoller:
|
||||
task = asyncio.create_task(self.poll_latest_ask(session))
|
||||
assert task is not None # to keep the reference to task, do not remove this
|
||||
|
||||
while True:
|
||||
self.polling_event.set()
|
||||
await self.new_record_event.wait()
|
||||
self.new_record_event.clear()
|
||||
new_record = self.last_record
|
||||
self.polling_event.clear() # stop polling when processing the question
|
||||
try:
|
||||
while True:
|
||||
self.polling_event.set()
|
||||
await self.new_record_event.wait()
|
||||
self.new_record_event.clear()
|
||||
new_record = self.last_record
|
||||
self.polling_event.clear() # stop polling when processing the question
|
||||
|
||||
query = new_record.get("query", "").strip()
|
||||
did = new_record.get("did", "").strip()
|
||||
await do_check_cmd_callback(did, query, False)
|
||||
query = new_record.get("query", "").strip()
|
||||
did = new_record.get("did", "").strip()
|
||||
await do_check_cmd_callback(did, query, False)
|
||||
|
||||
answer = new_record.get("answer")
|
||||
answers = new_record.get("answers", [{}])
|
||||
if answers:
|
||||
answer = answers[0].get("tts", {}).get("text", "").strip()
|
||||
await reset_timer_callback(len(answer), did)
|
||||
self.log.debug(f"query:{query} did:{did} answer:{answer}")
|
||||
answer = new_record.get("answer")
|
||||
answers = new_record.get("answers", [{}])
|
||||
if answers:
|
||||
answer = answers[0].get("tts", {}).get("text", "").strip()
|
||||
await reset_timer_callback(len(answer), did)
|
||||
self.log.debug(f"query:{query} did:{did} answer:{answer}")
|
||||
except asyncio.CancelledError:
|
||||
self.log.info("Conversation loop cancelled, cleaning up...")
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
raise
|
||||
|
||||
async def poll_latest_ask(self, session):
|
||||
"""轮询最新对话记录
|
||||
@@ -111,46 +120,50 @@ class ConversationPoller:
|
||||
Args:
|
||||
session: aiohttp客户端会话
|
||||
"""
|
||||
while True:
|
||||
if not self.config.enable_pull_ask:
|
||||
self.log.debug("Listening new message disabled")
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
try:
|
||||
while True:
|
||||
if not self.config.enable_pull_ask:
|
||||
self.log.debug("Listening new message disabled")
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
|
||||
self.log.debug(f"Listening new message, timestamp: {self.last_timestamp}")
|
||||
# 动态获取最新的 cookie_jar
|
||||
if self.auth_manager.cookie_jar is not None:
|
||||
session._cookie_jar = self.auth_manager.cookie_jar
|
||||
self.log.debug(f"Listening new message, timestamp: {self.last_timestamp}")
|
||||
# 动态获取最新的 cookie_jar
|
||||
if self.auth_manager.cookie_jar is not None:
|
||||
session._cookie_jar = self.auth_manager.cookie_jar
|
||||
|
||||
# 拉取所有音箱的对话记录
|
||||
tasks = []
|
||||
for device_id in self.device_id_did:
|
||||
# 首次用当前时间初始化
|
||||
did = self.get_did(device_id)
|
||||
if did not in self.last_timestamp:
|
||||
self.last_timestamp[did] = int(time.time() * 1000)
|
||||
# 拉取所有音箱的对话记录
|
||||
tasks = []
|
||||
for device_id in self.device_id_did:
|
||||
# 首次用当前时间初始化
|
||||
did = self.get_did(device_id)
|
||||
if did not in self.last_timestamp:
|
||||
self.last_timestamp[did] = int(time.time() * 1000)
|
||||
|
||||
hardware = self.get_hardward(device_id)
|
||||
if (
|
||||
hardware in self.get_ask_by_mina_list
|
||||
) or self.config.get_ask_by_mina:
|
||||
tasks.append(self.get_latest_ask_by_mina(device_id))
|
||||
hardware = self.get_hardward(device_id)
|
||||
if (
|
||||
hardware in self.get_ask_by_mina_list
|
||||
) or self.config.get_ask_by_mina:
|
||||
tasks.append(self.get_latest_ask_by_mina(device_id))
|
||||
else:
|
||||
tasks.append(self.get_latest_ask_from_xiaoai(session, device_id))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
start = time.perf_counter()
|
||||
await self.polling_event.wait()
|
||||
if self.config.pull_ask_sec <= 1:
|
||||
if (d := time.perf_counter() - start) < 1:
|
||||
await asyncio.sleep(1 - d)
|
||||
else:
|
||||
tasks.append(self.get_latest_ask_from_xiaoai(session, device_id))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
start = time.perf_counter()
|
||||
await self.polling_event.wait()
|
||||
if self.config.pull_ask_sec <= 1:
|
||||
if (d := time.perf_counter() - start) < 1:
|
||||
await asyncio.sleep(1 - d)
|
||||
else:
|
||||
sleep_sec = 0
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
sleep_sec = sleep_sec + 1
|
||||
if sleep_sec >= self.config.pull_ask_sec:
|
||||
break
|
||||
sleep_sec = 0
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
sleep_sec = sleep_sec + 1
|
||||
if sleep_sec >= self.config.pull_ask_sec:
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
self.log.info("Polling task cancelled")
|
||||
raise
|
||||
|
||||
async def get_latest_ask_from_xiaoai(self, session, device_id):
|
||||
"""从小爱API获取最新对话
|
||||
|
||||
Reference in New Issue
Block a user