Datasets I/O — dream.io
dream.io is the Layer 1 dataset abstraction for bulk inference. It
gives you a uniform way to load starting frames + action sequences
from any source (Hugging Face, S3, or local disk), run a rollout for
every row, and write the results to any sink — in one call. No
iteration loop, no per-row boilerplate.
When to use dream.io
- Synthetic data at scale — you have 50K episodes on Hugging Face
and want 50K predicted rollouts back on HF.
frames_from_hf+RolloutSink.hf+client.predict_manycovers the whole pipeline in ~10 lines. - Training-loop augmentation — your dataloader needs a small
batch of predictions each step.
frames_from_dir+ a quickpredict_manycall keeps the loop tight without per-row glue. - Custom backends — you have a proprietary data store. Subclass
IterableSourceorRolloutSink;predict_manyaccepts any conformant implementation.
For a walkthrough that ties these pieces together, see the Bulk inference quickstart and the bulk-augment-gr1 example.
Installing the [io] extra
pip install "dream-engine[io]"The [io] extra adds datasets>=3.0, huggingface_hub>=0.24,
pyarrow>=15, and boto3>=1.34. These are lazy-imported — only
the loader/sink you actually call pays its import cost.
frames_from_dir and RolloutSink.dir work without [io].
Source loaders
All loaders return an IterableSource. Pass the source directly to
client.predict_many or client.estimate_cost.
frames_from_hf
from dream.io import frames_from_hf src = frames_from_hf( repo, *, split="train", frame_field, actions_field, id_field=None, revision=None, token=None,) -> IterableSourceStreams rows from a Hugging Face dataset via
datasets.load_dataset(..., streaming=True). Each row is encoded to
PNG bytes (frame) + .npy bytes (actions) before being yielded as a
SourceRow.
| Parameter | Type | Description |
|---|---|---|
repo | str | HF dataset repo id, e.g. "kingJulio/dream-engine-example-frames" |
split | str | Dataset split. Default "train". |
frame_field | str | Column name holding the starting frame. Accepts PIL.Image, {"bytes":…,"path":…} dicts (HF Image feature), numpy.ndarray (H × W × 3 uint8), or string paths. |
actions_field | str | Column name holding the action sequence. Shape must be (T, action_dim) float32. |
id_field | str | None | Column to use as the stable row id. None defaults to "row_{i:08d}". |
revision | str | None | Dataset revision (branch, tag, or commit sha). None uses the default branch. |
token | str | None | HF read token. Falls back to HF_TOKEN env var. Public datasets work without a token. |
Returns: IterableSource. rows_hint is populated from Hub
metadata when available; for large streaming datasets you may get
None (see estimate_cost).
Example:
import dreamfrom dream.io import frames_from_hf, RolloutSink src = frames_from_hf( "kingJulio/dream-engine-example-frames", frame_field="start_frame", actions_field="action_sequence",) for row in src: print(row.row_id, len(row.frame_bytes), len(row.actions_bytes)) break # 16 rows total in the public fixtureRequires [io] extra.
frames_from_s3
from dream.io import frames_from_s3 src = frames_from_s3( uri, *, frame_glob="*.png", actions_glob="*.npy", id_pattern=r"(.*)\.png$", boto3_kwargs=None,) -> IterableSourceLists objects under an S3 prefix, pairs frames with same-stem action
files, and yields SourceRow instances.
| Parameter | Type | Description |
|---|---|---|
uri | str | S3 URI prefix, e.g. "s3://my-bucket/frames/" |
frame_glob | str | Glob pattern to find frame objects under the prefix. |
actions_glob | str | Glob pattern to find matching action objects. |
id_pattern | str | Regex applied to the frame key to extract the row id. Group 1 is used. |
boto3_kwargs | dict | None | Forwarded to boto3.client("s3", ...). Use for cross-account or non-default-profile setups. |
Example:
from dream.io import frames_from_s3 src = frames_from_s3( "s3://my-robot-data/episodes/", frame_glob="*.png", actions_glob="*.npy",)Requires [io] extra (boto3).
frames_from_dir
from dream.io import frames_from_dir src = frames_from_dir( path, *, frame_glob="*.png", actions_glob="*.npy",) -> IterableSourceGlobs a local directory for frame/actions pairs. The row id is the file stem (the part before the extension).
| Parameter | Type | Description |
|---|---|---|
path | str | Path | Directory containing the frames and action files. |
frame_glob | str | Glob pattern for frame files. |
actions_glob | str | Glob pattern for action files. Must pair 1-to-1 with frame files by stem. |
Example:
from dream.io import frames_from_dir # Directory layout:# /data/episodes/ep_001.png /data/episodes/ep_001.npy# /data/episodes/ep_002.png /data/episodes/ep_002.npysrc = frames_from_dir("/data/episodes")Does not require the [io] extra. This is the only loader
that works with the base pip install dream-engine.
Sinks
Sinks receive completed rollouts from client.predict_many and write
them to their backing store. Construct via the factory classmethods.
RolloutSink.dir
sink = RolloutSink.dir( path, *, videos_subdir="videos", metadata_filename="metadata.parquet",) -> RolloutSinkWrites to a local directory. Videos land at
<path>/<videos_subdir>/<row_id>.mp4; a single Parquet file is
written at <path>/<metadata_filename> when finalize() is called.
| Parameter | Type | Description |
|---|---|---|
path | str | Path | Output directory. Created if it does not exist. |
videos_subdir | str | Subdirectory for mp4 files. |
metadata_filename | str | Filename for the Parquet metadata table. |
Example:
from dream.io import RolloutSink sink = RolloutSink.dir("./out") result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")# ./out/videos/row_00000000.mp4# ./out/videos/row_00000001.mp4# …# ./out/metadata.parquetprint(result.output_uri) # /absolute/path/to/outRequires [io] extra (uses pyarrow to write the metadata
Parquet on finalize()). The mp4 writes during predict_many work
without it, but the run errors on the final flush — install [io]
whenever you use RolloutSink.dir.
RolloutSink.s3
sink = RolloutSink.s3( uri, *, boto3_kwargs=None, videos_subdir="videos", metadata_filename="metadata.parquet",) -> RolloutSinkMirrors the dir layout on S3. Each mp4 is uploaded as a separate
put_object call; the Parquet metadata file is uploaded on
finalize().
| Parameter | Type | Description |
|---|---|---|
uri | str | S3 URI prefix, e.g. "s3://my-bucket/rollouts/" |
boto3_kwargs | dict | None | Forwarded to boto3.client("s3", ...). |
videos_subdir | str | Prefix subdirectory for mp4 objects. |
metadata_filename | str | Object key (under uri) for the Parquet table. |
Example:
sink = RolloutSink.s3("s3://my-bucket/rollouts/run-2026-05-06/")result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")print(result.output_uri) # s3://my-bucket/rollouts/run-2026-05-06Requires [io] extra (boto3).
RolloutSink.hf
sink = RolloutSink.hf( repo, *, token=None, private=False, shard_rows=500, license="cc-by-4.0", pretty_name=None,) -> RolloutSinkWrites a Hugging Face dataset with path-based mp4 files + Parquet metadata shards, following the Hub's recommended layout:
<repo>/
videos/<row_id>.mp4
metadata/
data-00000.parquet # one shard per shard_rows rollouts (zero-indexed)
data-00001.parquet
…
README.md # auto-generated dataset cardEach shard is uploaded via HfApi.upload_file as rows accumulate,
so your training pipeline can start reading via streaming=True
before the run finishes. The README.md is written once on the
first shard and is not rewritten on subsequent flushes —
re-running predict_many against the same repo appends new shards
without touching the existing README.
| Parameter | Type | Description |
|---|---|---|
repo | str | HF dataset repo id, e.g. "my-org/predicted-rollouts". Created if it does not exist. |
token | str | None | HF write token. Falls back to HF_TOKEN env or huggingface-cli login cache. |
private | bool | Create the repo as private on first commit. Has no effect on subsequent pushes. |
shard_rows | int | Rows per Parquet shard. 500 ≈ 250 MB per shard for GR-1 rollouts. |
license | str | Dataset card license identifier. Must be compatible with the source model's license. |
pretty_name | str | None | Human-readable name for the dataset card. |
Example:
import osfrom dream.io import frames_from_hf, RolloutSink src = frames_from_hf( "kingJulio/dream-engine-example-frames", frame_field="start_frame", actions_field="action_sequence",)sink = RolloutSink.hf( "my-org/gr1-predictions", token=os.environ["HF_WRITE_TOKEN"], shard_rows=500,)result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")print(result.output_uri) # hf://datasets/my-org/gr1-predictionsRequires [io] extra (huggingface_hub, pyarrow).
Custom sources
Subclass IterableSource to load rows from any data origin. You only
need to implement __iter__:
import numpy as npfrom io import BytesIOfrom PIL import Imagefrom dream.io import IterableSource, SourceRow class MyDatabaseSource(IterableSource): """Loads episodes from a custom database.""" def __init__(self, connection_string: str): self._conn_str = connection_string self.rows_hint = self._count_rows() # optional; enables estimate_cost def _count_rows(self) -> int: # query your database for the total row count ... def __iter__(self): for episode in self._fetch_episodes(): # encode frame to PNG bytes buf = BytesIO() Image.fromarray(episode["start_frame"]).save(buf, format="PNG") frame_bytes = buf.getvalue() # encode actions to .npy bytes actions_buf = BytesIO() np.save(actions_buf, episode["actions"].astype(np.float32)) actions_bytes = actions_buf.getvalue() yield SourceRow( row_id=str(episode["id"]), frame_bytes=frame_bytes, actions_bytes=actions_bytes, metadata={"source": "my-database"}, )Pass it directly to client.predict_many:
src = MyDatabaseSource("postgresql://...")result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")Custom sinks
Subclass RolloutSink to write completed rollouts to your own
backend. Implement write (called per-row) and finalize (called
once at the end):
from dream.io import RolloutSink, RolloutRecord class MyWarehouseSink(RolloutSink): """Writes rollouts to a proprietary video warehouse.""" def __init__(self, warehouse_url: str): self._url = warehouse_url self._written = 0 def write(self, record: RolloutRecord) -> None: # record.row_id — matches source row id # record.mp4_bytes — raw mp4 from the engine # record.metadata — provenance dict from Client.predict_many self._upload(record.row_id, record.mp4_bytes) self._written += 1 def finalize(self) -> str: return f"{self._url} ({self._written} rollouts)" def _upload(self, row_id: str, mp4_bytes: bytes) -> None: ...write is called from predict_many's thread pool, so keep it
thread-safe or use locking.