Understanding PyTorch's Distributed Checkpointing: lookup_object() in DefaultSavePlanner


Distributed Checkpoints in PyTorch

DCP facilitates saving and loading models across multiple processes (ranks) running in parallel during distributed training. It offers several advantages over traditional saving methods like torch.save:

  • Flexibility
    DCP seamlessly handles various distributed tensor types like ShardedTensor and DTensor.
  • Load-Time Resharding
    Models can be saved in one cluster configuration and loaded into another with different settings.
  • Parallelism
    Each rank saves only its local shards, improving efficiency.

DefaultSavePlanner and lookup_object()

The DefaultSavePlanner class plays a crucial role in managing how objects are saved during checkpointing. It's responsible for determining the appropriate saving strategy for each object encountered within the model's state dictionary.

The lookup_object(name, state_dict) method is an internal function of DefaultSavePlanner that serves the following purposes:

  1. Object Identification
    Given an object name (name) and the model's state dictionary (state_dict), it retrieves the actual object associated with that name. This object could be a parameter, optimizer, or any custom stateful object included in the state dictionary.
  2. Planner Selection
    Based on the type of the retrieved object, lookup_object() might delegate the saving logic to a more specialized planner if necessary. For instance, if the object is a ShardedTensor, a planner tailored for sharded tensors might be used.
  3. Default Saving
    If no specific planner is required, lookup_object() likely relies on the default saving mechanism within DefaultSavePlanner. This might involve converting the object's state to a format suitable for serialization and storage.

In Summary

  • It might delegate to specialized planners or handle the saving process itself based on the object's type.
  • It's responsible for identifying objects in the model's state dictionary and determining the appropriate saving strategy for each object.
  • torch.distributed.checkpoint.DefaultSavePlanner.lookup_object() is an internal function within the Distributed Checkpoint framework in PyTorch.
  • If you need to customize the saving behavior for specific objects, you can potentially subclass DefaultSavePlanner and override its methods. However, this is an advanced technique and requires a deeper understanding of the Distributed Checkpoint API.


import torch
import torch.distributed as dist
from torch.distributed.checkpoint import DefaultSavePlanner

def train_model(model, optimizer, rank, world_size):
    # ... (Your training loop here)

    # Save checkpoint using DCP
    if rank == 0:  # Only rank 0 performs the save operation
        state_dict = model.state_dict()
        optimizer_state_dict = optimizer.state_dict()

        # Create a DefaultSavePlanner instance
        save_planner = DefaultSavePlanner()

        # Internal lookup_object() is called here
        for name, tensor in state_dict.items():
            save_planner.lookup_object(name, state_dict)  # Internal lookup and planning

        # DCP will handle saving based on the planner's decisions
        dist.barrier()  # Ensure all processes are ready
        dist.checkpoint(state_dict=state_dict, optimizer_state_dict=optimizer_state_dict, save_fn=save_planner)

if __name__ == "__main__":
    # ... (Distributed initialization code here)

    model = MyModel()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    train_model(model, optimizer, rank=dist.get_rank(), world_size=dist.get_world_size())
  1. The train_model function handles training and checkpointing.
  2. Checkpointing only happens on rank 0 to avoid redundancy.
  3. We create a DefaultSavePlanner instance.
  4. During state dictionary iteration, DefaultSavePlanner.lookup_object() is called internally for each object (parameter, optimizer state, etc.). This is where the planner identifies the object type and determines the saving strategy.
  5. The actual saving logic is handled by dist.checkpoint, which interacts with the planner to save the model and optimizer states across ranks.
  • You cannot directly call lookup_object() as it's an internal method.
  • This is a simplified example. The actual implementation of lookup_object() is more intricate and depends on the specific object type.


  1. Custom Save Function

    • Provide a custom function to the save_fn argument in dist.checkpoint. This function takes the state dictionary and a torch.distributed.checkpoint.SaveContext object as arguments.
    • Within your custom function, you can iterate through the state dictionary and handle saving for each object based on its type.
    • This approach gives you full control over the saving logic for all objects.
    import torch.distributed as dist
    from torch.distributed.checkpoint import SaveContext
    
    def custom_save_fn(state_dict, save_context):
        for name, tensor in state_dict.items():
            # Handle saving based on object type (e.g., special logic for custom objects)
            if isinstance(tensor, torch.Tensor):
                # Use torch.save or specialized saving for tensors
                pass
            else:
                # Custom logic for non-tensor objects
                pass
        # Signal completion to save_context
        save_context.finished()
    
    # ... (Training code)
    
    dist.checkpoint(state_dict=state_dict, save_fn=custom_save_fn)
    
  2. Subclassing DefaultSavePlanner

    • Create a subclass of DefaultSavePlanner.
    • Override specific methods like save_tensor() and save_non_tensor() to customize saving behavior for different object types.
    • This approach allows you to modify existing planner logic while leveraging its core functionality.

    Example (Simplified)

    from torch.distributed.checkpoint import DefaultSavePlanner
    
    class CustomSavePlanner(DefaultSavePlanner):
        def save_tensor(self, name, tensor, save_context):
            # Implement custom logic for saving tensors (e.g., compression)
            super().save_tensor(name, tensor, save_context)
    
    # ... (Training code)
    
    save_planner = CustomSavePlanner()
    dist.checkpoint(state_dict=state_dict, save_fn=save_planner)