Skip to main content
This guide covers the RolloutAgentLoop base class and best practices for implementing custom agents.

Required vs Optional Features

Before diving in, here’s a quick summary of what’s required and what’s optional:

Required

FeatureDescription
name attributeUnique identifier for your agent
get_tools(request) methodReturns tools list when /v1/rollout/init is called. The training cluster needs this to know what tools are available.
run(ctx) methodExecutes your agent loop logic

Optional

FeatureDescription
ctx.log_event()Debug logging - only writes if --log flag is enabled
ctx.record_tool_call()Metrics tracking for analytics
get_last_assistant_content()Helper function for logs/reward - not part of SDK
compute_reward_from_messages()Only required if platform is configured to compute reward in remote rollout

RolloutAgentLoop Base Class

Every agent must inherit from RolloutAgentLoop and implement two required methods:
from osmosis_ai.rollout import (
    RolloutAgentLoop,
    RolloutContext,
    RolloutResult,
    RolloutRequest,
)

class MyAgent(RolloutAgentLoop):
    name = "my_agent"  # Required: unique identifier

    def get_tools(self, request: RolloutRequest) -> list:
        """Return tools available for this rollout."""
        pass

    async def run(self, ctx: RolloutContext) -> RolloutResult:
        """Execute the agent loop."""
        pass

Required Attributes

AttributeTypeDescription
namestringUnique identifier for your agent

Required Methods

MethodDescription
get_tools(request)Return list of tools in OpenAI function format. Called when /v1/rollout/init is received, the returned tools are included in the response to the training cluster.
run(ctx)Execute the agent loop and return result
When the training cluster sends a request to /v1/rollout/init, the SDK automatically calls your get_tools() method and returns the tools list in the InitResponse. This tells the training cluster what tools are available for this rollout.

RolloutContext

The ctx parameter provides everything needed to run your agent:

Properties

PropertyTypeDescription
ctx.requestRolloutRequestOriginal request with messages and params
ctx.toolslistTools returned by get_tools()

Methods

MethodDescription
ctx.chat(messages, **kwargs)Call the LLM
ctx.complete(messages, finish_reason, reward)Return successful result
ctx.error(message)Return error result
ctx.record_tool_call(latency_ms)Track tool execution metrics
ctx.log_event(event_name, **data)Log debug events (when logging enabled)

RolloutRequest

The request contains everything from the training cluster:
FieldTypeDescription
messageslistInitial conversation messages
max_turnsintMaximum agent turns allowed
max_tokens_totalintToken limit for entire rollout
completion_paramsdictLLM parameters (temperature, etc.)
metadatadictCustom metadata (e.g., ground_truth)

Implementing Tools

Tool Schema Format

Tools use OpenAI’s function calling format:
from osmosis_ai.rollout.core.schemas import (
    OpenAIFunctionToolSchema,
    OpenAIFunctionSchema,
    OpenAIFunctionParametersSchema,
    OpenAIFunctionPropertySchema,
)

TOOLS = [
    OpenAIFunctionToolSchema(
        type="function",
        function=OpenAIFunctionSchema(
            name="search",
            description="Search for information",
            parameters=OpenAIFunctionParametersSchema(
                type="object",
                properties={
                    "query": OpenAIFunctionPropertySchema(
                        type="string",
                        description="Search query"
                    )
                },
                required=["query"]
            )
        )
    )
]
Or use plain dictionaries:
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "search",
            "description": "Search for information",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Search query"}
                },
                "required": ["query"]
            }
        }
    }
]

Tool Execution Helpers

The SDK provides utilities for executing tools:
from osmosis_ai.rollout.tools import (
    get_tool_call_info,
    create_tool_result,
    create_tool_error_result,
    execute_tool_calls,
    serialize_tool_result,
)

async def execute_tool(tool_call):
    """Execute a single tool call."""
    try:
        tool_call_id, function_name, arguments = get_tool_call_info(tool_call)
    except ToolArgumentError as e:
        return create_tool_error_result(e.tool_call_id, str(e))

    if function_name == "search":
        result = await do_search(arguments["query"])
        return create_tool_result(tool_call_id, serialize_tool_result(result))
    else:
        return create_tool_error_result(tool_call_id, f"Unknown tool: {function_name}")

Parallel Tool Execution

Execute multiple tools concurrently:
from osmosis_ai.rollout.tools import execute_tool_calls

async def execute_my_tool(tool_call):
    # Your tool execution logic
    pass

# Execute all tool calls in parallel
results = await execute_tool_calls(tool_calls, execute_my_tool)

Complete Agent Example

Here’s a full agent implementation with multiple tools. Note the distinction between required and optional features:
import time
from typing import List, Dict, Any, Optional

from osmosis_ai.rollout import (
    RolloutAgentLoop,
    RolloutContext,
    RolloutResult,
    RolloutRequest,
    create_app,
)
from osmosis_ai.rollout.tools import (
    get_tool_call_info,
    create_tool_result,
    create_tool_error_result,
    execute_tool_calls,
)

# Tool implementations
async def add(a: float, b: float) -> float:
    return a + b

async def multiply(a: float, b: float) -> float:
    return round(a * b, 4)

TOOL_REGISTRY = {
    "add": add,
    "multiply": multiply,
}

# Tool schemas (returned via get_tools)
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "add",
            "description": "Add two numbers",
            "parameters": {
                "type": "object",
                "properties": {
                    "a": {"type": "number"},
                    "b": {"type": "number"}
                },
                "required": ["a", "b"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "multiply",
            "description": "Multiply two numbers",
            "parameters": {
                "type": "object",
                "properties": {
                    "a": {"type": "number"},
                    "b": {"type": "number"}
                },
                "required": ["a", "b"]
            }
        }
    }
]


async def execute_tool(tool_call: Dict[str, Any]) -> Dict[str, str]:
    """Execute a single tool call."""
    try:
        tool_call_id, name, args = get_tool_call_info(tool_call)
    except Exception as e:
        return create_tool_error_result(tool_call.get("id", "unknown"), str(e))

    tool_fn = TOOL_REGISTRY.get(name)
    if not tool_fn:
        return create_tool_error_result(tool_call_id, f"Unknown tool: {name}")

    try:
        result = await tool_fn(**args)
        return create_tool_result(tool_call_id, str(result))
    except Exception as e:
        return create_tool_error_result(tool_call_id, str(e))


class CalculatorAgent(RolloutAgentLoop):
    """Calculator agent with add and multiply tools."""

    name = "calculator"  # REQUIRED: unique agent identifier

    # REQUIRED: Called when /v1/rollout/init is received
    # The returned tools are sent back to the training cluster
    def get_tools(self, request: RolloutRequest) -> list:
        return TOOLS

    # REQUIRED: Main agent loop logic
    async def run(self, ctx: RolloutContext) -> RolloutResult:
        messages = list(ctx.request.messages)
        finish_reason = "stop"
        total_tokens = 0

        for turn in range(ctx.request.max_turns):
            # OPTIONAL: Debug logging (no-op if logging not enabled)
            ctx.log_event("pre_llm", turn=turn, num_messages=len(messages))

            # Call LLM
            result = await ctx.chat(messages, **ctx.request.completion_params)
            messages.append(result.message)

            # Track tokens
            usage = result.usage or {}
            total_tokens += usage.get("completion_tokens", 0)

            if total_tokens >= ctx.request.max_tokens_total:
                finish_reason = "max_tokens"
                break

            if not result.has_tool_calls:
                finish_reason = result.finish_reason or "stop"
                break

            # Execute tools
            tool_start = time.monotonic()
            tool_results = await execute_tool_calls(result.tool_calls, execute_tool)
            latency_ms = (time.monotonic() - tool_start) * 1000

            # OPTIONAL: Record metrics for analytics
            for _ in result.tool_calls:
                ctx.record_tool_call(latency_ms=latency_ms / len(result.tool_calls))

            messages.extend(tool_results)

            # OPTIONAL: Debug logging
            ctx.log_event("tools_executed", 
                         turn=turn, 
                         num_tools=len(result.tool_calls))
        else:
            finish_reason = "max_turns"

        # CONDITIONAL: Compute reward if configured on platform
        # See "Handling Rewards" section for details
        reward = self._compute_reward(messages, ctx.request.metadata)

        return ctx.complete(messages, finish_reason=finish_reason, reward=reward)

    def _compute_reward(self, messages: list, metadata: dict) -> Optional[float]:
        """Compute reward from final response.
        
        IMPORTANT: This is only required if the platform is configured to 
        compute rewards in the remote rollout server. If reward computation 
        is handled elsewhere (e.g., on the platform side), you can return None.
        """
        ground_truth = metadata.get("ground_truth")
        if not ground_truth:
            return None

        # Get last assistant message
        for msg in reversed(messages):
            if msg.get("role") == "assistant":
                content = msg.get("content", "")
                if ground_truth in content:
                    return 1.0
                return 0.0
        return 0.0


# Export for CLI
agent_loop = CalculatorAgent()
app = create_app(agent_loop)

Server Configuration

create_app() Parameters

The create_app() function accepts several optional parameters for fine-tuning server behavior:
from osmosis_ai.rollout import create_app, RolloutSettings

app = create_app(
    agent_loop,
    max_concurrent=10,              # Max concurrent rollouts (default: unlimited)
    record_ttl_seconds=3600.0,      # How long to keep rollout records (default: 1 hour)
    settings=RolloutSettings(...),  # Custom settings (see below)
    debug_dir="./logs",             # Directory for debug logs
    on_startup=async_startup_fn,    # Async function called on server start
    on_shutdown=async_shutdown_fn,  # Async function called on server stop
)
ParameterTypeDefaultDescription
agent_loopRolloutAgentLoopRequiredYour agent loop implementation
max_concurrentint | NoneNoneMaximum concurrent rollouts (None = unlimited)
record_ttl_secondsfloat | None3600.0TTL for rollout records in seconds
settingsRolloutSettings | NoneNoneCustom configuration settings
debug_dirstr | NoneNoneDirectory for debug logging
on_startupCallable[[], Awaitable[None]] | NoneNoneAsync startup callback
on_shutdownCallable[[], Awaitable[None]] | NoneNoneAsync shutdown callback

RolloutSettings Configuration

For advanced configuration, use RolloutSettings with RolloutClientSettings:
from osmosis_ai.rollout.config import RolloutSettings, RolloutClientSettings

settings = RolloutSettings(
    client=RolloutClientSettings(
        timeout_seconds=300.0,          # HTTP client timeout
        max_retries=3,                  # Max retry attempts
        complete_rollout_retries=2,     # Retries for /completed endpoint
        retry_base_delay=1.0,           # Initial retry delay (seconds)
        retry_max_delay=30.0,           # Maximum retry delay (seconds)
        max_connections=100,            # HTTP connection pool size
        max_keepalive_connections=20,   # Keep-alive connections
    )
)

app = create_app(agent_loop, settings=settings)

Environment Variables

All client settings can be configured via environment variables:
Environment VariableDescriptionDefault
OSMOSIS_ROLLOUT_CLIENT_TIMEOUT_SECONDSHTTP timeout300.0
OSMOSIS_ROLLOUT_CLIENT_MAX_RETRIESMax retries3
OSMOSIS_ROLLOUT_CLIENT_COMPLETE_ROLLOUT_RETRIESRetries for /completed2
OSMOSIS_ROLLOUT_CLIENT_RETRY_BASE_DELAYInitial retry delay1.0
OSMOSIS_ROLLOUT_CLIENT_RETRY_MAX_DELAYMax retry delay30.0
OSMOSIS_ROLLOUT_CLIENT_MAX_CONNECTIONSConnection pool size100
OSMOSIS_ROLLOUT_CLIENT_MAX_KEEPALIVE_CONNECTIONSKeep-alive connections20
Example:
export OSMOSIS_ROLLOUT_CLIENT_TIMEOUT_SECONDS=120
export OSMOSIS_ROLLOUT_CLIENT_MAX_RETRIES=5
osmosis serve -m server:agent_loop
The get_last_assistant_content helper function (seen in some examples) is purely optional - it’s only useful for logging or reward computation. It’s not required by the SDK.

Handling Rewards

Reward computation in Remote Rollout is conditional - whether you need to return a reward depends on your platform configuration:
Platform ConfigurationReward Requirement
Reward computed on platform sideReturn None - no reward needed
Reward computed in remote rolloutMust return a float value

When Reward is Required

If your platform is configured to compute rewards in the remote rollout server, your run() method must return a reward value via ctx.complete():
async def run(self, ctx: RolloutContext) -> RolloutResult:
    # ... agent loop logic ...
    
    # Compute reward (required if platform expects it)
    reward = compute_reward_from_messages(messages, ctx.request.metadata)
    
    # reward must be a float, not None
    return ctx.complete(messages, finish_reason=finish_reason, reward=reward)

When Reward is Optional

If reward computation is handled elsewhere (e.g., on the platform side), you can simply return None:
async def run(self, ctx: RolloutContext) -> RolloutResult:
    # ... agent loop logic ...
    
    # No reward computation needed
    return ctx.complete(messages, finish_reason=finish_reason, reward=None)

Using @osmosis_reward Decorator

For complex reward computation, use the @osmosis_reward decorator:
from osmosis_ai import osmosis_reward

@osmosis_reward
def compute_reward(
    solution_str: str,
    ground_truth: str,
    extra_info: dict = None,
    **kwargs  # Required for platform compatibility
) -> float:
    """Compare solution to ground truth."""
    if solution_str.strip() == ground_truth.strip():
        return 1.0
    return 0.0
Always include **kwargs in reward functions for platform compatibility.

Helper Functions (Optional)

The following helper functions are purely optional - they’re useful for logging and reward computation but not required by the SDK:
from typing import List, Dict, Any, Optional

def get_last_assistant_content(messages: List[Dict[str, Any]]) -> Optional[str]:
    """Get the content of the last assistant message.
    
    NOTE: This is a helper function for logging/debugging purposes only.
    It is NOT required by the SDK.
    """
    for message in reversed(messages):
        if message.get("role") == "assistant":
            return message.get("content", "")
    return None


def compute_reward_from_messages(
    messages: List[Dict[str, Any]], 
    ground_truth: Optional[str]
) -> Optional[float]:
    """Compute reward from messages if ground_truth is available.
    
    NOTE: This function is only needed if the platform is configured to 
    compute rewards in the remote rollout server. If not, you can skip 
    this entirely and return None for reward.
    """
    if not ground_truth:
        return None

    solution_str = get_last_assistant_content(messages)
    if not solution_str:
        return 0.0

    # Your reward logic here
    return compute_reward(solution_str, ground_truth)

Extracting Solutions

Common pattern for extracting answers:
import re

def extract_answer(text: str) -> str | None:
    """Extract answer after #### marker."""
    match = re.search(r"####\s*(.+)", text)
    return match.group(1).strip() if match else None

Debug Logging

Use ctx.log_event() to trace execution:
async def run(self, ctx: RolloutContext) -> RolloutResult:
    ctx.log_event("agent_start", 
                  num_messages=len(ctx.request.messages))
    
    # ... agent logic ...
    
    ctx.log_event("llm_response",
                  has_tools=result.has_tool_calls,
                  tokens=result.usage.get("total_tokens"))
    
    ctx.log_event("agent_complete",
                  finish_reason=finish_reason,
                  reward=reward)
Enable logging with:
osmosis serve -m server:agent_loop --log ./logs
Output structure:
logs/
├── 1703270400/                    # Timestamp
│   ├── rollout-abc123.jsonl       # Per-rollout traces
│   └── rollout-def456.jsonl

Error Handling

Return errors gracefully:
async def run(self, ctx: RolloutContext) -> RolloutResult:
    try:
        # Agent logic
        return ctx.complete(messages)
    except Exception as e:
        return ctx.error(f"Agent failed: {e}")

Best Practices

Always run osmosis validate before deploying to catch issues early.
Check ctx.request.max_tokens_total and break early if exceeded.
Use ctx.record_tool_call() to track tool execution for analytics.
Use osmosis test with --interactive to debug agent behavior.
Use ctx.log_event() at key points (pre-LLM, post-tool, completion).

Next Steps