AgentWorkflow 是 rollout 行为的 SDK contract。您需要继承它、实现一个 async run() 方法,并通过 Osmosis 支持的 agent integration 调用当前策略来创建 samples。
Workflow 应该回答一个问题:给定这个数据集 prompt,agent 在 grader 打分之前应该做什么?
from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext
class MyWorkflow(AgentWorkflow):
async def run(self, ctx: AgentWorkflowContext) -> None:
# Build and run your agent here.
pass
SDK 形状如下:
class AgentWorkflow(Generic[TConfig], ABC):
def __init__(self, config: TConfig | None = None):
self.config = config
@abstractmethod
async def run(self, ctx: AgentWorkflowContext[TConfig]) -> Any:
raise NotImplementedError
run() 会为每次 workflow execution 调用一次。它应该在方法内部构造任何 per-execution agent/session objects,运行 agent,并让 integration 把生成的 conversation 注册到活跃的 RolloutContext。
AgentWorkflowContext
ctx object 为 workflow 提供输入和 config:
| Field | Type | Description |
|---|
ctx.prompt | list[dict[str, Any]] | 当前数据集行的输入 messages |
ctx.config | TConfig | None | 如果提供了,则为自定义 workflow config object |
ctx.metadata | dict[str, Any] | None | 来自数据集可选 metadata 列的每行 metadata。该行无 metadata 时为 None。 |
如果您的数据集行包含 system_prompt、user_prompt 和 ground_truth,prompt 字段会被组装进 ctx.prompt。参考答案不会传给 workflow;它会作为 ctx.label 暴露给 grader。AgentWorkflowContext 和 GraderContext 上的 metadata 对象相同,因此 workflow 和 grader 可以读取同一份按行的上下文。
不要把 task answers 放进 AgentWorkflow.run()。Workflow 应该产出行为;Grader 应该决定该行为是否值得 reward。
模型路由要求
run() 内的 LLM 调用必须通过执行后端安装的 RolloutContext 路由。训练集群依赖这个 context 服务当前策略、附加 x-sample-id / x-rollout-id headers、收集 traces,并把 rewards 连接到 samples。
请使用以下受支持 integrations 之一:
| Framework | Use | Integration objects |
|---|
| Strands Agents | Strands tools、Strands message history、从 strands.Agent 迁移 | OsmosisStrandsAgent、OsmosisRolloutModel |
| OpenAI Agents SDK | Runner.run、sessions、handoffs、OpenAI-style tools | OsmosisAgent、OsmosisRolloutModel、OsmosisMemorySession |
不要在 run() 中用硬编码 policy model 直接调用 litellm、OpenAI SDK 或其他 provider SDK。直接调用会绕过 rollout context,无法用于训练。
Strands 模式
对于 Strands,直接把 ctx.prompt 作为 messages 传入,并调用 invoke_async():
from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext
from osmosis_ai.rollout.integrations.agents.strands import (
OsmosisRolloutModel,
OsmosisStrandsAgent,
)
class SimpleStrandsWorkflow(AgentWorkflow):
async def run(self, ctx: AgentWorkflowContext) -> None:
agent = OsmosisStrandsAgent(
name="simple-strands-agent",
model=OsmosisRolloutModel(params={"temperature": 1.0}),
messages=ctx.prompt,
callback_handler=None,
)
await agent.invoke_async()
在 run() 内构造 OsmosisStrandsAgent 会将其绑定到活跃的 rollout context,并把 agent 注册为 sample source。
工具 examples、迁移步骤和 OsmosisRolloutModel 细节请参见 Strands 集成。
OpenAI Agents 模式
对于 OpenAI Agents,请构造 OsmosisAgent,附加 OpenAI Agents ModelSettings,创建一个 OsmosisMemorySession,并把该 session 传给 Runner.run():
from agents import ModelSettings, Runner
from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext
from osmosis_ai.rollout.integrations.agents.openai_agents import (
OsmosisAgent,
OsmosisMemorySession,
OsmosisRolloutModel,
)
class SimpleOpenAIWorkflow(AgentWorkflow):
async def run(self, ctx: AgentWorkflowContext) -> None:
agent = OsmosisAgent(
name="simple-openai-agent",
instructions="Answer the user's request clearly.",
model=OsmosisRolloutModel(),
model_settings=ModelSettings(temperature=1.0, max_tokens=4096),
)
session = OsmosisMemorySession(name="simple-openai-agent")
await Runner.run(
agent,
ctx.prompt,
session=session,
)
Session 会记录 OpenAI Agents SDK conversation 以供 grader 使用。请在 run() 内创建它,使其注册到当前 RolloutContext。
Session 行为、tracing 说明和迁移步骤请参见 OpenAI Agents 集成。
自定义配置
自定义 configs 继承 AgentWorkflowConfig。请在 rollout entrypoint 中定义 module-level config instance,并把它传给 backend:
from osmosis_ai.rollout import (
AgentWorkflow,
AgentWorkflowConfig,
AgentWorkflowContext,
ConcurrencyConfig,
)
class SearchWorkflowConfig(AgentWorkflowConfig):
name: str = "search-workflow"
max_iterations: int = 8
temperature: float = 1.0
concurrency: ConcurrencyConfig = ConcurrencyConfig(max_concurrent=4)
class SearchWorkflow(AgentWorkflow[SearchWorkflowConfig]):
async def run(self, ctx: AgentWorkflowContext[SearchWorkflowConfig]) -> None:
config = ctx.config or SearchWorkflowConfig()
max_iterations = config.max_iterations
temperature = config.temperature
# Use these values when constructing your agent.
search_workflow_config = SearchWorkflowConfig()
osmosis train submit preflight 会从 entrypoint module 中自动发现至多一个 module-level AgentWorkflowConfig instance。Eval 和 training TOML 文件目前不会直接设置 workflow config fields。
BaseConfig 允许额外字段,因此简单 rollout configs 通常不需要额外的 Pydantic 样板代码。
| Field | Type | Default | Description |
|---|
name | str | required | workflow 标识符 |
description | str | None | None | 可选描述 |
concurrency | ConcurrencyConfig | unlimited | 最大并发 workflow executions |
使用工具的 Workflows
工具使用属于 agent framework,而不是 backend。请按您的 framework 期望的方式定义 tools,把它们传给 Osmosis-wrapped agent,并让 integration 记录生成的 messages。
例如,Strands workflow 可以把 tool list 放在 config 中:
from typing import Any
from strands import tool
from osmosis_ai.rollout import (
AgentWorkflow,
AgentWorkflowConfig,
AgentWorkflowContext,
)
from osmosis_ai.rollout.integrations.agents.strands import (
OsmosisRolloutModel,
OsmosisStrandsAgent,
)
@tool(name="search")
def search_tool(query: str) -> str:
"""Search for information."""
return f"results for {query}"
class ToolWorkflowConfig(AgentWorkflowConfig):
name: str = "tool-workflow"
model: Any = OsmosisRolloutModel(params={"temperature": 1.0})
tools: Any = [search_tool]
max_iterations: int = 8
tool_workflow_config = ToolWorkflowConfig()
class ToolWorkflow(AgentWorkflow[ToolWorkflowConfig]):
async def run(self, ctx: AgentWorkflowContext[ToolWorkflowConfig]) -> None:
config = ctx.config or ToolWorkflowConfig()
agent = OsmosisStrandsAgent(
name="search-agent",
model=config.model,
tools=config.tools,
messages=ctx.prompt,
callback_handler=None,
)
for _ in range(config.max_iterations):
result = await agent.invoke_async()
content = result.message.get("content", [])
if not any("toolUse" in block for block in content):
break
自动发现
osmosis train submit 和 osmosis eval submit 都会扫描 rollout entrypoint module,查找具体的 AgentWorkflow subclasses。无需 decorators 或 registration functions,但 entrypoint 仍需要构造 backend,并通过 create_rollout_server() 暴露,以便 rollout server 在平台上运行。
对于训练 preflight,您的 entrypoint 文件必须恰好包含一个具体 AgentWorkflow subclass。如果 SDK 找到零个或多个,osmosis train submit 会在 discovery 阶段失败。
Helpers、base classes、tools 和 configs 可以自由使用,但请确保 entrypoint 中只有您想运行的 workflow class 是具体的 AgentWorkflow。
下一步
Strands 集成
使用 tools 和 OsmosisStrandsAgent 构建基于 Strands 的 rollout。
OpenAI Agents 集成
使用 OsmosisAgent 和 OsmosisMemorySession 构建 OpenAI Agents SDK rollout。
构建 Graders
为 workflow 生成的 samples 定义 reward 逻辑。
评估
在训练前提交 evaluation run 测试 workflow 和 grader。