DALI Dynamic Mode
Purpose
Guide AI agents in writing, reviewing, and migrating code that uses DALI's imperative dynamic-mode API, nvidia.dali.experimental.dynamic (ndd).
Instructions
- Import dynamic mode as
nvidia.dali.experimental.dynamic as nddand write code as directnddcalls in ordinary Python; do not use pipeline-mode APIs such asPipeline,@pipeline_def,pipe.build(), orpipe.run(). - Treat readers as stateful: create them once, reuse them across epochs, and pass
batch_sizetonext_epoch(...). - Pass explicit
batch_sizeto random ops; there is no pipeline-level batch size to inherit. - Use dynamic-mode API conventions:
device="gpu"instead of pipeline-mode"mixed",Batch.tensors[...]for sample selection, andBatch.slice[...]for per-sample slicing.
Prerequisites
- To run or validate code, NVIDIA DALI must be installed with dynamic mode importable as
nvidia.dali.experimental.dynamic. - GPU decode or GPU operators require a CUDA-capable DALI build and an available NVIDIA GPU/driver.
- Framework conversion examples require the target framework installed, such as PyTorch for
.torch().
Introduction
Dynamic mode is DALI's imperative Python API. It lets code call DALI operators directly from normal Python control flow instead of building and running a pipeline graph.
Core Data Types
Tensor -- single sample
t = ndd.tensor(data) # copy
t = ndd.as_tensor(data) # wrap, no copy if possible
t.cpu() # move to CPU
t.gpu() # move to GPU
t.torch(copy=False) # conversion to PyTorch tensor with no copy (default)
t[1:3] # slicing supported
np.asarray(t) # NumPy via __array__ (CPU only)
Supports __dlpack__, __cuda_array_interface__, __array__, arithmetic operators.
Batch -- collection of samples (variable shapes OK)
b = ndd.batch([arr1, arr2]) # copy
b = ndd.as_batch(data) # wrap, no copy if possible
Batch has no __getitem__ -- batch[i] raises TypeError because indexing is ambiguous (sample selection vs. per-sample slicing). Use the explicit APIs instead:
| Intent | Method | Returns |
|---|---|---|
| Get sample i | batch.select(i) | Tensor |
| Get subset of samples | batch.select(slice_or_list) | Batch |
| Slice within each sample | batch.slice[...] | Batch (same batch_size) |
.select() picks which samples. .slice indexes inside each sample.
xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0] # Batch of 16 scalars, first element from each sample
crop_y = xy.slice[1] # Batch of 16 scalars, second element from each sample
sample_0 = xy.select(0) # Tensor, the entire first sample [x, y]
PyTorch conversion:
batch.torch()-- works for uniform shapes; raises for ragged batchesbatch.torch(pad=True)-- zero-pads ragged batches to max shape (use for variable-length audio, detection boxes, etc.)batch.torch(copy=None)is the default (avoids copy if possible)- Batch has no
__dlpack__-- usendd.as_tensor(batch)first for DLPack consumers.ndd.as_tensorsupportspadas well. Tensor.torch(copy=False)is default (no copy)
Iteration: for sample in batch: yields Tensors.
Readers
Readers are stateful objects -- create once, reuse across epochs. This matters because readers track internal state like shuffle order and shard position.
reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)
for epoch in range(num_epochs):
for jpegs, labels in reader.next_epoch(batch_size=64):
# jpegs, labels are Batch objects
...
Key points:
- Reader outputs (jpegs, labels, etc.) are CPU tensors/batches. Labels typically stay on CPU until you convert them for your framework (e.g.
labels.torch().to(device)). - Reader classes are PascalCase:
ndd.readers.File(...),ndd.readers.COCO(...),ndd.readers.TFRecord(...) batch_sizegoes tonext_epoch(), not to the reader constructornext_epoch(batch_size=N)yields tuples ofBatch;next_epoch()without batch_size yields tuples ofTensor- The iterator from
next_epoch()must be fully consumed before callingnext_epoch()again - Once a reader is used with a given batch_size, it cannot be changed. Similarly, a reader used in batch mode cannot switch to sample mode or vice versa.
Sharded reading for distributed training:
reader = ndd.readers.File(
file_root=image_dir,
shard_id=rank, num_shards=world_size,
stick_to_shard=True,
pad_last_batch=True,
)
Device Handling
- Device is inferred from inputs -- GPU if any input is on GPU
- For hybrid decode: use
device="gpu"(NOT"mixed"). The"mixed"keyword is a pipeline-mode concept for implicit CPU-to-GPU transfer; in dynamic mode, passingdevice="gpu"triggers the same hardware-accelerated decode path. - Don't call
.cpu()before passing to a GPU model --.torch()gives you a GPU tensor directly..cpu()is only needed for consumers requiring host memory (numpy,__array__). - CUDA stream sync between DALI and PyTorch is automatic via DLPack -- no manual stream management needed.
Execution Model
Default mode is eager -- async execution in a background thread, returns immediately.
No .evaluate() needed in most cases. Any data consumption (.torch(), __dlpack__, __array__, .shape, property access, iteration) triggers evaluation automatically.
For debugging, switch to synchronous mode so errors surface at the exact call site rather than later in the async queue:
with ndd.EvalMode.sync_full:
images = ndd.decoders.image(jpegs, device="gpu")
images = ndd.resize(images, size=[224, 224])
# Any error surfaces here, at the exact op that failed
Modes (increasing synchronicity): deferred < eager < sync_cpu < sync_full
Use EvalMode.sync_full for debugging instead of scattering .evaluate() calls -- it's cleaner and catches all issues at once. sync_cpu is often sufficient and lighter than sync_full.
Thread Configuration
ndd.set_num_threads(4) # Call once at startup, only if necessary to override the defaults
Controls DALI's internal worker threads for CPU operators. Defaults to CPU affinity count or DALI_NUM_THREADS env var. Unrelated to Python-level threading.
RNG
Two approaches (use one, not both):
# Approach 1: set the thread-local default seed (simple, good enough for most cases)
ndd.random.set_seed(42)
angles = ndd.random.uniform(batch_size=64, range=(-30, 30))
# Approach 2: explicit RNG object (finer control, pass rng= to each op)
rng = ndd.random.RNG(seed=42)
values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)
When rng= is passed to a random op, the explicit RNG overrides the default seed. Thread-local: each thread has independent random state.
Random ops need an explicit batch_size when working with batches -- there is no pipeline-level batch size to inherit.
Checkpointing
Dynamic mode has no pipeline-level checkpoint. Checkpoints aggregate the state of individual stateful objects: readers and RNG instances. Stateless ops (decoders, resize, rotate, normalize, ...) are not part of a checkpoint.
ckpt = ndd.checkpoint.Checkpoint()
ckpt.register(reader, "my_reader")
ckpt.register(rng, "rng")
# ... iterate for a while ...
ckpt.collect() # snapshot the registered objects
ckpt.save("ckpt_{seq:04d}.json") # writes ckpt_0000.json, ckpt_0001.json, ...
Restoring is the symmetric operation -- build a fresh reader and RNG, then load + register. The loaded state is applied to each object at register time:
reader = ndd.readers.File(file_root=..., enable_checkpointing=True, name="my_reader")
rng = ndd.random.RNG()
ckpt = ndd.checkpoint.Checkpoint()
ckpt.load("ckpt_{seq:04d}.json") # picks the highest sequence number
ckpt.register(reader, "my_reader") # state applied here
ckpt.register(rng, "rng") # ditto
for batch in reader.next_epoch(batch_size=N):
... # produces the next batch after the checkpointed iteration
Key rules:
- Readers must opt in. Construct with
enable_checkpointing=True. Registering an already-iterated reader without it raisesRuntimeError; if the reader has not been iterated yet,registerenables it retroactively. - Reader state must be applied before the first
next_epochcall. The prefetch thread starts on first iteration and the snapshot queue is locked after that.set_state(or aregisterfrom a loaded checkpoint) on an already-iterated reader raisesRuntimeError. enable_checkpointing=Trueis incompatible withcompile=True. Callingreader.next_epoch(..., compile=True)on a checkpointing-enabled reader raisesNotImplementedError.- Named registration is safer. Anonymous
register(op)uses sequential keys (__op_0,__op_1, ...) so the registration order must match between save and restore. Type tags catch cross-type swaps but not reorders of compatible types. Preferregister(op, name). ndd.checkpoint.current()returns theCheckpointbound to the current thread-localEvalContext. It's shared across calls -- callckpt.clear()if reusing the default context for unrelated runs.- Filename pattern:
save/loadtake a Python format string with a single{seq}placeholder (e.g."ckpt_{seq:04d}.json").savepicks the next free sequence;loadpicks the highest matching one on disk. - Format version is strict.
deserializerejects payloads from a different checkpoint format version -- no automatic upgrade. - Not thread-safe. One
Checkpointper thread.
Manual get_state / set_state is also available directly on each Reader and RNG -- the Checkpoint aggregator is built on top of it. Use the manual API only when integrating with an external checkpoint system.
Examples
Image Classification Pipeline
import nvidia.dali.experimental.dynamic as ndd
reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)
for epoch in range(num_epochs):
for jpegs, labels in reader.next_epoch(batch_size=64):
images = ndd.decoders.image(jpegs, device="gpu")
images = ndd.resize(images, size=[224, 224])
images = ndd.crop_mirror_normalize(
images,
mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
)
train_step(images.torch(), labels.torch())
Common Mistakes
| Wrong | Right | Why |
|---|---|---|
device="mixed" | device="gpu" | "mixed" is pipeline mode only |
batch[i] | batch.select(i) | Batch has no __getitem__ |
batch.select(0) for per-sample slicing | batch.slice[0] | .select() picks samples; .slice slices within each sample |
.evaluate() after every op | Let consumption trigger eval | .torch(), .shape, etc. trigger it automatically |
.cpu() before GPU model | .torch() directly | Avoids wasteful D2H + H2D round-trip |
| Recreate reader each epoch | reader.next_epoch() | Readers are stateful -- create once, reuse |
ndd.readers.file(...) | ndd.readers.File(...) | Reader classes are PascalCase |
break from next_epoch() loop | Exhaust iterator or create new reader | Iterator must be fully consumed before next next_epoch() |
No batch_size to random ops | ndd.random.uniform(batch_size=N, ...) | No pipeline-level batch size to inherit |
register(reader) after first next_epoch to restore | Register the freshly built reader before the first iteration | Reader state can only be applied before the prefetch thread starts |
Restoring into a reader built without enable_checkpointing=True after iteration | Pass enable_checkpointing=True at construction (or register before first iteration) | Backend doesn't keep snapshots otherwise |
Pipeline Mode Migration
| Pipeline Mode | Dynamic Mode |
|---|---|
@pipeline_def / pipe.build() / pipe.run() | Direct function calls in a loop |
fn.readers.file(...) | ndd.readers.File(...) (PascalCase, stateful) |
fn.decoders.image(jpegs, device="mixed") | ndd.decoders.image(jpegs, device="gpu") |
fn.op_name(...) | ndd.op_name(...) |
Pipeline-level batch_size=64 | reader.next_epoch(batch_size=64) + random ops batch_size=64 |
Pipeline-level seed=42 | ndd.random.set_seed(42) or ndd.random.RNG(seed=42) |
Pipeline-level num_threads=4 | ndd.set_num_threads(4) at startup |
output.at(i) | batch.select(i) |
output.as_cpu() | batch.cpu() |
pipe.run() returns tuple of TensorList | reader.next_epoch(batch_size=N) yields tuples of Batch |
Pipeline(..., enable_checkpointing=True) + pipe.checkpoint() / pipeline(checkpoint=...) | ndd.checkpoint.Checkpoint + per-object register / collect / save / load; readers opt in with enable_checkpointing=True |
Limitations
Dynamic mode is more flexible than pipeline mode, but can have slightly worse performance. For maximum throughput, prefer pipeline mode.
Troubleshooting
- If errors surface later than the failing call, rerun the block under
with ndd.EvalMode.sync_full:. - If a reader behaves unexpectedly across epochs, check that it is created once and each
next_epoch()iterator is fully consumed.