跳转到主要内容
RolloutAgentLoop 基类定义了 Agent 与 Osmosis 训练集群之间的接口约定。本指南详细介绍该接口、必需方法以及生产级实现的最佳模式。

必需与可选功能

在深入了解之前,以下是必需和可选功能的快速概览:

必需

功能描述
name 属性Agent 的唯一标识符
get_tools(request) 方法/v1/rollout/init 被调用时返回工具列表。训练集群需要此信息来了解有哪些可用工具。
run(ctx) 方法执行您的 Agent 循环逻辑

可选

功能描述
ctx.log_event()调试日志 - 仅在启用 --log 标志时写入
ctx.record_tool_call()用于分析的指标跟踪
get_last_assistant_content()用于日志/奖励的辅助函数 - 非 SDK 组成部分
compute_reward_from_messages()仅在平台配置为在 Remote Rollout 中计算奖励时才需要

RolloutAgentLoop 基类

每个 Agent 必须继承 RolloutAgentLoop 并实现两个必需的方法:
from osmosis_ai.rollout import (
    RolloutAgentLoop,
    RolloutContext,
    RolloutResult,
    RolloutRequest,
)

class MyAgent(RolloutAgentLoop):
    name = "my_agent"  # Required: unique identifier

    def get_tools(self, request: RolloutRequest) -> list:
        """Return tools available for this rollout."""
        pass

    async def run(self, ctx: RolloutContext) -> RolloutResult:
        """Execute the agent loop."""
        pass

必需属性

属性类型描述
namestringAgent 的唯一标识符

必需方法

方法描述
get_tools(request)以 OpenAI function 格式返回工具列表。在收到 /v1/rollout/init 时被调用,返回的工具会包含在发送给训练集群的响应中。
run(ctx)执行 Agent 循环并返回结果
当训练集群向 /v1/rollout/init 发送请求时,SDK 会自动调用您的 get_tools() 方法,并在 InitResponse 中返回工具列表。这告知训练集群此次 Rollout 有哪些可用工具。

RolloutContext

ctx 参数提供运行 Agent 所需的一切:

属性

属性类型描述
ctx.requestRolloutRequest包含消息和参数的原始请求
ctx.toolslistget_tools() 返回的工具

方法

方法描述
ctx.chat(messages, **kwargs)调用 LLM
ctx.complete(messages, finish_reason, reward)返回成功结果
ctx.error(message)返回错误结果
ctx.record_tool_call(latency_ms)跟踪工具执行指标
ctx.log_event(event_name, **data)记录调试事件(启用日志时)

RolloutRequest

请求包含来自训练集群的所有信息:
字段类型描述
messageslist初始对话消息
max_turnsint允许的最大 Agent 轮次
max_tokens_totalint整个 Rollout 的 Token 限制
completion_paramsdictLLM 参数(temperature 等)
metadatadict自定义元数据(如 ground_truth)

实现工具

工具 Schema 格式

工具使用 OpenAI 的 function calling 格式:
from osmosis_ai.rollout.core.schemas import (
    OpenAIFunctionToolSchema,
    OpenAIFunctionSchema,
    OpenAIFunctionParametersSchema,
    OpenAIFunctionPropertySchema,
)

TOOLS = [
    OpenAIFunctionToolSchema(
        type="function",
        function=OpenAIFunctionSchema(
            name="search",
            description="Search for information",
            parameters=OpenAIFunctionParametersSchema(
                type="object",
                properties={
                    "query": OpenAIFunctionPropertySchema(
                        type="string",
                        description="Search query"
                    )
                },
                required=["query"]
            )
        )
    )
]
或使用普通字典:
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "search",
            "description": "Search for information",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Search query"}
                },
                "required": ["query"]
            }
        }
    }
]

工具执行辅助函数

SDK 提供了用于执行工具的实用工具:
from osmosis_ai.rollout.tools import (
    get_tool_call_info,
    create_tool_result,
    create_tool_error_result,
    execute_tool_calls,
    serialize_tool_result,
)

async def execute_tool(tool_call):
    """Execute a single tool call."""
    try:
        tool_call_id, function_name, arguments = get_tool_call_info(tool_call)
    except ToolArgumentError as e:
        return create_tool_error_result(e.tool_call_id, str(e))

    if function_name == "search":
        result = await do_search(arguments["query"])
        return create_tool_result(tool_call_id, serialize_tool_result(result))
    else:
        return create_tool_error_result(tool_call_id, f"Unknown tool: {function_name}")

并行工具执行

并发执行多个工具:
from osmosis_ai.rollout.tools import execute_tool_calls

async def execute_my_tool(tool_call):
    # Your tool execution logic
    pass

# Execute all tool calls in parallel
results = await execute_tool_calls(tool_calls, execute_my_tool)

完整 Agent 示例

以下是一个包含多个工具的完整 Agent 实现。请注意必需可选功能的区别:
import time
from typing import List, Dict, Any, Optional

from osmosis_ai.rollout import (
    RolloutAgentLoop,
    RolloutContext,
    RolloutResult,
    RolloutRequest,
    create_app,
)
from osmosis_ai.rollout.tools import (
    get_tool_call_info,
    create_tool_result,
    create_tool_error_result,
    execute_tool_calls,
)

# Tool implementations
async def add(a: float, b: float) -> float:
    return a + b

async def multiply(a: float, b: float) -> float:
    return round(a * b, 4)

TOOL_REGISTRY = {
    "add": add,
    "multiply": multiply,
}

# Tool schemas (returned via get_tools)
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "add",
            "description": "Add two numbers",
            "parameters": {
                "type": "object",
                "properties": {
                    "a": {"type": "number"},
                    "b": {"type": "number"}
                },
                "required": ["a", "b"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "multiply",
            "description": "Multiply two numbers",
            "parameters": {
                "type": "object",
                "properties": {
                    "a": {"type": "number"},
                    "b": {"type": "number"}
                },
                "required": ["a", "b"]
            }
        }
    }
]


async def execute_tool(tool_call: Dict[str, Any]) -> Dict[str, str]:
    """Execute a single tool call."""
    try:
        tool_call_id, name, args = get_tool_call_info(tool_call)
    except Exception as e:
        return create_tool_error_result(tool_call.get("id", "unknown"), str(e))

    tool_fn = TOOL_REGISTRY.get(name)
    if not tool_fn:
        return create_tool_error_result(tool_call_id, f"Unknown tool: {name}")

    try:
        result = await tool_fn(**args)
        return create_tool_result(tool_call_id, str(result))
    except Exception as e:
        return create_tool_error_result(tool_call_id, str(e))


class CalculatorAgent(RolloutAgentLoop):
    """Calculator agent with add and multiply tools."""

    name = "calculator"  # REQUIRED: unique agent identifier

    # REQUIRED: Called when /v1/rollout/init is received
    # The returned tools are sent back to the training cluster
    def get_tools(self, request: RolloutRequest) -> list:
        return TOOLS

    # REQUIRED: Main agent loop logic
    async def run(self, ctx: RolloutContext) -> RolloutResult:
        messages = list(ctx.request.messages)
        finish_reason = "stop"
        total_tokens = 0

        for turn in range(ctx.request.max_turns):
            # OPTIONAL: Debug logging (no-op if logging not enabled)
            ctx.log_event("pre_llm", turn=turn, num_messages=len(messages))

            # Call LLM
            result = await ctx.chat(messages, **ctx.request.completion_params)
            messages.append(result.message)

            # Track tokens
            usage = result.usage or {}
            total_tokens += usage.get("completion_tokens", 0)

            if total_tokens >= ctx.request.max_tokens_total:
                finish_reason = "max_tokens"
                break

            if not result.has_tool_calls:
                finish_reason = result.finish_reason or "stop"
                break

            # Execute tools
            tool_start = time.monotonic()
            tool_results = await execute_tool_calls(result.tool_calls, execute_tool)
            latency_ms = (time.monotonic() - tool_start) * 1000

            # OPTIONAL: Record metrics for analytics
            for _ in result.tool_calls:
                ctx.record_tool_call(latency_ms=latency_ms / len(result.tool_calls))

            messages.extend(tool_results)

            # OPTIONAL: Debug logging
            ctx.log_event("tools_executed",
                         turn=turn,
                         num_tools=len(result.tool_calls))
        else:
            finish_reason = "max_turns"

        # CONDITIONAL: Compute reward if configured on platform
        # See "Handling Rewards" section for details
        reward = self._compute_reward(messages, ctx.request.metadata)

        return ctx.complete(messages, finish_reason=finish_reason, reward=reward)

    def _compute_reward(self, messages: list, metadata: dict) -> Optional[float]:
        """Compute reward from final response.

        IMPORTANT: This is only required if the platform is configured to
        compute rewards in the remote rollout server. If reward computation
        is handled elsewhere (e.g., on the platform side), you can return None.
        """
        ground_truth = metadata.get("ground_truth")
        if not ground_truth:
            return None

        # Get last assistant message
        for msg in reversed(messages):
            if msg.get("role") == "assistant":
                content = msg.get("content", "")
                if ground_truth in content:
                    return 1.0
                return 0.0
        return 0.0


# Export for CLI
agent_loop = CalculatorAgent()
app = create_app(agent_loop)

服务器配置

create_app() 参数

create_app() 函数接受多个可选参数,用于微调服务器行为:
from osmosis_ai.rollout import create_app, RolloutSettings

app = create_app(
    agent_loop,
    max_concurrent=10,              # Max concurrent rollouts (default: unlimited)
    record_ttl_seconds=3600.0,      # How long to keep rollout records (default: 1 hour)
    settings=RolloutSettings(...),  # Custom settings (see below)
    debug_dir="./logs",             # Directory for debug logs
    on_startup=async_startup_fn,    # Async function called on server start
    on_shutdown=async_shutdown_fn,  # Async function called on server stop
)
参数类型默认值描述
agent_loopRolloutAgentLoop必需您的 Agent 循环实现
max_concurrentint | NoneNone最大并发 Rollout 数(None = 无限制)
record_ttl_secondsfloat | None3600.0Rollout 记录的 TTL(秒)
settingsRolloutSettings | NoneNone自定义配置设置
debug_dirstr | NoneNone调试日志目录
on_startupCallable[[], Awaitable[None]] | NoneNone异步启动回调
on_shutdownCallable[[], Awaitable[None]] | NoneNone异步关闭回调
credentialsWorkspaceCredentials | NoneNone平台凭据(从 osmosis login 自动加载)
server_hoststr | NoneNone用于平台注册的主机地址
server_portint | NoneNone用于平台注册的端口
api_keystr | NoneNone用于 TrainGate 认证的 API 密钥(省略则自动生成)

RolloutSettings 配置

对于高级配置,使用 RolloutSettingsRolloutClientSettings
from osmosis_ai.rollout.config import RolloutSettings, RolloutClientSettings

settings = RolloutSettings(
    client=RolloutClientSettings(
        timeout_seconds=300.0,          # HTTP client timeout
        max_retries=3,                  # Max retry attempts
        complete_rollout_retries=2,     # Retries for /completed endpoint
        retry_base_delay=1.0,           # Initial retry delay (seconds)
        retry_max_delay=30.0,           # Maximum retry delay (seconds)
        max_connections=100,            # HTTP connection pool size
        max_keepalive_connections=20,   # Keep-alive connections
    )
)

app = create_app(agent_loop, settings=settings)

环境变量

所有客户端设置都可以通过环境变量进行配置:
环境变量描述默认值
OSMOSIS_ROLLOUT_CLIENT_TIMEOUT_SECONDSHTTP 超时时间300.0
OSMOSIS_ROLLOUT_CLIENT_MAX_RETRIES最大重试次数3
OSMOSIS_ROLLOUT_CLIENT_COMPLETE_ROLLOUT_RETRIES/completed 端点重试次数2
OSMOSIS_ROLLOUT_CLIENT_RETRY_BASE_DELAY初始重试延迟1.0
OSMOSIS_ROLLOUT_CLIENT_RETRY_MAX_DELAY最大重试延迟30.0
OSMOSIS_ROLLOUT_CLIENT_MAX_CONNECTIONS连接池大小100
OSMOSIS_ROLLOUT_CLIENT_MAX_KEEPALIVE_CONNECTIONSKeep-alive 连接数20
示例:
export OSMOSIS_ROLLOUT_CLIENT_TIMEOUT_SECONDS=120
export OSMOSIS_ROLLOUT_CLIENT_MAX_RETRIES=5
osmosis serve -m server:agent_loop
get_last_assistant_content 辅助函数(在某些示例中出现)是完全可选的 - 它仅对日志记录或奖励计算有用,SDK 并不要求使用它。

处理奖励

Remote Rollout 中的奖励计算是有条件的 - 是否需要返回奖励取决于您的平台配置:
平台配置奖励要求
奖励在平台侧计算返回 None - 无需奖励
奖励在 Remote Rollout 中计算必须返回一个 float

需要奖励的情况

如果您的平台配置为在 Remote Rollout 服务器中计算奖励,您的 run() 方法必须通过 ctx.complete() 返回奖励值:
async def run(self, ctx: RolloutContext) -> RolloutResult:
    # ... agent loop logic ...

    # Compute reward (required if platform expects it)
    reward = compute_reward_from_messages(messages, ctx.request.metadata)

    # reward must be a float, not None
    return ctx.complete(messages, finish_reason=finish_reason, reward=reward)

奖励可选的情况

如果奖励计算在其他地方处理(例如平台侧),您可以简单地返回 None
async def run(self, ctx: RolloutContext) -> RolloutResult:
    # ... agent loop logic ...

    # No reward computation needed
    return ctx.complete(messages, finish_reason=finish_reason, reward=None)

辅助函数(可选)

以下辅助函数是完全可选的 - 它们对日志记录和奖励计算有用,但 SDK 并不要求使用:
from typing import List, Dict, Any, Optional

def get_last_assistant_content(messages: List[Dict[str, Any]]) -> Optional[str]:
    """Get the content of the last assistant message.

    NOTE: This is a helper function for logging/debugging purposes only.
    It is NOT required by the SDK.
    """
    for message in reversed(messages):
        if message.get("role") == "assistant":
            return message.get("content", "")
    return None


def compute_reward_from_messages(
    messages: List[Dict[str, Any]],
    ground_truth: Optional[str]
) -> Optional[float]:
    """Compute reward from messages if ground_truth is available.

    NOTE: This function is only needed if the platform is configured to
    compute rewards in the remote rollout server. If not, you can skip
    this entirely and return None for reward.
    """
    if not ground_truth:
        return None

    solution_str = get_last_assistant_content(messages)
    if not solution_str:
        return 0.0

    # Your reward logic here
    return compute_reward(solution_str, ground_truth)

提取解答

提取答案的常见模式:
import re

def extract_answer(text: str) -> str | None:
    """Extract answer after #### marker."""
    match = re.search(r"####\s*(.+)", text)
    return match.group(1).strip() if match else None

调试日志

使用 ctx.log_event() 来跟踪执行过程:
async def run(self, ctx: RolloutContext) -> RolloutResult:
    ctx.log_event("agent_start",
                  num_messages=len(ctx.request.messages))

    # ... agent logic ...

    ctx.log_event("llm_response",
                  has_tools=result.has_tool_calls,
                  tokens=result.usage.get("total_tokens"))

    ctx.log_event("agent_complete",
                  finish_reason=finish_reason,
                  reward=reward)
使用以下命令启用日志:
osmosis serve -m server:agent_loop --log ./logs
输出结构:
logs/
├── 1703270400/                    # Timestamp
│   ├── rollout-abc123.jsonl       # Per-rollout traces
│   └── rollout-def456.jsonl

错误处理

优雅地返回错误:
async def run(self, ctx: RolloutContext) -> RolloutResult:
    try:
        # Agent logic
        return ctx.complete(messages)
    except Exception as e:
        return ctx.error(f"Agent failed: {e}")

最佳实践

在部署之前始终运行 osmosis validate,以尽早发现问题。
检查 ctx.request.max_tokens_total 并在超出时提前中断。
使用 ctx.record_tool_call() 来跟踪工具执行情况以进行分析。
使用 osmosis test 配合 --interactive 来调试 Agent 行为。
在关键点使用 ctx.log_event()(LLM 调用前、工具执行后、完成时)。

下一步