Bulk inference
You have a dataset of starting frames and action sequences — on
Hugging Face, on S3, or on local disk — and you want a predicted
rollout for every row. dream.io Layer 1 covers this with
client.predict_many: one call, async concurrency pool, per-row
retry, optional progress bar, results written to whichever sink you
choose.
This page walks through the full flow against the public
kingJulio/dream-engine-example-frames fixture (16 synthetic GR-1 rows).
When to use this vs client.jobs.create
client.predict_many runs in your process. Your machine stays in
the data path — frames upload through your connection, mp4s download
back to you. It is the right choice when:
- Your dataset is small enough that your machine's bandwidth is not the bottleneck (typically up to a few thousand rows).
- You want to inspect or filter results in the same script.
- You want output on local disk or a private S3 bucket rather than a public HF repo.
client.jobs.create (Phase 5.2, shipping soon) submits the job to
the engine server-side. The engine pulls from HF, runs Modal-parallel,
and pushes to your target HF repo — your machine can go offline. Use
it for large datasets (tens of thousands of rows) or nightly cron
jobs where you don't want to babysit a long-running process.
The Layer 1 SDK contract is compatible: the same IterableSource
types that feed predict_many will feed client.jobs.create when
it ships.
Install
pip install "dream-engine[io,decode]"export DREAM_API_KEY="dre_..."The [io] extra adds the HF / S3 / Parquet deps. The [decode]
extra lets you access rollout.frames as a numpy array — useful for
inspecting output but not required for predict_many itself.
Walk-through
1. Load your source
import dreamfrom dream.io import frames_from_hf, RolloutSink client = dream.Client() src = frames_from_hf( "kingJulio/dream-engine-example-frames", # public; no token needed frame_field="start_frame", actions_field="action_sequence",)print(f"rows_hint: {src.rows_hint}") # 16For your own private dataset, pass token=os.environ["HF_TOKEN"].
For a local directory, use frames_from_dir("./episodes") instead.
2. Estimate cost
estimate = client.estimate_cost(src, spec="dreamdojo-2b-gr1") print(f"rows: {estimate.rows}")print(f"frames_per_row: {estimate.frames_per_row}")print(f"total_frames: {estimate.total_frames}")print(f"total_usd: ${estimate.total_usd:.4f}")Expected output for the 16-row fixture:
rows: 16
frames_per_row: 49
total_frames: 784
total_usd: $0.3920total_usd is None when the source is unbounded streaming and
rows_hint is not set. In that case, either set
src.rows_hint = N directly on the loader (it's a regular
attribute) when you know the count, or skip the estimate.
3. Run the predictions
sink = RolloutSink.dir("./out") result = client.predict_many( src, sink, spec="dreamdojo-2b-gr1", concurrency=4, # up to 4 in-flight requests at once on_error="skip", # skip failed rows; use "halt" to stop on first error progress=True, # tqdm progress bar) print(f"ok: {result.ok}")print(f"failed: {result.failed}")print(f"output_uri: {result.output_uri}")predict_many blocks until all rows are processed (or the pool is
exhausted on on_error="halt"). Each row retries up to 3 times on
429 / 5xx / network errors before being counted as failed.
Auth errors (401/403) and credit errors (402) fail fast —
they aren't retried.
4. Inspect the output
import pyarrow.parquet as pq table = pq.read_table("./out/metadata.parquet")print(table.schema)print(table.to_pandas().head()) # Load one rolloutimport mediapyrow = table.to_pandas().iloc[0]video = mediapy.read_video(f"./out/videos/{row['row_id']}.mp4")mediapy.show_video(video) # Jupyter; or video.shape → (49, 480, 640, 3)Tuning
Concurrency
The default is concurrency=4. Each concurrent request holds one
Modal H100 slot. Raising to 8–16 is fine if you're not hitting rate
limits — the engine autoscales (max_containers=8) and Modal
dispatches across available slots. Monitor for RateLimitError; if
you see them, back off to 4.
Error handling
on_error="skip" (default) records the failure in
result.failures (a list[FailedRow] with row_id, error,
attempts) and increments result.failed. Failed rows are
not written to the sink — only successful rollouts land in
videos/<row_id>.mp4 and the metadata Parquet. To audit which
rows failed, iterate result.failures after the run.
Auth errors (401 / 403) and insufficient-credit errors (402)
always halt regardless of on_error — they would fail every row,
so retrying them is wasted work. In-flight rows complete first;
the sink finalizes best-effort so partial results aren't lost.
on_error="halt" raises dream.DreamError on the first
non-retryable failure (any 4xx other than 429). In-flight rows
complete first; the sink receives whatever completed before the
halt.
Choosing a sink
| Sink | When to use |
|---|---|
RolloutSink.dir("./out") | Local debugging, small runs |
RolloutSink.s3("s3://…") | Your training pipeline already reads S3 |
RolloutSink.hf("my-org/…") | Shareable output; HF-native streaming reads |
What's next
POST /v1/jobs (Phase 5.2 — shipping soon) lets you submit a bulk
job server-side, removing your machine from the data path entirely.
The SDK contract stays compatible: the same IterableSource types
that feed predict_many will feed client.jobs.create.
dream.ioreference — full API reference for all loaders and sinks.- Bulk augmentation example — end-to-end pasteable script.
Client.predict_many— full argument reference.