cpu.config_parser

class cpu.config_parser.ConfigArgumentParser(*args, **kwargs)[source]

Argument parser that supports loading a YAML configuration file.

A small issue: config file values are processed using ArgumentParser.set_defaults() which means required and choices are not handled as expected. For example, if you specify a required value in a config file, you still have to specify it again on the command line.

If this issue matters, the ConfigArgParse library can be used as a substitute.

add_argument(*args, **kwargs)[source]

Same as ArgumentParser.add_argument().

parse_args(args=None)[source]

Same as ArgumentParser.parse_args().

cpu.config_parser.save_args(args: Namespace, filepath: str, rank: int = 0) None[source]

If in master process, save args to a YAML file. Otherwise, do nothing.

Parameters:
  • args (Namespace) – The parsed arguments to be saved.

  • filepath (str) – A filepath ends with .yaml.

  • rank (int) – Process rank in the distributed training. Defaults to 0.

cpu.history_buffer

class cpu.history_buffer.HistoryBuffer(window_size: int = 20)[source]

The class tracks a series of values and provides access to the smoothed value over a window or the global average / sum of the sequence.

Parameters:

window_size (int) – The maximal number of values that can be stored in the buffer. Defaults to 20.

Example:

>>> his_buf = HistoryBuffer()
>>> his_buf.update(0.1)
>>> his_buf.update(0.2)
>>> his_buf.avg
0.15
property avg: float

The average over the window.

property global_avg: float

The global average of the queue.

property global_sum: float

The global sum of the queue.

property latest: float

The latest value of the queue.

update(value: float) None[source]

Add a new scalar value. If the length of queue exceeds window_size, the oldest element will be removed from the queue.

cpu.logger

cpu.logger.setup_logger(name: str | None = None, output_dir: str | None = None, rank: int = 0, log_level: int = 20, color: bool = True) Logger[source]

Initialize the logger.

If the logger has not been initialized, this method will initialize the logger by adding one or two handlers, otherwise the initialized logger will be directly returned. During initialization, only the logger of the master process is added console handler. If output_dir is specified, all loggers will be added file handler.

Parameters:
  • name (str) – Logger name. Defaults to None to setup root logger.

  • output_dir (str) – The directory to save log.

  • rank (int) – Process rank in the distributed training. Defaults to 0.

  • log_level (int) – Verbosity level of the logger. Defaults to logging.INFO.

  • color (bool) – If True, color the output. Defaults to True.

Returns:

A initialized logger.

Return type:

logging.Logger

cpu.lr_scheduler

class cpu.lr_scheduler.LRWarmupScheduler(torch_scheduler: _LRScheduler, by_epoch: bool = True, epoch_len: int | None = None, warmup_t: int = 0, warmup_by_epoch: bool = False, warmup_mode: str = 'fix', warmup_init_lr: float | None = None, warmup_factor: float | None = None)[source]

This class wraps the standard PyTorch LR scheduler to support warmup.

The usage is demonstrated in the following snippet:

torch_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3)
warmup_scheduler = LRWarmupScheduler(torch_scheduler)
for epoch in range(max_epochs):
    for iter in range(epoch_len):
        train_one_iter()
        # call iter_update() after each iteration
        warmup_scheduler.iter_update()
    # call epoch_update() after each epoch
    warmup_scheduler.epoch_update()
Parameters:
  • torch_scheduler (_LRScheduler) –

  • by_epoch (bool) – If True, the torch_scheduler is epoch-based, else iteration-based. Defaults to True.

  • epoch_len (int) – The number of iterations in an epoch. Required only when by_epoch=True & warmup_by_epoch=False.

  • warmup_t (int) – How many iterations / epochs in warmup stage. If warmup_by_epoch=True, “t” means epoch, else iteration. Defaults to 0 to disable warmup.

  • warmup_by_epoch (bool) – If True, perform warmup at each epoch end, else iteration end. Defaults to False.

  • warmup_mode (str) – “fix”, “auto”, or “factor”. Defaults to “fix”.

  • warmup_init_lr (float) – The initial warmup lr. Required in “fix” mode. Defaults to None.

  • warmup_factor (float) – The factor of initial warmup lr relative to base lr. Required in “auto” and “factor” mode. Defaults to None.

epoch_update(metric: float | None = None) None[source]

Prepare the learning rate for the next epoch. The method should be called after finishing each epoch.

Parameters:

metric (float) – Metric value used in ReduceLROnPlateau. Defaults to None.

iter_update() None[source]

Prepare the learning rate for the next iteration. The method should be called after finishing each iteration.

load_state_dict(state_dict: Dict[str, Any]) None[source]

Loads the scheduler state.

Parameters:

state_dict (dict) – scheduler state. Should be an object returned from a call to state_dict().

state_dict() Dict[str, Any][source]

Returns the state of the scheduler as a dict.

cpu.trainer

class cpu.trainer.Trainer(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler, data_loader: DataLoader, unpack_batch_dict: bool = False, max_epochs: int = 0, max_iters: int = 0, work_dir: str = 'work_dir', max_num_checkpoints: int | None = None, checkpoint_period: int = 1, log_period: int = 50, clip_grad_norm: float = 0.0, enable_amp: bool = False, by_epoch: bool = True, warmup_t: int = 0, warmup_by_epoch: bool = False, warmup_mode: str = 'fix', warmup_init_lr: float = 0.0, warmup_factor: float = 0.0)[source]

An epoch-based trainer.

The class implements a simple trainer for the most common type of task: single-cost single-optimizer single-data-source epoch-based optimization. It assumes that every step, you:

  1. Load a batch from the data_loader.

  2. Compute the loss with the batch.

  3. Compute the gradients with the above loss.

  4. Update the model with the optimizer.

All other tasks during training (e.g., lr updating, checkpointing, logging, evaluation) are maintained by hooks, which can be registered by register_hooks().

If you want to do anything fancier than this, subclass this class and implement your own train_one_iter().

Parameters:
  • model (torch.nn.Module) –

  • optimizer (torch.optim.Optimizer) –

  • lr_scheduler (optim.lr_scheduler._LRScheduler) –

  • data_loader (torch.utils.data.DataLoader) – Training data loader.

  • unpack_batch_dict (bool) – Whether to unpack the batch dict returned by the data_loader, i.e., use model(**batch) instead of model(batch). Defaults to False.

  • max_epochs (int) – Total training epochs. If > 0, train by epoch.

  • max_iters (int) – Total training iterations. If > 0, train by iteration.

  • work_dir (str) – The working directory to save checkpoints and logs. Defaults to “work_dir”.

  • max_num_checkpoints (int) – The maximum number of checkpoints to save. If None, save all checkpoints. Defaults to None.

  • checkpoint_period (int) – The period to save checkpoint. Defaults to 1.

  • log_period (int) – The period (iter-based) to log. Defaults to 50.

  • clip_grad_norm (float) – Max norm of the gradients. If <= 0, will not clip gradients. Defaults to 0.

  • enable_amp (bool) – Enable the Automatic Mixed Precision (AMP) training. Defaults to False.

  • by_epoch – Refer to the documentation of cpu.lr_scheduler.LRWarmupScheduler.

  • warmup_t – Refer to the documentation of cpu.lr_scheduler.LRWarmupScheduler.

  • warmup_by_epoch – Refer to the documentation of cpu.lr_scheduler.LRWarmupScheduler.

  • warmup_mode – Refer to the documentation of cpu.lr_scheduler.LRWarmupScheduler.

  • warmup_init_lr – Refer to the documentation of cpu.lr_scheduler.LRWarmupScheduler.

  • warmup_factor – Refer to the documentation of cpu.lr_scheduler.LRWarmupScheduler.

Example:

# create your model / optimizer / lr_scheduler / data_loader before using the trainer
model = ...
optimizer = ...
lr_scheduler = ...
data_loader = ...
# train 100 epochs
trainer = Trainer(model, optimizer, lr_scheduler, data_loader, max_epochs=100)
trainer.train()
property ckpt_dir: str

The directory to save checkpoints. Overwrite this method to change the path.

property cur_epoch: int

The current epoch, ranged in [0, max_epochs - 1].

property hook_info: List[str]

The names of all registered hooks.

property inner_iter: int

The iteration within the epoch, ranged in [0, epoch_len - 1].

load_checkpoint(path: str | None = None, auto_resume: bool = False)[source]

Load the given checkpoint or resume from the latest checkpoint.

Parameters:
  • path (str) – Path to the checkpoint to load.

  • auto_resume (bool) – If True, automatically resume from the latest checkpoint.

log(*args, **kwargs) None[source]

Update the metrics stored in self.trainer.metric_storage.

property lr: float

The learning rate of the first parameter group.

property model_or_module: Module

The model not wrapped by DistributedDataParallel.

register_hook(hook: HookBase) None[source]

Register a hook to the trainer.

For hooks with the same priority, they are executed in the order they are registered.

Parameters:

hook (HookBase) – The hook to be registered.

register_hooks(hooks: List[HookBase]) None[source]

Register hooks to the trainer.

Parameters:

hooks (list[HookBase]) – List of hooks to be registered.

save_checkpoint(file_name: str) None[source]

Save training state: epoch, num_gpus, model, optimizer, lr_scheduler, metric_storage, hooks (optional), grad_scaler (optional).

Parameters:

filename (str) – The checkpoint will be saved as ckpt_dir/filename.

property tb_log_dir: str

The directory to save tensorboard files. Overwrite this method to change the path.

train(resume_from_checkpoint: str | None = None, auto_resume: bool = True) None[source]

Start training.

If resume_from_checkpoint is specified, resume from the given checkpoint. Otherwise, auto resume from the latest checkpoint.

Parameters:
  • resume_from_checkpoint (str) – Path to the checkpoint. Defaults to None.

  • auto_resume (bool) – Defaults to True.

train_one_iter() None[source]

Train one iteration.

Subclass cpu.trainer.Trainer and implement your own train_one_iter() to do something fancier.

cpu.hooks

class cpu.hooks.CheckpointHook(period: int, max_to_keep: int | None = None)[source]

Save checkpoint periodically.

Save checkpoint, if current epoch / iteration is a multiple of period or max_epochs / max_iters is reached.

Parameters:
  • period (int) – Save checkpoint every period epochs.

  • max_to_keep (int) – Maximum number of most current checkpoints to keep, previous checkpoints will be deleted. If None, save all checkpoints.

after_epoch() None[source]

Called after each epoch.

after_iter() None[source]

Called after each iteration.

class cpu.hooks.DistributedHook[source]

Call DistributedSampler.set_epoch() before each epoch.

before_epoch() None[source]

Called before each epoch.

class cpu.hooks.EvalHook(period: int, eval_func: Callable)[source]

Run an evaluation function periodically.

It is executed every period epochs / iterations and after the last epoch / iteration.

Parameters:
  • period (int) – The period to run eval_func. Set to 0 to not evaluate periodically, but still after the last epoch / iteration.

  • eval_func (callable) – A function which takes no arguments, and returns a dict of evaluation metrics.

after_epoch() None[source]

Called after each epoch.

after_iter() None[source]

Called after each iteration.

class cpu.hooks.HookBase[source]

Base class for hooks.

Hooks can be registered in cpu.trainer.Trainer. Each hook can implement 6 methods (before_train(), after_train(), before_epoch(), after_epoch(), before_iter(), after_iter()). The way they are called is demonstrated in the following snippet:

hook.before_train()
for epoch in range(start_epoch, max_epochs):
    hook.before_epoch()
    for iter in range(epoch_len):
        hook.before_iter()
        train_one_iter()
        hook.after_iter()
    hook.after_epoch()
hook.after_train()

In the hook method, users can access self.trainer to access more properties about the context (e.g., model, optimizer, current epoch).

Each hook has a priority, which is an integer from 1 to 10. The smaller the number, the higher the priority. Hooks are executed in order of priority from high to low. If two hooks have the same priority, they are executed in the order they are registered.

after_epoch() None[source]

Called after each epoch.

after_iter() None[source]

Called after each iteration.

after_train() None[source]

Called after the last epoch.

before_epoch() None[source]

Called before each epoch.

before_iter() None[source]

Called before each iteration.

before_train() None[source]

Called before the first epoch.

property checkpointable: bool

A hook is checkpointable when it implements state_dict() method. Its state will be saved into checkpoint.

property class_name: str

The class name of the hook.

class cpu.hooks.LRUpdateHook[source]

Adjust learning rate after each epoch and iteration.

To use ReduceLROnPlateau scheduler, user should register an EvalHook which returns a dict containing ‘Eval Metric’ field. The EvalHook should be called after each epoch (i.e., set period=1), and before the LRUpdateHook.

after_epoch() None[source]

Called after each epoch.

after_iter() None[source]

Called after each iteration.

class cpu.hooks.LoggerHook(period: int = 50, tb_log_dir: str = 'log_dir', **kwargs)[source]

Write metrics to console and tensorboard files. The hook has the lowest priority (level 10).

Parameters:
  • period (int) – The period to write metrics. Defaults to 50.

  • tb_log_dir (str) – The directory to save the tensorboard files. Defaults to “log_dir”.

  • kwargs – Other arguments passed to torch.utils.tensorboard.SummaryWriter.

after_epoch() None[source]

Called after each epoch.

after_iter() None[source]

Called after each iteration.

after_train() None[source]

Called after the last epoch.

before_train() None[source]

Called before the first epoch.

cpu.distributed

The code of this module is modified from:

cpu.distributed.all_gather(data: Any, group: ProcessGroup | None = None) List[Any][source]

Run all_gather() on arbitrary picklable data (not necessarily tensors).

Parameters:
  • data – Any picklable object.

  • group (ProcessGroup) – A torch process group. By default, will use a group which contains all ranks on gloo backend.

Returns:

List of data gathered from each rank.

Return type:

list[data]

cpu.distributed.gather(data: Any, dst: int = 0, group: ProcessGroup | None = None) List[Any][source]

Run gather() on arbitrary picklable data (not necessarily tensors).

Parameters:
  • data – Any picklable object.

  • dst (int) – Destination rank.

  • group (ProcessGroup) – A torch process group. By default, will use a group which contains all ranks on gloo backend.

Returns:

On dst, a list of data gathered from each rank. Otherwise, an empty list.

Return type:

list[data]

cpu.distributed.get_rank() int[source]

Return the rank of the current process in the current process group.

cpu.distributed.get_world_size() int[source]

Return the number of processes in the current process group.

cpu.distributed.init_distributed(auto: bool = False) Tuple[int][source]

Initialize the distributed mode as follows:

  • Initialize the process group, with backend="nccl" and init_method="env://".

  • Set correct cuda device.

  • Disable printing when not in master process.

Parameters:

auto (bool) – If True, when MASTER_PORT is not free, automatically find a free one. Defaults to False.

Returns:

(rank, local_rank, world_size)

Return type:

tuple

cpu.distributed.is_main_process() bool[source]

Return if the current process is the master process or not.

cpu.distributed.reduce_dict(input_dict: Dict[str, Tensor], average: bool = True) Dict[str, Tensor][source]

Reduce the values in the dictionary from all processes so that all processes have the averaged results.

Parameters:
  • input_dict (dict) – All the values will be reduced.

  • average (bool) – Whether to do average or sum.

Returns:

A dict with the same fields as input_dict, after reduction.

Return type:

dict

cpu.distributed.setup_print_for_distributed(is_master: bool) None[source]

This function disables printing when not in master process.

Parameters:

is_master (bool) – If the current process is the master process or not.

cpu.misc

cpu.misc.collect_env() str[source]

Collect the information of the running environments.

The following information are contained.

  • sys.platform: The value of sys.platform.

  • Python: Python version.

  • Numpy: Numpy version.

  • CUDA available: Bool, indicating if CUDA is available.

  • GPU devices: Device type of each GPU.

  • PyTorch: PyTorch version.

  • TorchVision (optional): TorchVision version.

  • OpenCV (optional): OpenCV version.

Returns:

A string describing the running environment.

Return type:

str

cpu.misc.set_random_seed(seed: int | None = None, deterministic: bool = False) None[source]

Set random seed.

Parameters:
  • seed (int) – If None or negative, use a generated seed.

  • deterministic (bool) – If True, set the deterministic option for CUDNN backend.

Create a symlink, dst -> src.

Parameters:
  • src (str) – Path to source.

  • dst (str) – Path to target.

  • overwrite (bool) – If True, remove existed target. Defaults to True.