DeviceMesh/DeviceGroup

这两个类用于表达硬件资源分配和网络拓扑,Twinkle 的数据分发和收集也依赖它们。

DeviceGroup

@dataclass
class DeviceGroup:
    name: str
    ranks: Union[List[int], int]
    device_type: str
    visible_devices: Optional[str] = None  # Optional: explicitly set visible devices (e.g., "8,9")
    gpus_per_worker: int = 1
  • name: 资源组名

  • ranks: 占用硬件列表,如果是CPU资源仅支持int类型

  • device_type: 硬件类型,例如 GPU/CPU/NPU 等

  • visible_devices: 可见资源列表,用于希望仅使用部分 rank 的硬件的情况

  • gpus_per_worker: 每个 worker 占用多少硬件

如果训练 RL,开发者可以构造多个这样的组,并将对应的模型、采样器分配进入其中。

DeviceMesh

DeviceMesh 承载了组件拓扑、分布式并行信息,这个类会在组件内传递,用于数据分发和数据收集。

@dataclass
class DeviceMesh:
    ...

    @staticmethod
    def from_sizes(*, world_size: int = 1, dp_size: int = 1, fsdp_size: int = None, tp_size: int = None,
                   pp_size: int = None, ulysses_size: int = None, cp_size: int = None, ep_size: int = None,
                   etp_size: int = None,vpp_size: int = None, device_type: str = 'cuda', sequence_parallel: bool = False) -> "DeviceMesh":
        ...

推荐使用 from_sizes 来构造它。

我们举一个例子:

sampler_device_mesh = DeviceMesh.from_sizes(dp_size=4)
actor_device_mesh = DeviceMesh.from_sizes(dp_size=2, pp_size=2, tp_size=2)

dataloader = DataLoader(...)
sampler = vLLMSampler(..., device_mesh=sampler_device_mesh, remote_group=...)
actor = MegatronModel(..., device_mesh=actor_device_mesh, remote_group=...)

for data in dataloader:
    sampler_output = sampler.sample(data)
    input_data = [seq.new_input_feature for response in sampler_output for seq in response.sequences]
    ...
    model_output = actor.forward(input_data)

我们以上面的伪代码来分析数据传递情况。

dataloader 取出数据 -> 按照 dp_size=4 分发给 sampler -> 按照 dp_size=4 收集数据 -> 按照 dp_size=2 分发给模型 -> 按照 dp_size=2 收集输出

通过 DeviceMesh,可以将数据流平顺地在各个 group 和组件之间流转起来。

数据的分发判断由 DeviceMesh 的 get_slice 方法执行:

batch[device_mesh.get_slice(len(batch))]

get_slice 会根据当前 rank,计算出当前 worker 属于哪个 dp 组,并获取对应的数据。该过程发生在 DataLoader 的 DeviceMeshSampler 中,同样发生在 remote_class 的 dispatch 和 collect 中。