
    ϑiZ                        S r SSKrSSKrSSKJrJr  SSKJr  / r " S S5      r	 " S S5      r
 " S	 S
5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      rg)zDefinition of Role Makers.    N)ManagerProcess)basec                        \ rS rSrSrSrSrSrg)Role             N)__name__
__module____qualname____firstlineno__WORKERSERVERXPU__static_attributes__r       l/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/incubate/distributed/fleet/role_maker.pyr   r      s    FF
Cr   r   c                   0    \ rS rSrSrS rS rS rS rSr	g)	MockBarrier   zs
MockBarrier is a empty implementation for barrier
mock as a real barrier for never-barrier in a specific scenario
c                     g)z
dummy barrier, do nothing
Nr   selfs    r   barrierMockBarrier.barrier%        	r   c                     g)z
dummy all barrier, do nothing
Nr   r   s    r   barrier_allMockBarrier.barrier_all+   r   r   c                     U$ )zG
dummy all reduce, do nothing
Args:
    obj(any): obj to do all reduce
r   r   objs     r   
all_reduceMockBarrier.all_reduce1   s	     
r   c                     U/$ )zG
dummy all gather, do nothing
Args:
    obj(any): obj to do all gather
r   r$   s     r   
all_gatherMockBarrier.all_gather9   s     ur   r   N)
r   r   r   r   __doc__r   r!   r&   r)   r   r   r   r   r   r      s    
r   r   c                   v    \ rS rSrSrS rS rS rS rS r	S r
S	 rS
 rS rS rS rS rSS jrS rS rSrg)RoleMakerBaseB   z
RoleMakerBase is a base class for assigning a role to current process
in distributed training.
A paddle developer can implement RoleMakerBase to design a role maker
for worker or pserver assignment.
c                 J    / U l         / U l        SU l        S U l        SU l        g )NF)_worker_endpoints_server_endpoints_role_is_generated_role_current_idr   s    r   __init__RoleMakerBase.__init__J   s)    !#!#"'
r   c                     [        S5      e)z'
return is_worker() of current process
+Please implement this method in child classNotImplementedErrorr   s    r   	is_workerRoleMakerBase.is_workerQ        ""OPPr   c                     [        S5      e)z'
return is_server() of current process
r9   r:   r   s    r   	is_serverRoleMakerBase.is_serverW   r>   r   c                     [        S5      e)z
Check whether the node is the first instance of worker.
Returns:
    bool: True if this is the first node of worker,
          False if not.
r9   r:   r   s    r   is_first_workerRoleMakerBase.is_first_worker]        ""OPPr   c                     [        S5      e)zC
Get current total worker number.

Returns:
    int: worker number
r9   r:   r   s    r   
worker_numRoleMakerBase.worker_numf   rE   r   c                 l    U R                  5       (       a  U R                  5       $ U R                  5       $ N)r<   worker_indexserver_indexr   s    r   role_idRoleMakerBase.role_ido   s+    &*nn&6&6t  "OD<M<M<OOr   c                     [        S5      e)z3
Get current worker id.

Returns:
    int: node id
r9   r:   r   s    r   rK   RoleMakerBase.worker_indexr   rE   r   c                     [        S5      e)z3
Get current server id.

Returns:
    int: node id
r9   r:   r   s    r   rL   RoleMakerBase.server_index{   rE   r   c                     U R                   $ )z
return trainer endpoints
)r1   r   s    r   get_trainer_endpoints#RoleMakerBase.get_trainer_endpoints        %%%r   c                     U R                   $ )z
return pserver endpoints
)r2   r   s    r   get_pserver_endpoints#RoleMakerBase.get_pserver_endpoints   rV   r   c                 n    SU R                    SU R                   SU R                   SU R                   3$ )Nzrole: z, current_id: z, worker_endpoints: z, server_endpoints: )r4   r5   r1   r2   r   s    r   	to_stringRoleMakerBase.to_string   sX    

|>$2B2B1CCWX\XnXnWo  pD  EI  E[  E[  D\  ]  	]r   c                     [        S5        g)y
all gather between trainers and pservers

Args:
    input(int|float): input value

Returns:
    return a list of values
z0warning: RoleMakerBase does not have all gather.Nprintr   inputs     r   r)   RoleMakerBase.all_gather   s     	@Ar   c                     [        S5        g)
all reduce between trainers if current role is TRAINER,
only support array of one dim.

Args:
    input(list/numpy.array): array of one dim
    output(list/numpy.array): array of one dim
    mode(str): "sum" or "min" or "max"
z7warning: RoleMakerBase does not have all reduce worker.Nr_   r   rb   outputmodes       r   all_reduce_workerRoleMakerBase.all_reduce_worker   s     	GHr   c                     [        S5        g)5
barrier between trainers if current role is TRAINER
z4warning: RoleMakerBase does not have barrier worker.Nr_   r   s    r   barrier_workerRoleMakerBase.barrier_worker   s     	DEr   c                     [        S5        g)5
barrier between trainers if current role is PSERVER
z1warning: RoleMakerBase does not have barrier all.Nr_   r   s    r   r!   RoleMakerBase.barrier_all   s     	ABr   )r5   r4   r3   r2   r1   Nsum)r   r   r   r   r+   r6   r<   r@   rC   rG   rM   rK   rL   rT   rX   r[   r)   ri   rm   r!   r   r   r   r   r-   r-   B   s^    QQQQPQQ&&]
B
IFCr   r-   c                   b   ^  \ rS rSrSrU 4S jrS rS rS rS r	S r
S	 rS
 rS rS rSrU =r$ )MPIRoleMaker   z
MPIRoleMaker is a MPI-API based role maker which is a counter-part of K8SRoleMaker
mpi4py will be used if a developer inherits MPIRoleMaker
c                    > [         TU ]  5         SSKJn  Xl        UR                  U l        SU l        SU l        SU l        g)Init.r   )MPIN)	superr6   mpi4pyry   
COMM_WORLD_comm_node_type_comm_ips_ip)r   ry   	__class__s     r   r6   MPIRoleMaker.__init__   s7    ^^
#	r   c                 X    U R                   R                  5       U l        U R                  $ )zReturn rank.)r}   Get_rank_rankr   s    r   	_get_rankMPIRoleMaker._get_rank        ZZ((*
zzr   c                 X    U R                   R                  5       U l        U R                  $ )zReturn size.)r}   Get_size_sizer   s    r   	_get_sizeMPIRoleMaker._get_size   r   r   c                 X    U R                  5         U R                  R                  U5      $ )z4
all_gather(obj) will call MPI's allgather function
)_barrier_allr}   	allgatherr$   s     r   _all_gatherMPIRoleMaker._all_gather   s%     	zz##C((r   c                     U R                  5       (       a5  U R                  R                  5         U R                  R                  U5      $ g)z7
worker_gather(obj) will call MPI's allgather function
N)r<   r~   r   r   r$   s     r   _worker_gatherMPIRoleMaker._worker_gather   s=     >>  ((*''11#66r   c                 8    U R                   R                  5         g)z4
barrier_all() will call MPI's barrier_all function
N)r}   r   r   s    r   r   MPIRoleMaker._barrier_all   s     	

r   c                 8    U R                   R                  5         g)z$
finalize the current MPI instance.
N)ry   Finalizer   s    r   	_finalizeMPIRoleMaker._finalize   s     	r   c                     U R                   (       d.  U R                  R                  U R                  5       5      U l         U R                   $ )z+
collect current distributed job's ip list
)r   r}   r   get_local_ipr   s    r   _get_ipsMPIRoleMaker._get_ips   s4     yy

,,T->->-@ADIyyr   c                 j    SSK nUR                  UR                  5       5      U l        U R                  $ )zReturn get local ip.r   N)socketgethostbynamegethostnamer   )r   r   s     r   r   MPIRoleMaker.get_local_ip   s)    ''(:(:(<=xxr   c                     [        S5      e)zE
generate_role() should be called to identify current process's role
r9   r:   r   s    r   generate_roleMPIRoleMaker.generate_role   r>   r   )ry   r}   r   r   r~   r   r   )r   r   r   r   r+   r6   r   r   r   r   r   r   r   r   r   r   __classcell__r   s   @r   ru   ru      sA    
	

)Q Qr   ru   c                      ^  \ rS rSrSrU 4S jrS rS rSS jrS r	S r
S	 rS
 rS rS rS rS rS rS rS rSS jrS rS rS rSrU =r$ )MPISymmetricRoleMakeri  z
MPISymmetricRoleMaker is designed for worker and server assignment
under MPI. Typically, a worker and a server node will be appointed
on each physical node. This role maker can be only used under MPI.
c                 L   > [         TU ]  5         SU l        SU l        SU l        g)rx   Nr
   r   )rz   r6   
_node_type_proc_per_node_pserver_rand_port)r   r   s    r   r6   MPISymmetricRoleMaker.__init__  s%    "#r   c                 <    U R                   (       d  [        S5      eg)z&Check whether role has been generated.z&generate_role() should be called firstT)r3   	NameErrorr   s    r   _check_role_generation,MPISymmetricRoleMaker._check_role_generation  s    &&DEEr   c                 f    U R                   (       d  U R                  5         U R                  U5      $ r^   )r3   r   r   ra   s     r   r)    MPISymmetricRoleMaker.all_gather  s*     && &&r   c                     U R                   (       d  U R                  5         U R                  5       (       d  [        S5        gU R	                  XU5        g)re   z8warning: current role is not worker in all_reduce_workerN)r3   r   r<   r`   _all_reducerf   s       r   ri   'MPISymmetricRoleMaker.all_reduce_worker'  sB     && ~~LM-r   c                     U R                   (       d  U R                  5         U R                  5       (       a  U R                  R	                  5         g[        S5        g)rl   z5warning: current role is not worker in barrier_workerN)r3   r   r<   r~   r   r`   r   s    r   rm   $MPISymmetricRoleMaker.barrier_worker8  sA     && >>  ((*IJr   c                 z    U R                   (       d  U R                  5         U R                  R                  5         grp   N)r3   r   r}   r   r   s    r   r!   !MPISymmetricRoleMaker.barrier_allC  s)     && 

r   c                     U R                  5       (       a*  U R                  5       =(       a    SU R                  5       :H  $ g)zK
return whether current process is the first worker assigned by role maker
r   F)r   r<   rK   r   s    r   rC   %MPISymmetricRoleMaker.is_first_workerK  s6     &&((>>#@T->->-@(@@r   c                    U R                   S::  a:  SSKnUR                  U R                  5       5        UR	                  SS5      U l         U R
                   Vs/ s H  nUS-   [        U R                   5      -   PM      nnU$ s  snf )zG
get pserver endpoints
Returns:
    endpoints(list): pserver endpoints
r   Nia  i   :)r   randomseed_server_numrandintr2   str)r   r   x	endpointss       r   rX   +MPISymmetricRoleMaker.get_pserver_endpointsS  s     ""a'KK((*+ '-nnUE&BD# ++
+ Gc$1122+ 	 
 	
s   %Bc                 "    U R                  5       $ rJ   _worker_numr   s    r   rG    MPISymmetricRoleMaker.worker_numg  s    !!r   c                 L    U R                  5       (       a  U R                  S:H  $ g)zA
return whether current process is worker assigned by role maker
r	   Fr   r   r   s    r   r<   MPISymmetricRoleMaker.is_workerj  %     &&((??a''r   c                 L    U R                  5       (       a  U R                  S:H  $ g)zA
return whether current process is server assigned by role maker
r   Fr   r   s    r   r@   MPISymmetricRoleMaker.is_serverr  r   r   c                 z    U R                  5       (       a&  [        U R                  5       U R                  -  5      $ g)%
return the current number of worker
r   )r   intr   r   r   s    r   r   !MPISymmetricRoleMaker._worker_numz  s3     &&((t~~'$*=*==>>r   c                     U R                  5       (       a&  [        U R                  5       U R                  -  5      $ U R	                  5         [        U R                  5       U R                  -  5      $ z%
return the current number of server
)r   r   r   r   r   r   s    r   r   !MPISymmetricRoleMaker._server_num  sY     &&((t~~'$*=*==>> t~~'$*=*==>>r   c                     U R                  5       (       a"  [        U R                  U R                  -  5      $ U R	                  5         [        U R                  5       S-  5      $ )z
return the index of worker
r
   r   r   r   r   r   r   r   s    r   rK   "MPISymmetricRoleMaker.worker_index  sP     &&((tzzD$7$7788 t~~'!+,,r   c                     U R                  5       (       a"  [        U R                  U R                  -  5      $ U R	                  5         [        U R                  5       U R                  -  5      $ )z
return the index of server
r   r   s    r   rL   "MPISymmetricRoleMaker.server_index  sV     &&((tzzD$7$7788 t~~'$*=*==>>r   c                 D   U R                   (       d  U R                  5         US:X  a  U R                  R                  nOHUS:X  a  U R                  R                  nO+US:X  a  U R                  R
                  nO[        SU 35      eU R                  R                  XUS9  g)re   rs   maxminzunknown mode: )opN)	r3   r   ry   SUMMAXMIN
ValueErrorr~   	Allreducerf   s       r   r   !MPISymmetricRoleMaker._all_reduce  s     && 5=88<<DU]88<<DU]88<<D~dV455&&u&>r   c                     U R                  5       (       a1  U R                  5       (       a  U R                  R                  5         gg[	        S5      e)0
barrier all workers in current distributed job
&You should check role generation firstN)r   r<   r~   r   	Exceptionr   s    r   _barrier_worker%MPISymmetricRoleMaker._barrier_worker  E     &&((~~$$,,.   DEEr   c                     U R                  5       (       a1  U R                  5       (       a  U R                  R                  5         gg[	        S5      e)0
barrier all servers in current distributed job
r   N)r   r@   r~   r   r   r   s    r   _barrier_server%MPISymmetricRoleMaker._barrier_server  r   r   c                 p   U R                   (       d  U R                  5       SSS2   U l        U R                  5       SSS2   U l        SU R	                  5       U R
                  -  S-  :X  a  SU l        OSU l        U R                  R                  U R                  5      U l	        SU l         g[        S5      e)z#
generate currently process's role
r	   Nr
   r   Tr   )r3   r   r1   r2   r   r   r   r}   Splitr~   r   r   s    r   r   #MPISymmetricRoleMaker.generate_role  s     &&%)]]_QTT%:D"%)]]_SqS%9D"DNN$t':'::Q>>"#"##'::#3#3DOO#DD &*D#DEEr   )r   r~   r   r   r3   r2   r1   rr   )r   r   r   r   r+   r6   r   r)   ri   rm   r!   rC   rX   rG   r<   r@   r   r   rK   rL   r   r   r   r   r   r   r   s   @r   r   r     sq    $'."	K("?-??,FFF Fr   r   c                   `   ^  \ rS rSrSrSU 4S jjrS rS rS rS r	S r
S	 rS
 rS rSrU =r$ )PaddleCloudRoleMakeri  z:
role maker for paddle cloud,
base class is RoleMakerBase
c                 <   > [         TU ]  5         SU l        Xl        g )NF)rz   r6   r3   _is_collective)r   is_collectiver   s     r   r6   PaddleCloudRoleMaker.__init__  s    "'+r   c                 x   U R                   (       Gd  U R                  (       Gd   [        R                  S   R	                  S5      n[        [        R                  S   5      n[        R                  S   nUS;  a  [        S5      eUS:X  a-  [        R                  n[        [        R                  S   5      nOkUS	:X  aZ  [        R                  n[        R                  S
   n[        R                  S   nSR                  Xg/5      nUR                  U5      nO[        S5      eX l        Xl        X@l        XPl        O[        [        R                   " SS5      5      U l        [        R                   " SS5      U l        U R"                  S:X  d   e[        R                   " S5      U l        [        R                   " S5      U l        U R$                  c   S5       eU R$                  R	                  S5      U l        [)        U R$                  5      U l        SU l         gg! [         a  n	[        S5      eSn	A	ff = f)zGenerate role.PADDLE_PSERVERS_IP_PORT_LIST,PADDLE_TRAINERS_NUMTRAINING_ROLETRAINERPSERVER(TRAINING_ROLE must be PSERVER or TRAINERr  PADDLE_TRAINER_IDr  POD_IPPADDLE_PORTr   z:something wrong with PaddleCloud, please check environmentN0PADDLE_TRAINING_ROLEPADDLE_TRAINER_ENDPOINTSPADDLE_CURRENT_ENDPOINTz#can't find PADDLE_TRAINER_ENDPOINTST)r3   r   osenvironsplitr   r   r   r   r   joinindex_trainers_numr2   r4   r5   getenv_training_roler1   _current_endpointlen)
r   eplisttrainers_numtraining_rolerole
current_idcur_ip	curr_portcurr_endpointves
             r   r   "PaddleCloudRoleMaker.generate_role  s   &&&&&&"  ZZ(FGMMF $'rzz2G'H#IL$&JJ$?M$,BB(F  %	1#{{%(4G)H%I
&)3#{{!#H!5$&JJ}$=	(+&1D(E%+\\-%@
(F  &2")/&!
#- #&ryy1Dc'J#K &(ii*I'# **i777)+3M)N&)+3L)M&--9 99 *.)?)?)E)Ec)J&%()?)?%@"&*D#q 'B " $T s%   BH ;AH H 
H9)H44H9c                 \    U R                   (       d  U R                  5         U R                  $ rJ   r3   r   r2   r   s    r   rX   *PaddleCloudRoleMaker.get_pserver_endpoints$  s#    && %%%r   c                 ~    U R                   (       d  U R                  5         U R                  [        R                  :H  $ rJ   r3   r   r4   r   r   r   s    r   r<   PaddleCloudRoleMaker.is_worker)  *    && zzT[[((r   c                 ~    U R                   (       d  U R                  5         U R                  [        R                  :H  $ rJ   r3   r   r4   r   r   r   s    r   r@   PaddleCloudRoleMaker.is_server.  r*  r   c                     U R                   (       d  U R                  5         U R                  [        R                  :H  =(       a    U R
                  S:H  $ Nr   r3   r   r4   r   r   r5   r   s    r   rC   $PaddleCloudRoleMaker.is_first_worker3  s:    && zzT[[(BT-=-=-BBr   c                 \    U R                   (       d  U R                  5         U R                  $ rJ   r3   r   r5   r   s    r   rK   !PaddleCloudRoleMaker.worker_index8  #    && r   c                 \    U R                   (       d  U R                  5         U R                  $ rJ   r3  r   s    r   rL   !PaddleCloudRoleMaker.server_index=  r5  r   c                 \    U R                   (       d  U R                  5         U R                  $ rJ   r3   r   r  r   s    r   rG   PaddleCloudRoleMaker.worker_numB  s#    && !!!r   )	r  r5   r   r4   r3   r2   r  r  r1   )F)r   r   r   r   r+   r6   r   rX   r<   r@   rC   rK   rL   rG   r   r   r   s   @r   r   r     s<    
,
:+x&
)
)
C
 
 
" "r   r   c                      ^  \ rS rSrSrU 4S jrS rS rS#S jrS r	S r
S	 rS
 rS rS rS rS rS rS rS rS rS rS rS rS rS rS rS#S jrS rS rS rS rS r S r!S  r"S! r#S"r$U =r%$ )$GeneralRoleMakeriH  C  
This role maker is for general use, you can set os.environ to customize:
    PADDLE_PSERVERS_IP_PORT_LIST : all pservers' ip:port, separated by ','
    PADDLE_TRAINER_ENDPOINTS     : all trainers' ip:port, separated by ','
    TRAINING_ROLE                : TRAINER or PSERVER
    PADDLE_TRAINER_ID            : current trainer id (only for trainer),
                                   it is index in PADDLE_TRAINER_ENDPOINTS
    PADDLE_PSERVER_ID            : current pserver id (only for pserver)
                                   it is index in PADDLE_PSERVERS_IP_PORT_LIST
c                 *  > [         TU ]  5         SU l        UR                  SS5      U l        UR                  SS5      U l        UR                  SS5      R                  S5      U l        UR                  SS5      U l        UR                  S	S
5      U l	        UR                  SS5      U l
        UR                  SS5      nUR                  SS5      U l        / U l        S U l        US:w  aS  UR                  S5      U l        [        5       U l        U R                   R#                  5       U l        SU R$                  S'   U R'                  5       U l        U R(                  S:X  a  SOU R(                  U l        [*        R,                  " SS5      U l        g )NF	hdfs_name hdfs_ugipath/init_timeout_seconds  run_timeout_seconds 
use_metrichttp_ip_port
use_ps_gpur   runninglo
SYS_JOB_ID)rz   r6   r3   get
_hdfs_name	_hdfs_ugirstrip
_hdfs_path_init_timeout_seconds_run_timeout_seconds_use_metric_use_ps_gpu_http_ip_port_http_serverr  r   _managerdict_http_server_d$_GeneralRoleMaker__get_default_iface_ifacer  r  _prefix)r   kwargsip_portr   s      r   r6   GeneralRoleMaker.__init__T  sE   "' **["5J3 **VR077<%+ZZ0F%M"$*JJ/Dg$N!!::lE:**^R0!::lE: b=!(s!3D#IDM"&--"4"4"6D-2D	*..0 KK4/bT[[yyr2r   c                    U R                   (       Gdi  [        R                  S   R                  S5      n[        R                  S   n[        R                  S   R                  S5      n[	        U5      nUS;  a  [        S5      eSU l        S[        R                  ;   a!  [        [        R                  S   5      U l        US	:X  Ga  [        R                  n[        [        R                  S
   5      nUS:X  a  [	        U R                  5      S:w  a  [	        U5      [	        U5      [	        U5      [	        U5      -   S.n[        U R                  U R                  U4S9U l        SU R                  l        SU R                  S'   U R                  R!                  5         SU l        X6   U l        U R                  (       Ga  [&        R(                  R+                  5       nUR-                  U5        UR/                  [	        U5      5        UR1                  U R2                  5        UR5                  U R6                  5        UR9                  U R:                  U R<                  5        [	        U R                  5      S:w  a7  UR?                  U R                  S   [        U R                  S   5      S5        O4URA                  U RB                  S-   U RD                  U RF                  5        URI                  5         Xl%        U RL                  (       d  U RN                  (       a  [&        R(                  RQ                  5       n	Xil)        [	        U5      U	l*        U R                  S   U	l+        [        U R                  S   5      U	l,        Sn
SnU
U	l-        Xl.        [&        R(                  R_                  U	5      nURI                  5         GO[a        5       U l1        GOUS:X  Ga  [        Rd                  n[        R                  Rg                  S5      b!  [        [        R                  S   5      nX   nOI[        R                  S   n[        R                  S   nSRi                  X/5      nURk                  U5      nSU l        Xl        [&        R(                  R+                  5       nUR-                  U5        UR/                  [	        U5      5        UR1                  U R2                  5        UR5                  U R6                  5        UR9                  U R:                  U R<                  5        [	        U R                  5      S:w  a7  UR?                  U R                  S   [        U R                  S   5      S5        O4URA                  U RB                  S-   U RD                  U RF                  5        URI                  5         Xl%        [&        R(                  R+                  5       nX1-   nUR-                  URk                  U R$                  5      5        UR/                  [	        U5      5        UR1                  U R2                  5        UR5                  U R6                  5        UR9                  U R:                  U R<                  5        [	        U R                  5      S:w  a7  UR?                  U R                  S   [        U R                  S   5      S5        O4URA                  U RB                  S-   U RD                  U RF                  5        URI                  5         Xl1        X@l6        Xl7        WU l8        WU l9        URk                  U R$                  5      U l:        [	        U5      U l;        X0l<        SU l         gg)&
generate role for general role maker
r  r  r  r  r  r  r	   PADDLE_IS_BARRIER_ALL_ROLEr  r	  r   )trainerpserverall)targetargsTrK  re  /trainerrE  rG  r  PADDLE_PSERVER_IDNr
  r  r   rf  /pserverrg  /all)=r3   r  r  r  r  r   _is_barrier_allr   r   r   rW  r   "_GeneralRoleMaker__start_kv_serverr[  rX  daemonstartr   _cur_endpointr   coreGlooset_rankset_size
set_prefixr^  	set_ifacer]  set_timeout_secondsrS  rT  set_http_storeset_hdfs_storerR  rO  rP  initr~   rV  rU  GlooParallelStrategyrankrank_num
ip_addressr`  init_secondsrun_secondsGlooParallelContextr   	_all_commr   rN  r  r  r  r2   r4   r5   r   r   r1   )r   r  r  worker_endpointsr  r  r  size_dglooGloo_strategyDefault_init_timeout_secondsDefault_run_timeout_secondsrt  cur_endpointr  cur_portall_lists                    r   r   GeneralRoleMaker.generate_roleo  s    &&&ZZ >?EEcJFJJ7M!zz*DEKKCP/0L$:: !KLL#$D +rzz9'*JJ;<($ 	){{ ,?!@A
?s4+=+='>!'C#&'7#8#&v;"#34s6{BF )0#55"116:)D% 04D%%,59D''	2%%++-"#%5%A"'''99>>+DMM*-MM#&6"78OODLL1NN4;;/,,22D4M4M 4--.!3++ ..q1 2 21 56% ++ OOj8 OO NN
 IIK+/(''4+;+;(,		(F(F(H-7*145E1F.373E3Ea3H003D4F4Fq4I0J-7;46=38 &2 5P1#yy<<]K		%0]DN)+{{::>>"56B!$RZZ0C%D!EJ#)#5L  ZZ1F!zz-8H#&88V,>#?L!'l!;J"#%1"yy~~'j)c&k*-t{{+((..0I0I t))*a/''**1-D..q12! ''*4
 		'+$99>>#D'0HMM(..););<=MM#h-(OODLL)NN4;;'$$**D,E,E 4%%&!+##&&q)3t/A/A!/D+Eu ##OOf,doot~~ IIK!N!-%+"DJ)D!(:(:;DJXDJ%5"&*D#M 'r   c                 $    U R                  U5      $ r   )r   ra   s     r   r)   GeneralRoleMaker.all_gather  s     &&r   c                 T    U R                  5       (       d  gU R                  XU5        g)re   N)r<   r   rf   s       r   ri   "GeneralRoleMaker.all_reduce_worker  s$     ~~-r   c                 $    U R                  5         g)rl   N)r   r   s    r   rm   GeneralRoleMaker.barrier_worker  s     	r   c                 $    U R                  5         gr   )r   r   s    r   r!   GeneralRoleMaker.barrier_all  s     	r   c                 \    U R                   (       d  U R                  5         U R                  $ )z'
get local endpoint of current process
)r3   r   rr  r   s    r   get_local_endpoint#GeneralRoleMaker.get_local_endpoint!  %     && !!!r   c                 \    U R                   (       d  U R                  5         U R                  $ )z
get endpoint of all trainers
)r3   r   r1   r   s    r   rT   &GeneralRoleMaker.get_trainer_endpoints)  %     && %%%r   c                 \    U R                   (       d  U R                  5         U R                  $ )z
get endpoint of all pservers
r%  r   s    r   rX   &GeneralRoleMaker.get_pserver_endpoints1  r  r   c                 ~    U R                   (       d  U R                  5         U R                  [        R                  :H  $ )z#
whether current process is worker
r(  r   s    r   r<   GeneralRoleMaker.is_worker9  ,     && zzT[[((r   c                 ~    U R                   (       d  U R                  5         U R                  [        R                  :H  $ z#
whether current process is server
r,  r   s    r   r@   GeneralRoleMaker.is_serverA  r  r   c                     U R                   (       d  U R                  5         U R                  [        R                  :H  =(       a    U R
                  S:H  $ z-
whether current process is worker of rank 0
r   r0  r   s    r   rC    GeneralRoleMaker.is_first_workerI  s<     && zzT[[(BT-=-=-BBr   c                 \    U R                   (       d  U R                  5         U R                  $ )z
get index of current worker
r3  r   s    r   rK   GeneralRoleMaker.worker_indexQ  %     && r   c                 \    U R                   (       d  U R                  5         U R                  $ )z
get index of current server
r3  r   s    r   rL   GeneralRoleMaker.server_indexY  r  r   c                 d    U R                   (       d  U R                  5         U R                  5       $ r   )r3   r   r   r   s    r   rG   GeneralRoleMaker.worker_numa  (     && !!r   c                 d    U R                   (       d  U R                  5         U R                  5       $ r   )r3   r   r   r   s    r   
server_numGeneralRoleMaker.server_numi  r  r   c                     U R                   (       d  U R                  5         U R                  5       (       a  U R                  R	                  5         ggr   N)r3   r   r<   r~   r   r   s    r   r    GeneralRoleMaker._barrier_workerq  =     && >>  ((* r   c                 z    U R                   (       d  U R                  5         U R                  R                  5         g)z<
barrier all workers and servers in current distributed job
N)r3   r   r  r   r   s    r   r   GeneralRoleMaker._barrier_allz  s)     &&  r   c                     U R                   (       d  U R                  5         U R                  5       (       a  U R                  R	                  5         gg)r   N)r3   r   r@   r~   r   r   s    r   r    GeneralRoleMaker._barrier_server  r  r   c                 \    U R                   (       d  U R                  5         U R                  $ r  r9  r   s    r   r   GeneralRoleMaker._worker_num  r  r   c                 n    U R                   (       d  U R                  5         [        U R                  5      $ r   )r3   r   r  r2   r   s    r   r   GeneralRoleMaker._server_num  s*     && 4))**r   c                     g)zDefault do nothing.Nr   r   s    r   r   GeneralRoleMaker._finalize  s    r   c                     U R                   (       d  U R                  5         [        U5      nU R                  R	                  XC5      n[        [        U5      5       H	  nXV   X&'   M     g)z
all reduce between all workers

Args:
    input(list|numpy.array): array of one dim
    output(list|numpy.array): array of one dim
    mode(str): "sum" or "min" or "max"
N)r3   r   listr~   r&   ranger  )r   rb   rg   rh   
input_listansis          r   r   GeneralRoleMaker._all_reduce  sS     && %[
""--j?s3xAFI !r   c                     U R                   (       d  U R                  5         U R                  5         U R                  R	                  U5      $ )z)
gather between all workers and pservers
)r3   r   r   r  r)   r$   s     r   r   GeneralRoleMaker._all_gather  s:     && ~~((--r   c                     U R                   (       d  U R                  5         U R                  5       (       d  gU R                  5         U R                  R                  U5      $ )z
gather between all workers
N)r3   r   r<   r   r~   r)   r$   s     r   r   GeneralRoleMaker._worker_gather  sL     && ~~##..s33r   c                 \    U R                   (       d  U R                  5         U R                  $ )z.
get current rank in all workers and pservers
)r3   r   r   r   s    r   r   GeneralRoleMaker._get_rank  #     && zzr   c                 \    U R                   (       d  U R                  5         U R                  $ )z+
get total num of all workers and pservers
)r3   r   r   r   s    r   r   GeneralRoleMaker._get_size  r  r   c                 V    U R                  5       nU R                  5       nUS:X  a  U$ U$ ) 
get default physical interface
rL  )1_GeneralRoleMaker__get_default_iface_from_gateway4_GeneralRoleMaker__get_default_iface_from_interfaces)r   default1default2s      r   __get_default_iface$GeneralRoleMaker.__get_default_iface  s2     88:;;=#t+x99r   c                    [         R                  " S5      R                  5       R                  5       R	                  S5      nSnSnU H  nUR	                  5       nSU;   a*  SU;   a$  UR                  S5      nUR                  S5      nMC  Uc  MH  Uc  MM  Sn[        U5      U:  a  XB   nU(       d  Mk  US:w  d  Ms  US:w  d  M{  [        U5      U:  d  M  XC   s  $    g)	r  zroute -A inet
NGatewayIface*z0.0.0.0rL  )r  popenreadstripr  r  r  )r   resgateway_idx	iface_idxitemgateways         r    __get_default_iface_from_gateway1GeneralRoleMaker.__get_default_iface_from_gateway  s     hh',,.446<<TB	D::<DD W_"jj3 JJw/	(Y-Bt9{*"/GG39,D	I-?*   r   c                     [         R                  " S5      R                  5       R                  5       R	                  S5      nU H-  nSU;   d  M  UR	                  S5      S   R                  5       s  $    g)r  zip -f inet addr | awk NR%3==1r  	BROADCASTr   r	   rL  )r  r  r  r  r  )r   r  r  s      r   #__get_default_iface_from_interfaces4GeneralRoleMaker.__get_default_iface_from_interfaces  sh    
 HH45::<BBDJJ4P 	 Dd"zz#q)//11  r   c                    SSK Jn  U" [        U R                  S   5      U5      nUR	                  5         SnUR                  SS5      (       a/  [        R                  " U5        UR                  SS5      (       a  M/  UR                  5         g )Nr   )KVServerr	      rK  F)	*paddle.distributed.fleet.utils.http_serverr  r   rW  rq  rN  timesleepstop)r   http_server_dr  r  http_serverwait_secondss         r   __start_kv_server"GeneralRoleMaker.__start_kv_server  sr    Gs4#5#5a#896B	511JJ|$ 	511r   )r  rr  r5   rO  rR  rP  rW  rX  r[  r]  rS  rn  rY  r   r~   r^  r   r4   r3   rT  r2   r   r  rU  rV  r1   rr   )&r   r   r   r   r+   r6   r   r)   ri   rm   r!   r  rT   rX   r<   r@   rC   rK   rL   rG   r  r   r   r   r   r   r   r   r   r   r   r   r\  r  r  ro  r   r   r   s   @r   r<  r<  H  s    	36J+X
'."&&))C  ""+!+"+ .	4:4
 r   r<  c                   <    \ rS rSrSrS rS rS rS rS r	S r
S	rg
)HeterRoleMakeri  r=  c                    U R                   (       Gdv  [        R                  S   R                  S5      n[        R                  S   n[        R                  S   R                  S5      n[	        U5      n[        R                  S   R                  S5      n[	        U5      nUS;  a  [        S5      eUS:X  Ga<  [        R                  n[        [        R                  S	   5      nS
U l	        X8   U l
        [        R                  R                  5       n	U	R                  U5        U	R                  [	        U5      5        U	R!                  U R"                  5        U	R%                  U R&                  5        U	R)                  U R*                  U R,                  5        U	R/                  U R0                  R3                  S5      S-   U R4                  U R6                  5        U	R9                  5         Xl        GOUS:X  Ga<  [        R<                  n[        [        R                  S   5      nSU l	        XX   U l
        [        R                  R                  5       n	U	R                  U5        U	R                  [	        U5      5        U	R!                  U R"                  5        U	R%                  U R&                  5        U	R)                  U R*                  U R,                  5        U	R/                  U R0                  R3                  S5      S-   U R4                  U R6                  5        U	R9                  5         Xl        GOUS:X  Ga  [        R>                  n[        R                  RA                  S5      b!  [        [        R                  S   5      nX   n
OI[        R                  S   n[        R                  S   nSRC                  X/5      n
URE                  U
5      nSU l	        Xl
        [        R                  R                  5       n	U	R                  U5        U	R                  [	        U5      5        U	R!                  U R"                  5        U	R%                  U R&                  5        U	R)                  U R*                  U R,                  5        U	R/                  U R0                  R3                  S5      S-   U R4                  U R6                  5        U	R9                  5         Xl        US:X  d  US:X  Ga  [        R                  R                  5       n	X5-   nU	R                  URE                  U R                  5      5        U	R                  [	        U5      5        U	R!                  U R"                  5        U	R%                  U R&                  5        U	R)                  U R*                  U R,                  5        U	R/                  U R0                  R3                  S5      S-   U R4                  U R6                  5        U	R9                  5         Xl#        [        R                  R                  5       n	X1-   U-   nU	R                  URE                  U R                  5      5        U	R                  [	        U5      5        U	R!                  U R"                  5        U	R%                  U R&                  5        U	R)                  U R*                  U R,                  5        U	R/                  U R0                  R3                  S5      S-   U R4                  U R6                  5        U	R9                  5         Xl$        X@l%        Xl&        WU l'        WU l(        URE                  U R                  5      U l)        [	        U5      U l*        X0l+        XPl,        SU l         gg)rc  r  r  r  r  PADDLE_XPU_ENDPOINTS)r  r  r   z/TRAINING_ROLE must be PSERVER or TRAINER or XPUr  r	  r	   rC  rj  r   PADDLE_XPU_IDr
   z/xpur  rk  Nr
  r  r   r   rl  z/heterrm  T)-r3   r  r  r  r  r   r   r   r   r   rr  r   rs  rt  ru  rv  rw  r^  rx  r]  ry  rS  rT  r{  rR  rQ  rO  rP  r|  r~   r   r   rN  r  r  _heter_commr  r  r2   r4   r5   r   r   r1   _xpu_endpoints)r   r  r  r  r  xpu_endpointsxpu_numr  r  r  r  r  r  
heter_listr  s                  r   r   HeterRoleMaker.generate_role  s    &&&ZZ >?EEcJFJJ7M!zz*DEKKCP/0LJJ'=>DDSIM-(G$AA E  	){{ ,?!@A
"#%5%A"yy~~'j)c"234-t{{+((..0I0I ##OO**3/*<OONN
 		'+$%'xx O!<=
"#%2%>"yy~~'j)c-01-t{{+((..0I0I ##OO**3/&8OONN
 		'+$)+{{::>>"56B!$RZZ0C%D!EJ#)#5L  ZZ1F!zz-8H#&88V,>#?L!'l!;J"#%1"yy~~'j)c&k*-t{{+((..0I0I ##OO**3/*<OONN
 		'+$	)]e-Cyy~~'-=
j..t/A/ABCc*o.-t{{+((..0I0I ##OO**3/(:OONN
 		#' 99>>#D'0=@HMM(..););<=MM#h-(OODLL)NN4;;'$$**D,E,E &&s+f4
 IIK!N!-%+"DJ)D!(:(:;DJXDJ%5""/&*D# 'r   c                 ~    U R                   (       d  U R                  5         U R                  [        R                  :H  $ r  )r3   r   r4   r   r   r   s    r   is_xpuHeterRoleMaker.is_xpu  s,     && zzTXX%%r   c                     U R                   (       d  U R                  5         U R                  [        R                  :H  =(       a    U R
                  S:H  $ r  )r3   r   r4   r   r   r5   r   s    r   is_first_xpuHeterRoleMaker.is_first_xpu  s<     && zzTXX%?$*:*:a*??r   c                     U R                   (       d  U R                  5         U R                  5       (       a  U R                  R	                  5         ggr  )r3   r   r  r~   r   r   s    r   _barrier_xpuHeterRoleMaker._barrier_xpu  s;     && ;;==  ((* r   c                     U R                   (       d  U R                  5         U R                  5       (       d  U R                  (       a  U R                  R                  5         ggr  )r3   r   r  r<   r  r   r   s    r   _barrier_heterHeterRoleMaker._barrier_heter  sA     && ;;==DNN$$& +r   c                 n    U R                   (       d  U R                  5         [        U R                  5      $ )r@  )r3   r   r  r   r   s    r   r  HeterRoleMaker.xpu_num  s(    && 4&&''r   )r  rr  r5   r  r   r~   r   r4   r3   r2   r   r  r1   r   N)r   r   r   r   r+   r   r  r	  r  r  r  r   r   r   r   r  r    s(    	C+J&@+'(r   r  c                   v   ^  \ rS rSrSrS\R                  SS4U 4S jjrS rS r	S r
S	 rS
 rS rS rSrU =r$ )UserDefinedRoleMakeri  z
UserDefinedRoleMaker is designed for worker and server assignment
under manual. Typically, a worker and a server node will be appointed
on each physical node, It can be assign by user.
r   Nc                 
  > [         TU ]  5         [        U[        5      (       d  [	        S5      e[        U5      S::  a  [        S5      e[        U5      [        [        U5      5      :w  a  [        S5      eU H#  n[        U[        5      (       a  M  [	        S5      e   X@l	        U[        R                  :w  a  U[        R                  :w  a  [	        S5      eX l        [        U[        5      (       d  [	        S5      eUS:  a  [        S5      eU R                  [        R                  :X  a  U[        U5      :  a  [        S	5      eXl        [        U[        5      (       d  [	        S
5      eUS::  a  [        S5      eX0l        g )Nz'server_endpoints must be as string listr   z:the length of server_endpoints list must be greater than 0z.server_endpoints can't have duplicate elementsz8every element in server_endpoints list must be as stringzrole must be as Rolecurrent_id must be as int-current_id must be greater than or equal to 0zZif role is Role.SERVER, current_id must be less than or equal to len(server_endpoints) - 1zworker_num must be as intz!worker_num must be greater than 0)rz   r6   
isinstancer  	TypeErrorr  r   setr   r2   r   r   r   r4   r   r5   r   )r   r  r  rG   server_endpointsserver_endpointr   s         r   r6   UserDefinedRoleMaker.__init__  se    	*D11EFF!"a'L  !"c#.>*?&@@MNN#3!/377#R  $4
 &6"4;;44;;#6233J*c**788A~ C  t{{*zS > 0 !p   **c**788Q !DEE)r   c                     SU l         g NTr3   r   s    r   r   "UserDefinedRoleMaker.generate_role  
    "&r   c                 <    U R                   [        R                  :H  $ rJ   )r4   r   r   r   s    r   r<   UserDefinedRoleMaker.is_worker      zzT[[((r   c                 <    U R                   [        R                  :H  $ rJ   )r4   r   r   r   s    r   r@   UserDefinedRoleMaker.is_server  r%  r   c                 h    U R                   [        R                  :H  =(       a    U R                  S:H  $ r/  )r4   r   r   r5   r   s    r   rC   $UserDefinedRoleMaker.is_first_worker
  s%    zzT[[(BT-=-=-BBr   c                     U R                   $ rJ   r5   r   s    r   rK   !UserDefinedRoleMaker.worker_index      r   c                     U R                   $ rJ   r+  r   s    r   rL   !UserDefinedRoleMaker.server_index  r-  r   c                     U R                   $ rJ   r   r   s    r   rG   UserDefinedRoleMaker.worker_num  r-  r   )r5   r4   r3   r2   r   )r   r   r   r   r+   r   r   r6   r   r<   r@   rC   rK   rL   rG   r   r   r   s   @r   r  r    sI     [[2*h'))C     r   r  c                   N   ^  \ rS rSrSrS
U 4S jjrS rS rS rS r	S r
S	rU =r$ )UserDefinedCollectiveRoleMakeri  zd
UserDefinedCollectiveRoleMaker is designed for worker assignment
under manual for collective mode.
c                 "  > [         TU ]  5         [        U[        5      (       d  [	        S5      e[        U5      S::  a  [        S5      e[        U5      [        [        U5      5      :w  a  [        S5      eU H#  n[        U[        5      (       a  M  [	        S5      e   X l	        [        U[        5      (       d  [	        S5      eUS:  a  [        S5      eU[        U5      :  a  [        S5      eXl        [        U R                  5      U l        g )	Nz'worker_endpoints must be as string listr   z:the length of worker_endpoints list must be greater than 0z.worker_endpoints can't have duplicate elementsz8every element in worker_endpoints list must be as stringr  r  zBcurrent_id must be less than or equal to len(worker_endpoints) - 1)rz   r6   r  r  r  r  r   r  r   r1   r   r5   r   )r   r  r  worker_endpointr   s       r   r6   'UserDefinedCollectiveRoleMaker.__init__  s   *D11EFF!"a'L  !"c#.>*?&@@MNN#3!/377#R  $4
 &6"*c**788A~ C  s#344 X   *t556r   c                     SU l         g r  r   r   s    r   r   ,UserDefinedCollectiveRoleMaker.generate_role?  r"  r   c                     gr  r   r   s    r   r<   (UserDefinedCollectiveRoleMaker.is_workerB  s    r   c                      U R                   S:H  $ r/  r+  r   s    r   rC   .UserDefinedCollectiveRoleMaker.is_first_workerE  s    1$$r   c                     U R                   $ rJ   r+  r   s    r   rK   +UserDefinedCollectiveRoleMaker.worker_indexH  r-  r   c                     U R                   $ rJ   r   r   s    r   rG   )UserDefinedCollectiveRoleMaker.worker_numK  r-  r   )r5   r3   r1   r   )r   N)r   r   r   r   r+   r6   r   r<   rC   rK   rG   r   r   r   s   @r   r3  r3    s,    
 7D'%    r   r3  )r+   r  r  multiprocessingr   r   paddler   __all__r   r   r-   ru   r   r   r<  r  r  r3  r   r   r   <module>rD     s    ! 	  , 
    FsC sClJQ= JQZUFL UFph"= h"VA} AHw(% w(tN = N b5 ] 5 r   