
    Αi                      ~    S r SSKrSSKJr  SSKJr  / r " S S5      r " S S\5      r " S	 S
5      r	 " S S5      r
g)z
Communicator is used for async distribute training in distribute_transpiler mode.
It's a wrapper of a cpp class Communicator and should be used inside fleet API.
    N)DistributedMode)corec                   t    \ rS rSrSS jr SS jr   SS jrS rS rS r	S	 r
S
 rS rS rS rSS jrSrg)Communicator)   Nc                    Uc  Uc  0 nOoU[         R                  :X  a  SR                  US   5      US'   [        US   5      US'   [        US   5      US'   [        US   5      US'   [        US   5      US'   SnU[         R                  :X  a  SnODU[         R                  :X  a  S	nO-U[         R
                  :X  a  S
nOU[         R                  :X  a  SnX@l        X0l        SU l	        SU l
        SU l        g)aD  
Communicator is used for async distribute training in distribute_transpiler mode.
It's a wrapper of a cpp class Communicator and should be used inside fleet API.

Args:
    program(Program): the trainers program after transpile of distribute_transpiler.
    It's used by communicator to extract the information to do communication.

Returns:
    None

Examples:
    .. code-block:: python

        >>> import paddle

        >>> prog = paddle.static.Program()
        >>> comm = paddle.distributed.communicator.Communicator(prog)
        >>> comm.start()
        >>> comm.stop()
N,pserver_endpointstrainers
trainer_idneed_global_stepbarrier_table_idSYNCASYNC
HALF_ASYNCGEO)r   r   joinstrr   r   r   modeenvscommunicator_	send_ctx_	recv_ctx_)selfr   kwargsr   mode_strs        _/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/communicator.py__init__Communicator.__init__*   s   0 >|+++,/HH./-()  #6*#56D!$VL%9!:D'*62D+E'FD#$'*62D+E'FD#$?'''H_***H_///#H_(((H		!    c           	          Uc  [         R                  R                  5       n[        R                  " U R
                  UUUUUU R                  5      U l        Xl        X l	        g N)
paddlestaticglobal_scoper   DistCommunicatorr   r   r   r   r   )r   send_ctxrecv_ctx	proto_txtunit64_hostsscopes         r   init_with_ctxCommunicator.init_with_ctxa   sW     =MM..0E!22IIII
 "!r    c                 <    U R                   R                  XU5        g r"   )r   "create_client_to_client_connection)r   pserver_timeout_mspserver_connect_timeout_ms	max_retrys       r   r/   /Communicator.create_client_to_client_connectionr   s     	==I	
r    c                 6    U R                   R                  5       $ r"   )r   get_client_infor   s    r   r5   Communicator.get_client_info|   s    !!1133r    c                 :    U R                   R                  U5        g r"   )r   set_clients)r   	host_lists     r   r9   Communicator.set_clients   s    &&y1r    c                 j    U R                   c  [        S5        gU R                   R                  5         g)a1  
Start communicator. Should call before training process.

Returns:
    None

Examples:
    .. code-block:: python

        >>> import paddle

        >>> prog = paddle.static.Program()
        >>> comm = paddle.distributed.communicator.Communicator(prog)
        >>> comm.start()
        >>> comm.stop()
Nz;you must call init_with_ctx first to init comm before start)r   printstartr6   s    r   r>   Communicator.start   s.    " %OP  "r    c                 j    U R                   c  [        S5        gU R                   R                  5         g)a/  
Stop communicator. Should call after training process.

Returns:
    None

Examples:
    .. code-block:: python

        >>> import paddle

        >>> prog = paddle.static.Program()
        >>> comm = paddle.distributed.communicator.Communicator(prog)
        >>> comm.start()
        >>> comm.stop()
N:you must call init_with_ctx first to init comm before stop)r   r=   stopr6   s    r   rB   Communicator.stop   s.    " %NO!r    c                 j    U R                   c  [        S5        gU R                   R                  5         g)a
  
Get communicator is running or stop.

Returns:
    bool

Examples:
    .. code-block:: python

        >>> import paddle

        >>> prog = paddle.static.Program()
        >>> comm = paddle.distributed.communicator.Communicator(prog)
        >>> comm.is_running()
NrA   )r   r=   
is_runningr6   s    r   rE   Communicator.is_running   s.      %NO%%'r    c                 8    U R                   R                  5         g r"   )r   recvr6   s    r   rH   Communicator.recv       !r    c                 :    U R                   R                  U5        g r"   )r   init_paramsr   contexts     r   rL   Communicator.init_params   s    &&w/r    c                 :    U R                   R                  U5        g r"   )r   
pull_denserM   s     r   rQ   Communicator.pull_dense   s    %%g.r    c                 `   Uc  [         R                  R                  5       nU R                  5       (       d  [	        S5      e[        U[        5      (       d   e[        U[        5      (       d   eUS:X  a  U R                  U   R                  5       nU R                  R                  XU5        g )NzTCommunicator should init first. Using fleet.init_worker() before push_sparse_param())r#   r$   r%   rE   
ValueError
isinstancer   intr   table_idr   push_sparse_param)r   var_namerX   r+   s       r   rY   Communicator.push_sparse_param   s    =MM..0E  f  (C(((((C((((r>~~h/88:H,,XGr    )r   r   r   r   r   )NNr"   )i  i'     )rT   N)__name__
__module____qualname____firstlineno__r   r,   r/   r5   r9   r>   rB   rE   rH   rL   rQ   rY   __static_attributes__ r    r   r   r   )   sR    5p BF"& "#(	
42#,",(*"0/Hr    r   c                   >   ^  \ rS rSrSU 4S jjrS rS rS rSrU =r	$ )FLCommunicator   c                 f   > S n[         TU ]  X25        0 n0 nSnSU l        U R                  XEXa5        g )N WITH_COORDINATOR)superr   r   r,   )r   ps_hostsr   r   r'   	dense_mapprototxt	__class__s          r   r   FLCommunicator.__init__   s;    &	&	8Cr    c                 V    U R                   b  U R                   R                  X5        g g r"   )r   start_coordinator)r   self_endpointtrainer_endpointss      r   rp    FLCommunicator.start_coordinator   s*    )00 *r    c                 j    U R                   b  U R                   R                  U5        g [        S5      e)Nzself.communicator_ is null)r   save_fl_strategyrU   )r   mps     r   ru   FLCommunicator.save_fl_strategy   s.    )//39::r    c                 X    0 nU R                   b  U R                   R                  5       nU$ r"   )r   query_fl_clients_info)r   info_mps     r   ry   $FLCommunicator.query_fl_clients_info   s,    )((>>@Gr    )r   r"   )
r]   r^   r_   r`   r   rp   ru   ry   ra   __classcell__)rm   s   @r   rd   rd      s    D; r    rd   c                   ,    \ rS rSrS rS rS rS rSrg)LargeScaleKV   c                 8    [         R                  " 5       U l        g r"   )r   r~   scale_kvr6   s    r   r   LargeScaleKV.__init__   s    ))+r    c                 :    U R                   R                  X5        g r"   )r   saver   varnamedirnames      r   r   LargeScaleKV.save       7,r    c                 :    U R                   R                  X5        g r"   )r   loadr   s      r   r   LargeScaleKV.load   r   r    c                 8    U R                   R                  U5      $ r"   )r   size)r   r   s     r   r   LargeScaleKV.size  s    }}!!'**r    )r   N)	r]   r^   r_   r`   r   r   r   r   ra   rb   r    r   r~   r~      s    ,--+r    r~   c                        \ rS rSrS rS rSrg)HeterClienti  c                 <    [         R                  " XU5      U l        g r"   )r   r   heter_client_)r   endpointprevious_endpointr   s       r   r   HeterClient.__init__  s    !--
r    c                 8    U R                   R                  5         g r"   )r   rB   r6   s    r   rB   HeterClient.stop  rJ   r    )r   N)r]   r^   r_   r`   r   rB   ra   rb   r    r   r   r     s    

"r    r   )__doc__r#   "paddle.distributed.ps.utils.publicr   paddle.frameworkr   __all__r   rd   r~   r   rb   r    r   <module>r      sI   :
  > !
nH nHb\ :+ +" "r    