Lightweight VM sandboxes for Python — run AI agents and untrusted code with hardware-level isolation.
The microsandbox Python package provides native bindings to the microsandbox runtime via pyo3. It spins up real microVMs (not containers) in under 100ms, runs standard OCI (Docker) images, and gives you full control over execution, filesystem, networking, and secrets — all from a simple async API.
- Hardware isolation — Each sandbox is a real VM with its own Linux kernel
- Sub-100ms boot — No daemon, no server setup, embedded directly in your app
- OCI image support — Pull and run images from Docker Hub, GHCR, ECR, or any OCI registry
- Command execution — Run commands with collected or streaming output, interactive shells
- Guest filesystem access — Read, write, list, copy files inside a running sandbox
- Named volumes — Persistent storage across sandbox restarts, with quotas
- Network policies — Public-only (default), allow-all, or fully airgapped
- DNS filtering — Block specific domains or domain suffixes
- TLS interception — Transparent HTTPS inspection and secret substitution
- Secrets — Credentials that never enter the VM; placeholder substitution at the network layer
- Port publishing — Expose guest TCP/UDP services on host ports
- Rootfs patches — Modify the filesystem before the VM boots
- Detached mode — Sandboxes can outlive the Python process
- Metrics — CPU, memory, disk I/O, and network I/O per sandbox
- Typed — Frozen dataclasses,
StrEnums, event objects,.pyistubs
- Python >= 3.10
- Linux with KVM enabled, or macOS with Apple Silicon (M-series)
| Platform | Architecture | Wheel tag |
|---|---|---|
| macOS | ARM64 (Apple Silicon) | macosx_11_0_arm64 |
| Linux | x86_64 | manylinux_2_28_x86_64 |
| Linux | ARM64 | manylinux_2_28_aarch64 |
Runtime binaries (msb + libkrunfw) are bundled in the wheel. One wheel per platform, all Python 3.10+ versions via abi3.
pip install microsandboximport asyncio
from microsandbox import Sandbox
async def main():
# Create a sandbox from an OCI image.
sandbox = await Sandbox.create(
"my-sandbox",
image="alpine",
cpus=1,
memory=512,
)
# Run a command.
output = await sandbox.shell("echo 'Hello from microsandbox!'")
print(output.stdout_text)
# Stop the sandbox.
await sandbox.stop()
asyncio.run(main())import sys
from microsandbox import Sandbox
# Collected output.
output = await sandbox.exec("python3", ["-c", "print(1 + 1)"])
print(output.stdout_text) # "2\n"
print(output.exit_code) # 0
# Shell command (pipes, redirects, etc.).
output = await sandbox.shell("echo hello && pwd")
print(output.stdout_text)
# Full configuration via keyword-only options.
output = await sandbox.exec(
"python3",
["script.py"],
cwd="/app",
env={"PYTHONPATH": "/app/lib"},
timeout=30.0,
)
# Streaming output.
handle = await sandbox.exec_stream("tail", ["-f", "/var/log/app.log"])
async for event in handle:
match event.event_type:
case "stdout": sys.stdout.buffer.write(event.data)
case "stderr": sys.stderr.buffer.write(event.data)
case "exited": break
await sandbox.stop()fs = sandbox.fs
# Write and read files.
await fs.write("/tmp/config.json", b'{"debug": true}')
content = await fs.read_text("/tmp/config.json")
# List a directory.
entries = await fs.list("/etc")
for entry in entries:
print(f"{entry.path} ({entry.kind})")
# Copy between host and guest.
await fs.copy_from_host("./local-file.txt", "/tmp/file.txt")
await fs.copy_to_host("/tmp/output.txt", "./output.txt")
# Check existence and metadata.
if await fs.exists("/tmp/config.json"):
meta = await fs.stat("/tmp/config.json")
print(f"size: {meta.size}, kind: {meta.kind}")
# Streaming read.
async for chunk in await fs.read_stream("/tmp/large-file.bin"):
process(chunk)from microsandbox import Sandbox, Volume
# Create a 100 MiB named volume.
data = await Volume.create("my-data", quota_mib=100)
# Mount it in a sandbox.
writer = await Sandbox.create(
"writer",
image="alpine",
volumes={"/data": Volume.named(data.name)},
replace=True,
)
await writer.shell("echo 'hello' > /data/message.txt")
await writer.stop()
# Mount the same volume in another sandbox (read-only).
reader = await Sandbox.create(
"reader",
image="alpine",
volumes={"/data": Volume.named(data.name, readonly=True)},
replace=True,
)
output = await reader.shell("cat /data/message.txt")
print(output.stdout_text) # "hello\n"
await reader.stop()
# Cleanup.
await Sandbox.remove("writer")
await Sandbox.remove("reader")
await Volume.remove("my-data")from microsandbox import Sandbox, Volume, DiskImageFormat
# Mount a host disk image at a guest path. Format is inferred from the
# extension; pass `format=` to override. `fstype` is the inner FS agentd
# will mount; omit to let agentd autodetect.
sb = await Sandbox.create(
"worker",
image="alpine",
volumes={
"/data": Volume.disk("./data.qcow2", format=DiskImageFormat.QCOW2, fstype="ext4"),
"/seed": Volume.disk("./seed.raw", readonly=True),
"/scratch": Volume.tmpfs(size_mib=128, readonly=True),
},
replace=True,
)from microsandbox import Destination, Network, NetworkPolicy, Protocol, Rule, Sandbox
# Default: public internet only (blocks private ranges).
sandbox = await Sandbox.create("public", image="alpine")
# Fully airgapped.
sandbox = await Sandbox.create(
"isolated",
image="alpine",
network=Network.none(),
)
# Unrestricted.
sandbox = await Sandbox.create(
"open",
image="alpine",
network=Network.allow_all(),
)
# DNS filtering.
sandbox = await Sandbox.create(
"filtered",
image="alpine",
network=Network(
deny_domains=("blocked.example.com",),
deny_domain_suffixes=(".evil.com",),
),
)
# Explicit allowlist with typed destinations.
sandbox = await Sandbox.create(
"allowlisted",
image="alpine",
network=Network(
policy=NetworkPolicy(
default_egress="deny",
rules=(
Rule.allow(
destination=Destination.ip("1.1.1.1"),
protocol=Protocol.TCP,
port=443,
),
Rule.allow(
destination=Destination.domain("api.github.com"),
protocol=Protocol.TCP,
port=443,
),
),
),
),
)sandbox = await Sandbox.create(
"web",
image="python",
ports={8080: 80}, # host:8080 → guest:80
)Secrets use placeholder substitution — the real value never enters the VM. It is only swapped in at the network layer for HTTPS requests to allowed hosts.
from microsandbox import Sandbox, Secret
sandbox = await Sandbox.create(
"agent",
image="python",
secrets=[
Secret.env("OPENAI_API_KEY",
value=os.environ["OPENAI_API_KEY"],
allow_hosts=["api.openai.com"]),
],
)
# Guest sees: OPENAI_API_KEY=$MSB_OPENAI_API_KEY (a placeholder)
# HTTPS to api.openai.com: placeholder is transparently replaced with the real key
# HTTPS to any other host with the placeholder: request is blockedModify the filesystem before the VM boots:
from microsandbox import Patch, Sandbox
sandbox = await Sandbox.create(
"patched",
image="alpine",
patches=[
Patch.text("/etc/greeting.txt", "Hello!\n"),
Patch.mkdir("/app", mode=0o755),
Patch.text("/app/config.json", '{"debug": true}', mode=0o644),
Patch.copy_dir("./scripts", "/app/scripts"),
Patch.append("/etc/hosts", "127.0.0.1 myapp.local\n"),
],
)Sandboxes in detached mode survive the Python process. The returned sandbox can still issue operations by name, but it does not own the host process lifecycle:
# Create in detached mode.
sandbox = await Sandbox.create(
"background",
image="python",
detached=True,
)
# Later, from another process:
handle = await Sandbox.get("background")
reconnected = await handle.connect()
output = await reconnected.shell("echo reconnected")async with await Sandbox.create("temp", image="alpine", replace=True) as sb:
output = await sb.shell("echo hello")
print(output.stdout_text)
# Sandbox is automatically killed and removed on exit.replace=True stops a sandbox with the same name (if any) and
recreates it. By default the existing one gets 10 seconds to exit
cleanly after SIGTERM before SIGKILL; pass replace_with_timeout
(in seconds, fractional allowed) to override. Setting
replace_with_timeout implies replace=True.
# Default 10s SIGTERM timeout.
sb = await Sandbox.create("worker", image="alpine", replace=True)
# Wait longer for a workload that needs more time to shut down.
sb = await Sandbox.create("worker", image="alpine", replace_with_timeout=30)
# Skip SIGTERM entirely; SIGKILL immediately.
sb = await Sandbox.create("worker", image="alpine", replace_with_timeout=0)If you'd rather handle the conflict yourself, catch the typed error:
from microsandbox import SandboxAlreadyExistsError
try:
sb = await Sandbox.create("worker", image="alpine")
except SandboxAlreadyExistsError:
print("already exists; resume or pass replace=True")from microsandbox import Network, Sandbox, TlsConfig
sandbox = await Sandbox.create(
"tls-inspect",
image="python",
network=Network(
tls=TlsConfig(
bypass=("*.googleapis.com",),
verify_upstream=True,
intercepted_ports=(443,),
),
),
)from microsandbox import Sandbox, all_sandbox_metrics, MiB
sandbox = await Sandbox.create("metrics-demo", image="python")
# Per-sandbox metrics.
m = await sandbox.metrics()
print(f"CPU: {m.cpu_percent:.1f}%")
print(f"Memory: {m.memory_bytes // MiB} MiB")
print(f"Uptime: {m.uptime_ms / 1000:.1f}s")
# Streaming metrics.
async for m in sandbox.metrics_stream(interval=1.0):
print(f"CPU: {m.cpu_percent:.1f}%")
# All sandboxes at once.
all_metrics = await all_sandbox_metrics()
for name, metrics in all_metrics.items():
print(f"{name}: {metrics.cpu_percent:.1f}%")from microsandbox import is_installed, install
if not is_installed():
await install() # Downloads msb + libkrunfw to ~/.microsandbox/| Class | Description |
|---|---|
Sandbox |
Live handle to a running sandbox — lifecycle, execution, filesystem |
SandboxHandle |
Lightweight database handle — use connect() or start() to get a live Sandbox |
ExecOutput |
Captured stdout/stderr with exit status |
ExecHandle |
Streaming execution handle — async iterator over events |
ExecSink |
Writable stdin channel for streaming exec |
SandboxFsOps |
Guest filesystem operations (read, write, list, copy, stat) |
FsReadStream |
Async iterator over file data chunks |
FsWriteSink |
Async context manager for streaming writes |
Volume |
Persistent named volume |
VolumeHandle |
Lightweight volume handle from the database |
Image |
Image source factories and local OCI image-cache management |
ImageHandle |
Lightweight cached image handle from the database |
ImagePruneReport |
Summary returned by Image.prune() |
MetricsStream |
Async iterator over metrics snapshots |
PullSession |
Async context manager for creation with pull progress |
| Class | Description |
|---|---|
Volume.bind() / .named() / .tmpfs() / .disk() |
Volume mount configuration |
Network.none() / .public_only() / .allow_all() |
Network presets |
Secret.env() |
Secret entry with host allowlist |
Patch.text() / .mkdir() / .copy_file() / .append() / ... |
Pre-boot filesystem modifications |
Image.oci(..., upper_size_mib=...) / .bind() / .disk() |
Explicit rootfs source configuration |
Rlimit.nofile() / .cpu() / .as_() / ... |
POSIX resource limits |
| Enum | Values |
|---|---|
PullPolicy |
ALWAYS, IF_MISSING, NEVER |
LogLevel |
TRACE, DEBUG, INFO, WARN, ERROR |
SecurityProfile |
DEFAULT, RESTRICTED |
SandboxStatus |
RUNNING, STOPPED, CRASHED, DRAINING, PAUSED |
Action |
ALLOW, DENY |
Direction |
EGRESS, INGRESS |
Protocol |
TCP, UDP, ICMPV4, ICMPV6 |
DestGroup |
LOOPBACK, PRIVATE, LINK_LOCAL, METADATA, MULTICAST |
ViolationAction |
BLOCK, BLOCK_AND_LOG, BLOCK_AND_TERMINATE |
FsEntryKind |
FILE, DIRECTORY, SYMLINK, OTHER |
RlimitResource |
CPU, FSIZE, NOFILE, AS, ... (16 variants) |
| Type | Description |
|---|---|
ExitStatus |
Exit code and success flag |
MountConfig |
Volume mount (bind, named, tmpfs, or disk) |
PatchConfig |
Pre-boot filesystem modification |
SecretEntry |
Secret binding to env var with host allowlist |
NetworkPolicy |
Custom network policy with rules |
TlsConfig |
TLS interception options |
Network |
Full network configuration |
Rule |
Network policy rule |
RegistryAuth |
Docker registry credentials |
Size |
Memory/storage size value type |
Rlimit |
POSIX resource limit |
| Type | Description |
|---|---|
| Streaming exec events | Event objects returned by exec_stream() / shell_stream(). Inspect event_type, pid, data, and code. |
| Pull progress events | Event objects yielded by PullSession.progress. Inspect event_type and the optional detail fields for that event. |
| Function | Description |
|---|---|
is_installed() |
Check if msb and libkrunfw are available |
install() |
Download and install runtime dependencies |
all_sandbox_metrics() |
Get metrics for all running sandboxes |
version() |
Return the SDK version string |
cd sdk/python
uv sync --group devmaturin develop --releaseuv run pytest tests/uv run --project sdk/python python examples/python/root-oci/main.pyuv run ruff check .Apache-2.0