跳转到主要内容
AgentWorkflow 是 rollout 行为的 SDK contract。您需要继承它、实现一个 async run() 方法,并通过 Osmosis 支持的 agent integration 调用当前策略来创建 samples。 Workflow 应该回答一个问题:给定这个数据集 prompt,agent 在 grader 打分之前应该做什么?

基类

from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext


class MyWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        # Build and run your agent here.
        pass
SDK 形状如下:
class AgentWorkflow(Generic[TConfig], ABC):
    def __init__(self, config: TConfig | None = None):
        self.config = config

    @abstractmethod
    async def run(self, ctx: AgentWorkflowContext[TConfig]) -> Any:
        raise NotImplementedError
run() 会为每次 workflow execution 调用一次。它应该在方法内部构造任何 per-execution agent/session objects,运行 agent,并让 integration 把生成的 conversation 注册到活跃的 RolloutContext

AgentWorkflowContext

ctx object 为 workflow 提供输入和 config:
FieldTypeDescription
ctx.promptlist[dict[str, Any]]当前数据集行的输入 messages
ctx.configTConfig | None如果提供了,则为自定义 workflow config object
ctx.metadatadict[str, Any] | None来自数据集可选 metadata 列的每行 metadata。该行无 metadata 时为 None
如果您的数据集行包含 system_promptuser_promptground_truth,prompt 字段会被组装进 ctx.prompt。参考答案不会传给 workflow;它会作为 ctx.label 暴露给 grader。AgentWorkflowContextGraderContext 上的 metadata 对象相同,因此 workflow 和 grader 可以读取同一份按行的上下文。
不要把 task answers 放进 AgentWorkflow.run()。Workflow 应该产出行为;Grader 应该决定该行为是否值得 reward。

模型路由要求

run() 内的 LLM 调用必须通过执行后端安装的 RolloutContext 路由。训练集群依赖这个 context 服务当前策略、附加 x-sample-id / x-rollout-id headers、收集 traces,并把 rewards 连接到 samples。
请使用以下受支持 integrations 之一:
FrameworkUseIntegration objects
Strands AgentsStrands tools、Strands message history、从 strands.Agent 迁移OsmosisStrandsAgentOsmosisRolloutModel
OpenAI Agents SDKRunner.run、sessions、handoffs、OpenAI-style toolsOsmosisAgentOsmosisRolloutModelOsmosisMemorySession
不要在 run() 中用硬编码 policy model 直接调用 litellm、OpenAI SDK 或其他 provider SDK。直接调用会绕过 rollout context,无法用于训练。

Strands 模式

对于 Strands,直接把 ctx.prompt 作为 messages 传入,并调用 invoke_async()
from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext
from osmosis_ai.rollout.integrations.agents.strands import (
    OsmosisRolloutModel,
    OsmosisStrandsAgent,
)


class SimpleStrandsWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        agent = OsmosisStrandsAgent(
            name="simple-strands-agent",
            model=OsmosisRolloutModel(params={"temperature": 1.0}),
            messages=ctx.prompt,
            callback_handler=None,
        )
        await agent.invoke_async()
run() 内构造 OsmosisStrandsAgent 会将其绑定到活跃的 rollout context,并把 agent 注册为 sample source。 工具 examples、迁移步骤和 OsmosisRolloutModel 细节请参见 Strands 集成

OpenAI Agents 模式

对于 OpenAI Agents,请构造 OsmosisAgent,附加 OpenAI Agents ModelSettings,创建一个 OsmosisMemorySession,并把该 session 传给 Runner.run()
from agents import ModelSettings, Runner
from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext
from osmosis_ai.rollout.integrations.agents.openai_agents import (
    OsmosisAgent,
    OsmosisMemorySession,
    OsmosisRolloutModel,
)


class SimpleOpenAIWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        agent = OsmosisAgent(
            name="simple-openai-agent",
            instructions="Answer the user's request clearly.",
            model=OsmosisRolloutModel(),
            model_settings=ModelSettings(temperature=1.0, max_tokens=4096),
        )
        session = OsmosisMemorySession(name="simple-openai-agent")
        await Runner.run(
            agent,
            ctx.prompt,
            session=session,
        )
Session 会记录 OpenAI Agents SDK conversation 以供 grader 使用。请在 run() 内创建它,使其注册到当前 RolloutContext Session 行为、tracing 说明和迁移步骤请参见 OpenAI Agents 集成

自定义配置

自定义 configs 继承 AgentWorkflowConfig。请在 rollout entrypoint 中定义 module-level config instance,并把它传给 backend:
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowConfig,
    AgentWorkflowContext,
    ConcurrencyConfig,
)


class SearchWorkflowConfig(AgentWorkflowConfig):
    name: str = "search-workflow"
    max_iterations: int = 8
    temperature: float = 1.0
    concurrency: ConcurrencyConfig = ConcurrencyConfig(max_concurrent=4)


class SearchWorkflow(AgentWorkflow[SearchWorkflowConfig]):
    async def run(self, ctx: AgentWorkflowContext[SearchWorkflowConfig]) -> None:
        config = ctx.config or SearchWorkflowConfig()
        max_iterations = config.max_iterations
        temperature = config.temperature
        # Use these values when constructing your agent.


search_workflow_config = SearchWorkflowConfig()
osmosis train submit preflight 会从 entrypoint module 中自动发现至多一个 module-level AgentWorkflowConfig instance。Eval 和 training TOML 文件目前不会直接设置 workflow config fields。 BaseConfig 允许额外字段,因此简单 rollout configs 通常不需要额外的 Pydantic 样板代码。
FieldTypeDefaultDescription
namestrrequiredworkflow 标识符
descriptionstr | NoneNone可选描述
concurrencyConcurrencyConfigunlimited最大并发 workflow executions

使用工具的 Workflows

工具使用属于 agent framework,而不是 backend。请按您的 framework 期望的方式定义 tools,把它们传给 Osmosis-wrapped agent,并让 integration 记录生成的 messages。 例如,Strands workflow 可以把 tool list 放在 config 中:
from typing import Any

from strands import tool
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowConfig,
    AgentWorkflowContext,
)
from osmosis_ai.rollout.integrations.agents.strands import (
    OsmosisRolloutModel,
    OsmosisStrandsAgent,
)


@tool(name="search")
def search_tool(query: str) -> str:
    """Search for information."""
    return f"results for {query}"


class ToolWorkflowConfig(AgentWorkflowConfig):
    name: str = "tool-workflow"
    model: Any = OsmosisRolloutModel(params={"temperature": 1.0})
    tools: Any = [search_tool]
    max_iterations: int = 8


tool_workflow_config = ToolWorkflowConfig()


class ToolWorkflow(AgentWorkflow[ToolWorkflowConfig]):
    async def run(self, ctx: AgentWorkflowContext[ToolWorkflowConfig]) -> None:
        config = ctx.config or ToolWorkflowConfig()
        agent = OsmosisStrandsAgent(
            name="search-agent",
            model=config.model,
            tools=config.tools,
            messages=ctx.prompt,
            callback_handler=None,
        )

        for _ in range(config.max_iterations):
            result = await agent.invoke_async()
            content = result.message.get("content", [])
            if not any("toolUse" in block for block in content):
                break

自动发现

osmosis train submitosmosis eval submit 都会扫描 rollout entrypoint module,查找具体的 AgentWorkflow subclasses。无需 decorators 或 registration functions,但 entrypoint 仍需要构造 backend,并通过 create_rollout_server() 暴露,以便 rollout server 在平台上运行。
对于训练 preflight,您的 entrypoint 文件必须恰好包含一个具体 AgentWorkflow subclass。如果 SDK 找到零个或多个,osmosis train submit 会在 discovery 阶段失败。
Helpers、base classes、tools 和 configs 可以自由使用,但请确保 entrypoint 中只有您想运行的 workflow class 是具体的 AgentWorkflow

下一步

Strands 集成

使用 tools 和 OsmosisStrandsAgent 构建基于 Strands 的 rollout。

OpenAI Agents 集成

使用 OsmosisAgentOsmosisMemorySession 构建 OpenAI Agents SDK rollout。

构建 Graders

为 workflow 生成的 samples 定义 reward 逻辑。

评估

在训练前提交 evaluation run 测试 workflow 和 grader。