
    Αia                    .   S SK Jr  S SKrS SKrS SKrS SKrS SKrS SKJrJ	r	  S SK
Jr  S SKJrJr  S SKJrJr  S SKrS SKrS SKJrJr  S SKJr  S S	KJrJrJrJrJrJrJ r J!r!J"r"J#r#  S S
K$J%r%J&r&J'r'  S SK(J)r)  S SK*J+r+  S SK,J-r-J.r/J0r0J1r1  S SK2J3r3  S SK4J5r5  SSK6J7r7  SSK8J9r9  \(       a  S SK:J;r;  S SKJ<r<  S SK=J>r>  S SK?J@r@  / rA\0R                  rBS rCS rD\R                  S 5       rF\R                  S 5       rG\/R                  \R                        S*S j5       5       rI\/R                  \R                       S+             S,S jj5       5       rJ " S S\35      rKSqL " S S 5      rMS! rNS" rOS# rPS$ rQS% rRS& rSS-S.S' jjrTS-S/S( jjrUS-S/S) jjrVg)0    )annotationsN)OrderedDict
namedtuple)contextmanager)ManagerProcess)TYPE_CHECKINGAny)_legacy_C_ops	framework)get_all_custom_device_type)
Group_default_group_name_get_group_map_by_name_new_process_group_impl_set_default_backend_set_default_store_set_group_map_set_group_map_backend_set_group_map_by_name_valid_backend_list)_add_new_group_get_global_groupis_initialized)wait_server_ready)check_backend)_set_expected_placebasecorein_dynamic_mode)Layer)
deprecated   )parallel_helper)getenv_or_backup)	Generator)Tensor)
NCCLConfig)
_StateDictc                    [        5       n [        R                  R                  5       R                  U l        [        R                  R                  5       R
                  U l        [        R                  R                  5       R                  U l        [        R                  R                  5       R                  U l        U $ N)ParallelStrategypaddledistributedParallelEnvnranks
local_ranktrainer_endpointscurrent_endpoint)strategys    [/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/parallel.py _build_default_parallel_strategyr6   P   s    !H((446==HO ,,88:EEH&&(::  	&&(99  O    c                b   / nU R                  5        H  u  p#/ n/ nU H`  nUR                  UR                  5        UR                  [        R                  " U[
        R                  " UR                  SS9/S95        Mb     [        R                  " U5      nUR                  XsU/5        M     U$ )Nint64dtype)xshape)itemsappendr=   r-   reshapenpprodconcat)
var_groupscoalesced_grads_and_grad_varsgroup_id	grad_varsflattened_varsg_var_shapesg_varcoalesced_grads           r5   _coalesce_tensorsrL   ]   s    $&!)//1E,!!BGGEKKw$G#H   ~6%,,5	
  2 )(r7   c                    [         R                  " U R                  S9n[         R                  " 5       R	                  SSU 0XS.SU0S9  g )Nr:   reshape2X)OutXShaper=   typeinputsoutputsattrs)r   _create_tensorr;   _dygraph_tracertrace_op)r<   r=   x_shapes      r5   _reshape_inplacer[   p   sJ    &&QWW5G((Qx-	 ) r7   c           	     F   [        5       (       a  U  H  u  nnnU Vs/ s H  n[        R                  " USS9PM     nnSnUSU4-  nUS-  n[        R                  " X/UQ76   [        X#5       H&  u  ptUR                  US9  UR                  U:X  a  M&   e   M     g g s  snf )Nr9   r:    sections)axisr   )r=   )r    rA   rB   r   splitzipreshape_r=   )rE   rK   origin_grad_varsgrad_shapesg_shapegrad_var_lenrV   rJ   s           r5   _split_tensorsrg   {   s    
 +	
 @K?JGw/{   Ej,//E[ EI5I"%&6"DW-{{g--- #E + s   Bc                   SnSn[        5       nU S   R                  nU  H  nUR                  n[        U[        R                  5      (       a'  [
        R                  R                  R                  U   n[        R                  " UR                  SS9[        R                  " U5      -  nX1:  a  XVR                  :X  a  X8-  nOUnUR                  nUS-  nUR                  U/ 5      R                  U5        M     [        U5      $ )Nr   r9   r:   r#   )r   r;   
isinstancer   DataTyper-   pirdatatype_to_vartyperA   rB   r=   size_of_dtype
setdefaultr?   rL   )	vars
group_size	group_idxmemory_counterrD   r;   var	var_dtypebytess	            r5   build_groupsrv      s    
 INJGMMEII	i//

;;IFI		1D4F4F5
 
 &5II+=#N"NIIENIi,33C8  Z((r7   c           	     X   / nU R                  5       R                  5        GH  u  px[        U[        R                  R
                  5      (       d  [        SUR                   S35      eU(       a$  [        US5      (       a  UR                  (       a  Ms  U(       d  [        USS5      (       a  M  O&[        USS5      (       a  [        USS5      (       d  M  UR                  [        R                  R                  R                  :X  a  M  UR                  UR!                  5       5        GM     [#        U5      S:X  a  g U(       a  [%        US5      n	U	 H&  u  n
  n[&        R(                  R+                  XUS	S
9  M(     U	 Hj  u  pnU Vs/ s H  n[,        R.                  " USS9PM     nn[&        R0                  R2                  R5                  5       R7                  SSU
0SU0USS.S9  Ml     g U H2  nUR9                  5       n[&        R(                  R+                  XUS	S
9  M4     g s  snf )NThe data type of 'z' must be core.eager.Tensoris_distributedno_syncFexpertr   i   T)srcgroupsync_opr9   r:   r`   rO   rP   )r^   r_   rR   )_obtain_parameters_buffersr>   ri   r   eagerr'   	TypeErrornamehasattrry   getattrrS   VarDescVarTypeVOCABr?   detachlenrv   r-   r.   	broadcastrA   rB   r   r   rX   rY   
contiguous)model
comm_groupsrc_rankis_model_parallelfuse_paramsis_moe_sharding_parallel
model_vars_paramcoalesced_varscoalesced_varorigin_vars
var_shapesv_shapevar_lenrs   s                   r5   sync_params_buffersr      s    J446<<>%!2!233$UZZL0KL  u.//E4H4H' ui// 0
 ui//x9 9 ::--333%,,.)5 ?6 :!%j2CD#1M1a((:t )  $2 7E2M
?I?IGw/z   KK!!113<<]+,#*A6	 = 	 7E C.."C((T )  s   	H'c                  Z  ^  \ rS rSr% SrS\S'   S\S'   S\S'   S\S	'   S
\S'   S
\S'        S             SU 4S jjjrSS jrS r\	SS j5       r
SS jr\" SSS9S 5       r\" SSS9S 5       r   S       S S jjr\R"                   S!     S"S jj5       r\r\rSrU =r$ )#DataParallel   aX  
Run the dygraph module with data parallelism.

Currently, DataParallel class only supports to run the dynamic graph
with multi-process.

Now supports two ways to start training:

1. start by ``paddle.distributed.spawn`` method, for example:

    ``python demo.py`` (spawn need to be called in ``__main__`` method)

2. start by ``paddle.distributed.launch`` module, for example:

    ``python -m paddle.distributed.launch --gpus=0,1 demo.py`` .

And the content of `demo.py` is the code of examples.

Args:
    layers(Layer): The module that should be executed by data parallel.
    strategy(ParallelStrategy, optional): (deprecated) The strategy of data parallelism,
        contains environment configuration related to parallel execution. Default: None.
    comm_buffer_size(int, optional):  It limits the memory size(MB) of one buffer
                                      parameters' gradient which is the input of communication
                                      calling(e.g NCCLAllReduce). Default: 25.
    last_comm_buffer_size(float, optional): It limits memory size(MB) of last buffer in communication
                                     calling. Making the last communication buffer size small is useful to
                                     improve performance. Default: 1.
    find_unused_parameters(bool, optional): Whether to traverse the entire backward graph from the
                                            all tensors in the return value of the wrapped model's
                                            forward function. For parameters not involved in loss
                                            calculation, their gradients will be marked as ready in
                                            advance to prepare reduce. Please note that all forward
                                            outputs derived from the wrapped model parameters must
                                            participate in the calculation of loss and subsequent
                                            gradient calculations. If not, serious error will occur.
                                            Note that setting the find_unused_parameters to True
                                            will affect computing performance. Therefore, if all parameters
                                            are sure to participate in the loss calculation and the
                                            autograd graph construction, please set it False. Default: False.

Returns:
    Layer: The data paralleled module.

Examples:

    .. code-block:: python
        :name: dp-example

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> import paddle.nn as nn
        >>> import paddle.optimizer as opt
        >>> import paddle.distributed as dist

        >>> class LinearNet(nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self._linear1 = nn.Linear(10, 10)
        ...         self._linear2 = nn.Linear(10, 1)
        ...     def forward(self, x):
        ...         return self._linear2(self._linear1(x))

        >>> def train():
        ...     # 1. initialize parallel environment
        ...     dist.init_parallel_env()
        ...     # 2. create data parallel layer & optimizer
        ...     layer = LinearNet()
        ...     dp_layer = paddle.DataParallel(layer)
        ...     loss_fn = nn.MSELoss()
        ...     adam = opt.Adam(
        ...         learning_rate=0.001, parameters=dp_layer.parameters())
        ...     # 3. run layer
        ...     inputs = paddle.randn([10, 10], 'float32')
        ...     outputs = dp_layer(inputs)
        ...     labels = paddle.randn([10, 1], 'float32')
        ...     loss = loss_fn(outputs, labels)
        ...     loss.backward()
        ...     adam.step()
        ...     adam.clear_grad()

        >>> if __name__ == '__main__':
        ...     # 1. start by ``paddle.distributed.spawn`` (default)
        ...     dist.spawn(train, nprocs=2)
        ...     # 2. start by ``paddle.distributed.launch``
        ...     # train()

.. note::
    ``PyLayer`` is not supported in DataParallel. To solve problems of this kind,
    it's recommended to skip gradient synchronization among multiple cards by 'no_sync',
    and manually implement 'all_reduce' before model optimization. There is an example
    showing specific implementation processing.

Examples:

    .. code-block:: python
        :name: dp-pylayer-example

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import numpy
        >>> import paddle
        >>> import paddle.distributed as dist
        >>> from paddle.autograd import PyLayer
        >>> from paddle.distributed.fleet.utils.hybrid_parallel_util import fused_allreduce_gradients

        >>> class cus_tanh(PyLayer):
        ...     @staticmethod
        ...     def forward(ctx, x):
        ...         y = paddle.tanh(x)
        ...         ctx.save_for_backward(y)
        ...         return y
        ...     @staticmethod
        ...     def backward(ctx, dy):
        ...         y, = ctx.saved_tensor()
        ...         grad = dy * (1 - paddle.square(y))
        ...         return grad

        >>> class SimpleNet(paddle.nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self.linear = paddle.nn.Linear(2, 2)
        ...     def forward(self, inputs):
        ...         inputs = cus_tanh.apply(inputs)
        ...         return self.linear(inputs)

        >>> if __name__ == '__main__':
        ...     dist.init_parallel_env()
        ...     model = SimpleNet()
        ...     model = paddle.DataParallel(model)
        ...     opt = paddle.optimizer.SGD(learning_rate=0.01, parameters=model.parameters())
        ...     for step in range(10):
        ...         x_data = numpy.random.randn(2, 2).astype(numpy.float32)
        ...         x = paddle.to_tensor(x_data)
        ...         x.stop_gradient = False
        ...         # step 1 : skip gradient synchronization by 'no_sync'
        ...         with model.no_sync():
        ...             y_pred = model(x)
        ...             loss = y_pred.mean()
        ...             loss.backward()
        ...         # step 2 : fuse + allreduce manually before optimization
        ...         fused_allreduce_gradients(list(model.parameters()), None)
        ...         opt.step()
        ...         opt.clear_grad()

boolfind_unused_parametersgrad_need_syncGroup | Noner}   r'   rt   intcomm_buffer_sizelast_comm_buffer_sizec           	     @  > [         T	U ]  UR                  5       S-   5        [        5       (       d   S5       eXl        XPl        SU l        X`l        [        R                  R                  U l        Ub  X l        O[        5       U l        U R                  R                  S:  Ga_  [        R                   c   S5       e[        5       (       a  U R                  c(  ["        R$                  R&                  R)                  5       OU R                  U l        [+        U R                  ["        R$                  R&                  R,                  5      (       d   S5       eU R                  R/                  5        VVs/ s H6  u  pxUR1                  5       (       a  M  [2        R4                  " SU S35      PM8       nn[7        U R                  S	S
9  [9        US-  S-  5      U l        [9        US-  S-  5      U l        U R?                  5         g [2        R4                  " S5        g s  snnf )N_data_parallelzBIt's not supported to construct DataParallel in static graph mode.Tr#   zsParallelContext must be initialized before. You should use init_parallel_env() beforeconstructing the DataParallel.z:ProcessGroup must be an instance of Group in DataParallel.zparam [z<] is not contiguous, please check it and make it contiguous.F)r   i   zThe program will return to single-card operation. Please check 1, whether you use spawn or fleetrun to start the program. 2, Whether it is a multi-card program. 3, Is the current environment multi-card.) super__init__	full_namer    _layersr   r   r}   r   r   r'   rt   	_strategyr6   r0   r$   __parallel_ctx__clz__r-   r.   
collective_get_default_groupri   r   named_parametersis_contiguouswarningswarnr   r   r   r   init_reducer)
selflayersr4   r   r   r   r}   r   r   	__class__s
            r5   r   DataParallel.__init__  s    	))+.>>?   	
P	
  &<#"
** %N=?DN>>  1$"88D 1D
    zz) &&11DDF 
 "JJ 2 2 = = C C  POP  (,||'D'D'F	 (G ..0	HMM!$'cd (G	  %@$'(84(?$(F$GD!
 *-%,t3*D& MME)s   2HHc                   / n[        5       nU R                  5        H  nUR                  SS9 H  u  pEUb  XR;   a  M  UR                  U5        [	        XPR
                  5      (       d&  [        SUR                   SU R
                   S35      eUR                  (       d  Ms  UR                  X545        M     M     [        [        S U VVs/ s H  u  pEUPM	     snn5      5      n[        U5      S:  d   S5       eS	 nU VVs/ s H!  u  p5[        US
S5      (       a  M  U" U5      PM#     nnn[        5       (       a  [        R                   " UUU R"                  U R$                  /5      U l        [        R(                  " U[        [+        U R&                  5      5      UU R,                  R.                  U R"                  U R$                  /U R0                  5      U l        g g s  snnf s  snnf )NF)include_sublayersrx   z' must be ''c                &    [        U SS5      (       + $ )Nrz   F)r   )r<   s    r5   <lambda>+DataParallel.init_reducer.<locals>.<lambda>  s    gaE::r7   r   zWThis model does not have any parameters to train, and does not need to use DataParallelc                    [        U [        R                  R                  R                  R
                  5      (       a  U R                  $ g)NF)ri   r-   nnlayercommon	Embedding_sparse)sublayers    r5   check_layer_sparse5DataParallel.init_reducer.<locals>.check_layer_sparse  s2    (FIIOO$:$:$D$DEE'''r7   rz   )set	sublayersr   addri   rt   r   r   	trainabler?   listfilterr   r   r    r   eager_assign_group_by_sizer   r   group_indicesEagerReducerreversedr}   process_groupr   _reducer)	r   layers_param
params_setr   r   r   trainable_parametersr   is_sparse_gradients	            r5   r   DataParallel.init_reducer  s   U
(H$555N=E$7u%!%88#,UZZLDNNCSSTU  ??? ''(9: O )  $:'34|81|4 
 '(1, 	
0	
,	 $0
#/5)U3 )x(#/ 	 
 !%!@!@$"++T-B-BC"D !--$Xd0012"

((++T-B-BC++DM 1 5$
s   G$G*!G*c                x   [         R                  R                  n[        X5      (       a  U/$ [        U[        [
        45      (       a'  [        R                  " [        U R                  U5      6 $ [        U[        5      (       a5  [        R                  " [        U R                  UR                  5       5      6 $ / $ r+   )r   r   r'   ri   r   tuple	itertoolschainmap_find_tensordictvalues)r   objvar_types      r5   r   DataParallel._find_tensor  s    ::$$c$$5LcD%=))??C(9(93$?@@c4  ??C(9(93::<$HII	r7   c              #  ^   #    U R                   nSU l          Sv   Xl         g! Xl         f = f7f)aQ  
A context manager to stop gradient synchronization. Within no_sync(),
gradients of parameters will only be accumulated on model and not
synchronized util the first forward-backward out of this context.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> import paddle.nn as nn
        >>> import paddle.distributed as dist

        >>> class SimpleNet(nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self._linear = nn.Linear(10, 1)
        ...     def forward(self, x):
        ...         return self._linear(x)

        >>> dist.init_parallel_env()
        >>> model = SimpleNet()
        >>> dp_model = paddle.DataParallel(model)

        >>> inputs_1 = paddle.randn([10, 10], 'float32')
        >>> inputs_2 = paddle.ones([10, 10], 'float32')

        >>> with dp_model.no_sync():
        ...     # gradients will not be synchronized
        ...     dp_model(inputs_1).backward()

        >>> # synchronization happens here
        >>> dp_model(inputs_2).backward()

FN)r   )r   tmp_grad_need_syncs     r5   rz   DataParallel.no_sync  s1     J "00#	5"4"4s   -" -*-c                .   U R                   " U0 UD6nU R                  R                  S:  ah  [        R                  " 5       R
                  (       aD  U R                  (       a3  U R                  R                  [        U R                  U5      5      5        U$ )Nr#   )r   r   r0   r   rX   	_has_gradr   r   prepare_for_backwardr   r   )r   rT   kwargsrU   s       r5   forwardDataParallel.forwardC  si    ,,1&1NN!!A%))+55##MM..tD4E4Eg4N/OPr7   z2.0.0z/This method does not need to be called anymore.)sincereasonc                    U$ )zd
Deprecated method, now ``scale_loss`` is an empty method,
keep this method just for compatibility.
r]   )r   losss     r5   
scale_lossDataParallel.scale_lossM  s	     r7   c                    g)zp
Deprecated method, now ``apply_collective_grads`` is an empty method,
keep this method just for compatibility.
Nr]   r   s    r5   apply_collective_grads#DataParallel.apply_collective_gradsW  s     	r7   c                8    U R                   R                  UUUS9$ )aS  
Get all parameters and persistable buffers of current layer and its sub-layers. And set them into a dict

Parameters:
    destination(dict, optional) : If provide, all the parameters and persistable buffers will be set to this dict . Default: None
    include_sublayers(bool, optional) : If true, also include the parameters and persistable buffers from sublayers. Default: True

Returns:
    dict: a dict contains all the parameters and persistable buffers.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> import paddle.distributed as dist

        >>> dist.init_parallel_env()

        >>> emb = paddle.nn.Embedding(10, 10)
        >>> emb = paddle.DataParallel(emb)

        >>> state_dict = emb.state_dict()
        >>> paddle.save(state_dict, "paddle_dy.pdparams")

)destinationr   structured_name_prefix)r   
state_dict)r   r   r   r   s       r5   r   DataParallel.state_dicta  s*    B ||&&#/#9 ' 
 	
r7   c                6    U R                   R                  XS9  g)a  
Set parameters and persistable buffers from state_dict. All the parameters and buffers will be reset by the tensor in the state_dict

Parameters:
    state_dict(dict) : Dict contains all the parameters and persistable buffers.
    use_structured_name(bool, optional) : If true, use structured name as key, otherwise, use parameter or buffer name as key.
                                          Default: True
Returns:
    None

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> import paddle.distributed as dist

        >>> dist.init_parallel_env()

        >>> emb = paddle.nn.Embedding(10, 10)
        >>> emb = paddle.DataParallel(emb)

        >>> state_dict = emb.state_dict()
        >>> paddle.save(state_dict, "paddle_dy.pdparams")

        >>> para_state_dict = paddle.load("paddle_dy.pdparams")
        >>> emb.set_state_dict(para_state_dict)

)use_structured_nameN)r   set_state_dict)r   r   r   s      r5   r   DataParallel.set_state_dict  s    D 	## 	$ 	
r7   )
r   r   r   r   r   r   r}   r   r   rt   )N   r#   FN)r   r!   r4   zParallelStrategy | Noner   r   r   floatr   r   r}   r   returnNone)r  r  )r  zGenerator[None, None, None])rT   r
   r   r
   r  r'   )NT )r   z_StateDict | Noner   r   r   strr  r)   )T)r   r)   r   r   r  r  )__name__
__module____qualname____firstlineno____doc____annotations__r   r   r   r   rz   r   r"   r   r   r   r   deprecate_stat_dictr   set_dict	load_dict__static_attributes____classcell__)r   s   @r5   r   r      su   Pd ! 
 -1 "'(',"II *I 	I
  %I !%I I 
I IV8t )5 )5V O O *."&&(	%
&%
  %
 !$	%

 
%
N ""BF#
$#
;?#
	#
 ##
L HIr7   r   c                      \ rS rSrSrS r\SS j5       r\SS j5       r\SS j5       r	\SS j5       r
\SS j5       r\SS	 j5       r\SS
 j5       r\SS j5       r\r\r\	rSrg)r/   i  a;  
.. note::
    This API is not recommended, if you need to get rank and world_size,
    it is recommended to use ``paddle.distributed.get_rank()`` and
    ``paddle.distributed.get_world_size()`` .

This class is used to obtain the environment variables required for
the parallel execution of ``paddle.nn.Layer`` in dynamic mode.

The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch``
or ``paddle.distributed.spawn`` .

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> import paddle
        >>> import paddle.distributed as dist

        >>> def train():
        ...     # 1. initialize parallel environment
        ...     dist.init_parallel_env()
        ...     # 2. get current ParallelEnv
        ...     parallel_env = dist.ParallelEnv()
        ...     print("rank: ", parallel_env.rank)
        ...     print("world_size: ", parallel_env.world_size)

        >>> if __name__ == '__main__':
        ...     # 1. start by ``paddle.distributed.spawn`` (default)
        ...     dist.spawn(train, nprocs=2)
        ...     # 2. start by ``paddle.distributed.launch``
        ...     train()

        # Print result in process 1:
        rank: 1
        world_size: 2

        # Print result in process 2:
        rank: 2
        world_size: 2

c                N   [        [        R                  " SS5      5      U l        [        [        R                  " SS5      5      U l        [        5       nU(       a  [        US   5      OSU l        [        [        R                  " SS5      5      U l        U R                  S:w  aJ  S	U R                   S
3n[        R                  " US5      R                  S5      n[        US   5      U l
        O[        R                  " 5       (       a:  [        R                  " SS5      R                  S5      n[        US   5      U l
        OS[        R                  " 5       (       a9  [        R                  " SS5      R                  S5      n[        US   5      U l
        [        SS5      R                  S5      U l        [        R                  " SS5      U l        [        [        R                  " SS5      5      U l        U R"                  S:  d   S5       eU R"                  S:  d   S5       eg )NPADDLE_TRAINER_ID0PADDLE_TRAINERS_NUM1r   r  PADDLE_PG_TIMEOUT1800000FLAGS_selected_s,FLAGS_selected_gpusFLAGS_selected_xpusPADDLE_TRAINER_ENDPOINTSPADDLE_CURRENT_ENDPOINTFLAGS_nccl_nringsz.nccl_nrings must be an integer greater than 0.	   zEnccl_nrings should be less than 9, which is enough in most scenarios.)r   osgetenv_rank_world_sizer   r  _device_type_pg_timeoutr`   
_device_idr   is_compiled_with_cudais_compiled_with_xpur%   _trainer_endpoints_current_endpoint_nrings)r   custom_device_typesFLAGS_selected_custom_devicesselected_custom_devicesselected_gpusselected_xpuss         r5   r   ParallelEnv.__init__  s   #6<=
ryy)>DE8:+>C#A&'B 	 ryy)<iHI "!$"3"3!4A6 * ')ii-s'eCj $ ""9!"<=DO))++ "		*? E K KC P"%mA&6"7**,, "		*? E K KC P"%mA&6"7"2&#

%* 	 "$+Db!I299%8#>?||a 	
<	
 ||a 	
S	
r7   c                    U R                   $ )a  
Rank of current trainer.

Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ID`` . The default value is 0.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # execute this command in terminal: export PADDLE_TRAINER_ID=0
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> print("The rank is %d" % env.rank)
        The rank is 0

)r$  r   s    r5   rankParallelEnv.rank
  s    & zzr7   c                    U R                   $ )a  
The number of trainers (number of processes participating in current job).

Its value is equal to the value of the environment variable ``PADDLE_TRAINERS_NUM`` . The default value is 1.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # execute this command in terminal: export PADDLE_TRAINERS_NUM=4
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> print("The world_size is %d" % env.world_size)
        The world_size is 4

)r%  r   s    r5   
world_sizeParallelEnv.world_size  s    & r7   c                    U R                   $ )a  
The ID of selected GPU card for parallel training.

Its value is equal to the value of the environment variable ``FLAGS_selected_gpus`` . The default value is 0.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # execute this command in terminal: export FLAGS_selected_gpus=1
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> print("The device id are %d" % env.device_id)
        The device id are 1
)r(  r   s    r5   	device_idParallelEnv.device_id4  s    $ r7   c                    U R                   $ )z
The type of custom device for parallel training.

Its value is equal to the value of paddle.device.get_all_custom_device_type() . The default value is None.

)r&  r   s    r5   device_typeParallelEnv.device_typeH  s        r7   c                    U R                   $ )a@  
The endpoint of current trainer, it is in the form of (node IP + port).

Its value is equal to the value of the environment variable ``PADDLE_CURRENT_ENDPOINT`` . The default value is "".

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # execute this command in terminal: export PADDLE_CURRENT_ENDPOINT=127.0.0.1:6170
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> print("The current endpoint are %s" % env.current_endpoint)
        The current endpoint are 127.0.0.1:6170
)r,  r   s    r5   r3   ParallelEnv.current_endpointR  s    $ %%%r7   c                    U R                   $ )a  
The endpoints of all trainer nodes in the task,
which are used to broadcast the NCCL ID when NCCL2 is initialized.

Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ENDPOINTS`` . The default value is "".

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # execute this command in terminal: export PADDLE_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> print("The trainer endpoints are %s" % env.trainer_endpoints)
        The trainer endpoints are ['127.0.0.1:6170', '127.0.0.1:6171']

)r+  r   s    r5   r2   ParallelEnv.trainer_endpointsf  s    ( &&&r7   c                    U R                   $ )a  
Nrings of current trainer.

Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # execute this command in terminal: export FLAGS_nccl_nrings=1
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> print("The nrings is %d" % env.nrings)
        The nrings is 1
)r-  r   s    r5   nringsParallelEnv.nrings|  s    $ ||r7   c                    U R                   $ )a  
timeout of process group.

Its value is equal to the value of the environment variable ``PADDLE_PG_TIMEOUT`` . The default value is 30 minutes.

Examples:
    .. code-block:: python

        >>> # execute this command in terminal: export PADDLE_PG_TIMEOUT=1800000
        >>> import paddle.distributed as dist

        >>> env = dist.ParallelEnv()
        >>> # the pg_timeout of process group 1800000
)r'  r   s    r5   
pg_timeoutParallelEnv.pg_timeout  s      r7   )r,  r(  r&  r-  r'  r$  r+  r%  N)r  r   )r  r  )r  z	list[str])r  r  r  r	  r
  r   propertyr5  r8  r;  r>  r3   r2   rE  rH  r1   r0   dev_idr  r]   r7   r5   r/   r/     s    )V$
L  (    (  & ! ! & && ' '*  &    $ JFFr7   r/   c                 0    [         c
  [        5       q [         $ r+   )_global_parallel_envr/   r]   r7   r5   _get_global_parallel_envrN    s    #*}r7   c                X   SSK Jn  U" [        U 5      US9nUR                  5         SnUR	                  SS5      (       d  UR                  5       (       dF  [        R                  " U5        UR	                  SS5      (       a  M/  UR                  5       (       d  MF  UR                  5         g )Nr   )KVServer)size   runningF)	*paddle.distributed.fleet.utils.http_serverrP  r   startgetshould_stoptimesleepstop)porthttp_server_drQ  rP  http_serverwait_secondss         r5   _start_kv_serverr_    s    C3t940KL


Iu
-
-[5L5L5N5N

<  

Iu
-
-[5L5L5N5Nr7   c                    [        U 5        U S;   a4  [        R                  " 5       (       d   [        R                  " 5       (       d  U S:X  a  gg)N)autoncclbkclheterflagcxxcclFT)r   r   r)  r*  )backends    r5   _is_cpuonlyrh    s>    '>>''))T-F-F-H-H	F	r7   c                @    [        U S 5      nUc  [        SU  S35      eg )Nz:paddle.distributed initialize error, environment variable z is needed, but not set.)r%   
ValueError)var_namers   s     r5   _check_var_existsrl    s8    
8T
*C
{$$,:-EG
 	
 r7   c                     / n [        S/ SQ5      n[        R                  " 5       nUR                  5        HD  nUR	                  U5      nUR                  U5      nXE:X  a  M,  U R                  U" X4U5      5        MF     U $ )NFLAGS)r   current_valuedefault_value)r   r   globalskeysrV  get_defaultr?   )retrn  global_flagskeyvaluerp  s         r5   _get_modified_flagsrx    sr    
Cw JKE<<>L  "  %$005%JJuS78	 #
 Jr7   c                @   [        U 5      S:  a  [        R                  R                  S5        U  HJ  n[        R                  R                  [	        U5      5        [        R                  R                  S5        ML     [        R                  R                  S5        g g )Nr   zH======================= Modified FLAGS detected =======================

zH=======================================================================
)r   sysstderrwriter  )modified_flagsflags     r5   _print_modified_flagsr    st    
>Q

W	
 #DJJSY'JJT" # 	

W	
 r7   c                   [        5       n[        U5        [        5       q[        nUR                  S:  a  [
        R                  " S5        g[        R                  R                  SS5      nUS:X  a/  [        R                  " 5       (       a  S[        R                  S'   Sn[        U5      nU(       dE  [        R                  " 5       (       d+  [        R                  " 5       (       d  US:X  d  [        S	5      eUS:X  a  S
UR                   S3n[!        U5        OmU(       d0  [        R                  " 5       (       a  [!        S5        US:X  a  SOUnO6U(       d/  [        R                  " 5       (       a  [!        S5        US:X  a  SOUn[!        S5        [!        S5        [!        S5        US:X  a,  [        R"                  " UR                  UR$                  5      nOU(       a  [        R&                  " 5       nOu[        R                  " 5       (       a!  [        R(                  " UR$                  5      nO:[        R                  " 5       (       a   [        R*                  " UR$                  5      n[-        W5        SnU[.        ;   GaM  [1        5       (       Ga=  [2        [5        5       ;   a  [5        5       [2           $ [7        U5        [9        [        R:                  " S5      5      n[9        [        R:                  " S5      5      n	US:  a  X:  a  U	S:  d   S5       e[        R:                  " SS5      n
[        R:                  " SS5      nU
(       a  U(       a  SR=                  X/5      OSnUc  [        R:                  " SS5      nUc  [?        S5      RA                  S5      S   nU(       d   S5       eURA                  S5      u  p[9        U5      nUS:H  n[9        [        R:                  " SS5      5      n[        RB                  " 5       n[E        U5        US;   a)  [        RF                  RI                  UR$                  5        SSK%J&n  [O        UUUU	[2        SU" U S 5      S!9n[Q        [S        U	5      5      n[U        USUU[2        S"9n[W        [2        U5        [Y        SU5        [[        Xs5        []        U5        [^        R`                  " S#5        U$ URb                   Vs1 s H  nURA                  S5      S   iM     nn[9        [        R:                  " S$S%5      5      nU(       d  U(       d  US&:X  a  URb                  S   RA                  S5      n[e        5       nURg                  5       nS'US('   URh                  S:X  aZ  S)UR                  0nUS&:X  a  S)[k        U5      0n[m        [n        [9        US   5      UU4S*9nS#Ul8        S#US('   URs                  5         [u        5       n[^        Rv                  " 5       (       a  [
        R                  " S+5        UR                  Ul<        URh                  Ul=        URb                  Ul1        UR|                  Ul>        UR~                  Ul?        U(       a,  [^        R`                  " [        R                  " UU5      5        OUS&:X  a6  [^        R`                  " [        R                  " UUR$                  5      5        O[        R                  " 5       (       a,  [^        R`                  " [        R                  " UU5      5        OE[        R                  " 5       (       a+  [^        R`                  " [        R                  " UU5      5        US&:w  aL  URb                  SS nUR                  UR|                  5        U(       d  URz                  S:X  a  [        U5        [^        R                  " 5         U(       d  US&:X  a'  URh                  S:X  a  S'WS('   WR=                  5         U$ U(       a  [        URb                  S   /5        [        R                  " 5       nURh                  Ul4        UR                  UlH        WS   UlI        [9        US   5      UlJ        S,nS-nUUlK        UUlL        [        R                  " U5      n U R                  5         URh                  S:X  a  S'WS('   WR=                  5         U$ s  snf ).a  

Initialize parallel training environment in dynamic graph mode.

Note:
    Now initialize both `NCCL` and `GLOO` contexts for communication.

Args:
    backend (string): A string represents the backend used by DataParallel,
        should be one of 'gloo'(for cpu), 'nccl'(for cuda), 'bkcl'(for xpu), 'auto'(auto detect).
        The auto detection prefer 'nccl', 'bkcl' than 'gloo'.

Returns:
    None

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:GPU, env:DISTRIBUTED)
        >>> import paddle
        >>> import paddle.nn as nn
        >>> import paddle.optimizer as opt
        >>> import paddle.distributed as dist

        >>> class LinearNet(nn.Layer):
        ...     def __init__(self):
        ...         super().__init__()
        ...         self._linear1 = nn.Linear(10, 10)
        ...         self._linear2 = nn.Linear(10, 1)
        ...     def forward(self, x):
        ...         return self._linear2(self._linear1(x))

        >>> def train():
        ...     # 1. initialize parallel environment
        ...     dist.init_parallel_env()
        ...     # 2. create data parallel layer & optimizer
        ...     layer = LinearNet()
        ...     dp_layer = paddle.DataParallel(layer)
        ...     loss_fn = nn.MSELoss()
        ...     adam = opt.Adam(
        ...         learning_rate=0.001, parameters=dp_layer.parameters())
        ...     # 3. run layer
        ...     inputs = paddle.randn([10, 10], 'float32')
        ...     outputs = dp_layer(inputs)
        ...     labels = paddle.randn([10, 1], 'float32')
        ...     loss = loss_fn(outputs, labels)
        ...     loss.backward()
        ...     adam.step()
        ...     adam.clear_grad()

        >>> if __name__ == '__main__':
        ...     dist.spawn(train)

   zlCurrently not a parallel execution environment, `paddle.distributed.init_parallel_env` will not do anything.NPADDLE_DISTRI_BACKENDra  re  rc  rf  zAIf you want to use CPU-only version, please use 'gloo' as backendr  r  r  rb  r  r  r  r  r   r#   zrank must be non-negative and world_size must be the maximum rank plus one. Moreover, at least two processes are required to create a process group.MASTER_ADDRMASTER_PORT:PADDLE_MASTERr  r  zThe environment variable 'MASTER_ADDR' and 'MASTER_PORT' must be specified, for example 'export MASTER_ADDR=127.0.0.1' and 'export MASTER_ADDR=54612'. Or you can start your trainingwith paddle.distributed.run module.FLAGS_stop_check_timeout900)rb  rf  rc  re  )message2nccl_configdefault)
pg_optionsnccl_config)pgr   TPADDLE_WITH_GLOOr  rd  FrS  _worker)targetargsz.The parallel environment has been initialized.i  i )Orx  r  r/   rM  r8  r   r   r"  environrV  r   r*  rh  r)  NotImplementedErrorr>  rl  CustomPlacer;  CPUPlace	CUDAPlaceXPUPlacer   r   r    r   r   r   r   r#  joinr%   r`   create_or_get_global_tcp_storer   CommContextManagerset_device_id&paddle.distributed.fleet.base.topologyr  r   r   ranger   r   r   r   r   r$   _set_parallel_ctxr2   r   r   r5  r   r   r_  daemonrU  r,   _is_parallel_ctx_initializedr0   r1   r3   rE  GLOOParallelContextHeterParallelContextNCCLParallelContextBKCLParallelContextremover   _init_parallel_ctxGlooParallelStrategyrank_num
ip_addressip_portinit_secondsrun_secondsGlooParallelContextinit)!r  r~  parallel_envrg  is_cpu_onlyr/  placer}   r5  r8  master_addrmaster_port	endpoints	is_masterstop_check_timeoutdefault_storer  r  ranksinode_num	init_gloo	ep_rank_0managerr\  rQ  r]  r4   other_endpointsgloo_strategydefault_init_timeout_secondsdefault_run_timeout_secondsgloos!                                    r5   init_parallel_envr    sj   p )*N.)
 '='L"z	
 	 jjnn4f=G (t88::.4

*+g&K 	%%''$$&&f!O
 	
 &l667q9 	& 	78t99;;34 '6 1fwG!:!:!<!<34 '6 1fwG)*/0+, &  $$l&<&<
 
		#	#	%	%|556		"	"	$	$l445E%%/*;*;"8"::)+,?@@W%299012#89:
qyZ.:> 	
2	
A
 iit4iit4 { HHk/0 	
 		/48I()CDJJ3OI  	
2	
y $-??3#7 +&AI	 +Eu!MN;;==)88##11,2H2HI	
 %+
 U:&'dAu2EF2E:q% u.u))$/)5)G)GH)GAQ)GHHBII0#67Ii7g#5 2215;;C@	)#(i !|667D'!!3x=1!')A,'=K "&K'+M)$  !H3355FG"--HO&++H!-!?!?H , = =H"))HO ))$$Xu5	
 
G	))%%h0F0FG	
 
	#	#	%	%))$$Xu5	
 
	"	"	$	$))$$Xu5	
 '"44Q7x889x22a7o.&&( 	w')|/@/@A/E $)i $ L! 
<99!<=>113)..!-!8!8#,Q<  #IaL 1'+$&-#%A"$?!''6		!',M)$Li Is    cc                    [        5       (       a  U (       a  U R                  $ U b   S5       e[        5       R                  $ )ao  
Returns the rank of current trainer in the given group, ranks are consecutive integers in [0, ``world_size``).
If none of the group is given, the global group will be used as default.

Args:
    group (Group, optional): The communication group you want to get rank of current trainer, use global group as default if group is None.

Returns:
    (int) The rank of current trainer in the given group. Return -1 if the process is not part of the given group.

Warning:
    Argument ``group`` only supports in dygraph mode.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # Execute this script using distributed launch with one card configs.
        >>> import paddle
        >>> import paddle.distributed as dist

        >>> dist.init_parallel_env()
        >>> print("The rank is %d" % dist.get_rank())
        The rank is 0

*Only support group argument in eager mode.)r    r5  rN  r}   s    r5   get_rankr     s8    6 Uzz=FFF=#%***r7   c                    [        5       (       a  U c  [        5       (       a
  [        5       n [        5       (       a  U (       a  U R                  $ U b   S5       e[	        5       R                  $ )ah  
Returns the number of trainers (number of processes participating in current job) in the given group.
If none of the group is given, the global group will be used as default.

Args:
    group (Group, optional): The communication group you want to check world size, use global group as default if group is None.

Returns:
    (int) The number of trainers in the given group. Return -1 if the process if not part of the given group.

Warning:
    Argument ``group`` only supports in dygraph mode.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> # Execute this script using distributed launch with one card configs.
        >>> import paddle
        >>> import paddle.distributed as dist

        >>> dist.init_parallel_env()
        >>> print("The world_size is %d" % dist.get_world_size())
        The world_size is 1

r  )r    r   r   r8  rN  r  s    r5   get_world_sizer  "  sX    6 em%'EU=FFF=#%000r7   )ro   zlist[Tensor]rp   r   r  z-list[list[Tensor | list[Tensor] | list[int]]])Nr   FTF)r   r!   r   r   r   r   r   r   r   r   r   r   r  r  r+   )r  zNCCLConfig | Noner  r   )r}   r   r  r   )W
__future__r   r   r"  r{  rX  r   collectionsr   r   
contextlibr   multiprocessingr   r   typingr	   r
   numpyrA   r-   r   r   paddle.base.corer   paddle.distributed.collectiver   r   r   r   r   r   r   r   r   r   &paddle.distributed.communication.groupr   r   r   5paddle.distributed.fleet.base.private_helper_functionr   %paddle.distributed.fleet.launch_utilsr   paddle.frameworkr   r   imperative_baser   r    paddle.nn.layerr!   paddle.utilsr"   r  r$   
backup_envr%   collections.abcr&   r'   paddle.base.libpaddler(   paddle.nn.layer.layersr)   __all__r,   r6   rL   dygraph_onlyr[   rg   no_gradrv   r   r   rM  r/   rN  r_  rh  rl  rx  r  r  r  r  r]   r7   r5   <module>r     s   #  	 
   / % ,
   + 7   
 @  " #  ()01
(( 
)&   . .& 
)
)$')2)  )2 
  $#%*??? ? 	?
 ? #? 
?  ?DB5 BN  m m` 	
	

Up+D#1r7   