


The Store is used to record state and state changes.

The TaskState for each sample has a Store which can be used when solvers and/or tools need to coordinate changes to shared state. The Store can be accessed directly from the TaskState via or can be accessed using the store() global function.

Note that changes to the store that occur are automatically recorded to transcript as a StoreEvent. In order to be serialised to the transcript, values and objects must be JSON serialisable (you can make objects with several fields serialisable using the @dataclass decorator or by inheriting from Pydantic BaseModel)

class Store



Get a value from the store.

Provide a default to automatically initialise a named store value with the default when it does not yet exist.

def get(self, key: str, default: VT | None = None) -> VT | Any
key str

Name of value to get

default VT | None

Default value (defaults to None)


Set a value into the store.

def set(self, key: str, value: Any) -> None
key str

Name of value to set

value Any

Value to set


Remove a value from the store.

def delete(self, key: str) -> None
key str

Name of value to remove


View of keys within the store.

def keys(self) -> KeysView[str]

View of values within the store.

def values(self) -> ValuesView[Any]

View of items within the store.

def items(self) -> ItemsView[str, Any]


Get the currently active Store.

def store() -> Store


Get a Pydantic model interface to the store.

def store_as(model_cls: Type[SMT]) -> SMT
model_cls Type[SMT]

Pydantic model type (must derive from StoreModel)


Store backed Pydandic BaseModel.

The model is initialised from a Store, so that Store should either already satisfy the validation constraints of the model OR you should provide Field(default=) annotations for all of your model fields (the latter approach is recommended).

class StoreModel(BaseModel)



Obtain a concurrency context.

A concurrency context can be used to limit the number of coroutines executing a block of code (e.g calling an API). For example, here we limit concurrent calls to an api (‘api-name’) to 10:

async with concurrency("api-name", 10):
    # call the api

Note that concurrency for model API access is handled internally via the max_connections generation config option. Concurrency for launching subprocesses is handled via the subprocess function.

def concurrency(
    name: str,
    concurrency: int,
    key: str | None = None,
) -> asyncio.Semaphore
name str

Name for concurrency context. This serves as the display name for the context, and also the unique context key (if the key parameter is omitted)

concurrency int

Maximum number of coroutines that can enter the context.

key str | None

Unique context key for this context. Optional. Used if the unique key isn’t human readable – e.g. includes api tokens or account ids so that the more readable name can be presented to users e.g in console UI>


Execute and wait for a subprocess.

Convenience method for solvers, scorers, and tools to launch subprocesses. Automatically enforces a limit on concurrent subprocesses (defaulting to os.cpu_count() but controllable via the max_subprocesses eval config option).

async def subprocess(
    args: str | list[str],
    text: bool = True,
    input: str | bytes | memoryview | None = None,
    cwd: str | Path | None = None,
    env: dict[str, str] = {},
    capture_output: bool = True,
    output_limit: int | None = None,
    timeout: int | None = None,
) -> Union[ExecResult[str], ExecResult[bytes]]
args str | list[str]

Command and arguments to execute.

text bool

Return stdout and stderr as text (defaults to True)

input str | bytes | memoryview | None

Optional stdin for subprocess.

cwd str | Path | None

Switch to directory for execution.

env dict[str, str]

Additional environment variables.

capture_output bool

Capture stderr and stdout into ExecResult (if False, then output is redirected to parent stderr/stdout)

output_limit int | None

Stop reading output if it exceeds the specified limit (in bytes).

timeout int | None

Timeout. If the timeout expires then a TimeoutError will be raised.


Execution result from call to subprocess().

class ExecResult(Generic[T])


success bool

Did the process exit with success.

returncode int

Return code from process exit.

stdout T

Contents of stdout.

stderr T

Contents of stderr.



Get the current console display type.

def display_type() -> DisplayType


Console display type.

DisplayType = Literal["full", "conversation", "rich", "plain", "none"]


Input screen for receiving user input.

Context manager that clears the task display and provides a screen for receiving console input.

def input_screen(
    header: str | None = None,
    transient: bool | None = None,
    width: int | None = None,
) -> Iterator[Console]
header str | None

Header line to print above console content (defaults to printing no header)

transient bool | None

Return to task progress display after the user completes input (defaults to True for normal sessions and False when trace mode is enabled).

width int | None

Input screen width in characters (defaults to full width)



Decorator for subtasks.

def subtask(
    name: str | Subtask,
    store: Store | None = None,
    type: str | None = None,
    input: dict[str, Any] | None = None,
) -> Callable[..., Subtask] | Subtask
name str | Subtask

Name for subtask (defaults to function name)

store Store | None

Store to use for subtask

type str | None

Type to use for subtask

input dict[str, Any] | None

Input to log for subtask


Subtask with distinct Store and Transcript.

class Subtask(Protocol):
    async def __call__(
        *args: Any,
        **kwargs: Any,
    ) -> Any
*args Any

Arguments for the subtask.

**kwargs Any

Keyword arguments for the subtask.



Read and resolve a resource to a string.

Resources are often used for templates, configuration, etc. They are sometimes hard-coded strings, and sometimes paths to external resources (e.g. in the local filesystem or remote stores e.g. s3:// or https://).

The resource() function will resolve its argument to a resource string. If a protocol-prefixed file name (e.g. s3://) or the path to a local file that exists is passed then it will be read and its contents returned. Otherwise, it will return the passed str directly This function is mostly intended as a helper for other functions that take either a string or a resource path as an argument, and want to easily resolve them to the underlying content.

If you want to ensure that only local or remote files are consumed, specify type="file". For example: resource("templates/prompt.txt", type="file")

def resource(
    resource: str,
    type: Literal["auto", "file"] = "auto",
    fs_options: dict[str, Any] = {},
) -> str
resource str

Path to local or remote (e.g. s3://) resource, or for type="auto" (the default), a string containing the literal resource value.

type Literal['auto', 'file']

For “auto” (the default), interpret the resource as a literal string if its not a valid path. For “file”, always interpret it as a file path.

fs_options dict[str, Any]

Optional. Additional arguments to pass through to the fsspec filesystem provider (e.g. S3FileSystem). Use {"anon": True } if you are accessing a public S3 bucket with no credentials.


Throttle a function to ensure it is called no more than every n seconds.

def throttle(seconds: float) -> Callable[..., Any]
seconds float

Throttle time.


Trace a long running or poentially unreliable action.

Trace actions for which you want to collect data on the resolution (e.g. succeeded, cancelled, failed, timed out, etc.) and duration of.

Traces are written to the TRACE log level (which is just below HTTP and INFO). List and read trace logs with inspect trace list and related commands (see inspect trace --help for details).

def trace_action(
    logger: Logger, action: str, message: str, *args: Any, **kwargs: Any
) -> Generator[None, None, None]
logger Logger

Logger to use for tracing (e.g. from getLogger(__name__))

action str

Name of action to trace (e.g. ‘Model’, ‘Subprocess’, etc.)

message str

Message describing action (can be a format string w/ args or kwargs)

*args Any

Positional arguments for message format string.

**kwargs Any

Named args for message format string.


Log a message using the TRACE log level.

The TRACE log level is just below HTTP and INFO). List and read trace logs with inspect trace list and related commands (see inspect trace --help for details).

def trace_message(
    logger: Logger, category: str, message: str, *args: Any, **kwargs: Any
) -> None
logger Logger

Logger to use for tracing (e.g. from getLogger(__name__))

category str

Category of trace message.

message str

Trace message (can be a format string w/ args or kwargs)

*args Any

Positional arguments for message format string.

**kwargs Any

Named args for message format string.



Get the SandboxEnvironment for the current sample.

def sandbox(name: str | None = None) -> SandboxEnvironment
name str | None

Optional sandbox environmnent name.


Get the SandboxEnvironment for the current sample that has the specified file.

async def sandbox_with(file: str) -> SandboxEnvironment | None
file str

Path to file to check for.


Environment for executing arbitrary code from tools.

Sandbox environments provide both an execution environment as well as a per-sample filesystem context to copy samples files into and resolve relative paths to.

class SandboxEnvironment(abc.ABC)



Execute a command within a sandbox environment.

The current working directory for execution will be the per-sample filesystem context.

Each output stream (stdout and stderr) is limited to 10 MiB. If exceeded, an OutputLimitExceededError will be raised.

async def exec(
    cmd: list[str],
    input: str | bytes | None = None,
    cwd: str | None = None,
    env: dict[str, str] = {},
    user: str | None = None,
    timeout: int | None = None,
    timeout_retry: bool = True,
) -> ExecResult[str]
cmd list[str]

Command or command and arguments to execute.

input str | bytes | None

Standard input (optional).

cwd str | None

Current working dir (optional). If relative, will be relative to the per-sample filesystem context.

env dict[str, str]

Environment variables for execution.

user str | None

Optional username or UID to run the command as.

timeout int | None

Optional execution timeout (seconds).

timeout_retry bool

Retry the command in the case that it times out. Commands will be retried up to twice, with a timeout of no greater than 60 seconds for the first retry and 30 for the second.


Write a file into the sandbox environment.

If the parent directories of the file path do not exist they should be automatically created.

async def write_file(self, file: str, contents: str | bytes) -> None
file str

Path to file (relative file paths will resolve to the per-sample working directory).

contents str | bytes

Text or binary file contents.


Read a file from the sandbox environment.

File size is limited to 100 MiB.

When reading text files, implementations should preserve newline constructs (e.g. crlf should be preserved not converted to lf). This is equivalent to specifying newline="" in a call to the Python open() function.

async def read_file(self, file: str, text: bool = True) -> Union[str | bytes]
file str

Path to file (relative file paths will resolve to the per-sample working directory).

text bool

Read as a utf-8 encoded text file.


Information required to connect to sandbox environment.

async def connection(self) -> SandboxConnection

Standard config files for this provider (used for automatic discovery)

def config_files(cls) -> list[str]

Default max_sandboxes for this provider (None means no maximum)

def default_concurrency(cls) -> int | None

Called at task startup initialize resources.

async def task_init(
    cls, task_name: str, config: SandboxEnvironmentConfigType | None
) -> None
task_name str

Name of task using the sandbox environment.

config SandboxEnvironmentConfigType | None

Implementation defined configuration (optional).


Initialize sandbox environments for a sample.

async def sample_init(
    task_name: str,
    config: SandboxEnvironmentConfigType | None,
    metadata: dict[str, str],
) -> dict[str, "SandboxEnvironment"]
task_name str

Name of task using the sandbox environment.

config SandboxEnvironmentConfigType | None

Implementation defined configuration (optional).

metadata dict[str, str]

Sample metadata field


Cleanup sandbox environments.

async def sample_cleanup(
    task_name: str,
    config: SandboxEnvironmentConfigType | None,
    environments: dict[str, "SandboxEnvironment"],
    interrupted: bool,
) -> None
task_name str

Name of task using the sandbox environment.

config SandboxEnvironmentConfigType | None

Implementation defined configuration (optional).

environments dict[str, 'SandboxEnvironment']

Sandbox environments created for this sample.

interrupted bool

Was the task interrupted by an error or cancellation


Called at task exit as a last chance to cleanup resources.

async def task_cleanup(
    cls, task_name: str, config: SandboxEnvironmentConfigType | None, cleanup: bool
) -> None
task_name str

Name of task using the sandbox environment.

config SandboxEnvironmentConfigType | None

Implementation defined configuration (optional).

cleanup bool

Whether to actually cleanup environment resources (False if --no-sandbox-cleanup was specified)


Handle a cleanup invoked from the CLI (e.g. inspect sandbox cleanup).

async def cli_cleanup(cls, id: str | None) -> None
id str | None

Optional ID to limit scope of cleanup.


Information required to connect to sandbox.

class SandboxConnection(BaseModel)


type str

Sandbox type name (e.g. ‘docker’, ‘local’, etc.)

command str

Shell command to connect to sandbox.

vscode_command list[Any] | None

Optional vscode command (+args) to connect to sandbox.

ports list[PortMapping] | None

Optional list of port mappings into container

container str | None

Optional container name (does not apply to all sandboxes).


Decorator for registering sandbox environments.

def sandboxenv(name: str) -> Callable[..., Type[T]]
name str

Name of SandboxEnvironment type