
    Αi-                    ^   S SK Jr  S SKrS SKrS SKJrJr  S SKrS SKJ	r	  S SK
Jr  S SKJr  S SKJr  S SKJr  S S	KJr  S S
KJr  \(       a  S SKJr  S SKJr  S SKJr  S SKJr  \" \R<                  5      r         S                         SS jjr  S       SS jjr!g)    )annotationsN)TYPE_CHECKINGLiteral)GroupShardedOptimizerStage2)GroupShardedStage2)GroupShardedStage3)GroupShardedScaler)MixPrecisionOptimizer)
get_logger)	Optimizer)Sequence)
GradScaler)Group)Layerc                   [         R                  " 5       R                  S5      S   nUS;   d)  U[         R                  R	                  5       ;   d   S5       e[        U [         R                  R                  5      (       d   S5       e[        U[        [        45      (       d   S5       eUS;   d   S5       eS	 n[        [        XR                  5       5      5      nUc$  [        U5      S:  a  [        R                  S
5        US;   ag  [        R!                  S5        [        R!                  S5        [        R!                  S5        [#        UR$                  UUUU
US9n['        U UUUUU
US9n O$US:X  a  [)        U UUUUUU	U
UUS9
n O[+        S5      e[        U[         R,                  R.                  5      (       a  [1        U5      n[        R!                  S5        [        R!                  S5        [        R!                  S5        XU4$ )a/  
Use group_sharded_parallel can perform group shared configuration on the model, optimizer and GradScaler. Level has three string options, 'os', 'os_g' and 'p_g_os' corresponds to three different usage scenarios: optimizer state segmentation, optimizer state + gradient segmentation, and parameter + gradient + optimizer state segmentation.
Usually, optimizer state + gradient segmentation is actually a re optimization of optimizer state segmentation, so optimizer state + gradient segmentation can be used to realize optimizer state segmentation.

Args:
    model (Layer): The layer to be wrapped with group_sharded_parallel.
    optimizer (Optimizer): The optimizer to be wrapped with group_sharded_parallel.
    level (str): The different level of the group sharded. Such as `os`, `os_g`, `p_g_os`.
    scaler (GradScaler|None, optional): If AMP is used, you need to pass GradScaler. Defaults to None, indicating that GradScaler is not used.
    group (Group|None, optional): The group instance. Defaults to None, indicating that the default environment group is used.
    offload (bool, optional): Whether to use the offload function. Defaults to False, which means that the offload function is not used.
    sync_buffers (bool, optional): Whether to broadcast model buffers. It is generally used when there are registered model buffers. Defaults to False, indicating that model buffers are not used.
    buffer_max_size (int, optional): The max size of the buffer used to integrate gradient in `os_g`. The larger the size, the more GPU memory will be used. Defaults to 2**23, which means that the dimension of the buffer is 2**23.
    segment_size (int, optional): The smallest size of parameter to be sharded in `p_g_os`. Defaults to 2**20, indicating that the dimension of the minimum segmented parameter is 2**20.
    sync_comm (bool, optional): Whether to use synchronous communication, only in `p_g_os` used. Defaults to False, indicating that asynchronous communication is used.
    dp_group(Group|None, optional): dp communication group, support to combine stage2 or stage3 with dp hybrid communication.
    exclude_layer(list|None, optional): exclude some layers for slicing for sharding stage3, for example, exclude_layer=["GroupNorm", id(model.gpt.linear)], exclude_layer must contain the layers' name or one layer's id.

Returns:
    model: A wrapper for group sharded given model.
    optimizer: A wrapper for group sharded given optimizer.
    scaler: A wrapper for group sharded given scaler.

Examples:
    .. code-block:: python

        >>> # type: ignore
        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> from paddle.nn import Linear
        >>> from paddle.distributed import fleet
        >>> from paddle.distributed.sharding import group_sharded_parallel

        >>> fleet.init(is_collective=True)
        >>> group = paddle.distributed.new_group([0, 1])
        >>> model = Linear(1000, 1000)

        >>> clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
        >>> optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip)

        >>> # wrap sharding model, optimizer and scaler
        >>> model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler)

        >>> img, label = data
        >>> label.stop_gradient = True
        >>> img.stop_gradient = True

        >>> out = model(img)
        >>> loss = paddle.nn.functional.cross_entropy(input=out, label=label)

        >>> loss.backward()
        >>> optimizer.step()
        >>> optimizer.clear_grad()

:r   )gpuxpuzBgroup_sharded_parallel only support gpu, xpu and custom_device nowz2The model must be the instance of paddle.nn.Layer.zhThe optimizer must be the instance of paddle.optimizer.Optimizer or MixPrecisionOptimizer for main grad.)osos_gp_g_osz%The level must be os, os_g or p_g_os.c                <    U R                   [        R                  :H  $ N)dtypepaddlefloat16)params    i/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/sharding/group_sharded.pycheck_dtype+group_sharded_parallel.<locals>.check_dtype   s    {{fnn,,    zjthe input of scaler is None, please ensure the logic of your scaler outside is same as GroupShardedScaler.)r   r   z******************************z6Sharded level os uses sharded level os_g achieved now.)paramsoptimgroupoffloaddp_groupdevice)r$   sync_buffersbuffer_max_sizer&   r'   r   )		optimizerr$   r(   segment_sizer%   	sync_commr&   r'   exclude_layerzPlease enter the correct level.zIf there is a communication hang using group sharded, please check whether the communication operations of each process are unified.)r   
get_devicesplitr'   get_all_custom_device_type
isinstancennr   r
   r   listfilter
parameterslenlogger_warninginfor   _parameter_listr   r   
ValueErrorampr   r	   )modelr*   levelscalerr$   r%   r(   r)   r+   r,   r&   r-   r'   r   params_fp16s                  r   group_sharded_parallelrA   2   s   L  &&s+A.F
	

 V]]==??L LL	@ eVYY__-- <- i"7!CDD 	2D    / /	/ - vk+;+;+=>?K~#k*Q.x	
 XMNX/,,
	 #%+
 
(	"%%'
 :;;&&**//00#F+LLLL 	O LLV##r!   c                f   [         R                  S5        [        R                  R	                  U5      (       a   SU S35       e[        R
                  " USS9  [        R                  R                  US5      n[        U [        5      (       a0  [        R                  " U R                  R                  5       U5        Ot[        U [        5      (       aT  U R                  (       a  SOSnU R                  US9  [        R                  " U R                  R                  5       U5        O[!        S	5      eUbg  [#        US5      (       d   S5       e[        R                  R                  US5      n[        R                  " UR$                  R                  5       U5        [         R                  S5        g
)a  
Group sharded encapsulated model and optimizer state saving module.

Note:
    If using save_group_sharded_model saves the model. When loading again, you need to set the model or optimizer state before using group_sharded_parallel.

Args:
    model (Layer): A wrapper for group sharded given model.
    output (str): Save directory.
    optimizer (Optimizer, optional): Group sharded encapsulated optimizer. Defaults to None, indicating that the optimizer state is not saved.

Examples:
    .. code-block:: python

        >>> # type: ignore
        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> from paddle.nn import Linear
        >>> from paddle.distributed import fleet
        >>> from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model

        >>> fleet.init(is_collective=True)
        >>> group = paddle.distributed.new_group([0, 1])
        >>> model = Linear(1000, 1000)

        >>> clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
        >>> optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip)

        >>> # wrap sharding model, optimizer and scaler
        >>> model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler)

        >>> img, label = data
        >>> label.stop_gradient = True
        >>> img.stop_gradient = True

        >>> out = model(img)
        >>> loss = paddle.nn.functional.cross_entropy(input=out, label=label)

        >>> loss.backward()
        >>> optimizer.step()
        >>> optimizer.clear_grad()

        >>> # save model and optimizer state_dict
        >>> save_group_sharded_model(model, optimizer, output=output_dir)

zC==========Begin to save group sharded model and optimizer==========zSaving directory (z#) should be a directory, not a fileT)exist_okzmodel.pdmodelF)convert2cpuzBPlease use the layer which is wrapped with group_sharded_parallel.N_optimzFPlease use the optimizer which is wrapped with group_sharded_parallel.zmodel.pdoptzA==========End to save group sharded model and optimizer==========)r7   r9   r   pathisfilemakedirsjoinr1   r   r   save_layer
state_dictr   _offloadget_all_parametersr;   hasattrrE   )r=   outputr*   output_modelrD   
output_opts         r   save_group_sharded_modelrS      sN   b LLM ww~~f%% 
VH$GH% KK&77<<8L%+,,ELL++-|<	E-	.	.#nnd%  [ 9ELL++-|<P
 	
 y(++ 	
T	
+ WW\\&-8
I$$//1:>LLKr!   )	NNFFi   i   FNN)r=   r   r*   r   r>   zLiteral['os', 'os_g', 'p_g_os']r?   zGradScaler | Noner$   Group | Noner%   boolr(   rU   r)   intr+   rV   r,   rU   r&   rT   r-   zSequence[str | int] | Nonereturnz#tuple[Layer, Optimizer, GradScaler]r   )r=   r   rP   strr*   zOptimizer | NonerW   None)"
__future__r   loggingr   typingr   r   r   Npaddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2r   Dpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2r   Dpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3r   Cpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_utilsr	   2paddle.distributed.fleet.utils.mix_precision_utilsr
   "paddle.distributed.utils.log_utilsr   paddle.optimizerr   collections.abcr   
paddle.ampr   &paddle.distributed.communication.groupr   	paddle.nnr   WARNINGr7   rA   rS    r!   r   <module>rj      s3   #  	 )  : &(%<
W__
% !% !04R$R$R$ +R$ 	R$
 R$ R$ R$ R$ R$ R$ R$ .R$ )R$l >BLLL*:L	Lr!   