
    ΑiD                    D   S SK Jr  S SKrS SKJrJr  S SKJr  S SKJ	r	  S SK
Jr  SSKJrJr  SS	KJr  SS
KJr  SSKJr  \(       a<  S SKrSSKJr  SSKJr   " S S\5      r " S S\5      r " S S\5      r " S S\5      r   S         SS jjrSqSS jrSS jrg)    )annotationsN)TYPE_CHECKING	TypedDict)NotRequired)fleet)core   )ParallelOptimizerparallelize_model_and_optimizer)pipeline_parallel)sharded_data_parallel)tensor_parallel)
SplitPoint)PlanBasec                       \ rS rSr% S\S'   Srg)	_DPConfig#   z	str | intsharding_level N__name__
__module____qualname____firstlineno____annotations____static_attributes__r       y/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/auto_parallel/intermediate/parallelize.pyr   r   #   s    !!r   r   c                       \ rS rSr% S\S'   Srg)	_MPConfig&   z$dict[str, PlanBase | list[PlanBase]]parallelize_planr   Nr   r   r   r   r    r    &   s    >>r   r    c                  *    \ rS rSr% S\S'   S\S'   Srg)	_PPConfig)   zstr | dict[str, SplitPoint]
split_speczNotRequired[str]global_specr   Nr   r   r   r   r$   r$   )   s    //%%r   r$   c                  4    \ rS rSr% S\S'   S\S'   S\S'   Srg	)
_ParallelizeConfig-   zNotRequired[_DPConfig]	dp_configzNotRequired[_MPConfig]	mp_configzNotRequired[_PPConfig]	pp_configr   Nr   r   r   r   r)   r)   -   s    ))))))r   r)   c                j   Uc  [         R                  " S5        X4$ [        U[        5      (       d   eUb  [        U[        R
                  5      (       d   S5       e[        R                  R                  5       nUb  XB:w  a  [         R                  " S5        [        R                  R                  U5        UR                  S5      nUR                  S5      nUR                  S5      nUR                  S5      nUb&  [        U[        5      (       d   e[        U UU5      u  pUb  [        U[        5      (       d   eUbZ  [        U[        5      (       d   eSUR                  5       ;   d   eSUR                  5       ;   d   eUS   R                  US   5        [        XU5      u  pO)Ub&  [        U[        5      (       d   e[        U UU5      u  pUbN  [        U[        5      (       d   eS	UR                  5       ;  a  [         R                  " S
5        [        U UUS9u  p[!        X5      u  pX4$ )a9'  

Parallelize the model and optimizer from a single card version to a distributed version.

Args:
    model (paddle.nn.Layer): the model to be parallelized.
    optimizer (paddle.optimizer.Optimizer, optional): the optimizer to be parallelized.
        Could be `None` if no optimizer to be parallelized.
    mesh (paddle.distributed.ProcessMesh, optional): the process mesh for parallelize the model and the optimizer.
        Best practice: calling `dist.auto_parallel.set_mesh` to set the global mesh ahead of calling `parallelize`
        and keep the `mesh` parameter as `None.
        If the `mesh` is not None, the mesh passed to `parallelize` will overwrite the mesh set by `set_mesh`.
    config (dict, optional): a dict contains the parallel config.
        The keys of the dict can be chosen from `dp_config`, `mp_config` and `pp_config` which will be used to
        determine the parallel method for data parallel, tensor parallel and pipeline parallel separately.
        A valid config can be like this: {"dp_config": for more information refer the `dp_config` section of
        this doc, "mp_config": for more information refer the `mp_config` section of this doc, "pp_config":
        for more information refer the `pp_config` section of this doc}.

        dp_config (dict): a dict specifying the data parallel config. The keys of `dp_config` is `sharding_level`.
            The value of `sharding_level` can be chosen from 0/1/2/3, which means pure data parallel, sharding
            parallel stage 1, sharding parallel stage 2 and sharding parallel stage 3  separately. A valid
            dp_config can be like this: {"sharding_level": 2}.

        mp_config (dict): a dict specifying the tensor parallel config. The keys of `mp_config` is
            `parallelize_plan`. The value of `parallelize_plan` is another dict, mapping a layer name or a param
            name to a specific parallel plan. Note that the layer name could be written in regular format. If
            mapping a param name to a specific plan, the name of the param must be ended with `weight` or `bias`.
            And all valid parallel plan is `ColWiseParallel`, `RowWiseParallel`, `SequenceParallelBegin,
            `SequenceParallelDisable`, `SequenceParallelEnable`, `SequenceParallelEnd`, `PrepareLayerInput` and
            `PrepareLayerOutput`. A valid mp_config can be like this: {"llama.embed_tokens": dist.ColWiseParallel(),
            "llama.norm": dist.SequenceParallelEnable(), "lm_head.weight": dist.ColWiseParallel()}.

        pp_config (dict): a dict specifying the pipeline parallel config. The keys of `pp_config` is `split_spec`
            and `global_spec`. The `split_spec` can be a dict or a string. If the `split_spec` is a dict, it maps
            a layer name to a `SplitPoint`, note that the layer name could be written in regular format. The
            pipeline parallel will exactly split the model at the point indicated by the map. If the `split_spec`
            is a string, it contains the prefix of a set of layers. The pipeline parallel will automatically split
            the model evenly at target layer. The `global_spec` is a string indicating a layer that contains global
            tensors, which will be duplicated through all stages of the pipeline parallel. Some valid pp_config
            can be list these:  {"split_spec": "llama.layers", "global_spec": "llama.global_layer"}
            or {"split_spec": {"llama.layers.1": SplitPoint.END}}.

        cp_config (dict): a dict specifying the context parallel config. The keys of `cp_config` is
            `parallelize_plan`. The value of `parallelize_plan` is another dict, mapping a layer name or a param
            name to a specific parallel plan. All valid parallel plan is `ContextParallel` and `PrepareContextParallel`.
            A valid cp_config can be like this: {"llama": dist.PrepareContextParallel('p2p'),
            "llama.sdpa": dist.ContextParallel('p2p')}.

Note:
    If the mesh is `None` or neither of `dp_config`, `mp_config`, `pp_config` and `cp_config` is in the config, this
    api will do nothing but return the model and optimizer passed in.

Returns:
    model, optimizer: the model and the optimizer after parallelize

Examples:
    .. code-block:: python

        >>> import paddle
        >>> import paddle.distributed as dist

        >>> class ModelConfig:
        ...     def __init__(self):
        ...         self.vocab_size = 10
        ...         self.hidden_size = 20
        ...         self.intermediate_size = 20
        ...         self.num_layers = 2

        >>> model_config = ModelConfig()

        >>> class LlamaRMSNorm(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.weight = paddle.create_parameter(
        ...             shape=[model_config.hidden_size],
        ...             dtype=paddle.get_default_dtype(),
        ...         )
        ...
        ...     def forward(self, input):
        ...         pass

        >>> class LlamaAttention(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...
        ...         self.qkv_proj = paddle.nn.Linear(
        ...             model_config.hidden_size,
        ...             model_config.hidden_size * 3,
        ...             bias_attr=False,
        ...         )
        ...
        ...         self.o_proj = paddle.nn.Linear(
        ...             model_config.hidden_size,
        ...             model_config.hidden_size,
        ...             bias_attr=False,
        ...         )
        ...
        ...     def forward(self, input):
        ...         pass

        >>> class LlamaMLP(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.gate_up_proj = paddle.nn.Linear(
        ...             model_config.hidden_size,
        ...             model_config.intermediate_size * 2,
        ...             bias_attr=False
        ...         )
        ...
        ...         self.down_proj = paddle.nn.Linear(
        ...             model_config.intermediate_size, model_config.hidden_size, bias_attr=False
        ...         )
        ...
        ...     def forward(self, input):
        ...         pass

        >>> class LlamaDecoderLayer(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.self_attn = LlamaAttention()
        ...         self.mlp = LlamaMLP()
        ...         self.input_layernorm = LlamaRMSNorm()
        ...         self.post_attention_layernorm = LlamaRMSNorm()
        ...
        ...     def forward(self, input):
        ...         pass

        >>> class LlamaModel(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.embedding = paddle.nn.Embedding(model_config.vocab_size, model_config.hidden_size)
        ...         decoder_layers = []
        ...         for _ in range(model_config.num_layers):
        ...             decoder_layers.append(LlamaDecoderLayer())
        ...
        ...         self.layers = paddle.nn.LayerList(decoder_layers)
        ...         self.norm = LlamaRMSNorm()
        ...
        ...     def forward(self, input):
        ...         pass

        >>> class LlamaLMHead(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.weight = self.create_parameter(
        ...             shape=[model_config.hidden_size, model_config.vocab_size],
        ...             dtype=paddle.get_default_dtype(),
        ...         )
        ...
        ...     def forward(self, input):
        ...         pass

        >>> class LlamaForCausalLM(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.llama = LlamaModel()
        ...         self.lm_head = LlamaLMHead()
        ...
        ...     def forward(self, input):
        ...         pass

        >>> mesh = dist.ProcessMesh([[[0, 1], [2, 3]], [[4, 5], [6, 7]]], dim_names=["dp", "mp", "pp"])
        >>> dist.auto_parallel.set_mesh(mesh)
        >>> parallel_config = {
        ...     "dp_config": {'sharding_level': 1},
        ...     "mp_config": {
        ...         "parallelize_plan": {
        ...             "llama.embed_tokens": [
        ...                 dist.ColWiseParallel(),
        ...                 dist.SequenceParallelBegin(),
        ...             ],
        ...             "llama.position_embedding": [
        ...                 dist.ColWiseParallel(),
        ...                 dist.SequenceParallelBegin(),
        ...             ],
        ...             "llama.layers.*.self_attn.qkv_proj": dist.ColWiseParallel(),
        ...             "llama.layers.*.self_attn.o_proj": dist.RowWiseParallel(),
        ...             "llama.layers.*.self_attn": dist.SequenceParallelDisable(),
        ...             "llama.layers.*.mlp.gate_up_proj": dist.ColWiseParallel(),
        ...             "llama.layers.*.mlp.down_proj": dist.RowWiseParallel(),
        ...             "llama.layers.*.mlp": dist.SequenceParallelDisable(
        ...                 need_transpose=False
        ...             ),
        ...             "lm_head.weight": dist.ColWiseParallel(),
        ...             "lm_head": dist.SequenceParallelEnd(),
        ...         }
        ...     },
        ...     "pp_config": {'split_spec': "llama.layers"}
        ... }

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> model = LlamaForCausalLM()
        >>> optimizer = paddle.optimizer.AdamW(parameters=model.parameters())
        >>> dist_model, dist_optimizer = dist.parallelize(model, optimizer, config=parallel_config) # type: ignore[arg-type]
        >>> # This case need to be executed in multi-card environment
        >>> # python -m paddle.distributed.launch --gpus=0,1,2,3,4,5,6,7 {test_case}.py

z<The `parallelize will do nothing since the config is `None`.?The mesh must be an instance of paddle.distributed.ProcessMesh.zxThe mesh set by `fleet.auto.set_mesh` is different with the mesh pass to `parallelize`. Will overwrite the previous meshr-   r,   r+   	cp_configr"   r   @The dp_config doesn't contain sharding_level, will run under dp.)config)warningswarn
isinstancedictr   ProcessMeshr   autoget_meshset_meshgetr   keysupdater   r   r   )	model	optimizermeshr2   g_meshr-   r,   r+   r0   s	            r   parallelizerB   3   s,   Z ~J	
 fd####$ 0 011 	
M	
1 $$&&.MMB 	

D!

;'I

;'I

;'I

;'I)T****,

 )T**** i....%)9999%)9999()00;M1NO*5YGy		)T*****

 )T****9>>#33MMR 1

 7uHEr   Fc                   Uc  [         R                  " S5        U $ [        U[        5      (       d   eUb  [        U[        R
                  5      (       d   S5       e[        R                  R                  5       nUb  X1:w  a  [         R                  " S5        [        R                  R                  U5        Sq
[        U S X5      u  pU $ )NzBThe `parallelize_model will do nothing since the config is `None`.r/   z~The mesh set by `fleet.auto.set_mesh` is different with the mesh pass to `parallelize_model`. Will overwrite the previous meshT)r3   r4   r5   r6   r   r7   r   r8   r9   r:   has_parallelized_modelrB   )r>   r@   r2   rA   _s        r   parallelize_modelrF   =  s    ~P	
 fd####$ 0 011 	
M	
1 $$&&.MMH 	

D!!5$5HELr   c                   Uc  [         R                  " S5        U $ [        U[        5      (       d   eUb  [        U[        R
                  5      (       d   S5       e[        R                  R                  5       nUb  X1:w  a  [         R                  " S5        [        R                  R                  U5        [        (       d   S5       eU R                  n[        US   [        5      (       a2  U H+  nUS    H  nUR                  5       (       a  M   S5       e   M-     O%U H  nUR                  5       (       a  M   S5       e   UR                  S5      nS nS n	UbM  S	UR                  5       ;  a  [         R                  " S
5        UR                  S	5      nUR                  SS5      n	[        XU	5      n U R!                  5       n U $ )NzFThe `parallelize_optimizer will do nothing since the config is `None`.r/   zThe mesh set by `fleet.auto.set_mesh` is different with the mesh pass to `parallelize_optimizer`. Will overwrite the previous meshz:Please parallelize the model before parallelize optimizer.r   paramsz7Please use model after parallelize to create optimizer.r+   r   r1   sharding_mesh_dimdp)r3   r4   r5   r6   r   r7   r   r8   r9   r:   rD   _parameter_listis_distr;   r<   r
   rB   )
r?   r@   r2   rA   
param_listparam_groupparamr+   levelrI   s
             r   parallelize_optimizerrQ   U  s   ~T	
 fd####$ 0 011 	
M	
1 $$&&.MML 	

D! "! D! **J*Q-&&%K$X.}} M / &  E==?? I?  
 

;'IE9>>#33MMR ./%MM*=tD!)4EFI%%'Ir   )NNN)
r>   zpaddle.nn.Layerr?   z!paddle.optimizer.Optimizer | Noner@   z%paddle.distributed.ProcessMesh | Noner2   z_ParallelizeConfig | Nonereturnz2tuple[paddle.nn.Layer, paddle.optimizer.Optimizer])NN)
__future__r   r3   typingr   r   typing_extensionsr   paddle.distributedr   paddle.frameworkr   parallel_baser
   r   r   r   r   paddler   r   r   r    r$   r)   rB   rD   rF   rQ   r   r   r   <module>rZ      s    #  + ) $ ! M 0 8 ,-)"I "?I ?&I &*Y * 4826(,	DD0D 0D &	D
 8DN  00r   