跳到主要内容

管道

管道

Pipe 是一种独立函数,用来处理输入并生成响应。它可以在返回结果给用户之前,调用一个或多个 LLM,或者访问外部服务。

典型用途包括:

  • Retrieval-Augmented Generation(RAG)
  • 把请求发给非 OpenAI 提供方,例如 Anthropic、Azure OpenAI、Google
  • 直接在 Web UI 中执行某些函数式逻辑

参考示例见 Pipelines repo

Pipe 可以作为 Function 运行,也可以托管在独立的 Pipelines server 上。整体流程如下图:

Pipe Workflow

在 WebUI 中定义的 Pipe,会以一个带 External 标记的新模型形式出现在模型选择器里。

Pipe Models in WebUI

流式返回格式

Pipe 可以返回:

  • 单个 str
  • 一个 iterator / generator

在流式场景中,每次 yield 的内容可以是:

  • 纯字符串:会被视为对用户可见的文本,并持续追加到消息中
  • OpenAI 兼容的 SSE chunk dict:格式类似 /v1/chat/completions 的流式返回

例如:

{"choices": [{"delta": {"content": "..."}, "finish_reason": None}]}

如果你要自己闭合一个完整的流,请在最后发出一个终止 chunk:

yield {"choices": [{"delta": {}, "finish_reason": "stop"}]}

finish_reason 只能在最后出现一次。对于自行处理完工具调用的 pipeline,它应始终是 "stop",而不是 "tool_calls"

自包含 Agent 与 delta.tool_calls

这是构建 agent pipeline 时最容易踩的坑。

delta.tool_calls 的语义是:

“请客户端替我执行这个工具调用。”

当 OPL 数据空间的中间层看到它时,会:

  1. 取走这个 tool call
  2. 执行工具
  3. 追加 role: "tool" 消息
  4. 再次把请求送回同一个 pipeline

如果你的 pipeline 自己内部已经执行过工具,但又把 delta.tool_calls 发了出去, OPL 数据空间就会重复执行这次工具调用,并可能循环重试,直到达到 CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES 上限。

经验法则

  • 如果工具应该由 OPL 数据空间 执行:发 delta.tool_calls,并以 finish_reason: "tool_calls" 结束
  • 如果工具由 你的 pipeline 内部 agent 自己执行完全不要发 delta.tool_calls

如何把工具执行渲染成内容

OPL 数据空间自己会把已完成的工具执行渲染成 <details type="tool_calls"> 内容块。你也可以在自定义 pipeline 中发出同样结构:

import html
import json

call_id = "call_123"
name = "get_weather_test"
arguments = {"location": "SF"}
result = {"temp_c": 22}

details_block = (
    f'<details type="tool_calls" done="true" '
    f'id="{call_id}" name="{name}" '
    f'arguments="{html.escape(json.dumps(arguments))}">\n'
    f'<summary>Tool Executed</summary>\n'
    f'{html.escape(json.dumps(result, ensure_ascii=False))}\n'
    f'</details>\n'
)

然后把它作为内容 yield 出去:

yield details_block

或者:

yield {"choices": [{"delta": {"content": details_block}, "finish_reason": None}]}

一个完整的自包含 Agent 流示例

def pipe(self, user_message, model_id, messages, body):
    yield {"choices": [{"delta": {"role": "assistant", "content": "Looking up the weather… "}, "finish_reason": None}]}

    call_id = "call_123"
    name = "get_weather_test"
    arguments = {"location": "SF"}
    result = {"temp_c": 22}

    details_block = (
        f'<details type="tool_calls" done="true" '
        f'id="{call_id}" name="{name}" '
        f'arguments="{html.escape(json.dumps(arguments))}">\n'
        f'<summary>Tool Executed</summary>\n'
        f'{html.escape(json.dumps(result, ensure_ascii=False))}\n'
        f'</details>\n'
    )
    yield details_block

    yield "The weather is 22°C. Done."
    yield {"choices": [{"delta": {}, "finish_reason": "stop"}]}

LangChain Agent 模式

如果你用的是 LangChain,一般做法是:

  • AIMessageChunk 只流出普通文本内容
  • 完全丢弃 tool_calls
  • ToolMessage 渲染成 <details type="tool_calls">

这样才能避免 OPL 数据空间再次帮你执行已经由 Agent 内部跑完的工具。

参考讨论见: open-webui #23957