From 4a418e20202ae0cc47d0bd3be55cd1409c2f6350 Mon Sep 17 00:00:00 2001 From: lilingfengdev Date: 2025年10月29日 10:20:15 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E7=AE=80=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/v1/chat.py | 303 ++++++++++++++++++----------------------- api/v1/models.py | 17 +-- config.py | 123 ++++++++++------- engine/deep_think.py | 85 +++++------- engine/ultra_think.py | 3 +- utils/openai_client.py | 8 +- utils/summary_think.py | 292 +++++---------------------------------- 7 files changed, 273 insertions(+), 558 deletions(-) diff --git a/api/v1/chat.py b/api/v1/chat.py index f2abf5a..a61f71f 100644 --- a/api/v1/chat.py +++ b/api/v1/chat.py @@ -122,6 +122,126 @@ def verify_auth(authorization: str = Header(None)) -> bool: return True +async def _run_engine_with_streaming( + engine, + request_id: str, + created: int, + model: str, + thinking_generator +) -> AsyncIterator[str]: + """运行引擎并流式输出结果""" + progress_queue = [] + + def on_progress(event): + """捕获进度事件""" + progress_queue.append(event) + + # 设置进度回调 + engine.on_progress = on_progress + + # UltraThink 特殊处理 + if hasattr(engine, 'on_agent_update'): + def on_agent_update(agent_id: str, update: Dict[str, Any]): + """捕获 Agent 更新""" + from models import ProgressEvent + progress_queue.append(ProgressEvent( + type="agent-update", + data={"agentId": agent_id, **update} + )) + engine.on_agent_update = on_agent_update + + # 在后台运行引擎 + engine_task = asyncio.create_task(engine.run()) + + try: + # 流式发送进度 + while not engine_task.done(): + # 处理队列中的进度事件 + while progress_queue: + event = progress_queue.pop(0) + # 如果启用了 summary_think,将事件转换为思维链 + if thinking_generator: + thinking_text = thinking_generator.process_event(event) + if thinking_text: + # 使用 reasoning_content 字段输出推理过程 + delta = {"reasoning_content": thinking_text} + chunk_data = { + "id": request_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{ + "index": 0, + "delta": delta, + "finish_reason": None + }] + } + yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" + await asyncio.sleep(0.1) # 短暂等待避免busy loop + + # 获取最终结果 + result = await engine_task + + # 处理剩余的进度事件 + while progress_queue: + event = progress_queue.pop(0) + if thinking_generator: + thinking_text = thinking_generator.process_event(event) + if thinking_text: + delta = {"reasoning_content": thinking_text} + chunk_data = { + "id": request_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{ + "index": 0, + "delta": delta, + "finish_reason": None + }] + } + yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" + + # 流式发送最终答案 + final_text = result.summary or result.final_solution + for i in range(0, len(final_text), 50): + chunk = final_text[i:i+50] + delta = {"content": chunk} + chunk_data = { + "id": request_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{ + "index": 0, + "delta": delta, + "finish_reason": None + }] + } + yield f"data: {json.dumps(chunk_data)}\n\n" + + except GeneratorExit: + # 客户端断开连接,取消引擎任务 + logger.info(f"Client disconnected for request {request_id}, cancelling engine task") + if engine_task and not engine_task.done(): + engine_task.cancel() + try: + await engine_task + except asyncio.CancelledError: + pass # 预期的取消异常 + # 不重新抛出 GeneratorExit,让生成器正常结束 + except (asyncio.CancelledError, Exception) as e: + # 其他异常情况,记录日志并取消任务 + logger.error(f"Error during streaming for request {request_id}: {e}") + if engine_task and not engine_task.done(): + engine_task.cancel() + try: + await engine_task + except asyncio.CancelledError: + pass + raise # 重新抛出异常 + + async def stream_chat_completion( request: ChatCompletionRequest, model_config, @@ -130,7 +250,6 @@ async def stream_chat_completion( """流式聊天补全""" request_id = f"chatcmpl-{uuid.uuid4().hex[:8]}" created = int(time.time()) - engine_task = None # 用于跟踪引擎任务以便在断开时取消 # 提取 LLM 参数 llm_params = extract_llm_params(request) @@ -178,205 +297,43 @@ async def stream_chat_completion( else: thinking_generator = ThinkingSummaryGenerator(mode="deepthink") - # 定义进度处理器 - 将进度事件转换为流式输出 - async def stream_progress(event): - """处理进度事件并流式发送""" - # 如果启用了 summary_think,将事件转换为思维链 - if thinking_generator: - thinking_text = thinking_generator.process_event(event) - if thinking_text: - # 使用 reasoning_content 字段输出推理过程 - delta = {"reasoning_content": thinking_text} - chunk_data = { - "id": request_id, - "object": "chat.completion.chunk", - "created": created, - "model": request.model, - "choices": [{ - "index": 0, - "delta": delta, - "finish_reason": None - }] - } - yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n" # 根据模型级别选择引擎 if model_config.level == "ultrathink": # UltraThink 模式 - # 使用生成器来捕获进度并流式输出 - progress_queue = [] - - def on_progress(event): - """捕获进度事件""" - progress_queue.append(event) - - def on_agent_update(agent_id: str, update: Dict[str, Any]): - """捕获 Agent 更新""" - from models import ProgressEvent - progress_queue.append(ProgressEvent( - type="agent-update", - data={"agentId": agent_id, **update} - )) - - # 运行引擎 - 传递结构化的对话历史和多模态内容 engine = UltraThinkEngine( client=client, model=model_config.model, - problem_statement=problem_statement_raw, # 传递多模态内容 - conversation_history=conversation_history, # 传递结构化的消息历史 + problem_statement=problem_statement_raw, + conversation_history=conversation_history, max_iterations=model_config.max_iterations, required_successful_verifications=model_config.required_verifications, num_agents=model_config.num_agent, parallel_run_agent=model_config.parallel_run_agent, model_stages=model_config.models, - on_progress=on_progress, - on_agent_update=on_agent_update, enable_parallel_check=model_config.parallel_check, llm_params=llm_params, ) - - # 在后台运行引擎 - engine_task = asyncio.create_task(engine.run()) - - try: - # 流式发送进度 - while not engine_task.done(): - # 处理队列中的进度事件 - while progress_queue: - event = progress_queue.pop(0) - async for chunk in stream_progress(event): - yield chunk - await asyncio.sleep(0.1) # 短暂等待避免busy loop - - # 获取最终结果 - result = await engine_task - - # 处理剩余的进度事件 - while progress_queue: - event = progress_queue.pop(0) - async for chunk in stream_progress(event): - yield chunk - - # 流式发送最终答案 - final_text = result.summary or result.final_solution - for i in range(0, len(final_text), 50): - chunk = final_text[i:i+50] - delta = {"content": chunk} - chunk_data = { - "id": request_id, - "object": "chat.completion.chunk", - "created": created, - "model": request.model, - "choices": [{ - "index": 0, - "delta": delta, - "finish_reason": None - }] - } - yield f"data: {json.dumps(chunk_data)}\n\n" - except GeneratorExit: - # 客户端断开连接,取消引擎任务 - logger.info(f"Client disconnected for request {request_id}, cancelling engine task") - if engine_task and not engine_task.done(): - engine_task.cancel() - try: - await engine_task - except asyncio.CancelledError: - pass # 预期的取消异常 - # 不重新抛出 GeneratorExit,让生成器正常结束 - except (asyncio.CancelledError, Exception) as e: - # 其他异常情况,记录日志并取消任务 - logger.error(f"Error during streaming for request {request_id}: {e}") - if engine_task and not engine_task.done(): - engine_task.cancel() - try: - await engine_task - except asyncio.CancelledError: - pass - raise # 重新抛出异常 - else: # deepthink # DeepThink 模式 - progress_queue = [] - - def on_progress(event): - """捕获进度事件""" - progress_queue.append(event) - - # 运行引擎 - 传递结构化的对话历史和多模态内容 engine = DeepThinkEngine( client=client, model=model_config.model, - problem_statement=problem_statement_raw, # 传递多模态内容 - conversation_history=conversation_history, # 传递结构化的消息历史 + problem_statement=problem_statement_raw, + conversation_history=conversation_history, max_iterations=model_config.max_iterations, required_successful_verifications=model_config.required_verifications, model_stages=model_config.models, - on_progress=on_progress, enable_planning=model_config.has_plan_mode, enable_parallel_check=model_config.parallel_check, llm_params=llm_params, ) - - # 在后台运行引擎 - engine_task = asyncio.create_task(engine.run()) - - try: - # 流式发送进度 - while not engine_task.done(): - # 处理队列中的进度事件 - while progress_queue: - event = progress_queue.pop(0) - async for chunk in stream_progress(event): - yield chunk - await asyncio.sleep(0.1) # 短暂等待避免busy loop - - # 获取最终结果 - result = await engine_task - - # 处理剩余的进度事件 - while progress_queue: - event = progress_queue.pop(0) - async for chunk in stream_progress(event): - yield chunk - - # 流式发送最终答案 - final_text = result.summary or result.final_solution - for i in range(0, len(final_text), 50): - chunk = final_text[i:i+50] - delta = {"content": chunk} - chunk_data = { - "id": request_id, - "object": "chat.completion.chunk", - "created": created, - "model": request.model, - "choices": [{ - "index": 0, - "delta": delta, - "finish_reason": None - }] - } - yield f"data: {json.dumps(chunk_data)}\n\n" - except GeneratorExit: - # 客户端断开连接,取消引擎任务 - logger.info(f"Client disconnected for request {request_id}, cancelling engine task") - if engine_task and not engine_task.done(): - engine_task.cancel() - try: - await engine_task - except asyncio.CancelledError: - pass # 预期的取消异常 - # 不重新抛出 GeneratorExit,让生成器正常结束 - except (asyncio.CancelledError, Exception) as e: - # 其他异常情况,记录日志并取消任务 - logger.error(f"Error during streaming for request {request_id}: {e}") - if engine_task and not engine_task.done(): - engine_task.cancel() - try: - await engine_task - except asyncio.CancelledError: - pass - raise # 重新抛出异常 + + # 使用统一的流式处理函数 + async for chunk in _run_engine_with_streaming( + engine, request_id, created, request.model, thinking_generator + ): + yield chunk # 发送结束标记 chunk_data = { diff --git a/api/v1/models.py b/api/v1/models.py index 6a1300e..b4e8fbb 100644 --- a/api/v1/models.py +++ b/api/v1/models.py @@ -6,26 +6,11 @@ from typing import List, Dict, Any from config import config +from .chat import verify_auth # 复用,别重复造轮子 router = APIRouter() -def verify_auth(authorization: str = Header(None)) -> bool: - """验证 API 密钥""" - if not config.api_key: - return True - - if not authorization: - raise HTTPException(status_code=401, detail="Missing authorization header") - - token = authorization.replace("Bearer ", "").strip() - - if not config.validate_api_key(token): - raise HTTPException(status_code=401, detail="Invalid API key") - - return True - - @router.get("/v1/models") async def list_models(authorization: str = Header(None)): """ diff --git a/config.py b/config.py index dcf8b86..2a13590 100644 --- a/config.py +++ b/config.py @@ -6,71 +6,88 @@ from typing import Dict, Any, Optional, List import yaml from pathlib import Path +from dataclasses import dataclass, field +@dataclass class ModelConfig: """单个模型的配置""" - - def __init__(self, model_id: str, config: Dict[str, Any]): - self.model_id = model_id - self.name = config.get("name", model_id) - self.provider = config.get("provider") - self.model = config.get("model") - self.level = config.get("level", "deepthink") # deepthink, ultrathink - self.rpm = config.get("rpm") # 每分钟请求限制 - self.max_iterations = config.get("max_iterations", 30) - self.required_verifications = config.get("required_verifications", 3) - self.max_errors = config.get("max_errors_before_give_up", 10) - self.parallel_check = config.get("parallel_check", False) # 并行验证模式 - self.max_retry = config.get("max_retry") # 最大重试次数(可选,不设置则使用系统默认值) - - # UltraThink 配置 - self.num_agent = config.get("num_agent") - self.parallel_run_agent = config.get("parallel_run_agent", 3) - - # 特性配置 - self.feature = config.get("feature", {}) - - # 分阶段模型配置 - self.models = config.get("models", {}) - - @property - def has_vision(self) -> bool: - """是否支持视觉""" - return self.feature.get("vision", False) - - @property - def has_summary_think(self) -> bool: - """是否生成思维链摘要""" - return self.feature.get("summary_think", False) - - @property - def has_plan_mode(self) -> bool: - """是否启用计划模式""" - return self.feature.get("plan_mode", False) - - @property - def has_web_search(self) -> bool: - """是否启用网页搜索""" - return self.feature.get("web_search", False) + model_id: str + name: str + provider: str + model: str + level: str = "deepthink" # deepthink, ultrathink + rpm: Optional[int] = None # 每分钟请求限制 + max_iterations: int = 30 + required_verifications: int = 3 + max_errors: int = 10 + parallel_check: bool = False # 并行验证模式 + max_retry: Optional[int] = None # 最大重试次数 + + # UltraThink 配置 + num_agent: Optional[int] = None + parallel_run_agent: int = 3 + + # 特性配置 - 直接用布尔值,别搞那些花里胡哨的 + has_vision: bool = False + has_summary_think: bool = False + has_plan_mode: bool = False + has_web_search: bool = False + + # 分阶段模型配置 + models: Dict[str, str] = field(default_factory=dict) + + @classmethod + def from_dict(cls, model_id: str, config: Dict[str, Any]) -> 'ModelConfig': + """从字典创建配置""" + feature = config.get("feature", {}) + return cls( + model_id=model_id, + name=config.get("name", model_id), + provider=config.get("provider"), + model=config.get("model"), + level=config.get("level", "deepthink"), + rpm=config.get("rpm"), + max_iterations=config.get("max_iterations", 30), + required_verifications=config.get("required_verifications", 3), + max_errors=config.get("max_errors_before_give_up", 10), + parallel_check=config.get("parallel_check", False), + max_retry=config.get("max_retry"), + num_agent=config.get("num_agent"), + parallel_run_agent=config.get("parallel_run_agent", 3), + has_vision=feature.get("vision", False), + has_summary_think=feature.get("summary_think", False), + has_plan_mode=feature.get("plan_mode", False), + has_web_search=feature.get("web_search", False), + models=config.get("models", {}) + ) def get_stage_model(self, stage: str) -> str: - """获取特定阶段的模型,如果未配置则返回主模型""" + """获取特定阶段的模型""" return self.models.get(stage, self.model) def get_max_retry(self, default: int = 3) -> int: - """获取最大重试次数,如果未配置则使用提供的默认值""" + """获取最大重试次数""" return self.max_retry if self.max_retry is not None else default +@dataclass class ProviderConfig: """提供商配置""" - - def __init__(self, provider_id: str, config: Dict[str, Any]): - self.provider_id = provider_id - self.base_url = config.get("base_url", "") - self.key = config.get("key", "") - self.response_api = config.get("response_api", True) + provider_id: str + base_url: str = "" + key: str = "" + response_api: bool = True + + @classmethod + def from_dict(cls, provider_id: str, config: Dict[str, Any]) -> 'ProviderConfig': + """从字典创建配置""" + return cls( + provider_id=provider_id, + base_url=config.get("base_url", ""), + key=config.get("key", ""), + response_api=config.get("response_api", True) + ) class Config: @@ -109,12 +126,12 @@ def load(self, config_path: Optional[Path] = None): # 加载提供商配置 providers = self._config.get("provider", {}) for provider_id, provider_config in providers.items(): - self._providers[provider_id] = ProviderConfig(provider_id, provider_config) + self._providers[provider_id] = ProviderConfig.from_dict(provider_id, provider_config) # 加载模型配置 models = self._config.get("model", {}) for model_id, model_config in models.items(): - self._models[model_id] = ModelConfig(model_id, model_config) + self._models[model_id] = ModelConfig.from_dict(model_id, model_config) @property def api_key(self) -> str: diff --git a/engine/deep_think.py b/engine/deep_think.py index 7cd7357..0c433f9 100644 --- a/engine/deep_think.py +++ b/engine/deep_think.py @@ -27,9 +27,10 @@ build_final_summary_prompt, build_thinking_plan_prompt, ) +from .base import ThinkEngine, ThinkResult -class DeepThinkEngine: +class DeepThinkEngine(ThinkEngine): """Deep Think 引擎 - 单 Agent 深度推理""" def __init__( @@ -49,32 +50,26 @@ def __init__( enable_parallel_check: bool = False, llm_params: Optional[Dict[str, Any]] = None, ): - self.client = client - self.model = model - self.problem_statement = problem_statement # 可能是字符串或多模态内容 - self.problem_statement_text = extract_text_from_content(problem_statement) # 提取纯文本版本 - self.conversation_history = conversation_history or [] # 结构化的消息历史 + # 调用基类构造函数 + super().__init__( + client=client, + model=model, + problem_statement=problem_statement, + conversation_history=conversation_history, + max_iterations=max_iterations, + required_successful_verifications=required_successful_verifications, + model_stages=model_stages, + enable_parallel_check=enable_parallel_check, + llm_params=llm_params, + on_progress=on_progress, + ) + + # DeepThink 特有的属性 self.other_prompts = other_prompts or [] # 向后兼容 self.knowledge_context = knowledge_context - self.max_iterations = max_iterations - self.required_verifications = required_successful_verifications self.max_errors = max_errors_before_give_up - self.model_stages = model_stages or {} - self.on_progress = on_progress self.sources: List[Source] = [] self.enable_planning = enable_planning - self.enable_parallel_check = enable_parallel_check - self.llm_params = llm_params or {} - self._task = None # 用于存储当前任务,以便取消 - - def _get_model_for_stage(self, stage: str) -> str: - """获取特定阶段的模型""" - return self.model_stages.get(stage, self.model) - - def _emit(self, event_type: str, data: Dict[str, Any]): - """发送进度事件""" - if self.on_progress: - self.on_progress(ProgressEvent(type=event_type, data=data)) def _extract_detailed_solution( self, @@ -100,42 +95,36 @@ async def _generate_thinking_plan(self, problem_statement: MessageContent) -> st prompt = build_thinking_plan_prompt(problem_text) # 直接使用 prompt 参数传递多模态内容 - plan = await self.client.generate_text( - model=self.model, + plan = await self._call_llm( prompt=problem_statement, # 保留多模态内容 - **self.llm_params + stage="planning" ) self._emit("planning", {"plan": plan}) return plan - async def _verify_solution( - self, - problem_statement: MessageContent, - solution: str - ) -> Dict[str, str]: - """验证解决方案""" + async def _verify_solution(self, solution: str, index: int) -> Dict[str, Any]: + """验证解决方案 - 实现基类接口""" detailed_solution = self._extract_detailed_solution(solution) # 提取文本用于构建提示词 - problem_text = extract_text_from_content(problem_statement) + problem_text = extract_text_from_content(self.problem_statement) verification_prompt = build_verification_prompt( problem_text, detailed_solution, self.conversation_history # 传入对话历史 ) - self._emit("progress", {"message": "Verifying solution..."}) + self._emit("progress", {"message": f"Verifying solution (attempt {index + 1})..."}) # 使用验证阶段的模型 verification_model = self._get_model_for_stage("verification") # 获取验证结果 - verification_output = await self.client.generate_text( - model=verification_model, - system=VERIFICATION_SYSTEM_PROMPT, + verification_output = await self._call_llm( prompt=verification_prompt, - **self.llm_params + system=VERIFICATION_SYSTEM_PROMPT, + stage="verification" ) # 检查验证是否通过 @@ -145,10 +134,9 @@ async def _verify_solution( f'justification gap?\n\n{verification_output}' ) - good_verify = await self.client.generate_text( - model=verification_model, + good_verify = await self._call_llm( prompt=check_prompt, - **self.llm_params + stage="verification" ) bug_report = "" @@ -163,13 +151,12 @@ async def _verify_solution( async def _verify_solution_parallel( self, - problem_statement: MessageContent, solution: str ) -> Dict[str, str]: """并行验证解决方案 - 同时启动required_verifications个验证LLM调用,全部通过才算成功""" detailed_solution = self._extract_detailed_solution(solution) # 提取文本用于构建提示词 - problem_text = extract_text_from_content(problem_statement) + problem_text = extract_text_from_content(self.problem_statement) verification_prompt = build_verification_prompt( problem_text, detailed_solution, @@ -335,15 +322,9 @@ async def _initial_exploration( # 验证 - 根据配置选择串行或并行 if self.enable_parallel_check: - verification = await self._verify_solution_parallel( - problem_statement, - improved_solution - ) + verification = await self._verify_solution_parallel(improved_solution) else: - verification = await self._verify_solution( - problem_statement, - improved_solution - ) + verification = await self._verify_solution(improved_solution, 0) self._emit("verification", { "passed": "yes" in verification["good_verify"].lower(), @@ -499,9 +480,9 @@ async def run(self) -> DeepThinkResult: # 再次验证 - 根据配置选择串行或并行 if self.enable_parallel_check: - verification = await self._verify_solution_parallel(self.problem_statement, solution) + verification = await self._verify_solution_parallel(solution) else: - verification = await self._verify_solution(self.problem_statement, solution) + verification = await self._verify_solution(solution, i + 1) self._emit("verification", { "passed": "yes" in verification["good_verify"].lower(), "iteration": i + 1, diff --git a/engine/ultra_think.py b/engine/ultra_think.py index e373277..a9c988e 100644 --- a/engine/ultra_think.py +++ b/engine/ultra_think.py @@ -22,9 +22,10 @@ build_final_summary_prompt, ) from engine.deep_think import DeepThinkEngine +from .base import ThinkEngine, ThinkResult -class UltraThinkEngine: +class UltraThinkEngine(ThinkEngine): """Ultra Think 引擎 - 多 Agent 并行探索""" def __init__( diff --git a/utils/openai_client.py b/utils/openai_client.py index 5d7c014..b62af25 100644 --- a/utils/openai_client.py +++ b/utils/openai_client.py @@ -15,16 +15,10 @@ class OpenAIClient: """OpenAI 客户端包装器""" def __init__(self, base_url: str, api_key: str, rpm: Optional[int] = None, max_retry: int = 3): - # 创建带有超时设置的 HTTP 客户端 - # 这样可以确保在客户端断开时,HTTP 请求能更快响应取消 - http_client = httpx.AsyncClient( - timeout=httpx.Timeout(3000.0, connect=10.0, read=1200.0), # 总超时300秒,读取120秒 - ) - + # OpenAI客户端自己会管理连接,不需要我们操心 self.client = AsyncOpenAI( base_url=base_url, api_key=api_key, - http_client=http_client, max_retries=max_retry, ) self.rpm = rpm diff --git a/utils/summary_think.py b/utils/summary_think.py index 2fa2ccf..01bb29b 100644 --- a/utils/summary_think.py +++ b/utils/summary_think.py @@ -1,279 +1,59 @@ """ -Summary Think 功能 -根据实际推理进度动态生成思维链 +简化版思维链生成器 +把280行垃圾代码简化成50行 """ from typing import Dict, Any from models import ProgressEvent -class ThinkingSummaryGenerator: - """ - 思维链生成器 - 根据引擎的实际进度事件动态生成思维链文本 - """ +def process_thinking_event(event: ProgressEvent, mode: str = "deepthink") -> str: + """处理进度事件,生成思维链文本""" + event_type = event.type + data = event.data + + # 简单的事件映射 + event_messages = { + "init": "Analyzing the problem...\n\n", + "thinking": f"Iteration {data.get('iteration', 0) + 1}\n", + "verification": "Verified\n" if data.get('passed') else "Refining approach\n", + "summarizing": "\nGenerating final answer...\n", + "planning": f"Planning approach\n", + "agent-start": f"Agent {data.get('agentId', '')} starting: {data.get('approach', '')}\n", + "agent-complete": f"Agent {data.get('agentId', '')} completed\n", + "synthesis": "Synthesizing results...\n", + } + + # 成功/失败特殊处理 + if event_type == "success": + iterations = data.get('iterations', 0) + return f"\nAnalysis complete ({iterations} iterations)\n\n---\n\n" + elif event_type == "failure": + return f"\nAnalysis failed: {data.get('reason', 'Unknown')}\n" + return event_messages.get(event_type, "") + + +class ThinkingSummaryGenerator: + """保持接口兼容的简化版生成器""" def __init__(self, mode: str = "deepthink"): self.mode = mode - self.started = False - self.current_iteration = -1 - self.current_phase = None - self.agent_states: Dict[str, Dict[str, Any]] = {} def process_event(self, event: ProgressEvent) -> str: - """ - 处理进度事件,生成对应的思维链文本 (o1 专业风格) - - Args: - event: 进度事件 - - Returns: - 思维链文本块 (如果有) - """ - output = "" - event_type = event.type - data = event.data - - # 初始化 - if event_type == "init": - if not self.started: - self.started = True - output += "Analyzing the problem...\n\n" - - # 思考阶段 - elif event_type == "thinking": - iteration = data.get("iteration", 0) - phase = data.get("phase", "") - - # 新的迭代轮次 - if iteration != self.current_iteration: - self.current_iteration = iteration - if iteration> 0: - output += f"\nIteration {iteration + 1}\n" - - # 新的阶段 - if phase != self.current_phase: - self.current_phase = phase - phase_name = self._format_phase_name(phase) - output += f"{phase_name}\n" - - # 生成解决方案 - elif event_type == "solution": - # o1 风格:不显示每个解决方案生成 - pass - - # 验证 - elif event_type == "verification": - passed = data.get("passed", False) - if passed: - output += "Verified\n" - else: - output += "Refining approach\n" - - # 修正 - elif event_type == "correction": - # o1 风格:合并到验证流程 - pass - - # 总结 - elif event_type == "summarizing": - output += "\nGenerating final answer...\n" - - # 成功 - elif event_type == "success": - iterations = data.get("iterations", 0) - stats = data.get("statistics", {}) - output += f"\nAnalysis complete ({iterations} iterations)\n" - - # 显示统计信息 - if stats: - api_calls = stats.get("api_calls", 0) - total_tokens = stats.get("total_tokens", 0) - output += f"API calls: {api_calls}" - if total_tokens> 0: - output += f" | Tokens: {total_tokens:,}" - output += "\n" - - output += "\n---\n\n" - - # 失败 - elif event_type == "failure": - reason = data.get("reason", "Unknown") - stats = data.get("statistics", {}) - output += f"\nNote: {reason}\n" - - # 显示统计信息 - if stats: - api_calls = stats.get("api_calls", 0) - total_tokens = stats.get("total_tokens", 0) - output += f"API calls: {api_calls}" - if total_tokens> 0: - output += f" | Tokens: {total_tokens:,}" - output += "\n" - - output += "\n---\n\n" - - # 进度消息 - elif event_type == "progress": - # o1 风格:隐藏详细进度消息 - pass - - # 计划阶段 - elif event_type == "planning": - output += "Planning approach\n" - - return output - - def _format_phase_name(self, phase: str) -> str: - """ - 格式化阶段名称 (o1 专业风格) - - Args: - phase: 阶段标识 - - Returns: - 格式化的阶段名称 - """ - phase_map = { - "initial-exploration": "Exploring approaches", - "self-improvement": "Refining solution", - "initializing": "Initializing", - "correcting": "Applying corrections", - "summarizing": "Summarizing", - "verifying": "Verifying solution", - } - - default_name = phase.replace("_", " ").replace("-", " ").title() - return phase_map.get(phase, default_name) + return process_thinking_event(event, self.mode) class UltraThinkSummaryGenerator(ThinkingSummaryGenerator): - """ - UltraThink 模式的思维链生成器 - 额外处理 Agent 状态更新 - """ - + """UltraThink版本""" def __init__(self): - super().__init__(mode="ultrathink") - self.agent_count = 0 - self.completed_agents = 0 - - def process_event(self, event: ProgressEvent) -> str: - """处理 UltraThink 事件 (o1 专业风格)""" - output = "" - event_type = event.type - data = event.data - - # 重写初始化消息 - if event_type == "init": - if not self.started: - self.started = True - output += "Analyzing with multiple approaches...\n\n" - return output - - # 处理 Agent 更新 - if event_type == "agent-update": - agent_id = data.get("agentId", "") - status = data.get("status", "") - approach = data.get("approach", "") - - # 新 Agent 启动 - if agent_id and agent_id not in self.agent_states: - self.agent_count += 1 - self.agent_states[agent_id] = {"approach": approach} - if approach: - output += f"Approach {self.agent_count}: {approach}\n" - - # 状态变化 - if agent_id and status: - old_status = self.agent_states[agent_id].get("status") - if status != old_status: - self.agent_states[agent_id]["status"] = status - - if status == "completed": - self.completed_agents += 1 - # 只显示完成状态 - output += f" Completed ({self.completed_agents}/{self.agent_count})\n" - - elif status == "failed": - # 只显示失败,不显示详细错误 - output += f" Approach encountered difficulties\n" - - # 综合阶段 - elif event_type == "progress": - message = data.get("message", "") - if "Synthesizing" in message: - if "deep thinking" in message.lower(): - output += "\nSynthesizing results with deep analysis...\n" - else: - output += "\nSynthesizing results from all approaches...\n" - elif "Deep thinking on synthesis" in message: - output += " Analyzing synthesis approach\n" - elif "Synthesis verified" in message: - output += " Synthesis verified\n" - elif "Refining synthesis" in message: - output += " Refining synthesis\n" - - # 生成摘要 - elif event_type == "summarizing": - output += "Generating final comprehensive answer...\n" - - # 成功完成 - elif event_type == "success": - stats = data.get("statistics", {}) - output += f"\nAnalysis complete\n" - - # 显示统计信息 - if stats: - api_calls = stats.get("api_calls", 0) - total_tokens = stats.get("total_tokens", 0) - output += f"API calls: {api_calls}" - if total_tokens> 0: - output += f" | Tokens: {total_tokens:,}" - output += "\n" - - output += "\n---\n\n" - - else: - # 其他事件用父类处理 - output = super().process_event(event) - - return output + super().__init__("ultrathink") def generate_simple_thinking_tag(mode: str = "deepthink") -> str: - """ - 生成简单的思考标签(非流式场景) - o1 专业风格 - - Args: - mode: 模式 (deepthink 或 ultrathink) - - Returns: - 简单的思考提示 - """ - if mode == "ultrathink": - return """Analyzing with multiple approaches... - -Exploring different solution paths -Verifying feasibility -Synthesizing results from all approaches -Generating final comprehensive answer + """生成简单的思考标签""" + return f"""Analyzing with {mode}... Analysis complete --- -""" - else: - return """Analyzing the problem... - -Understanding requirements -Exploring approaches -Refining solution -Verifying correctness -Generating final answer - -Analysis complete - ---- - -""" - +""" \ No newline at end of file From bbc547f00493bfd35a413863e9bcfd0e25bd668b Mon Sep 17 00:00:00 2001 From: lilingfengdev Date: 2025年10月29日 11:43:07 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E7=AE=80=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- engine/deep_think.py | 57 +++++++++++++++++++++++++------------------ engine/ultra_think.py | 3 +-- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/engine/deep_think.py b/engine/deep_think.py index 0c433f9..b15c55c 100644 --- a/engine/deep_think.py +++ b/engine/deep_think.py @@ -27,10 +27,9 @@ build_final_summary_prompt, build_thinking_plan_prompt, ) -from .base import ThinkEngine, ThinkResult -class DeepThinkEngine(ThinkEngine): +class DeepThinkEngine: """Deep Think 引擎 - 单 Agent 深度推理""" def __init__( @@ -50,26 +49,32 @@ def __init__( enable_parallel_check: bool = False, llm_params: Optional[Dict[str, Any]] = None, ): - # 调用基类构造函数 - super().__init__( - client=client, - model=model, - problem_statement=problem_statement, - conversation_history=conversation_history, - max_iterations=max_iterations, - required_successful_verifications=required_successful_verifications, - model_stages=model_stages, - enable_parallel_check=enable_parallel_check, - llm_params=llm_params, - on_progress=on_progress, - ) - - # DeepThink 特有的属性 + self.client = client + self.model = model + self.problem_statement = problem_statement # 可能是字符串或多模态内容 + self.problem_statement_text = extract_text_from_content(problem_statement) # 提取纯文本版本 + self.conversation_history = conversation_history or [] # 结构化的消息历史 self.other_prompts = other_prompts or [] # 向后兼容 self.knowledge_context = knowledge_context + self.max_iterations = max_iterations + self.required_verifications = required_successful_verifications self.max_errors = max_errors_before_give_up + self.model_stages = model_stages or {} + self.on_progress = on_progress self.sources: List[Source] = [] self.enable_planning = enable_planning + self.enable_parallel_check = enable_parallel_check + self.llm_params = llm_params or {} + self._task = None # 用于存储当前任务,以便取消 + + def _get_model_for_stage(self, stage: str) -> str: + """获取特定阶段的模型""" + return self.model_stages.get(stage, self.model) + + def _emit(self, event_type: str, data: Dict[str, Any]): + """发送进度事件""" + if self.on_progress: + self.on_progress(ProgressEvent(type=event_type, data=data)) def _extract_detailed_solution( self, @@ -95,9 +100,11 @@ async def _generate_thinking_plan(self, problem_statement: MessageContent) -> st prompt = build_thinking_plan_prompt(problem_text) # 直接使用 prompt 参数传递多模态内容 - plan = await self._call_llm( + planning_model = self._get_model_for_stage("planning") + plan = await self.client.generate_text( + model=planning_model, prompt=problem_statement, # 保留多模态内容 - stage="planning" + **self.llm_params ) self._emit("planning", {"plan": plan}) @@ -121,10 +128,11 @@ async def _verify_solution(self, solution: str, index: int) -> Dict[str, Any]: verification_model = self._get_model_for_stage("verification") # 获取验证结果 - verification_output = await self._call_llm( - prompt=verification_prompt, + verification_output = await self.client.generate_text( + model=verification_model, system=VERIFICATION_SYSTEM_PROMPT, - stage="verification" + prompt=verification_prompt, + **self.llm_params ) # 检查验证是否通过 @@ -134,9 +142,10 @@ async def _verify_solution(self, solution: str, index: int) -> Dict[str, Any]: f'justification gap?\n\n{verification_output}' ) - good_verify = await self._call_llm( + good_verify = await self.client.generate_text( + model=verification_model, prompt=check_prompt, - stage="verification" + **self.llm_params ) bug_report = "" diff --git a/engine/ultra_think.py b/engine/ultra_think.py index a9c988e..e373277 100644 --- a/engine/ultra_think.py +++ b/engine/ultra_think.py @@ -22,10 +22,9 @@ build_final_summary_prompt, ) from engine.deep_think import DeepThinkEngine -from .base import ThinkEngine, ThinkResult -class UltraThinkEngine(ThinkEngine): +class UltraThinkEngine: """Ultra Think 引擎 - 多 Agent 并行探索""" def __init__( From 41145c30cbd0f3723174d085344bd10b9d254950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=AE?= Date: 2025年10月31日 10:29:56 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81ARM64=E6=9E=B6?= =?UTF-8?q?=E6=9E=84=E7=9A=84Docker=E9=95=9C=E5=83=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加QEMU模拟器支持,实现多架构构建 - Dockerfile支持linux/amd64,linux/arm64双平台 - 更新README文档说明架构自动选择机制 --- .github/workflows/docker.yml | 7 ++++++- Dockerfile | 4 ++-- README.md | 8 +++++++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 0218677..2e7f23f 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -21,6 +21,11 @@ jobs: - name: Checkout code uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + with: + platforms: arm64 + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 @@ -52,4 +57,4 @@ jobs: labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha cache-to: type=gha,mode=max - platforms: linux/amd64 # 只构建amd64镜像节省成本 + platforms: linux/amd64,linux/arm64 diff --git a/Dockerfile b/Dockerfile index f79c553..c7486eb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -# 使用官方Python运行时作为基础镜像 -FROM python:3.11-slim +# 使用官方Python运行时作为基础镜像,支持多架构 +FROM --platform=linux/amd64,linux/arm64 python:3.11-slim # 设置工作目录 WORKDIR /app diff --git a/README.md b/README.md index 18d6d47..4c89f26 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ docker-compose down ##### 使用 Docker 直接运行 ```bash -# 使用GitHub镜像直接运行 +# 使用GitHub镜像直接运行(支持多架构,Docker会自动选择合适的架构) docker run -d \ --name deepapi \ -p 8000:8000 \ @@ -95,6 +95,12 @@ docker run -d \ deepapi ``` +**架构支持说明** +- 镜像支持 `linux/amd64` 和 `linux/arm64` 双架构 +- Docker 会根据你的服务器架构自动拉取对应镜像 +- 在 ARM64 服务器(如 AWS Graviton、Apple Silicon)上会自动使用 arm64 版本 +- 在传统 x86_64 服务器上会自动使用 amd64 版本 + ##### 环境变量说明 - `PYTHONUNBUFFERED=1`: 启用Python日志输出 - 自定义DNS服务器(解决网络问题):`8.8.8.8`, `8.8.4.4`

AltStyle によって変換されたページ (->オリジナル) /