Dali Dynamic Mode

DALI imperative dynamic mode (`nvidia.dali.experimental.dynamic`, ndd): use when working on ndd code or migrating pipelines; skip pipeline-only tasks.

Published by @NVIDIA·0 agent reads / 30d·0 saves·

DALI Dynamic Mode

Purpose

Guide AI agents in writing, reviewing, and migrating code that uses DALI's imperative dynamic-mode API, nvidia.dali.experimental.dynamic (ndd).

Instructions

  • Import dynamic mode as nvidia.dali.experimental.dynamic as ndd and write code as direct ndd calls in ordinary Python; do not use pipeline-mode APIs such as Pipeline, @pipeline_def, pipe.build(), or pipe.run().
  • Treat readers as stateful: create them once, reuse them across epochs, and pass batch_size to next_epoch(...).
  • Pass explicit batch_size to random ops; there is no pipeline-level batch size to inherit.
  • Use dynamic-mode API conventions: device="gpu" instead of pipeline-mode "mixed", Batch.tensors[...] for sample selection, and Batch.slice[...] for per-sample slicing.

Prerequisites

  • To run or validate code, NVIDIA DALI must be installed with dynamic mode importable as nvidia.dali.experimental.dynamic.
  • GPU decode or GPU operators require a CUDA-capable DALI build and an available NVIDIA GPU/driver.
  • Framework conversion examples require the target framework installed, such as PyTorch for .torch().

Introduction

Dynamic mode is DALI's imperative Python API. It lets code call DALI operators directly from normal Python control flow instead of building and running a pipeline graph.

Core Data Types

Tensor -- single sample

t = ndd.tensor(data)           # copy
t = ndd.as_tensor(data)        # wrap, no copy if possible
t.cpu()                        # move to CPU
t.gpu()                        # move to GPU
t.torch(copy=False)            # conversion to PyTorch tensor with no copy (default)
t[1:3]                         # slicing supported
np.asarray(t)                  # NumPy via __array__ (CPU only)

Supports __dlpack__, __cuda_array_interface__, __array__, arithmetic operators.

Batch -- collection of samples (variable shapes OK)

b = ndd.batch([arr1, arr2])    # copy
b = ndd.as_batch(data)         # wrap, no copy if possible

Batch has no __getitem__ -- batch[i] raises TypeError because indexing is ambiguous (sample selection vs. per-sample slicing). Use the explicit APIs instead:

IntentMethodReturns
Get sample ibatch.select(i)Tensor
Get subset of samplesbatch.select(slice_or_list)Batch
Slice within each samplebatch.slice[...]Batch (same batch_size)

.select() picks which samples. .slice indexes inside each sample.

xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0]       # Batch of 16 scalars, first element from each sample
crop_y = xy.slice[1]       # Batch of 16 scalars, second element from each sample
sample_0 = xy.select(0)    # Tensor, the entire first sample [x, y]

PyTorch conversion:

  • batch.torch() -- works for uniform shapes; raises for ragged batches
  • batch.torch(pad=True) -- zero-pads ragged batches to max shape (use for variable-length audio, detection boxes, etc.)
  • batch.torch(copy=None) is the default (avoids copy if possible)
  • Batch has no __dlpack__ -- use ndd.as_tensor(batch) first for DLPack consumers. ndd.as_tensor supports pad as well.
  • Tensor.torch(copy=False) is default (no copy)

Iteration: for sample in batch: yields Tensors.

Readers

Readers are stateful objects -- create once, reuse across epochs. This matters because readers track internal state like shuffle order and shard position.

reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        # jpegs, labels are Batch objects
        ...

Key points:

  • Reader outputs (jpegs, labels, etc.) are CPU tensors/batches. Labels typically stay on CPU until you convert them for your framework (e.g. labels.torch().to(device)).
  • Reader classes are PascalCase: ndd.readers.File(...), ndd.readers.COCO(...), ndd.readers.TFRecord(...)
  • batch_size goes to next_epoch(), not to the reader constructor
  • next_epoch(batch_size=N) yields tuples of Batch; next_epoch() without batch_size yields tuples of Tensor
  • The iterator from next_epoch() must be fully consumed before calling next_epoch() again
  • Once a reader is used with a given batch_size, it cannot be changed. Similarly, a reader used in batch mode cannot switch to sample mode or vice versa.

Sharded reading for distributed training:

reader = ndd.readers.File(
    file_root=image_dir,
    shard_id=rank, num_shards=world_size,
    stick_to_shard=True,
    pad_last_batch=True,
)

Device Handling

  • Device is inferred from inputs -- GPU if any input is on GPU
  • For hybrid decode: use device="gpu" (NOT "mixed"). The "mixed" keyword is a pipeline-mode concept for implicit CPU-to-GPU transfer; in dynamic mode, passing device="gpu" triggers the same hardware-accelerated decode path.
  • Don't call .cpu() before passing to a GPU model -- .torch() gives you a GPU tensor directly. .cpu() is only needed for consumers requiring host memory (numpy, __array__).
  • CUDA stream sync between DALI and PyTorch is automatic via DLPack -- no manual stream management needed.

Execution Model

Default mode is eager -- async execution in a background thread, returns immediately.

No .evaluate() needed in most cases. Any data consumption (.torch(), __dlpack__, __array__, .shape, property access, iteration) triggers evaluation automatically.

For debugging, switch to synchronous mode so errors surface at the exact call site rather than later in the async queue:

with ndd.EvalMode.sync_full:
    images = ndd.decoders.image(jpegs, device="gpu")
    images = ndd.resize(images, size=[224, 224])
    # Any error surfaces here, at the exact op that failed

Modes (increasing synchronicity): deferred < eager < sync_cpu < sync_full

Use EvalMode.sync_full for debugging instead of scattering .evaluate() calls -- it's cleaner and catches all issues at once. sync_cpu is often sufficient and lighter than sync_full.

Thread Configuration

ndd.set_num_threads(4)  # Call once at startup, only if necessary to override the defaults

Controls DALI's internal worker threads for CPU operators. Defaults to CPU affinity count or DALI_NUM_THREADS env var. Unrelated to Python-level threading.

RNG

Two approaches (use one, not both):

# Approach 1: set the thread-local default seed (simple, good enough for most cases)
ndd.random.set_seed(42)
angles = ndd.random.uniform(batch_size=64, range=(-30, 30))

# Approach 2: explicit RNG object (finer control, pass rng= to each op)
rng = ndd.random.RNG(seed=42)
values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)

When rng= is passed to a random op, the explicit RNG overrides the default seed. Thread-local: each thread has independent random state.

Random ops need an explicit batch_size when working with batches -- there is no pipeline-level batch size to inherit.

Checkpointing

Dynamic mode has no pipeline-level checkpoint. Checkpoints aggregate the state of individual stateful objects: readers and RNG instances. Stateless ops (decoders, resize, rotate, normalize, ...) are not part of a checkpoint.

ckpt = ndd.checkpoint.Checkpoint()
ckpt.register(reader, "my_reader")
ckpt.register(rng, "rng")

# ... iterate for a while ...

ckpt.collect()                       # snapshot the registered objects
ckpt.save("ckpt_{seq:04d}.json")     # writes ckpt_0000.json, ckpt_0001.json, ...

Restoring is the symmetric operation -- build a fresh reader and RNG, then load + register. The loaded state is applied to each object at register time:

reader = ndd.readers.File(file_root=..., enable_checkpointing=True, name="my_reader")
rng = ndd.random.RNG()

ckpt = ndd.checkpoint.Checkpoint()
ckpt.load("ckpt_{seq:04d}.json")     # picks the highest sequence number
ckpt.register(reader, "my_reader")   # state applied here
ckpt.register(rng, "rng")            # ditto

for batch in reader.next_epoch(batch_size=N):
    ...  # produces the next batch after the checkpointed iteration

Key rules:

  • Readers must opt in. Construct with enable_checkpointing=True. Registering an already-iterated reader without it raises RuntimeError; if the reader has not been iterated yet, register enables it retroactively.
  • Reader state must be applied before the first next_epoch call. The prefetch thread starts on first iteration and the snapshot queue is locked after that. set_state (or a register from a loaded checkpoint) on an already-iterated reader raises RuntimeError.
  • enable_checkpointing=True is incompatible with compile=True. Calling reader.next_epoch(..., compile=True) on a checkpointing-enabled reader raises NotImplementedError.
  • Named registration is safer. Anonymous register(op) uses sequential keys (__op_0, __op_1, ...) so the registration order must match between save and restore. Type tags catch cross-type swaps but not reorders of compatible types. Prefer register(op, name).
  • ndd.checkpoint.current() returns the Checkpoint bound to the current thread-local EvalContext. It's shared across calls -- call ckpt.clear() if reusing the default context for unrelated runs.
  • Filename pattern: save/load take a Python format string with a single {seq} placeholder (e.g. "ckpt_{seq:04d}.json"). save picks the next free sequence; load picks the highest matching one on disk.
  • Format version is strict. deserialize rejects payloads from a different checkpoint format version -- no automatic upgrade.
  • Not thread-safe. One Checkpoint per thread.

Manual get_state / set_state is also available directly on each Reader and RNG -- the Checkpoint aggregator is built on top of it. Use the manual API only when integrating with an external checkpoint system.

Examples

Image Classification Pipeline

import nvidia.dali.experimental.dynamic as ndd

reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        images = ndd.decoders.image(jpegs, device="gpu")
        images = ndd.resize(images, size=[224, 224])
        images = ndd.crop_mirror_normalize(
            images,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        )
        train_step(images.torch(), labels.torch())

Common Mistakes

WrongRightWhy
device="mixed"device="gpu""mixed" is pipeline mode only
batch[i]batch.select(i)Batch has no __getitem__
batch.select(0) for per-sample slicingbatch.slice[0].select() picks samples; .slice slices within each sample
.evaluate() after every opLet consumption trigger eval.torch(), .shape, etc. trigger it automatically
.cpu() before GPU model.torch() directlyAvoids wasteful D2H + H2D round-trip
Recreate reader each epochreader.next_epoch()Readers are stateful -- create once, reuse
ndd.readers.file(...)ndd.readers.File(...)Reader classes are PascalCase
break from next_epoch() loopExhaust iterator or create new readerIterator must be fully consumed before next next_epoch()
No batch_size to random opsndd.random.uniform(batch_size=N, ...)No pipeline-level batch size to inherit
register(reader) after first next_epoch to restoreRegister the freshly built reader before the first iterationReader state can only be applied before the prefetch thread starts
Restoring into a reader built without enable_checkpointing=True after iterationPass enable_checkpointing=True at construction (or register before first iteration)Backend doesn't keep snapshots otherwise

Pipeline Mode Migration

Pipeline ModeDynamic Mode
@pipeline_def / pipe.build() / pipe.run()Direct function calls in a loop
fn.readers.file(...)ndd.readers.File(...) (PascalCase, stateful)
fn.decoders.image(jpegs, device="mixed")ndd.decoders.image(jpegs, device="gpu")
fn.op_name(...)ndd.op_name(...)
Pipeline-level batch_size=64reader.next_epoch(batch_size=64) + random ops batch_size=64
Pipeline-level seed=42ndd.random.set_seed(42) or ndd.random.RNG(seed=42)
Pipeline-level num_threads=4ndd.set_num_threads(4) at startup
output.at(i)batch.select(i)
output.as_cpu()batch.cpu()
pipe.run() returns tuple of TensorListreader.next_epoch(batch_size=N) yields tuples of Batch
Pipeline(..., enable_checkpointing=True) + pipe.checkpoint() / pipeline(checkpoint=...)ndd.checkpoint.Checkpoint + per-object register / collect / save / load; readers opt in with enable_checkpointing=True

Limitations

Dynamic mode is more flexible than pipeline mode, but can have slightly worse performance. For maximum throughput, prefer pipeline mode.

Troubleshooting

  • If errors surface later than the failing call, rerun the block under with ndd.EvalMode.sync_full:.
  • If a reader behaves unexpectedly across epochs, check that it is created once and each next_epoch() iterator is fully consumed.

Bundled with this artifact

5 files

Reference files that ship alongside this artifact. Agents pull these in only when the task needs them.

More on the bench

SKILL0

Tensorflow And Deep Learning Rules

TensorFlow and deep learning rules for building, training, evaluating, and deploying neural network models

data-science-ml+1
0
SKILL0

Fortran Programming Guidelines

Modern Fortran rules for scientific computing, modules, explicit interfaces, kind parameters, memory safety, and testing

software-engineering+1
0
SKILL0

Automl And Hyperparameter Optimization Rules

AutoML and hyperparameter optimization rules for Python ML projects using Ray Tune, Optuna, PyCaret, and time-series AutoML libraries

data-science-ml+1
0