
    x-jD                    :   d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	 d dl
mZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ erHd dlZddlmZ ddlmZ  G d de          Z G d de          Z G d de          Z G d de          Z	 	 	 d$d%d Zd!ad&d"Zd&d#ZdS )'    )annotationsN)TYPE_CHECKING	TypedDict)NotRequired)fleet)core   )ParallelOptimizerparallelize_model_and_optimizer)pipeline_parallel)sharded_data_parallel)tensor_parallel)
SplitPoint)PlanBasec                      e Zd ZU ded<   dS )	_DPConfigz	str | intsharding_levelN__name__
__module____qualname____annotations__     y/var/www/html/banglarbhumi/venv/lib/python3.11/site-packages/paddle/distributed/auto_parallel/intermediate/parallelize.pyr   r   #   s         !!!!!!r   r   c                      e Zd ZU ded<   dS )	_MPConfigz$dict[str, PlanBase | list[PlanBase]]parallelize_planNr   r   r   r   r   r   &   s         >>>>>>r   r   c                  $    e Zd ZU ded<   ded<   dS )	_PPConfigzstr | dict[str, SplitPoint]
split_speczNotRequired[str]global_specNr   r   r   r   r    r    )   s*         ////%%%%%%r   r    c                  .    e Zd ZU ded<   ded<   ded<   dS )_ParallelizeConfigzNotRequired[_DPConfig]	dp_configzNotRequired[_MPConfig]	mp_configzNotRequired[_PPConfig]	pp_configNr   r   r   r   r$   r$   -   s6         ))))))))))))))r   r$   modelpaddle.nn.Layer	optimizer!paddle.optimizer.Optimizer | Nonemesh%paddle.distributed.ProcessMesh | Noneconfig_ParallelizeConfig | Nonereturn2tuple[paddle.nn.Layer, paddle.optimizer.Optimizer]c                   |t          j        d           | |fS t          |t                    sJ |}t          |t          j                  s
J d            t          j                                        }|||k    rt          j        d           t          j        	                    |           |
                    d          }|
                    d          }|
                    d          }|
                    d          }|+t          |t                    sJ t          | ||          \  } }|t          |t                    sJ |ht          |t                    sJ d	|                                v sJ d	|                                v sJ |d	                             |d	                    t          | ||          \  } }n-|+t          |t                    sJ t          | ||          \  } }|Vt          |t                    sJ d
|                                vrt          j        d           t          | ||          \  } }t!          | |          \  } }| |fS )a)  

    Parallelize the model and optimizer from a single card version to a distributed version.

    Args:
        model (paddle.nn.Layer): the model to be parallelized.
        optimizer (paddle.optimizer.Optimizer, optional): the optimizer to be parallelized.
            Could be `None` if no optimizer to be parallelized.
        mesh (paddle.distributed.ProcessMesh, optional): the process mesh for parallelize the model and the optimizer.
            Best practice: calling `dist.auto_parallel.set_mesh` to set the global mesh ahead of calling `parallelize`
            and keep the `mesh` parameter as `None.
            If the `mesh` is not None, the mesh passed to `parallelize` will overwrite the mesh set by `set_mesh`.
        config (dict, optional): a dict contains the parallel config.
            The keys of the dict can be chosen from `dp_config`, `mp_config` and `pp_config` which will be used to
            determine the parallel method for data parallel, tensor parallel and pipeline parallel separately.
            A valid config can be like this: {"dp_config": for more information refer the `dp_config` section of
            this doc, "mp_config": for more information refer the `mp_config` section of this doc, "pp_config":
            for more information refer the `pp_config` section of this doc}.

            dp_config (dict): a dict specifying the data parallel config. The keys of `dp_config` is `sharding_level`.
                The value of `sharding_level` can be chosen from 0/1/2/3, which means pure data parallel, sharding
                parallel stage 1, sharding parallel stage 2 and sharding parallel stage 3  separately. A valid
                dp_config can be like this: {"sharding_level": 2}.

            mp_config (dict): a dict specifying the tensor parallel config. The keys of `mp_config` is
                `parallelize_plan`. The value of `parallelize_plan` is another dict, mapping a layer name or a param
                name to a specific parallel plan. Note that the layer name could be written in regular format. If
                mapping a param name to a specific plan, the name of the param must be ended with `weight` or `bias`.
                And all valid parallel plan is `ColWiseParallel`, `RowWiseParallel`, `SequenceParallelBegin,
                `SequenceParallelDisable`, `SequenceParallelEnable`, `SequenceParallelEnd`, `PrepareLayerInput` and
                `PrepareLayerOutput`. A valid mp_config can be like this: {"llama.embed_tokens": dist.ColWiseParallel(),
                "llama.norm": dist.SequenceParallelEnable(), "lm_head.weight": dist.ColWiseParallel()}.

            pp_config (dict): a dict specifying the pipeline parallel config. The keys of `pp_config` is `split_spec`
                and `global_spec`. The `split_spec` can be a dict or a string. If the `split_spec` is a dict, it maps
                a layer name to a `SplitPoint`, note that the layer name could be written in regular format. The
                pipeline parallel will exactly split the model at the point indicated by the map. If the `split_spec`
                is a string, it contains the prefix of a set of layers. The pipeline parallel will automatically split
                the model evenly at target layer. The `global_spec` is a string indicating a layer that contains global
                tensors, which will be duplicated through all stages of the pipeline parallel. Some valid pp_config
                can be list these:  {"split_spec": "llama.layers", "global_spec": "llama.global_layer"}
                or {"split_spec": {"llama.layers.1": SplitPoint.END}}.

            cp_config (dict): a dict specifying the context parallel config. The keys of `cp_config` is
                `parallelize_plan`. The value of `parallelize_plan` is another dict, mapping a layer name or a param
                name to a specific parallel plan. All valid parallel plan is `ContextParallel` and `PrepareContextParallel`.
                A valid cp_config can be like this: {"llama": dist.PrepareContextParallel('p2p'),
                "llama.sdpa": dist.ContextParallel('p2p')}.

    Note:
        If the mesh is `None` or neither of `dp_config`, `mp_config`, `pp_config` and `cp_config` is in the config, this
        api will do nothing but return the model and optimizer passed in.

    Returns:
        model, optimizer: the model and the optimizer after parallelize

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> class ModelConfig:
            ...     def __init__(self):
            ...         self.vocab_size = 10
            ...         self.hidden_size = 20
            ...         self.intermediate_size = 20
            ...         self.num_layers = 2

            >>> model_config = ModelConfig()

            >>> class LlamaRMSNorm(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.weight = paddle.create_parameter(
            ...             shape=[model_config.hidden_size],
            ...             dtype=paddle.get_default_dtype(),
            ...         )
            ...
            ...     def forward(self, input):
            ...         pass

            >>> class LlamaAttention(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...
            ...         self.qkv_proj = paddle.nn.Linear(
            ...             model_config.hidden_size,
            ...             model_config.hidden_size * 3,
            ...             bias_attr=False,
            ...         )
            ...
            ...         self.o_proj = paddle.nn.Linear(
            ...             model_config.hidden_size,
            ...             model_config.hidden_size,
            ...             bias_attr=False,
            ...         )
            ...
            ...     def forward(self, input):
            ...         pass

            >>> class LlamaMLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.gate_up_proj = paddle.nn.Linear(
            ...             model_config.hidden_size,
            ...             model_config.intermediate_size * 2,
            ...             bias_attr=False
            ...         )
            ...
            ...         self.down_proj = paddle.nn.Linear(
            ...             model_config.intermediate_size, model_config.hidden_size, bias_attr=False
            ...         )
            ...
            ...     def forward(self, input):
            ...         pass

            >>> class LlamaDecoderLayer(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.self_attn = LlamaAttention()
            ...         self.mlp = LlamaMLP()
            ...         self.input_layernorm = LlamaRMSNorm()
            ...         self.post_attention_layernorm = LlamaRMSNorm()
            ...
            ...     def forward(self, input):
            ...         pass

            >>> class LlamaModel(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.embedding = paddle.nn.Embedding(model_config.vocab_size, model_config.hidden_size)
            ...         decoder_layers = []
            ...         for _ in range(model_config.num_layers):
            ...             decoder_layers.append(LlamaDecoderLayer())
            ...
            ...         self.layers = paddle.nn.LayerList(decoder_layers)
            ...         self.norm = LlamaRMSNorm()
            ...
            ...     def forward(self, input):
            ...         pass

            >>> class LlamaLMHead(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.weight = self.create_parameter(
            ...             shape=[model_config.hidden_size, model_config.vocab_size],
            ...             dtype=paddle.get_default_dtype(),
            ...         )
            ...
            ...     def forward(self, input):
            ...         pass

            >>> class LlamaForCausalLM(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.llama = LlamaModel()
            ...         self.lm_head = LlamaLMHead()
            ...
            ...     def forward(self, input):
            ...         pass

            >>> mesh = dist.ProcessMesh([[[0, 1], [2, 3]], [[4, 5], [6, 7]]], dim_names=["dp", "mp", "pp"])
            >>> dist.auto_parallel.set_mesh(mesh)
            >>> parallel_config = {
            ...     "dp_config": {'sharding_level': 1},
            ...     "mp_config": {
            ...         "parallelize_plan": {
            ...             "llama.embed_tokens": [
            ...                 dist.ColWiseParallel(),
            ...                 dist.SequenceParallelBegin(),
            ...             ],
            ...             "llama.position_embedding": [
            ...                 dist.ColWiseParallel(),
            ...                 dist.SequenceParallelBegin(),
            ...             ],
            ...             "llama.layers.*.self_attn.qkv_proj": dist.ColWiseParallel(),
            ...             "llama.layers.*.self_attn.o_proj": dist.RowWiseParallel(),
            ...             "llama.layers.*.self_attn": dist.SequenceParallelDisable(),
            ...             "llama.layers.*.mlp.gate_up_proj": dist.ColWiseParallel(),
            ...             "llama.layers.*.mlp.down_proj": dist.RowWiseParallel(),
            ...             "llama.layers.*.mlp": dist.SequenceParallelDisable(
            ...                 need_transpose=False
            ...             ),
            ...             "lm_head.weight": dist.ColWiseParallel(),
            ...             "lm_head": dist.SequenceParallelEnd(),
            ...         }
            ...     },
            ...     "pp_config": {'split_spec': "llama.layers"}
            ... }

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> model = LlamaForCausalLM()
            >>> optimizer = paddle.optimizer.AdamW(parameters=model.parameters())
            >>> dist_model, dist_optimizer = dist.parallelize(model, optimizer, config=parallel_config) # type: ignore[arg-type]
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1,2,3,4,5,6,7 {test_case}.py

    Nz<The `parallelize will do nothing since the config is `None`.?The mesh must be an instance of paddle.distributed.ProcessMesh.zxThe mesh set by `fleet.auto.set_mesh` is different with the mesh pass to `parallelize`. Will overwrite the previous meshr'   r&   r%   	cp_configr   r   @The dp_config doesn't contain sharding_level, will run under dp.)r.   )warningswarn
isinstancedictr   ProcessMeshr   autoget_meshset_meshgetr   keysupdater   r   r   )	r(   r*   r,   r.   g_meshr'   r&   r%   r4   s	            r   parallelizerB   3   s   Z ~J	
 	
 	
 ifd#####$ 011 	
 	
M	
 	
1 $$&&&D..MB   	
D!!!

;''I

;''I

;''I

;''I)T*****,
 
y
 )T***** i.....%)9)99999%)9)99999()00;M1NOOO*5)YGGyy		)T******
 
y
 )T*****9>>#3#333MR   1
 
 
y
 7uiHHE9)r   Fc                   |t          j        d           | S t          |t                    sJ |}t          |t          j                  s
J d            t          j                                        }|||k    rt          j        d           t          j        	                    |           da
t          | d ||          \  } }| S )NzBThe `parallelize_model will do nothing since the config is `None`.r3   z~The mesh set by `fleet.auto.set_mesh` is different with the mesh pass to `parallelize_model`. Will overwrite the previous meshT)r6   r7   r8   r9   r   r:   r   r;   r<   r=   has_parallelized_modelrB   )r(   r,   r.   rA   _s        r   parallelize_modelrF   =  s    ~P	
 	
 	
 fd#####$ 011 	
 	
M	
 	
1 $$&&&D..MH   	
D!!!!5$f55HE1Lr   c                   |t          j        d           | S t          |t                    sJ |}t          |t          j                  s
J d            t          j                                        }|||k    rt          j        d           t          j        	                    |           t          s
J d            | j        }t          |d         t                    r/|D ]+}|d         D ] }|                                s
J d            !,n#|D ] }|                                s
J d            !|                    d          }d }d }	|Ud	|                                vrt          j        d
           |                    d	          }|                    dd          }	t          | ||	          } |                                 } | S )NzFThe `parallelize_optimizer will do nothing since the config is `None`.r3   zThe mesh set by `fleet.auto.set_mesh` is different with the mesh pass to `parallelize_optimizer`. Will overwrite the previous meshz:Please parallelize the model before parallelize optimizer.r   paramsz7Please use model after parallelize to create optimizer.r%   r   r5   sharding_mesh_dimdp)r6   r7   r8   r9   r   r:   r   r;   r<   r=   rD   _parameter_listis_distr>   r?   r
   rB   )
r*   r,   r.   rA   
param_listparam_groupparamr%   levelrI   s
             r   parallelize_optimizerrQ   U  s2   ~T	
 	
 	
 fd#####$ 011 	
 	
M	
 	
1 $$&&&D..ML   	
D!!! "  D ! *J*Q-&& 
% 	 	K$X.  }}  M  	   	 	E==??  I ?  

;''IE9>>#3#333MR   .//%MM*=tDD!)U4EFFI%%''Ir   )NNN)
r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   )NN)
__future__r   r6   typingr   r   typing_extensionsr   paddle.distributedr   paddle.frameworkr   parallel_baser
   r   r   r   r   paddler   r   r   r   r    r$   rB   rD   rF   rQ   r   r   r   <module>rY      s   # " " " " "  + + + + + + + + ) ) ) ) ) ) $ $ $ $ $ $ ! ! ! ! ! ! M M M M M M M M 0 0 0 0 0 0 8 8 8 8 8 8 , , , , , , *MMM------))))))" " " " "I " " "? ? ? ? ?I ? ? ?& & & & &I & & &* * * * *Y * * * 4826(,	D D D D DN     00 0 0 0 0 0r   