diff --git a/xiaomusic/api/app.py b/xiaomusic/api/app.py index 27a8947..c04cfe6 100644 --- a/xiaomusic/api/app.py +++ b/xiaomusic/api/app.py @@ -25,24 +25,26 @@ import xiaomusic.api.dependencies as deps @asynccontextmanager async def app_lifespan(app): """应用生命周期管理""" + task = None if deps.xiaomusic is not None: - await asyncio.create_task(deps.xiaomusic.run_forever()) + task = asyncio.create_task(deps.xiaomusic.run_forever()) try: yield - except Exception as e: - deps.log.exception(f"Execption {e}") + except asyncio.CancelledError: + # 正常关闭时的取消,不需要记录 + pass finally: - # 应用关闭时的清理工作 - if ( - deps.xiaomusic is not None - and hasattr(deps.xiaomusic, "js_plugin_manager") - and deps.xiaomusic.js_plugin_manager - ): + # 关闭时取消后台任务 + if task is not None and not task.done(): + task.cancel() try: - deps.log.info("Shutting down application, cleaning up resources...") - deps.xiaomusic.js_plugin_manager.shutdown() + await task + except asyncio.CancelledError: + if deps.log: + deps.log.info("Background task cleanup: CancelledError") except Exception as e: - deps.log.error(f"Error during shutdown: {e}") + if deps.log: + deps.log.error(f"Background task cleanup error: {e}") # 创建 FastAPI 应用实例 diff --git a/xiaomusic/cli.py b/xiaomusic/cli.py index 14c9f0d..1e13ba5 100644 --- a/xiaomusic/cli.py +++ b/xiaomusic/cli.py @@ -99,6 +99,15 @@ def main(): options = parser.parse_args() config = Config.from_options(options) + # 自定义过滤器,过滤掉关闭时的 CancelledError + class CancelledErrorFilter(logging.Filter): + def filter(self, record): + if record.exc_info: + exc_type = record.exc_info[0] + if exc_type and exc_type.__name__ == 'CancelledError': + return False + return True + LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, @@ -113,11 +122,17 @@ def main(): "datefmt": "[%X]", }, }, + "filters": { + "cancelled_error": { + "()": CancelledErrorFilter, + }, + }, "handlers": { "default": { "formatter": "default", "class": "logging.StreamHandler", "stream": "ext://sys.stderr", + "filters": ["cancelled_error"], }, "access": { "formatter": "access", @@ -131,6 +146,7 @@ def main(): "filename": config.log_file, "maxBytes": 10 * 1024 * 1024, "backupCount": 1, + "filters": ["cancelled_error"], }, }, "loggers": { @@ -163,78 +179,36 @@ def main(): except Exception as e: print(f"Execption {e}") - xiaomusic_instance = None - original_sigint_handler = None - - def signal_handler(sig, frame): - """信号处理函数 - 立即关闭并退出""" - print("\n收到中断信号,正在关闭...") - # 设置退出标志,避免重复处理 - signal_handler.has_been_called = getattr(signal_handler, 'has_been_called', False) - if signal_handler.has_been_called: - print("程序正在关闭中,请稍候...") - return - signal_handler.has_been_called = True - - if xiaomusic_instance and hasattr(xiaomusic_instance, "js_plugin_manager") and xiaomusic_instance.js_plugin_manager: - try: - xiaomusic_instance.js_plugin_manager.shutdown() - print("JS 插件管理器已关闭") - except Exception as e: - print(f"关闭 JS 插件管理器时出错: {e}") - - # 强制退出程序 - print("程序已退出") - os._exit(0) - - # 提前注册信号处理函数,确保在 uvicorn 启动前生效 - original_sigint_handler = signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - print("[DEBUG] 信号处理函数已注册") - - def run_server(port): - nonlocal xiaomusic_instance, original_sigint_handler - xiaomusic_instance = XiaoMusic(config) - HttpInit(xiaomusic_instance) - - try: - # 使用 Uvicorn 的 Config 类来配置服务器,避免信号处理冲突 - from uvicorn import Config, Server - - uvicorn_config = Config( - app=HttpApp, - host="0.0.0.0", - port=port, - log_config=LOGGING_CONFIG, - ) - server = Server(config=uvicorn_config) - - # 运行服务器 - import asyncio - asyncio.run(server.serve()) - except ImportError: - # 如果无法导入 Config 和 Server,则回退到原来的 uvicorn.run 方式 - uvicorn.run( - HttpApp, - host="0.0.0.0", - port=port, - log_config=LOGGING_CONFIG, - ) - finally: - # uvicorn 关闭后清理资源 - if ( - xiaomusic_instance - and hasattr(xiaomusic_instance, "js_plugin_manager") - and xiaomusic_instance.js_plugin_manager - ): - try: - xiaomusic_instance.js_plugin_manager.shutdown() - print("JS 插件管理器已关闭") - except Exception as e: - print(f"关闭 JS 插件管理器时出错: {e}") + import asyncio + from uvicorn import Config as UvicornConfig, Server + xiaomusic = XiaoMusic(config) + HttpInit(xiaomusic) port = int(config.port) - run_server(port) + + # 创建 uvicorn 配置,禁用其信号处理 + uvicorn_config = UvicornConfig( + HttpApp, + host="0.0.0.0", + port=port, + log_config=LOGGING_CONFIG, + ) + server = Server(uvicorn_config) + + # 自定义信号处理 + shutdown_initiated = False + def handle_exit(signum, frame): + nonlocal shutdown_initiated + if not shutdown_initiated: + shutdown_initiated = True + print("\n正在关闭服务器...") + server.should_exit = True + + signal.signal(signal.SIGINT, handle_exit) + signal.signal(signal.SIGTERM, handle_exit) + + # 运行服务器 + asyncio.run(server.serve()) if __name__ == "__main__": diff --git a/xiaomusic/conversation.py b/xiaomusic/conversation.py index 78c2761..b502268 100644 --- a/xiaomusic/conversation.py +++ b/xiaomusic/conversation.py @@ -84,23 +84,32 @@ class ConversationPoller: task = asyncio.create_task(self.poll_latest_ask(session)) assert task is not None # to keep the reference to task, do not remove this - while True: - self.polling_event.set() - await self.new_record_event.wait() - self.new_record_event.clear() - new_record = self.last_record - self.polling_event.clear() # stop polling when processing the question + try: + while True: + self.polling_event.set() + await self.new_record_event.wait() + self.new_record_event.clear() + new_record = self.last_record + self.polling_event.clear() # stop polling when processing the question - query = new_record.get("query", "").strip() - did = new_record.get("did", "").strip() - await do_check_cmd_callback(did, query, False) + query = new_record.get("query", "").strip() + did = new_record.get("did", "").strip() + await do_check_cmd_callback(did, query, False) - answer = new_record.get("answer") - answers = new_record.get("answers", [{}]) - if answers: - answer = answers[0].get("tts", {}).get("text", "").strip() - await reset_timer_callback(len(answer), did) - self.log.debug(f"query:{query} did:{did} answer:{answer}") + answer = new_record.get("answer") + answers = new_record.get("answers", [{}]) + if answers: + answer = answers[0].get("tts", {}).get("text", "").strip() + await reset_timer_callback(len(answer), did) + self.log.debug(f"query:{query} did:{did} answer:{answer}") + except asyncio.CancelledError: + self.log.info("Conversation loop cancelled, cleaning up...") + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + raise async def poll_latest_ask(self, session): """轮询最新对话记录 @@ -111,46 +120,50 @@ class ConversationPoller: Args: session: aiohttp客户端会话 """ - while True: - if not self.config.enable_pull_ask: - self.log.debug("Listening new message disabled") - await asyncio.sleep(5) - continue + try: + while True: + if not self.config.enable_pull_ask: + self.log.debug("Listening new message disabled") + await asyncio.sleep(5) + continue - self.log.debug(f"Listening new message, timestamp: {self.last_timestamp}") - # 动态获取最新的 cookie_jar - if self.auth_manager.cookie_jar is not None: - session._cookie_jar = self.auth_manager.cookie_jar + self.log.debug(f"Listening new message, timestamp: {self.last_timestamp}") + # 动态获取最新的 cookie_jar + if self.auth_manager.cookie_jar is not None: + session._cookie_jar = self.auth_manager.cookie_jar - # 拉取所有音箱的对话记录 - tasks = [] - for device_id in self.device_id_did: - # 首次用当前时间初始化 - did = self.get_did(device_id) - if did not in self.last_timestamp: - self.last_timestamp[did] = int(time.time() * 1000) + # 拉取所有音箱的对话记录 + tasks = [] + for device_id in self.device_id_did: + # 首次用当前时间初始化 + did = self.get_did(device_id) + if did not in self.last_timestamp: + self.last_timestamp[did] = int(time.time() * 1000) - hardware = self.get_hardward(device_id) - if ( - hardware in self.get_ask_by_mina_list - ) or self.config.get_ask_by_mina: - tasks.append(self.get_latest_ask_by_mina(device_id)) + hardware = self.get_hardward(device_id) + if ( + hardware in self.get_ask_by_mina_list + ) or self.config.get_ask_by_mina: + tasks.append(self.get_latest_ask_by_mina(device_id)) + else: + tasks.append(self.get_latest_ask_from_xiaoai(session, device_id)) + await asyncio.gather(*tasks) + + start = time.perf_counter() + await self.polling_event.wait() + if self.config.pull_ask_sec <= 1: + if (d := time.perf_counter() - start) < 1: + await asyncio.sleep(1 - d) else: - tasks.append(self.get_latest_ask_from_xiaoai(session, device_id)) - await asyncio.gather(*tasks) - - start = time.perf_counter() - await self.polling_event.wait() - if self.config.pull_ask_sec <= 1: - if (d := time.perf_counter() - start) < 1: - await asyncio.sleep(1 - d) - else: - sleep_sec = 0 - while True: - await asyncio.sleep(1) - sleep_sec = sleep_sec + 1 - if sleep_sec >= self.config.pull_ask_sec: - break + sleep_sec = 0 + while True: + await asyncio.sleep(1) + sleep_sec = sleep_sec + 1 + if sleep_sec >= self.config.pull_ask_sec: + break + except asyncio.CancelledError: + self.log.info("Polling task cancelled") + raise async def get_latest_ask_from_xiaoai(self, session, device_id): """从小爱API获取最新对话