Dream Engines

Bulk inference

You have a dataset of starting frames and action sequences — on Hugging Face, on S3, or on local disk — and you want a predicted rollout for every row. dream.io Layer 1 covers this with client.predict_many: one call, async concurrency pool, per-row retry, optional progress bar, results written to whichever sink you choose.

This page walks through the full flow against the public kingJulio/dream-engine-example-frames fixture (16 synthetic GR-1 rows).

When to use this vs client.jobs.create

client.predict_many runs in your process. Your machine stays in the data path — frames upload through your connection, mp4s download back to you. It is the right choice when:

  • Your dataset is small enough that your machine's bandwidth is not the bottleneck (typically up to a few thousand rows).
  • You want to inspect or filter results in the same script.
  • You want output on local disk or a private S3 bucket rather than a public HF repo.

client.jobs.create (Phase 5.2, shipping soon) submits the job to the engine server-side. The engine pulls from HF, runs Modal-parallel, and pushes to your target HF repo — your machine can go offline. Use it for large datasets (tens of thousands of rows) or nightly cron jobs where you don't want to babysit a long-running process.

The Layer 1 SDK contract is compatible: the same IterableSource types that feed predict_many will feed client.jobs.create when it ships.

Install

BASH
pip install "dream-engine[io,decode]"
export DREAM_API_KEY="dre_..."

The [io] extra adds the HF / S3 / Parquet deps. The [decode] extra lets you access rollout.frames as a numpy array — useful for inspecting output but not required for predict_many itself.


Walk-through

1. Load your source

PYTHON
import dream
from dream.io import frames_from_hf, RolloutSink
client = dream.Client()
src = frames_from_hf(
"kingJulio/dream-engine-example-frames", # public; no token needed
frame_field="start_frame",
actions_field="action_sequence",
)
print(f"rows_hint: {src.rows_hint}") # 16

For your own private dataset, pass token=os.environ["HF_TOKEN"]. For a local directory, use frames_from_dir("./episodes") instead.

2. Estimate cost

PYTHON
estimate = client.estimate_cost(src, spec="dreamdojo-2b-gr1")
print(f"rows: {estimate.rows}")
print(f"frames_per_row: {estimate.frames_per_row}")
print(f"total_frames: {estimate.total_frames}")
print(f"total_usd: ${estimate.total_usd:.4f}")

Expected output for the 16-row fixture:

rows:           16
frames_per_row: 49
total_frames:   784
total_usd:      $0.3920

total_usd is None when the source is unbounded streaming and rows_hint is not set. In that case, either set src.rows_hint = N directly on the loader (it's a regular attribute) when you know the count, or skip the estimate.

3. Run the predictions

PYTHON
sink = RolloutSink.dir("./out")
result = client.predict_many(
src,
sink,
spec="dreamdojo-2b-gr1",
concurrency=4, # up to 4 in-flight requests at once
on_error="skip", # skip failed rows; use "halt" to stop on first error
progress=True, # tqdm progress bar
)
print(f"ok: {result.ok}")
print(f"failed: {result.failed}")
print(f"output_uri: {result.output_uri}")

predict_many blocks until all rows are processed (or the pool is exhausted on on_error="halt"). Each row retries up to 3 times on 429 / 5xx / network errors before being counted as failed. Auth errors (401/403) and credit errors (402) fail fast — they aren't retried.

4. Inspect the output

PYTHON
import pyarrow.parquet as pq
table = pq.read_table("./out/metadata.parquet")
print(table.schema)
print(table.to_pandas().head())
# Load one rollout
import mediapy
row = table.to_pandas().iloc[0]
video = mediapy.read_video(f"./out/videos/{row['row_id']}.mp4")
mediapy.show_video(video) # Jupyter; or video.shape → (49, 480, 640, 3)

Tuning

Concurrency

The default is concurrency=4. Each concurrent request holds one Modal H100 slot. Raising to 8–16 is fine if you're not hitting rate limits — the engine autoscales (max_containers=8) and Modal dispatches across available slots. Monitor for RateLimitError; if you see them, back off to 4.

Error handling

on_error="skip" (default) records the failure in result.failures (a list[FailedRow] with row_id, error, attempts) and increments result.failed. Failed rows are not written to the sink — only successful rollouts land in videos/<row_id>.mp4 and the metadata Parquet. To audit which rows failed, iterate result.failures after the run.

Auth errors (401 / 403) and insufficient-credit errors (402) always halt regardless of on_error — they would fail every row, so retrying them is wasted work. In-flight rows complete first; the sink finalizes best-effort so partial results aren't lost.

on_error="halt" raises dream.DreamError on the first non-retryable failure (any 4xx other than 429). In-flight rows complete first; the sink receives whatever completed before the halt.

Choosing a sink

SinkWhen to use
RolloutSink.dir("./out")Local debugging, small runs
RolloutSink.s3("s3://…")Your training pipeline already reads S3
RolloutSink.hf("my-org/…")Shareable output; HF-native streaming reads

What's next

POST /v1/jobs (Phase 5.2 — shipping soon) lets you submit a bulk job server-side, removing your machine from the data path entirely. The SDK contract stays compatible: the same IterableSource types that feed predict_many will feed client.jobs.create.