
    ϑivJ                        S SK r S SKrS SKrS SKJs  Js  Jr  S SKJr  S SK	J
r
  S SKJr  S SKJr  S SKJrJr  S SKJr  S SKJrJrJr  S S	KJr   " S
 S\5      r\" 5       r " S S\R8                  5      r " S S\5      r " S S\5      rg)    N)base)CompiledProgram)Executor)Program)CheckpointSaverPaddleModel)RawProgramOptimizer)DistributedOptimizerFleetMode)ioc                      ^  \ rS rSrU 4S jrS rSS jrSS jrS rS r	SS jr
    SS	 jr SS
 jr   SS jr   SS jrSrU =r$ )
Collective$   c                    > [         TU ]  [        R                  5        SU l        S U l        S U l        S U l        S U l        SU l	        SU l
        g )Nr   __paddle_fleet_checkpoint___paddle_fleet_param__)super__init__r   
COLLECTIVE	_local_ipstartup_program_origin_program_transpiled_programmain_program_checkpoint_prefix_param_file_nameself	__class__s    l/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/incubate/distributed/fleet/collective.pyr   Collective.__init__%   sK    )###'  "? 7    c                 0    [         R                  " S5        g )Nz=You should not call 'init_worker' method for collective mode.loggingwarningr   s    r!   init_workerCollective.init_worker0       K	
r#   c                 0    [         R                  " S5        g )Nz<You should not call 'run_worker' method for collective mode.r%   )r   main_programsscopess      r!   
run_workerCollective.run_worker5       J	
r#   c                 0    [         R                  " S5        g )Nz=You should not call 'init_server' method for collective mode.r%   )r   	model_dirs     r!   init_serverCollective.init_server:   r+   r#   c                 0    [         R                  " S5        g )Nz<You should not call 'run_server' method for collective mode.r%   r(   s    r!   
run_serverCollective.run_server?   r1   r#   c                 0    [         R                  " S5        g )Nz=You should not call 'stop_worker' method for collective mode.r%   r(   s    r!   stop_workerCollective.stop_workerD   r+   r#   c                 :    [        X5      U l        U R                  $ N)CollectiveOptimizer
_optimizer)r   	optimizerstrategys      r!   distributed_optimizer Collective.distributed_optimizerI   s    -iBr#   c           	          [        U[        5      (       d   S5       eUc  U R                  n[        U[        5      (       d   S5       e[        R
                  " UUUUUUS9  g)z
Prune the given `main_program` to build a new program especially for
inference, and then save it and all related parameters to given
`dirname` by the `executor`.
LIn fleet.save_inference_model() function, executor must be as Executor type.NOIn fleet.save_inference_model() function, main_program must be as Program type.)programlegacy_format)
isinstancer   r   r   r   save_inference_model)r   executorpath_prefix
feeded_vas
fetch_varsrG   rH   s          r!   rJ   Collective.save_inference_modelM   sv     (H-- 	
	
-
 ?**G'7++ 	
'	
+
 	'	
r#   c                     [        U[        5      (       d   S5       eUc  U R                  n[        U[        5      (       d   S5       e[        R
                  R                  R                  XX4S9  g)a  
This function filters out all variables with `persistable==True` from
the give `main_program` and then saves these variables to the folder
`dirname` or file `filename`.

The `dirname` is used to specify the folder where persistable variables
are going to be saved. If you would like to save variables in separate
files, set `filename` None; if you would like to save all variables in a
single file, use `filename` to specify the file name.
rE   NrF   )filename)rI   r   r   r   paddledistributedr   save_persistables)r   rK   dirnamer   rQ   s        r!   rT   Collective.save_persistablesp   s{     (H-- 	
	
-
 //L,00 	
'	
0
 	//| 	0 	
r#   c	                     Uc  U R                   n[        X5      n	Un
[        U5      nUR                  UX/UUS9u  pU(       d  UR	                  U5        X4$ )z@
This function save persistables and current epoch num to path.
)pathslists
trainer_idlocal_cache_path)r   r   r   save_checkpointclean_redundant_checkpoints)r   rK   rX   rZ   train_statusfsr   r[   remain_all_checkpointmtc	real_pathcheckpoint_nos                 r!   r\   Collective.save_checkpoint   sp     33L/B#$#4#46!-	 $5 $
 	 %))$/''r#   c	                 t    Uc  U R                   n[        X5      n	[        U5      n
U
R                  UX/UUUS9$ )zB
This function load persistables and current epoch num from path.
)rZ   ignore_emptyr[   )r   r   r   load_checkpoint)r   rK   rX   rZ   r^   r_   r   r[   rh   ra   rc   s              r!   ri   Collective.load_checkpoint   sS     33L/B  !%- ! 
 	
r#   )r   r   r?   r   r   r   r   r   )NNr=   )NNNF)Nz.cacheT)__name__
__module____qualname____firstlineno__r   r)   r/   r4   r7   r:   rB   rJ   rT   r\   ri   __static_attributes____classcell__r    s   @r!   r   r   $   sl    	8









 !
H >B
J !"(N !
 
r#   r   c                   ,   ^  \ rS rSrSrU 4S jrSrU =r$ )DistributedStrategy   z&
Init function of DistributedStrategy
c                    > [         TU ]  5         SU l        SU l        S U l        SU l        S U l        SU l        SU l        / U l	        SU l
        SU l        SU l        g )NFnccl2   i   )r   r   use_local_sgduse_dist_fcdist_fc_configmodecollective_modenccl_comm_numforward_recomputerecompute_checkpointsuse_ampamp_loss_scaling_ut4grad_allreducer   s    r!   r   DistributedStrategy.__init__   sf    " "	#!&%'" % #(r#   )r   r   r|   rz   r~   r{   r}   r   r   ry   rx   )rk   rl   rm   rn   __doc__r   ro   rp   rq   s   @r!   rs   rs      s    ( (r#   rs   c                   H   ^  \ rS rSrSrSU 4S jjr    SS jrS rSrU =r	$ )	CollectiveOpBasedOptimizer   z[
Collective Operator Base Class For Distributed Optimizer
The class is invisible to a user
c                 \   > [        U[        5      (       d   S5       e[        TU ]  X5        g )Nz$strategy must be DistributedStrategy)rI   rs   r   r   r   r@   rA   r    s      r!   r   #CollectiveOpBasedOptimizer.__init__   s0    ($788 	
2	
8 	-r#   c                 <    U R                   R                  XX4U5      $ r=   r?   backwardr   lossr   parameter_listno_grad_set	callbackss         r!   r   #CollectiveOpBasedOptimizer.backward   "     ''>	
 	
r#   c                 8    U R                   R                  U5      $ r=   r?   apply_gradientsr   params_gradss     r!   r   *CollectiveOpBasedOptimizer.apply_gradients       ..|<<r#    r=   NNNN)
rk   rl   rm   rn   r   r   r   r   ro   rp   rq   s   @r!   r   r      s*    
. 

= =r#   r   c                      ^  \ rS rSrSr\" 5       4U 4S jjr    SS jrS rS r	S r
S rS	 rS
 rS rS r SS jrSrU =r$ )r>   i  a  
DistributedOptimizer is a wrapper for paddle.base.optimizer
A user should pass a paddle.base.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
c                 (  > Uc
  [        5       n[        TU ]	  X5        UR                  U l        [        UR                  [        5      (       d  [        S5      eUR                  U l	        UR                  U l        UR                  U l        SU l        g )Nz3DistStrategy.recompute_checkpoints should be a ListF)rs   r   r   r~   _forward_recomputerI   r   list
ValueError_recompute_checkpointsr   _use_ampr   _amp_loss_scalingprint_configr   s      r!   r   CollectiveOptimizer.__init__  s    *,H-"*"<"<(88$??E  '/&D&D# ((!)!:!:!r#   c                 <    U R                   R                  XX4U5      $ r=   r   r   s         r!   r   CollectiveOptimizer.backward  r   r#   c                 8    U R                   R                  U5      $ r=   r   r   s     r!   r   #CollectiveOptimizer.apply_gradients&  r   r#   c                 d    UR                  5        H  u  p4USL d  M  [        SU SU S35      e   g )NTzyou can't use z and z	 together)itemsAssertionError)r   namekwargskvs        r!   _check_condition$CollectiveOptimizer._check_condition)  s5    LLNDADy$~dV59%MNN #r#   c                    UR                   (       a>  SUl        SUl        U R                  SUR                  UR
                  UR                  S9  UR
                  (       aD  U R                  SUR                  UR                   UR                  S9  UR                  c   S5       eUR                  (       a3  SUl        S	Ul        U R                  S
UR                  UR                  S9  U R                  R                  S:X  d  U R                  R                  S	:X  a"  U R                  R                  S:X  d   S5       egg)z 
Check the conflict conditions.

collective	local_sgdrx   )use_dgcry   use_lambry   )r   rx   r   Nz0DistributedStrategy.dist_fc_config should be setgrad_allreducer   )r   r   z>local_sgd and grad_allreduce can be used under collective mode)
rx   r{   r|   r   _enable_dgcry   	_use_lambrz   r   	_strategy)r   r   r@   rA   s       r!   _check_collective_mode*CollectiveOptimizer._check_collective_mode.  s8    !!(HM'2H$!!$00$00%//	 "  !!$00&44%//	 "  **6 B6 &&(HM'7H$!!$$00%// "  NN**k9~~--1AA>>&&,6 P6 Br#   c           
         [         R                  5       n[         R                  5       n[         R                  5       U   nSR                  U5      n[         R	                  5       nU R
                  (       a  [        SU SU SU SU 35        [        R                  " 5       nU R                  R                  Ul
        U R                  R                  Ul        U R                  R                  Ul        U R                  R                  Ul        U R                  R                  Ul        [        R                  " US9n	U	R!                  UUUUUS9  g)	zH
Transpile the programs to distributed programs. And add the variables.
,zworker_endpoints:z trainers_num:z current_endpoint:z                   trainer_id:)config)rZ   trainersr   rG   current_endpointN)fleetworker_endpointsworker_indexjoin
worker_numr   printdist_transpilerDistributeTranspilerConfigr   r{   r|   r}   use_hierarchical_allreduce#hierarchical_allreduce_inter_nranksDistributeTranspiler	transpile)
r   r   r   r   rZ   r   worker_endpoints_envtrainers_numr   rb   s
             r!   
_transpileCollectiveOptimizer._transpileX  s/    !113'')
 113J?"xx(89'')#$4#5^L>Qcdtcu v(\+ !;;=nn))!%!?!?#~~;;NN55 	) NN>> 	2 00?	!)+ - 	 	
r#   c                     [        5       n/ nU HN  nUR                  S5      S   R                  5       nXR;  a$  UR                  U5        UR	                  U5        MN  MP     U$ )N:r   )setsplitstripaddappend)r   	endpointsssipsepips         r!   _get_node_ips_from_endpoints0CollectiveOptimizer._get_node_ips_from_endpoints~  sX    UB#q!'')B|r


2  
r#   c                    [         R                  5       n[         R                  5       [         R                  5          nSR                  U5      nU R	                  U5      nUR                  S5      S   R                  5       n[        U5      nU$ )Nr   r   r   )r   r   r   r   r   r   r   len)r   r   r   r   node_ipsnode_ipnode_nums          r!   	_node_numCollectiveOptimizer._node_num  s{     113 113E4F4F4HI"xx(89445EF"((-a0668x=r#   c           
         U R                  5       nUS:  d
   SU 35       eUS::  a  U R                  R                  S:  a  [        R                  " S5        SU R                  l        U R                  R
                  (       a  [        R                  " S5        SU R                  l        [        R                  " S5      nU R                  R                  nUb=  USL a8  SU R                  l        SU R                  l        [        R                  " S5        U R                  (       a:  [        S	US
U R                  R
                  SU R                  R                  SU5        U R                  X5        U R                  R                  S:X  a  U$ [        R                  5       U R                  l        [        R!                  5       U R                  l        [        R%                  5       U R                  l        SU R                  l        [+        U R,                  5      nSUl        SUl        U R                  R&                  Ul        UR2                  [        R!                  5          Ul        [        R!                  5       Ul        [        R                  5       Ul        X&l        UR8                  S:  a  UR=                  U R>                  5        [A        UR:                  U R                  S9U l!        U RB                  $ )Nrw   znccl2 node_num must >= 1, now:z/set nccl_comm_num=1 since you only have 1 node.z@set use_hierarchical_allreduce=False since you only have 1 node.FFLAGS_sync_nccl_allreduceTzuse sync_batch_norm will hang when set num_threads > 1, so set num_threads=1, nccl_comm_num=1, use_hierarchical_allreduce=False.z	node_num:zuse_hierarchical_allreduce:znccl_comm_num:zFLAGS_sync_nccl_allreduce:r   )build_strategy)"r   r   r}   r&   r'   r   osgetenvsync_batch_normr   r   r   r{   r   r   num_trainersr   rZ   r   trainers_endpoints!enable_backward_optimizer_op_depsr	   r?   fuse_all_reduce_opsfuse_grad_size_in_numr   r   ranknranksr   _transpile_main_program_lossr   _compiled_program)r   r   r   r   sync_allreducer   comm_opts          r!   _try_to_compile#CollectiveOptimizer._try_to_compile  sH   >>#1}I >xjII}q=~~++a/E ,-DNN(~~88V 9>DNN5#>? ..88&?d+B+,DNN(8=DNN5OOX
 -99 ,,,	 	6>>,.&+&6&6&8#$)$6$6$8!,1,B,B,D);?8&t7'+$)-&!^^>>$,$6$6u7I7I7K$L!**,**, ,??Q,,TZZ8!0!!$.."
 %%%r#   c                 &    [        SU SU S35      e)Nzcan not use z when you set DistStrategy.z as True)r   )r   strategy_nameoptimize_names      r!   raiseOptimizeError&CollectiveOptimizer.raiseOptimizeError  s(    =/)D]O T 
 	
r#   c                    U R                   (       a  U R                  / :X  a  [        S5      eU R                  R                  R
                  S;   a0  U R                  SU R                  R                  R
                  5        [        R                  R                  R                  U R                  5      U l        U R                  R                  U R                  5        U R                  (       a  U R                  R                  R
                  S;   a0  U R                  SU R                  R                  R
                  5        [        R                  R                  R                  U R                  U R                   SS9U l        UR"                  R$                  nUc  [&        R(                  " 5       nU[*        l        Xl        U R1                  XPR                  U R2                  5        U R                  R5                  XX4S9u  pgUR7                  S	S
9[*        l        U[*        l        U R=                  X%5      [*        l        Xg4$ )a-  
minimize a program through loss
Args:
    loss (Variable|Variable List): loss variable or loss variable list to run optimization.
    startup_program (Program): startup_program for initializing parameters
        in `parameter_list`.
    parameter_list (list): list of Variables to update.
    no_grad_set (set|None): set of Variables should be ignored.
Returns:
    tuple: (optimize_ops, params_grads) which are, list of operators appended;
    and list of (param, grad) Variables pair for optimization.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optimizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
zTplease set strategy.recompute_checkpointswhen set strategy.forward_recompute as True)RecomputeOptimizerOptimizerWithMixedPrecisionr~   )r  DGCMomentumOptimizermixed_precisionT)init_loss_scalinguse_dynamic_loss_scaling)r   F)for_test) r   r   r   r?   r    rk   r  rR   incubater@   r  _set_checkpointsr   staticampdecorater   blockrG   r   default_startup_programr   r   r   r   r   minimizecloner   r   r   r   )r   r   r   r   r   r   optimize_opsparam_gradss           r!   r  CollectiveOptimizer.minimize  s   * ""**b0 B  ((11 6  ''')B)B)K)K %oo77JJDO OO,,T-H-HI==((11 6  ''%t'@'@'I'I %mm//88"&"8"8)- 9 DO zz))""::<O /
##//4>>	
 %)OO$<$<> %= %
! !- 2 2E 2 B$0!!11/P((r#   )r   r   r   r   r?   r   r   r   r   )NNN)rk   rl   rm   rn   r   rs   r   r   r   r   r   r   r   r   r   r  r  ro   rp   rq   s   @r!   r>   r>     sh     ,?+@ "" 

=O
(T$
L
B&H
 LPI) I)r#   r>   ) r&   r   rR   3paddle.distributed.transpiler.distribute_transpilerrS   
transpilerdistribute_transpilerr   r   paddle.base.compilerr   paddle.base.executorr   paddle.base.frameworkr   0paddle.base.incubate.checkpoint.checkpoint_saverr   r   (paddle.distributed.fleet.meta_optimizersr	   &paddle.incubate.distributed.fleet.baser
   r   r   paddle.staticr   r   r   BuildStrategyrs   r   r>   r   r#   r!   <module>r"     s     	  M M  0 ) ) I 
 d
 d
N 	($,, (.=!5 =8i). i)r#   