跳转到主要内容

Documentation Index

Fetch the complete documentation index at: https://docs.osmosis.ai/llms.txt

Use this file to discover all available pages before exploring further.

AgentWorkflow 类是在 Osmosis 中定义 Agent 行为的核心抽象。您需要对其进行子类化,并实现一个异步方法 —— run() —— 该方法处理 prompt 并生成输出。

AgentWorkflow 基类

from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext

class MyWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        # 您的 Agent 逻辑
        pass
SDK 中的基类签名:
class AgentWorkflow(Generic[TConfig], ABC):
    def __init__(self, config: TConfig | None = None):
        self.config = config

    @abstractmethod
    async def run(self, ctx: AgentWorkflowContext[TConfig]) -> Any:
        raise NotImplementedError
AgentWorkflow 在配置类型上是泛型的,允许您定义自定义配置字段。它有一个抽象方法 —— run() —— 接收一个 AgentWorkflowContext 并执行您的 Agent 逻辑。

AgentWorkflowContext

传递给 run()ctx 参数提供两个字段:
字段类型描述
ctx.promptlist[dict[str, Any]]当前数据集行对应的输入消息
ctx.configTConfig | None您的自定义配置对象(如果提供)
对于大多数 workflow,您不需要关心执行后端如何把这些消息整理成运行时格式。如果数据集中的这一行包含 system_promptuser_promptground_truth,那么输入部分会出现在 ctx.prompt 中,而参考答案会在 grader 中作为 ctx.label 暴露。 如果您使用的是 OsmosisStrandsAgent,通常可以直接把 messages=ctx.prompt 传进去,而不需要额外转换。

AgentWorkflowConfig

自定义配置扩展 AgentWorkflowConfig,它本身扩展自 BaseConfig
from osmosis_ai.rollout import AgentWorkflowConfig

class MyConfig(AgentWorkflowConfig):
    name: str = "my-workflow"
    description: str | None = "A custom workflow"
    max_iterations: int = 10
    temperature: float = 0.7
BaseConfig 提供以下基础字段:
字段类型默认值描述
namestr(必填)工作流的标识符
descriptionstr | NoneNone可选描述
AgentWorkflowConfig 增加了并发控制:
字段类型默认值描述
concurrencyConcurrencyConfig无限制控制最大并发 rollout 数
from osmosis_ai.rollout import AgentWorkflowConfig, ConcurrencyConfig

class MyConfig(AgentWorkflowConfig):
    name: str = "rate-limited-workflow"
    concurrency: ConcurrencyConfig = ConcurrencyConfig(max_concurrent=5)
BaseConfig 配置了 extra="allow",因此您可以添加任何自定义字段,无需额外的 Pydantic 样板代码。

实现模式

run() 中的 LLM 调用必须通过执行后端为每次 rollout 建立的 RolloutContext 进行路由。训练集群依赖这一路由来提供当前策略、附加 x-sample-id / x-rollout-id header,并收集 rollout 轨迹与 token 用量。受支持的方式是 OsmosisStrandsAgent + OsmosisRolloutModel —— 两者都会在运行时自动从活跃的 RolloutContext 中读取所需信息。直接调用 litellm(或任何厂商 SDK)并传入像 openai/gpt-5.2 这样的硬编码模型名会绕过这套机制,产出的 rollout 服务器无法与平台训练兼容。完整机制详见 Strands 集成

简单 LLM 调用

最简单的工作流把 prompt 包在 OsmosisStrandsAgent 里并调用一次。OsmosisRolloutModel 只是一个占位模型,SDK 会在运行时把它解析为训练集群的 endpoint:
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowContext,
    OsmosisRolloutModel,
    OsmosisStrandsAgent,
)

class SimpleLLMWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        agent = OsmosisStrandsAgent(
            name="simple",
            model=OsmosisRolloutModel(params={"temperature": 1.0}),
            messages=ctx.prompt,
            callback_handler=None,
        )
        await agent.invoke_async()
run() 内部构造 agent 会把它注册到当前的 RolloutContext,产生的消息轨迹会自动作为 rollout sample 被采集 —— 无需手动簿记。

带工具的多步 Agent 循环

对使用工具的 Agent,把模型和工具列表放在自定义配置上,然后迭代直到 agent 不再产生工具调用为止。Strands 内部会处理单次调用内的 tool-use 循环;外层循环则用于附加额外的停止条件并限制轮数:
from typing import Any

from strands import tool
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowConfig,
    AgentWorkflowContext,
    OsmosisRolloutModel,
    OsmosisStrandsAgent,
)


@tool(name="search")
def search_tool(query: str) -> str:
    """Search for information."""
    return f"results for {query}"


class SearchWorkflowConfig(AgentWorkflowConfig):
    name: str = "search-workflow"
    model: Any
    tools: Any


search_workflow_config = SearchWorkflowConfig(
    model=OsmosisRolloutModel(params={"temperature": 1.0}),
    tools=[search_tool],
)


class SearchWorkflow(AgentWorkflow[SearchWorkflowConfig]):
    async def run(self, ctx: AgentWorkflowContext[SearchWorkflowConfig]) -> None:
        config = ctx.config or search_workflow_config
        agent = OsmosisStrandsAgent(
            name="search-agent",
            model=config.model,
            tools=config.tools,
            messages=ctx.prompt,
            callback_handler=None,
        )

        for _ in range(8):  # max iterations
            result = await agent.invoke_async()
            content = result.message.get("content", [])
            if not any("toolUse" in cb for cb in content):
                break
完整可运行的版本可参考 osmosis-sdk-python 仓库中的 examples/rollout/multiply_rollout/workflow.py

自定义配置

使用自定义配置使您的工作流可参数化:
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowConfig,
    AgentWorkflowContext,
)

class MyConfig(AgentWorkflowConfig):
    name: str = "my-workflow"
    max_iterations: int = 10
    temperature: float = 0.7

class ConfigurableWorkflow(AgentWorkflow[MyConfig]):
    async def run(self, ctx: AgentWorkflowContext[MyConfig]) -> None:
        max_iter = ctx.config.max_iterations if ctx.config else 10
        # ... 在 Agent 逻辑中使用配置值 ...
配置值可以在 eval训练 TOML 配置文件中设置,方便您在不修改代码的情况下尝试不同的参数。

自动发现

SDK 会自动扫描您的 rollout 入口模块,查找 AgentWorkflow 的子类。无需注册、装饰器或工厂函数 —— 只需定义您的类即可被发现。
您的入口文件必须恰好包含 一个 AgentWorkflow 子类。如果 SDK 发现零个或多于一个,将会抛出错误。

使用 Strands Agents

Osmosis SDK 内置了与 AWS Strands Agents 框架的集成,允许您在 AgentWorkflow 中直接使用 Strands Agent。 详见 Strands 集成 了解设置和使用详情。

下一步

构建评分器

定义评估逻辑以对 Agent 的输出进行评分。

本地评估

使用 eval 模式在本地测试您的工作流。