Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.osmosis.ai/llms.txt

Use this file to discover all available pages before exploring further.

The AgentWorkflow class is the core abstraction for defining agent behavior in Osmosis. You subclass it and implement a single async method — run() — that processes a prompt and produces output.

AgentWorkflow Base Class

from osmosis_ai.rollout import AgentWorkflow, AgentWorkflowContext

class MyWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        # Your agent logic here
        pass
The base class signature from the SDK:
class AgentWorkflow(Generic[TConfig], ABC):
    def __init__(self, config: TConfig | None = None):
        self.config = config

    @abstractmethod
    async def run(self, ctx: AgentWorkflowContext[TConfig]) -> Any:
        raise NotImplementedError
AgentWorkflow is generic over a config type, letting you define custom configuration fields. It has one abstract method — run() — which receives an AgentWorkflowContext and should execute your agent logic.

AgentWorkflowContext

The ctx parameter passed to run() provides two fields:
FieldTypeDescription
ctx.promptlist[dict[str, Any]]Input messages for the current dataset row
ctx.config`TConfigNone`Your custom config object (if provided)
For most workflows, you do not need to care how the backend materializes those messages. If your dataset row contains system_prompt, user_prompt, and ground_truth, the input side shows up in ctx.prompt, while the reference answer is exposed to your grader as ctx.label. If you are using OsmosisStrandsAgent, you can usually pass messages=ctx.prompt directly with no extra reshaping.

AgentWorkflowConfig

Custom configs extend AgentWorkflowConfig, which itself extends BaseConfig:
from osmosis_ai.rollout import AgentWorkflowConfig

class MyConfig(AgentWorkflowConfig):
    name: str = "my-workflow"
    description: str | None = "A custom workflow"
    max_iterations: int = 10
    temperature: float = 0.7
BaseConfig provides the following base fields:
FieldTypeDefaultDescription
namestr(required)Identifier for the workflow
description`strNone`NoneOptional description
AgentWorkflowConfig adds concurrency control:
FieldTypeDefaultDescription
concurrencyConcurrencyConfigunlimitedControls max concurrent rollouts
from osmosis_ai.rollout import AgentWorkflowConfig, ConcurrencyConfig

class MyConfig(AgentWorkflowConfig):
    name: str = "rate-limited-workflow"
    concurrency: ConcurrencyConfig = ConcurrencyConfig(max_concurrent=5)
BaseConfig is configured with extra="allow", so you can add any custom fields you need without additional Pydantic boilerplate.

Implementation Patterns

LLM calls inside run() must route through the RolloutContext that the execution backend sets up for each rollout. The training cluster uses this routing to serve the current policy, attach x-sample-id / x-rollout-id headers, and collect rollout traces and token usage.The supported path is OsmosisStrandsAgent + OsmosisRolloutModel — both read from the active RolloutContext automatically. Calling litellm (or any provider SDK) directly with a hard-coded model like openai/gpt-5.2 bypasses this wiring and produces a rollout server that is not compatible with training. See Strands Integration for the full mechanism.

Simple LLM Call

The simplest workflow wraps the prompt in an OsmosisStrandsAgent and invokes it once. OsmosisRolloutModel acts as a placeholder that the SDK resolves to the training cluster’s endpoint at runtime:
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowContext,
    OsmosisRolloutModel,
    OsmosisStrandsAgent,
)

class SimpleLLMWorkflow(AgentWorkflow):
    async def run(self, ctx: AgentWorkflowContext) -> None:
        agent = OsmosisStrandsAgent(
            name="simple",
            model=OsmosisRolloutModel(params={"temperature": 1.0}),
            messages=ctx.prompt,
            callback_handler=None,
        )
        await agent.invoke_async()
Constructing the agent inside run() registers it with the current RolloutContext, so the resulting message trace is collected as a rollout sample automatically — no manual bookkeeping required.

Multi-Step Agent Loop with Tools

For agents that use tools, put the model and tool list on a custom config, then iterate until the agent produces no more tool calls. Strands handles the per-invocation tool-use loop internally; the outer loop lets you apply additional stopping criteria and cap the number of rounds:
from typing import Any

from strands import tool
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowConfig,
    AgentWorkflowContext,
    OsmosisRolloutModel,
    OsmosisStrandsAgent,
)


@tool(name="search")
def search_tool(query: str) -> str:
    """Search for information."""
    return f"results for {query}"


class SearchWorkflowConfig(AgentWorkflowConfig):
    name: str = "search-workflow"
    model: Any
    tools: Any


search_workflow_config = SearchWorkflowConfig(
    model=OsmosisRolloutModel(params={"temperature": 1.0}),
    tools=[search_tool],
)


class SearchWorkflow(AgentWorkflow[SearchWorkflowConfig]):
    async def run(self, ctx: AgentWorkflowContext[SearchWorkflowConfig]) -> None:
        config = ctx.config or search_workflow_config
        agent = OsmosisStrandsAgent(
            name="search-agent",
            model=config.model,
            tools=config.tools,
            messages=ctx.prompt,
            callback_handler=None,
        )

        for _ in range(8):  # max iterations
            result = await agent.invoke_async()
            content = result.message.get("content", [])
            if not any("toolUse" in cb for cb in content):
                break
See examples/rollout/multiply_rollout/workflow.py in the osmosis-sdk-python repository for a complete version of this pattern.

Custom Configuration

Use a custom config to make your workflow parameterizable:
from osmosis_ai.rollout import (
    AgentWorkflow,
    AgentWorkflowConfig,
    AgentWorkflowContext,
)

class MyConfig(AgentWorkflowConfig):
    name: str = "my-workflow"
    max_iterations: int = 10
    temperature: float = 0.7

class ConfigurableWorkflow(AgentWorkflow[MyConfig]):
    async def run(self, ctx: AgentWorkflowContext[MyConfig]) -> None:
        max_iter = ctx.config.max_iterations if ctx.config else 10
        # ... use config values in your agent logic ...
Config values can be set in your eval or training TOML configuration files, making it easy to experiment with different parameters without changing code.

Auto-Discovery

The SDK automatically scans your rollout entrypoint module for subclasses of AgentWorkflow. No registration, decorators, or factory functions are needed — just define your class and it will be discovered.
Your entrypoint file must contain exactly one AgentWorkflow subclass. If the SDK finds zero or more than one, it will raise an error.

Using Strands Agents

The Osmosis SDK includes built-in integration with the AWS Strands Agents framework, letting you use Strands agents directly inside your AgentWorkflow. See Strands Integration for setup and usage details.

Next Steps

Building Graders

Define evaluation logic to score your agent’s outputs.

Local Evaluation

Test your workflow locally with eval mode.