
    ϑiG                        S SK r S SKrS SKJr  S SKJrJr  S SKrS SK	r	S SK
JrJr  S SKJrJrJrJr  / r " S S5      rg)    N)defaultdict)
cmp_to_keyreduce)coreunique_name)	ParameterProgramdefault_startup_programin_dygraph_modec                   .   \ rS rSrSrS/S jrS rS rS rS r	S r
S	 rS
 rS rS rS rS rS rS rS rS0S jrS rS rS rS rS rS rS rS rS rS r S1S jrS r  S0S  jr!S! r"S" r#S# r$S$ r%S% r&S& r'S' r(S( r)S) r*S* r+S+ r,S, r- S2S- jr.S.r/g)3PipelineOptimizer"   a	  
    :api_attr: Static Graph

Pipeline Optimizer: Make a program to run as pipeline, that is splitting a
program into multiple sections (sub-programs) and each section run on a
device to enable the training of large scale models and the use of
heterogeneous devices. Meanwhile, all sections run in the stype of pipeline.

Args:
    optimizer (Optimizer): The optimizer to use, such as SGD.
    num_microbatches (int): Number of microbatches. [Optional. Default:1].
    start_cpu_core_id (int): The first cpu core id to use. [Optional. Default:0].

Examples:
    .. code-block:: python

        >>> import paddle
        >>> import paddle.base as base
        >>> import paddle.base.layers as layers
        >>> import numpy as np

        >>> paddle.enable_static()
        >>> with base.device_guard("gpu:0"):
        ...     x = paddle.static.data(name='x', shape=[-1, 1], dtype='int64')
        ...     y = paddle.static.data(name='y', shape=[-1, 1], dtype='int64')
        ...     data_loader = base.io.DataLoader.from_generator(
        ...         feed_list=[x, y],
        ...         capacity=64,
        ...         use_double_buffer=True,
        ...         iterable=False)

        ...     emb_x = layers.embedding(input=x, param_attr=base.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
        ...     emb_y = layers.embedding(input=y, param_attr=base.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)

        >>> with base.device_guard("gpu:1"):
        ...     concat = layers.concat([emb_x, emb_y], axis=1)
        ...     fc = paddle.static.nn.fc(x=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
        ...     loss = paddle.mean(fc)
        >>> optimizer = paddle.optimizer.SGD(learning_rate=0.5)
        >>> optimizer = paddle.incubate.optimizer.PipelineOptimizer(optimizer)
        >>> optimizer.minimize(loss)

        >>> def train_reader():
        ...     for _ in range(4):
        ...         x = np.random.random(size=[1]).astype('int64')
        ...         y = np.random.random(size=[1]).astype('int64')
        ...         yield x, y
        >>> data_loader.set_sample_generator(train_reader, batch_size=1)

        >>> place = paddle.CUDAPlace(0)
        >>> exe = paddle.static.Executor(place)
        >>> exe.run(paddle.static.default_startup_program())
        >>> batch_size = 1
        >>> data_loader.start()
        >>> exe.train_from_dataset(
        ...         paddle.static.default_main_program())
        >>> data_loader.reset()
c                    SU l         [        R                  " 5       (       a  SU l         [        5       (       a  [	        S5      e[
        R                  R                  [
        R                  R                  R                  R                  4n[        X5      (       d  [        SU S[        U5       S35      eXl        U R                  U l        [#        U R                   S5      (       a8  U R                   R$                  U l        [#        U R                   S5      (       a  M8  US:  d   S	5       eX l        US
:  d   S5       eX0l        S U l        [        R,                  nUR.                  U l        UR3                  5       U l        UR7                  5       U l        UR;                  5       U l        S U l        / U l         0 U l!        S U l"        S U l#        g )Ncpugpuz,In dygraph, don't support PipelineOptimizer.zGThe 'optimizer' parameter for PipelineOptimizer must be an instance of z, but the given type is .	inner_opt   z*num_microbatches must be a positive value.r   z1start_cpu_core_id must be a non-negative integer.)$_devicer   is_compiled_with_cudar   	Exceptionpaddle	optimizer	Optimizerstaticamp	decoratorOptimizerWithMixedPrecision
isinstance
ValueErrortype
_optimizer_origin_optimizerhasattrr   _num_microbatches_start_cpu_core_id_place_listop_proto_and_checker_makerOpRole_op_rolekOpRoleAttrName_op_role_keykOpRoleVarAttrName_op_role_var_keykOpDeviceAttrName_op_device_key_param_device_map_pipeline_pair_pp_ring_mapoutput_var_to_opinput_var_to_op)selfr   num_microbatchesstart_cpu_core_idvalid_optimizersop_makers         b/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/incubate/optimizer/pipeline.py__init__PipelineOptimizer.__init__^   s   %%'' DLJKK&&MM''CC
 )66<#$$<T)_<MQP 
 $ "&d,,k::%)%;%;%E%ED" d,,k::  1$ 	
8	
$ "2 A% 	
?	
% #422 $446 ( ; ; =&88:!%  $#    c                    UR                   U   nUR                  R                  5       S   nUR                  U5      nSnUR                  S:X  a  [
        R                  " US-   5      nUR                  US/SS9nUR                  US-   U-   SSU0S	U0S
UR                  SUR                  U R                  U R                  R                  0S9  US-  nUR                  US-   U-   SSUR                  S:X  a  WOU0SUR                  S:X  a  WOU0SU R                  U R                  U R                  R                  SUR                  S:X  a$  [        R                  R                   R"                  O#[        R                  R                   R$                  0S9  US-  nUR                  S:X  aZ  UR                  US-   U-   SSW0S	U0S
UR                  SUR                  U R                  U R                  R                  0S9  US-  nU$ )zR
Insert allreduce op to sync global information for global
gradient clip and amp.
r   
reduce_any_cast_int32r   int32)nameshapedtypecastXOutin_dtype	out_dtyper!   inputsoutputsattrs
all_reducexoutring_idreduce_type)opsdescoutput_arg_namesvarr!   r   generate
create_var
_insert_oprE   r,   r*   Optimizeglobal_ring_idr   distributedReduceOpMAXSUM)	r6   op_idxblockopout_nameout_varoffsettemp_var_nametemp_vars	            r;   _insert_allreduce_op&PipelineOptimizer._insert_allreduce_op   s   
 YYv77++-a0))H%77l"'00M1IJM''"1#W ( H 
V#W~)%%t}}'='=  
 aKFQJRWW%<'J<(?HWM4..!!4==#9#9ww,. &&//33++4488 	 	
 	!77l"
V#X(%%t}}'='=  
 aKFr>   c                 	   [        5       nSnSnUR                  R                  5       nXVU-   :  Ga  SnUR                  U   n/ n	UR                  S:X  a  U R                  U5      (       a  SnGOoUR                  S:X  a  U R                  U5      (       ai  UR                  R                  S5       H,  n
UR                  U
5      (       d  M  U	R                  U
5        M.     UR                  R                  SU	5        GOUR                  S:X  a  UR                  R                  S5       H,  n
UR                  U
5      (       d  M  U	R                  U
5        M.     UR                  R                  SU	5        UR                  R                  SU	5        GOKUR                  S	:X  a  UR                  R                  S5       H,  n
UR                  U
5      (       d  M  U	R                  U
5        M.     UR                  R                  SU	5        UR                  R                  SU	5        [        U	5      S:X  a  UR                  U5        US
-  nGM%  OUR                  S:X  a  U R                  U5      (       ai  UR                  R                  S5       H,  n
UR                  U
5      (       d  M  U	R                  U
5        M.     UR                  R                  SU	5        SnUR                  R                  5       UR                  R                  5       -   nU GH  nX;   d  SU;   a  M  UR!                  U5        UR                  [#        U5      5      (       a  MC  UR%                  [#        U5      5      nUR                  [&        R(                  R*                  R,                  :X  a>  UR/                  U[&        R(                  R*                  R,                  UR0                  S9nO[3        U[4        5      (       a}  UR7                  UR8                  UR:                  UR<                  UR                  UR>                  UR@                  URB                  URD                  URF                  URH                  S9
nOURK                  US5      nU RM                  X5        GM     US
-  nU RN                  (       d  U(       d  GM  U RQ                  US
-
  U5      nXO-  nX_-  nXVU-   :  a  GM  URS                  5         g )Nr   Fr@   TconcatrG   update_loss_scalingrH   check_finite_and_unscaler   sum_blocking_queue)rC   r!   persistable)
rC   rD   rE   r!   	lod_levelstop_gradient	trainableoptimize_attrregularizer
error_clip)*setrU   op_sizerT   r!   _is_optimize_opinput_find_var_recursiveappend	set_input
set_outputlen
_remove_op_is_gradient_clip_opinput_arg_namesrV   addstr_var_recursiver   VarDescVarTypeREADERrY   rq   r   r   create_parameterrC   rD   rE   rr   rs   rt   ru   rv   rw   _clone_variable_clone_var_attruse_shardingri   _sync_with_cpp)r6   rb   	ori_blockused_var_setadded_op_numra   ry   should_insertrc   
reserved_x
input_namevarsrW   
source_vardest_varinserted_opss                   r;   _create_varsPipelineOptimizer._create_vars   s   u**$$&-- "M6"B Jww,&4+?+?+C+C $H$)=)=b)A)A"$''--"4J00<<"))*5 #5 !!#z211"$''--"4J00<<"))*5 #5 !!#z2""5*566"$''--"4J00<<"))*5 #5 !!#z2""5*5z?a'$$V,qLG ( E!d&?&?&C&C"$''--"4J00<<"))*5 #5 !!#z2 $77**,rww/G/G/IID &*;s*B  %,,SX66&55c#h?
??dll&:&:&A&AA$// !\\1188$.$:$:  0  H
  
I66$55'__(..(..'__","6"6&0&>&>","6"6&0&>&>$.$:$:#-#8#8  6  H  %44ZGH$$X:= B aKF  44VaZGL(L"F] --^ 	r>   c                    U R                   UR                  ;   d   e[        UR                  U R                   5      5      nU[        U R                  R
                  5      -  =(       a"    U[        U R                  R                  5      -  $ N)r,   
attr_namesintattrr*   BackwardLoss)r6   rc   op_roles      r;   _is_loss_grad_op"PipelineOptimizer._is_loss_grad_op  sp      BMM111bggd//01T]]3344 
3MMD
 :
 	
r>   c                     U R                   UR                  ;   =(       aD    [        UR                  U R                   5      5      [        U R                  R
                  5      :H  $ r   )r,   r   r   r   r*   Forwardr6   rc   s     r;   _is_forward_op PipelineOptimizer._is_forward_op#  sI      BMM1 
))*+s4==3H3H/II	
r>   c                     U R                   UR                  ;   =(       aD    [        UR                  U R                   5      5      [        U R                  R
                  5      -  $ r   )r,   r   r   r   r*   r   r   s     r;   _is_backward_op!PipelineOptimizer._is_backward_op(  I      BMM1 
))*+c$--2H2H.II	
r>   c                     U R                   UR                  ;   d   e[        UR                  U R                   5      5      [        U R                  R
                  5      :H  $ r   )r,   r   r   r   r*   r   r   s     r;   _is_loss_opPipelineOptimizer._is_loss_op-  sH      BMM1112774,,-.#dmm6H6H2IIIr>   c                     U R                   UR                  ;   =(       aD    [        UR                  U R                   5      5      [        U R                  R
                  5      -  $ r   )r,   r   r   r   r*   r[   r   s     r;   rz   !PipelineOptimizer._is_optimize_op1  r   r>   c                 x    SUR                   ;   =(       a%    SUR                   ;   =(       a    SUR                   ;   $ )NParamGradLearningRate)input_namesr   s     r;   _is_update_opPipelineOptimizer._is_update_op6  s5    r~~% 3"..(32>>1	
r>   c                    [        [        5      nUR                  S5      nUR                   GH  nUR	                  U R
                  5      nX`R                   S3:X  ap  U Hh  nX6   nUR                  nUR                  5       R                  R                  5       n	U	R                  U5        U	R                  U R
                  S5        Mj     M  X6   nUR                  nUR                  5       R                  R                  5       n	U	R                  U5        U	R                  U R
                  S5        GM	     / n
U H(  nX;   nUR                  5         U
R                  U5        M*     U
$ )z
Split a program into sections according to devices that ops run on.
The op whose op_device attr is "gpu:all" is copied to all sections.

Args:
    main_program (Program): the main program
    devices: all used devices
r   :all )r   r	   rb   rT   r   r0   r   rU   global_block	append_op	copy_from	_set_attrr   r}   )r6   main_programdevicesdevice_program_maprb   rc   deviceprogramop_descap_opprogram_listkeys               r;   _split_program PipelineOptimizer._split_program=  s5    )1""1%))BWWT001FLL>..%F08G ggG#00277AACEOOG,OOD$7$7< & -4'',,.33==?( 3 3R8 " C(-G""$( 
 r>   c                 r    SU;   d  SU;   d   S5       eUSUR                  S5       nU R                  U   nU$ )z
For adam optimizer, it will add accumulators and initialize them
with fill_constant, and force the op device to cpu. Hence, we should
get the real op_device attribute of the fill_constant as the device
where the corresponding parameters on.
beta1_pow_accbeta2_pow_acczPFor accumulators for Adam, the name must contain beta1_pow_acc or beta2_pow_acc.r   _beta)indexr1   )r6   var_name
param_namer   s       r;   "_get_op_device_for_startup_program4PipelineOptimizer._get_op_device_for_startup_programc  sQ     (*o.I 	
 	
I a(.."9:
''
3r>   c                    UR                  5       n[        5       nUR                   H  nUR                  U R                  5      nUS:X  a7  UR
                  S:X  d   S5       eUR                  S   nU R                  U5      nU(       a  [        UR                  S5      S   5      nOS nU(       a  WU:w  a  M  UR                  n	UR                  5       R                  R                  5       n
U
R                  U	5        U
R                  U R                  S5        M     UR                  5         U R                  UR                  5       U5        U$ )Nr   fill_constantzcFor ops in startup program with the op_device attribute of cpu, they must be of type fill_constant.r   :r   r   )r   r	   rT   r   r0   r!   rV   r   r   splitrU   r   r   r   r   r   )r6   startup_program	device_idrb   new_startup_programrc   r   
output_vardevice_indexr   r   s              r;   _split_startup_program(PipelineOptimizer._split_startup_programr  s%   ,,.%i))BWWT001Fww/1 B1  003
@@L"6<<#4Q#78 ,)3ggG'446;;EEGEOOG$OOD//4) * 	**,-::<eD""r>   c                     SU;   a  UR                  SS5      nSU;   a  UR                  SS5      nU R                  U   nUc  gSn[        U5       H  u  pVXa:  d  M  Un  U$    U$ )z=
Find the post op that has variable named var_name as input.
z
.cast_fp32r   
.cast_fp16N)replacer5   reversed)r6   r   r   post_ops	result_oppost_oppost_idxs          r;   _find_post_opPipelineOptimizer._find_post_op  s    
 8#''b9H8#''b9H''1	!)(!3G#		 "4 r>   c                 l    U R                   U   nUc  gSn[        U5       H  u  pVXa:  d  M  Un  U$    U$ )zM
Find the previous op of op with index that outputs
variable named var_name.
N)r4   r   )r6   r   r   prev_opsr   prev_opprev_idxs          r;   _find_prev_opPipelineOptimizer._find_prev_op  sP    
 ((2	!)(!3G#		 "4 r>   c                 H    UR                  X#5        UR                  X#5        g r   )_rename_input_rename_output)r6   rc   old_namenew_names       r;   _rename_argPipelineOptimizer._rename_arg  s    
,
(-r>   Nc                    UR                  UUR                  Uc  UR                  OUUR                  UR                  UR
                  UR                  UR                  R                  5       S9nU R                  XR5        U$ )zw
Create a new var for block, which has the same type,
shape and dtype as ref_var, then rename it with the
name `name`.
)rC   rD   rE   r!   rr   rq   is_dataneed_check_feed)
rY   rD   rE   r!   rr   rq   r   rU   r   r   )r6   rb   ref_varrC   rE   new_vars         r;   _create_varPipelineOptimizer._create_var  su     ""--#(='--e''++OO#LL88: # 	
 	W.r>   c                 l    UR                   Ul         [        US5      (       a  UR                  Ul        g g )Nis_distributed)rs   r$   r   )r6   destsrcs      r;   r   !PipelineOptimizer._clone_var_attr  s1     ..3())"%"4"4D *r>   c                 d    UR                  [        R                  " 5       5      nUS:w  a  USU $ U$ )z4
Strip the grad suffix from the given variable name
N)findr   grad_var_suffix)r6   rC   poss      r;   _strip_grad_suffix$PipelineOptimizer._strip_grad_suffix  s2     ii,,./ BYtDSz0D0r>   c                 2    U[         R                  " 5       -   $ )z/
Append grad suffix to the given variable name
)r   r  )r6   rC   s     r;   _append_grad_suffix%PipelineOptimizer._append_grad_suffix  s     d**,,,r>   c                     UR                  U R                  5      (       a  UR                  U R                  5      OSnU(       a  USS S:X  d   S5       eU$ )z&
Get the op_device attribute of a op.
Nr      r   z<Now, only gpu devices are supported in pipeline parallelism.)has_attrr0   r   )r6   rc   r   s      r;   _get_op_device_attr%PipelineOptimizer._get_op_device_attr  s_     {{4..// GGD''( 	
 !A;%' N' r>   c                    [        U R                  R                  5      nUR                  U R                  5      U:X  a*  UR                  U R                  U R                   S35        gUR                  S:X  Ga  U R                  U5      (       a  UR                  R                  5        H  nSU;   a  M   S5       e   [        UR                  R                  5       5      S:X  d   eUR                  R                  5       S   nU R                  X&5      nUR                  S5      (       d   UR                   SU 35       eUR                  U R                  5      nU(       d   S	5       eUR                  U R                  U5        gUR                  S
:X  d  UR                  S:X  a  U R                  U5      (       d  U R!                  U5      (       ac  U R#                  X!R                  R%                  S5      S   5      n	UR                  U R                  U	R                  U R                  5      5        gUR                  S:X  Ga  U R'                  U5      (       Gd  [        UR                  5      S:X  a  [        UR                  5      S:X  d   eUR                  S   n
UR                  S   nSU;   aG  U R                  X+5      nUR                  U R                  UR                  U R                  5      5        gU R#                  X!R                  R%                  S5      S   5      n	UR                  U R                  U	R                  U R                  5      5        gU R)                  U5      (       Ga=  SnUR*                  X,-      R                  U R                  5      (       a/  UR*                  X,-      R                  U R                  5      (       dg  US-  nUR*                  X,-      R                  U R                  5      (       d  M6  UR*                  X,-      R                  U R                  5      (       d  Mg  UR*                  X,-      R                  U R                  5      nU(       d   S5       e[-        U5       H.  nUR*                  X--      R                  U R                  U5        M0     gU R'                  U5      (       ar  UR                  S
:X  ab  UR/                  S5      n[        U5      S:X  d   eU R1                  US   5      nU R2                  U   nUR                  U R                  U5        gU R5                  U5      (       d  U R7                  U5      (       a  U R8                  UR:                  ;   d   S5       eUR                  U R8                  5      n[        U5      S:X  d   S5       eUS   nU R2                  U   nUR                  S:X  d@  UR                  S:X  d0  UR                  S:X  d   UR                  S:X  d  UR                  S:X  a  U R                   S3nUR                  U R                  U5        gUR                  S:X  d  UR                  S:X  a  UR                  U R                  U R                   S35        UR                  U R                  U R                  R                  5        UR                  S   nUR=                  U5      nSUl        g/ SQnUR                  U;   d   SU SUR                   35       eU R'                  U5      (       d   eUR                  U R                  U R                   S35        g)z
Add op_device attribute for ops that have not that attribute set.
We use "gpu:all" to represent the op should be put on all
sub-programs, such as lr-related ops. Note that: "gpu:all"
is only used by pipeline as an indicator.
r   ro   z@RENAME@z3The op must be sum used to accumulate renamed vars.r   r   	op_devicez has no op_device attr for var z$The post op must have op_device set.rF   scalerG   memcpyz@Fetchz1Please put you program within device_guard scope.rH   zEgradient_clip and regularization ops must have op_role_var attribute.   zHop_role_var for gradient_clip regularization ops must have two elements.sqrtr   elementwise_maxelementwise_divalloc_float_statusclear_float_statusT)rm   r@   rl   ro   rn   r  z9For other ops without op_device set, they must be one of z, but it is N) r   r*   LRSchedr   r,   r   r0   r   r!   r   rU   r   r   rV   r   r  r   r   r{   rz   r   rT   rangeoutputr  r1   r   _is_regularization_opr.   r   rW   rq   )r6   rc   idxrb   lrsched_rolerC   rd   r   r   r   r   output_namerf   i	grad_namer   op_role_varfloat_status_namefloat_status_varother_known_opss                       r;   _add_op_device_attr_for_op,PipelineOptimizer._add_op_device_attr_for_op  s=    4==001774$$%5 LL,,d.CDWW$"6"6r":"://1!T) I) 2 rww//12a777ww//1!4H((7G##K00 <<. ?zJ0 \\$"5"56FAAA6LL,,f5gg277g#5  $$(;(;B(?(?((ggmmC.@.CDGLL,,gll4;N;N.OPWW )=)=b)A)A B&&'1,R5H5H1IQ1NN++A.J--a0K;&,,S>''d6I6I)J ,,S''--2DQ2GH''d6I6I)J b!!Fii-66## YYs|,11$2E2EFF! ii-66## YYs|,11$2E2EFFYYs|,11$2E2EFFNNN66]		#'",,T-@-@&I #!!"%%"''V*;		%(Iy>Q&&&001>J++J7FLL,,f5&&r**d.H.H.L.L ((BMM9 J9 ''$"7"78K{#q( =( %QJ++J7F 5 77f$77o-77//77// LL>.LL,,f5WW,,;O0OLL,,d.CD LL**DMM,A,AB " 3 3A 6$yy):; ,0(O 77o- 66E5F GggY -
 ''++++LL,,d.CDr>   c                 \   [        [        UR                  5      5       H  u  p#UR                  S:X  d   UR                  S:X  d  UR                  S:X  a+  UR	                  U R
                  U R                   S35        M`  U R                  U5      (       a  Mx  U R                  X2U5        M     g)zL
Add op_device attribute for ops in block that have
not that attribute set.
create_py_readerreadcreate_double_buffer_readerr   N)		enumeratelistrT   r!   r   r0   r   r  r)  )r6   rb   r   rc   s       r;   _add_op_device_attr%PipelineOptimizer._add_op_device_attrc  s    
 !eii1GC--77f$77;; T00T\\N$2GH''++++BU;! 2r>   c           	      b   / n[        U R                  R                  5      [        U R                  R                  5      [        U R                  R                  5      [        U R                  R
                  5      [        U R                  R                  5      [        U R                  R                  5      [        U R                  R
                  5      -  /nUR                   GH  nUR                  UR                  5      (       dS  UR                  S:X  a<  UR                  U R                  5      [        U R                  R                  5      :X  d   S5       eUR                  U R                  5      (       d"   SUR                   SU R                   S35       eUR                  U R                  5      n[        U5      U;   d   SU SUR                   SU 35       eUR                  U R                  5      (       d"   SUR                   SU R                   S35       eUR                  U R                  5      nU(       d   S	UR                   S
35       eX`R                   S3:X  a  GM  UR                  S5      S   nUS:X  d   S5       eXb;  d  GM  UR!                  U5        GM     U$ )zy
Check whether ops in a block have both the op_device and the
op_role attributes set.
Then, return all devices in order.
conditional_blockz`Now, the only supported op without kernel is conditional_block, and its op role must be LRSched.zop (z	) has no z attribute.zop_role z for op z must be one of zop_device attribute for op z has not been set.r   r   r   r   z<Now only gpu devices are supported for pipeline parallelism.)r   r*   r  r   r   r   r[   rT   _has_kernelr!   r   r,   r  r0   r   r   r}   )r6   rb   device_listvalid_op_role_valuerc   r   r   dev_types           r;   _check_validation#PipelineOptimizer._check_validationz  sd     %%&%%&&&'""#&&'&&'#dmm.@.@*AA
 ))B>>"''**ww"55GGD--.#dmm6K6K2LLJ  ;;t0011 rwwiy):):(;;G1 ggd//0Gw<#66 7)8BGG94DEXDYZ6 ;;t2233 rwwiy)<)<(=[I3 WWT001F -bggY6HI6 LL>..||C(+Hu$ N$ (""6*C F r>   c                   ^ ^^^^	^
^^^^ 0 mSn[        [        TR                  5      5       H   u  mmT R                  T5      (       d  M  Tn  O   SUS.m
[        [        TR                  5      5       GHu  u  mmTR	                  T R
                  5      nUT R                   S3:X  a  M7  TR                   GH-  mTR                  T5      nUR                  (       a  M(  SnT R                  TT5      nUc!  TT R                  ;  a  MQ  T R                  T   nU(       d$  U(       a  UR	                  T R
                  5      OSnUb  UT R                   S3:X  a  M  XS:X  a  M  TT;  a  / TT'   X54TT   ;   a  M  UR                  S5      S   S-   m	UU 4S jmUUUU	U
UUUU U4
S jmT" [        UR                  S5      S   5      [        UR                  S5      S   5      5        GM0     GMx     TR                  5         g)	zX
Insert a pair of send and recv ops for every two
consecutive ops on different devices.
Nr   )r   first_optimize_indexr   r   c                    > TR                  T5      nTR                  T5      nU(       d  U(       d
   ST 35       eU(       a  X:  d   SU SU  ST 35       eg U(       a  X:  d   SU SU  ST 35       eg g )Nzdsend/recv in pipeline should only be inserted in forward or backward,please check the op_role of op=zEIn forward, send/recv can only be passed forward, but now prev_stage=z great than cur_stage=z, please check op_device of op=zGIn backward, send/recv can only be passed backward, but now prev_stage=z less than cur_stage=)r   r   )cur_idprev_id
is_forwardis_backwardrc   r6   s       r;   _check_stageKPipelineOptimizer._insert_sendrecv_ops_for_boundaries.<locals>._check_stage  s    !%!4!4R!8J"&"6"6r":K% ::<?4
 "&/ **12HPoprosu/ %&/ **12GxOnoqnrt/ %r>   c                   >
 T[        U 5      -   nT[        U5      -   nX#4TT   ;   a  g X-
  S:  a-  T" U S-
  U5        T" X S-
  5        TT   R                  X#45        g X-
  S:  a-  T" U S-   U5        T" X S-   5        TT   R                  X#45        g [        X-
  5      S:X  d   eTT   R                  X#45        TR                  TR                  5      nTR
                  T   nX4nUS-  U -   nUTR                  ;  aV  TR                  R                  U5        TR                  TR                  U'   TR                  nT=R                  S-  sl        OTR                  U   nTR                  S:X  a  TR                  TTS   -   SSU0TR                  UTR                  USS	S
SSU0S9  TS==   S-  ss'   [        UR                  5      n	U	S   S:  a  TR                  OU	S   U	S'   TR                  TTS   -   SSU/0SU	SUR                  TR                  UTR                  USS	S
SSU0S9  TS==   S-  ss'   g TR                  S:X  Ga  [        UR                  5      n	U	S   S:  a  TR                  OU	S   U	S'   [         R"                  " U	5      n
TR$                  S:  =(       a    U
TR$                  -  S:H  nSUR&                  ;   a  UR&                  R)                  S5      S   SS nTR+                  U5      nTR                  TTS   -   SSU/0SU/0SU	SUR                  TR                  UTR                  USS	0S9  TS==   S-  ss'   g T" X5        TR                  TTS   -   SSU/0SU/0TR                  UTR                  U0S9  TS==   S-  ss'   UR&                  R)                  S5      S   nTR+                  U5      n[-        U[.        5      (       a  S	OSnTR                  TTS   -   U(       a  U(       a  SOSSU0TR                  UTR                  USSSUS
SSTR$                  STR0                  0S9  TS==   S-  ss'   S n[3        U5      [3        TR4                  R6                  5      :X  a  TS   nTR4                  R8                  nOTnTR4                  R6                  nTR                  UTS   -   SSU/0SU/0TR                  UTR                  USU0S9n[3        U5      [3        TR4                  R:                  5      :X  a  UR=                  SS 5        TS==   S-  ss'   TR                  TTS   -   U(       a  U(       a  SOS!SU/0SU	SUR                  TR                  UTR                  USS	S
SSUSTR$                  STR0                  0	S9  TS==   S-  ss'   U(       ai  U(       da  TR                  TTS   -   S"SU/0SU/0TR                  UTR                  USS	SSS#TR$                  S$TR0                  0S9  TS==   S-  ss'   g g g [?        S%TR                   S&35      e)'Nr   r    zF-then-Br   send_v2rG   use_calc_streamTpeerrR   r   r!   rL   rN   r   recv_v2rH   	out_shaperE   r   r!   rM   rN   1F1Bsubprogassignr   r!   rL   rM   rN   c_sync_calc_stream@Fpartial_sendnumidr<  c_sync_comm_streampipeline_flagr   partial_recvpartial_allgathernranksrankz@Now only 'F-then-B' and '1F1B' are supported.The given value is r   ) r   r}   absr   r,   r   r2   rR   r3   schedule_mode_insert_op_without_syncr0   r0  rD   micro_batch_sizerE   npprod	mp_degreerC   r   rW   r   r   mp_rankr   r*   r   r[   r   r   r    )r>  r?  cur_devprev_devr   rW   pairpair_keyrR   	var_shapenumeluse_mporigin_nameassociate_varprefix_name
prefix_varis_paraminsert_indexnew_op_rolesync_comm_oprB  _insert_send_recvrb   device_typeextra_index_infor   input_var_to_devicerc   r6   r   s                       r;   rs  PPipelineOptimizer._insert_sendrecv_ops_for_boundaries.<locals>._insert_send_recv  s,   )CK7G*S\9H*.A(.KK'!+)&1*g>)&1*=+H5<<$/ )B.)&1*g>)&1*=+H5<<$/ v/0A555'188'9LM ggd&7&78G**X.C#,D&~6H4#6#66++22486:ll))(3"&,,)"&"3"3H"=))Z755"'*:7*C"C!*$': $ 3 3X $ 1 17 14 & )7#	 6  )1Q61$(O	  )|a/ !11!*1 "!
 55"'*:7*C"C!*%*SEN +Y ' $ 3 3W $ 1 17 14 & )7#	 6  )1Q61++v5$(O	  )|a/ !11!*1 "! !#	 2"&..1"4 "!DNN2a7  %0 +.((..*CA*Fq*LK,1IIk,BM!99&+.>w.G&G%-(+m_'=).$/$+SYY$($7$7$($5$5w$5t'" :  -W5:5"$V555"'*:7*C"C!5$'#<%*SEN $ 3 3X $ 1 17# 6 	 )1Q61&)hhnnS&9!&<%*YY{%;
$.z9$E$ED5 ! 55"'*:7*C"C (. !*%3$': $ 3 3X $ 1 17 15 )7 & %t~~ $dll	# 6 & )1Q61'+w<3t}}/E/E+FF+; 6,L +/--*@*@K+0L*.--*@*@K','D'D".1A'1J"J!5$'#<%*SEN $ 3 3X $ 1 1; )7# (E 
( w<3t}}/D/D+EE(22?BG,W5:555"'*:7*C"C (. !*%3%*SEN +Y ' $ 3 3W $ 1 17 14 & )7 %t~~ $dll# 6 * )1Q61!(!99&+.>w.G&G%8(+cU|).$($7$7$($5$5w$5t$-q$,dnn$*DLL'" :  -W5:5! +36$ )2262D2D1EQH r>   r   )r/  r0  rT   rz   r   r0   r   r   rW   r   r   r1   r   r   r   )r6   rb   r<  
cur_devicerW   prev_devicer   rB  rs  rt  ru  r   rv  rc   r   s   ``     @@@@@@@@r;   #_insert_sendrecv_ops_for_boundaries5PipelineOptimizer._insert_sendrecv_ops_for_boundaries  s    !#"4		?3IE2##B''',$ 4
 $8

 #4		?3IE2!4!45J~T22..ii);;",,UH=?t'='== "&"8"8"BK"=DT%8%89$   &+DLL>9N*N,#6646'1,0CH0MM(..s3A6<(W Wr "
((-a01))#.q12Y /	 4h 	r>   c           	         U R                   S:X  a  g[        [        [        [	        UR
                  5      5      5      5       H  u  p#U R                  U5      (       d  M  UR                  S:X  d   SUR                   35       eUR                  S5      (       d   e[        UR                  S5      5      nX@R                   -  nUR                  SU5          g   g)z:
Scale the loss corresponding to number of micro-batches.
r   Nr   z6loss_grad_op must be fill_constant op, but this op is value)r%   r   tupler/  r0  rT   r   r!   r  floatr   r   )r6   rb   r   rc   
loss_scales        r;   _insert_loss_scale$PipelineOptimizer._insert_loss_scale  s     !!Q&!%	$uyy/(B"CDIE$$R((ww/1 &&(ggY01 {{7++++"2777#34
'*@*@@
Wj1 Er>   c                    [        UR                  5       H  u  p#U R                  U5      (       d  M  UR                  nUR                  nXE-   nUR
                  S:X  d  UR
                  S:X  a  M[  U HY  n[        R                  " 5       U;  a  M  UR                  [        R                  " 5       5      nUS-   n	U R                  X7U	5        M[     M     g )NrF   rV  @MERGED)
r/  rT   rz   r   rV   r!   r   r  stripr   )
r6   rb   r   rc   r   output_namesin_out_namesrC   r   new_grad_names
             r;   _rename_gradient_var_name+PipelineOptimizer._rename_gradient_var_name  s    "599-IE''++,,K..L&5Lww& BGG/C$C %'')5!ZZ(<(<(>?
 $y 0  =9 % .r>   c                 
   U(       a  UR                   OSnU(       a0  UR                  (       a  U R                  XUR                  U5      nU$ / nSnU(       a  SOSn	U(       a  [        R
                  OSn
[        [        [        [        UR                  5      5      5      5       GH_  u  pU R                  U5      (       ax  UR                  S:X  ah  UR                  S   nUR                  S   nUR                  S5      U R                   ;   a+  UR#                  SS	5      U:X  d   eUR%                  U5        M  U R'                  U5      (       a  Uc  US
-   nU R'                  U5      (       d  M  U R(                  UR*                  ;   d  M  UR-                  U R(                  5      n[/        U5      S:X  a  GM  [/        U5      S-  S:X  d   e[1        S[/        U5      S5       GH  nSnUU   nUR3                  U5      (       d  M#  SU;   a  M+  U[4        R6                  " 5       -   nUU	-   nUR3                  U5      (       d!  U R9                  UUR:                  U   UU
5        UR3                  U5      (       d   eUR=                  U5      nUR=                  U5      nSUl        URA                  UU-   S0 SU/0SURB                  SURD                  S[G        S5      U RH                  U RJ                  RL                  RN                  0S9  US
-  nUUS
-      nUR:                  U   nSU;   nUULnU(       ay  US-   nU R9                  UUUU
5      nSUl        URA                  UU-   SSU0SU0SURD                  SURD                  U RH                  U RJ                  RP                  0S9  US
-  nUnURA                  UU-   SSUU/0SU0U RH                  U RJ                  RP                  0S9  US
-  nURS                  U5        GM      GMb     U(       d  U$ Sn[        [        [        [        UR                  5      5      5      5       H'  u  pU R'                  U5      (       d  M  Ub  M"  US
-   n  O   Uc   eU H  nUR#                  SS	5      nUR#                  SS	5      nUR3                  U5      (       d  U R9                  XR:                  U   U5        UR3                  U5      (       d   eUR=                  U5      nUR=                  U5      nSUl        URA                  USSU0SU0SURD                  SURD                  U RH                  U RJ                  RL                  0S9  M     U$ )zb
Create a new merged gradient for each parameter and accumulate the
corresponding gradient to it.
FN@MERGED@FP16r  rF   r   @GRADr   r   r   r  
@BroadCastTr   rH   rD   rE   r}  rP  	cast_fp16@TMPrG   rI   rJ   ro   z@FP16z@GRAD@MERGED@FP16)*fp16_allreducefuse_grad_merge_accumulate_gradients_with_fusefuse_grad_size_in_MBr   float16r   r~  r/  r0  rT   rz   r!   r   rV   r  r1   r   r   r   r.   r   r   r   r  has_varr   r  r   r   rW   rq   rZ   rD   rE   r  r,   r*   r[   r  r   r}   )r6   rb   pp_allreduce_in_optimizestrategyshardr  fused_gradient_namesmerged_gradient_namesfirst_opt_op_idxmerged_suffixrE   r   rc   in_namerd   r%  r#  rf   r   param_grad_namemerged_param_grad_nameparam_grad_varmerged_param_grad_varr$  grad_varis_fp16_grad	need_castcast_grad_var_namecast_grad_varfp16_grad_namefp16_grad_vars                                  r;   _accumulate_gradients'PipelineOptimizer._accumulate_gradients  sg    5=00%00#'#G#Gx'D'De$  (' "*8i"0d!%	$uyy/(B"CDIE##B''BGGv,=,,Q/..q1>>'*d.D.DD"??<<HHH$$U+##B'',<,D#(19  ##B''%%6 ggd&;&;<{#q(;'!+q000q#k"2A6AF!,QJ ==44 #z1 &043G3G3I&IO-<}-L* ==)?@@((!!JJz22!	 !==)?@@@@%*YY%?N,1II6L,M)8<)5$$.7,!!&)>(? @#%:%@%@#%:%@%@#U1X --t}}/E/E/M/M %  aKF +AE 2I$zz)4H#.)#;L ,N BI 
 .=v-E*(,(8(8!>3Eu) 5:1(("2V";!'$'?%*M$: *HNN +]-@-@ $ 1 14==3I3I# ) 
 !#0$$.7" #&;X%FG!&(= > --t}}/E/E %  aKF)001GHY 7- EH ((!%	$uyy/(B"CDIE##B'',<,D#(19  E  +++
 4N&..w;I'//0CRHJ==++  

:(>	J==++++!IIn5Myy+H#(H &]+) 3 3%%t}}'='=  
 40 %$r>   c                 
   U R                  X5      n/ nU(       a  SOSnU(       a  [        R                  O[        R                  nSn	S n
U GH*  u  pUR	                  U5      nUR                  U[        R                  " 5       -   U-   UUR                  SSS9nUR	                  U5      n[        US5      (       a  UR                  Ul
        U R                  U5      n[        U5      S:X  d  U	U-   U:  d  UR                  U
:w  a'  UR                  U/U/U/45        UR                  n
Sn	M  US	   S   R                  U5        US	   S
   R                  U5        US	   S   R                  U5        U	U-  n	GM-     / n/ nU H  nUS   nUS   nUR                  SUS   R                   3US   R                  SSS9nUS   R                  [        R                  :X  a  SOSnUSUS   R                   3-   nUR                  UUS   R                  SSS9nUR                  U5        UR                  U5        M     [        U5      [        U5      :X  d   e[        U5      [        U5      :X  d   eS n[!        UR"                  5       H%  u  nnU R%                  U5      (       d  M  Ub  M#  Un  O   Uc   eSn['        [        U5      5       H  nUU   nUU   nUU   S   nUU   S
   n UU   S   n!UR)                  UU-   SSU 0UUS.SSSSSSSUS   R                  U R*                  U R,                  R.                  S[        R0                  " S5      SS0S9  US
-  nUR)                  UU-   SSU 0U!US.SSSSSSSSSSSU!S   R                  U R*                  U R,                  R2                  R4                  0S9  US
-  nM     UU-  nSn['        [        U5      5       H  nUU   nUU   nSUR                  ;   n"U"ULn#U#(       az  UR                  S-   n$UR                  U$USSS9n%UR7                  UU-   SSU0S U%0S!UR                  S"U%R                  U R*                  U R,                  R.                  0S#9  US
-  nU%nUR7                  UU-   S$SUU/0S U0U R*                  U R,                  R.                  0S#9  US
-  nM     U(       Ga  U GH   u  pUR	                  U5      nU[        R                  " 5       -   S-   n&UR9                  U&5      (       d   eUR	                  U&5      n'U[        R                  " 5       -   S-   n(UR                  U([        R                  UR                  SSS9n)UR7                  UU-   SSU'0S U)0S![        R                  S"[        R                  U R*                  U R,                  R2                  0S#9  US
-  nGM     ['        [        U5      5       H  nUU   R                  UU'   M     UU4$ )%Nr  r  g        TF)rC   rE   rD   rq   rs   r   r   r  r   r  
FusedGrad_)rC   rE   rq   rs   zFusedMergedGrad.cast_fp16.FusedMergedGrad_coalesce_tensorInput)OutputFusedOutputuser_defined_size_of_dtype	copy_data	use_alignrE   set_constantnpuconstantrK   r  r  rF   rG   rH   rI   rJ   rP  ro   )_sort_grad_param_by_dtyper   r  float32rW   rY   r   r  rD   r$   r   _get_var_sizer   rE   r}   rC   r/  rT   r   r  r^  r,   r*   r   is_compiled_with_custom_devicer[   r  rZ   r  )*r6   
main_blockfp16
fused_sizegrad_param_pairsr  grad_param_segmentsr  rE   cur_size
last_dtypegradparam	real_gradmerged_grad_var
real_paramtmp_sizefused_gradientsfused_merged_gradientsgrad_param_segmentgrad_segmentmerged_grad_segment
fused_gradfused_merged_grad_name_prefixfused_merged_grad_namefused_merged_gradfirst_back_op_idxr   rc   rf   r#  gradsparamsmerged_gradsr  r  r  r  r  	fp16_gradfp32_grad_name	fp32_grads*                                             r;   &_insert_accumulate_gradients_with_fuse8PipelineOptimizer._insert_accumulate_gradients_with_fuse  s     99
 !*.I"&FNN
+KD"t,I(33T1133mCoo # 4 O $.Jz#3441;1J1J.)))4H
 '(A-h&3??j0#**[:,0AB '__
#B'*11)<#B'*11*=#B'*11/BH$? ,B !#"5-a0L"4Q"7#..!,q/"6"6!78"1o++!#	 / J 'q)//6>>A -& * .)!,11234 # !+ 5 5+)!,22 #	 !6 ! "":."))*;<5 #68 ?#s+>'????)*c2E.FFFF !":>>2IE2##B'',=,E$)! 3 !,,,s./0A(+J 6q 9'*1-E(+A.F.q1!4L..!F*&(#(D 1!U1X^^%%t}}'='=
 #D$G$G$N- / < aKF ..!F*&(*#4
 1!"D\!_22%%t}}'='='E'E / $ aKFu 1z 	F"s?+,A(+J 6q 9&*//9L$D0I &0__v%=" * 5 5+ %"'	 !6 ! %%*V3,"M2"J$4$4#]%8%8))4==+A+A & 
 !*
!!&//<= 12(($--*@*@A "  aKFG -J /&NN40	!&)=)=)?!?.!P!)).9999&NN>:	!&)=)=)?!?)!K&11' ..#// %"' 2 	 %%*V3+"I."FNN#V^^))4==+A+A & 
 !1  06 s123A(>q(A(F(F"1% 4 &'777r>   c           	      D   S n/ n[        [        [        [        UR                  5      5      5      5       GH  u  pxU R                  U5      (       ax  UR                  S:X  ah  UR                  S   n	UR                  S   n
U
R                  S5      U R                  ;   a+  U	R                  SS5      U
:X  d   eUR                  U5        M  U R                  U5      (       a#  Uc   US-   nU[        UR                  5      :X  a    g U R                  U5      (       d  M  U R                  UR                   ;   d  GM  UR#                  U R                  5      n[        U5      S:X  a  GM/  [        U5      S-  S:X  d   e[%        S[        U5      S5       HA  nX   nUR'                  U5      (       d  M  SU;   a  M'  UR)                  XS-      X   45        MC     GM     [        U5      S:X  a  g U(       a  UR*                  OSn[%        U5       Vs/ s H  n/ PM     nnU HF  nU(       a  UR-                  US   5      OSnSUs=::  a  U:  d   e   eUU   R)                  U5        MH     / nU H  nU R/                  XUUU5      u  nnUU-  nM!     UR1                  5         U$ s  snf )	NrF   r   r  r   r   r   r  r  )r   r~  r/  r0  rT   rz   r!   r   rV   r  r1   r   r   r   r   r.   r   r   r  r  r}   
worker_numr   r  r   )r6   r  r  r  r  r  r  r   rc   r  rd   r%  r#  r   rZ  r  device_to_pairsrf  root_idall_fused_merged_gradientspairsr  s                         r;   r  1PipelineOptimizer._accumulate_gradients_with_fusez  s     !%	$z~~2F(G"HIIE##B''BGGv,=,,Q/..q1>>'*d.D.DD"??<<HHH))%0##B'',<,D#(19 #s:>>'::##B''%%6 ggd&;&;<{#q(;'!+q000q#k"2A6A!,J%--j99 #z1 $++$U+[^< 7/ JB  A%%*!!',V}5}!2}5$D/4ell47+!G(&(((((G$++D1 %
 &("$E ;;*e5E&  '*@@& % 	!!#))# 6s   
Jc                 f   / n/ n/ nU H  nUR                  US   5      R                  nU[        R                  :X  a  UR	                  U5        MH  U[        R
                  :X  a  UR	                  U5        Mo  UR	                  U5        M     UnUR                  U5        UR                  U5        U$ )Nr   )rW   rE   r   r  r}   r  extend)	r6   r  r  
fp16_pairs
fp32_pairsother_pairsr  rE   sorted_pairss	            r;   r  +PipelineOptimizer._sort_grad_param_by_dtype  s    

%ENN58,22E&!!%(&..(!!%(""5) & "J'K(r>   c                    [         R                  R                  R                  S[         R                  R                  R                  S[         R                  R                  R
                  S[         R                  R                  R                  S[         R                  R                  R                  S[         R                  R                  R                  S[         R                  R                  R                  S[         R                  R                  R                  S[         R                  R                  R                  S0	nSUR                  ;  d   e[        S UR                  S5      X!R                     -  S-  S-  $ )Nr        r   r  c                 
    X-  $ r    )rP   ys     r;   <lambda>1PipelineOptimizer._get_var_size.<locals>.<lambda>  s    r>   g      @)r   r   r   FP16BF16FP32FP64INT16INT32INT64BOOLUINT8rD   r   rE   )r6   rW   dtype_to_sizes      r;   r  PipelineOptimizer._get_var_size  s   LL  %%qLL  %%qLL  %%qLL  %%qLL  &&LL  &&LL  &&LL  %%qLL  &&

 """%syy!4II&' 	
r>   c                    UR                   nU H  nUR                  S5      R                   H  nUR                  S5      (       d  M  UR	                  S5      R
                  nUR                  U5      nUR                  SS9nUR                   H:  n	U	R                  n
UR                  R                  5       nUR                  U
5        M<     UR                  5         U R                  X5        UR                  SU5        M     M     g )Nr   	sub_block)
parent_idx)r   rb   rT   r  r   rU  _create_blockrU   r   r   r   r   r   )r6   r  r   r   progrc   origin_sub_block_idorigin_sub_blocknew_sub_blocksub_opr   r   s               r;   _add_sub_blocks!PipelineOptimizer._add_sub_blocks  s    !)) Djjm''{{;//&(ggk&:&=&=##/#5#56I#J  $ 2 2a 2 @.22F$kkG)..88:EOOG, 3 ,,.!!-B[-8 ( !r>   c                     UR                    HB  nUR                  UR                  5      (       d  M%  UR                  U R                  5      nUs  $    g r   )rT   r5  r!   r   r0   )r6   rb   rc   r  s       r;   _get_device_info"PipelineOptimizer._get_device_info  s@    ))B>>"''** 3 34I	 r>   c                    0 nU Hy  nUR                  S5      nUR                   HU  nUS:X  a  M  UR                  U5      nUR                  (       d  M/  Xt;  a  / XG'   XTU   ;  d  MB  XG   R	                  U5        MW     M{     [        UR                  5       5       H'  n[        XG   5      S:X  d  M  UR                  U5        M)     0 n	UR                  5        H  nXG    H  nUR                  S5      nUR                   H  n
U
R                  S:X  d0  U
R                  S:X  d   U
R                  S:X  d  U
R                  S:X  a  ME  U
R                  U R                  5      [        U R                  R                  R                   5      :X  a  M  XzR"                  R%                  5       ;   d  M  Xy;  d   SU S	U
 S
35       eXYU'     M     M     M     UR                  5        GHj  nXy;  a  M  X   nUR                  S5      nU R'                  U5      n[        UR)                  S5      S   5      nXG   nU GH  nX[:X  a  M  UR                  S5      nU R'                  U5      n[        UR)                  S5      S   5      nUU4nUS-  U-   nUU R*                  ;  aV  U R*                  R	                  U5        U R,                  U R.                  U'   U R,                  nU =R,                  S-  sl        OU R.                  U   nUR1                  SSSUR                  U5      0U R2                  USSU R                  U R                  R                   SUSU0S9  UR1                  SSSUR                  U5      /0SUR                  U5      R4                  SUR                  U5      R6                  U R2                  USSU R                  U R                  R                   SUSU0S9  UR1                  SSSUR                  U5      /0SUR                  U5      /0U R2                  UU R                  U R                  R                   SU0S9  GM     GMm     g)z]
Special Case: process persistable vars that exist in
multiple sections, e.g., shared weight
r   double_buffer_0r   rJ  r,  r-  rm   z two sections write the same var(z): second op r   r   rE  rF  rG   rG  FrH  rR   rI  rH   rK  rE   rL  rV  rP  N)rb   r   rW   rq   r}   r0  keysr   poprT   r!   r   r,   r   r*   r[   r  rU   rV   r
  r   r2   rR   r3   rZ   r0   rD   rE   )r6   r   startup_progr   var_infor  rb   r   rW   
write_inforc   
write_progwrite_blockwrite_devicewrite_dev_index	all_progs
read_blockread_deviceread_dev_indexrf  rg  rR   s                         r;   +_process_persistable_vars_in_multi_sections=PipelineOptimizer._process_persistable_vars_in_multi_sections  s-     DJJqME!JJ00ii)+)+H&11&--d3 ' ! X]]_-H8%&!+X& . 
 H *

1))B9,77&8877f,77&;; wwt001S..666  !77#;#;#=='9 >xj I""$Q(9 048,' $ + (0 !H) $-J$**1-K00=L!,"4"4S"9!"<=O *I!%!ZZ]
"33J?!$[%6%6s%;A%>!?'8*T1NBt222''..t426,,D%%h/"llGLLA%L"//9G&&"[__X6 ++\)5 ))4==+@+@!7 '   %%""Z^^H%=$>?#Z^^H%=%C%C!9!?!?++[)5 ))4==+@+@!7
	 &   %%-*..":!;<"Z^^H%=$>?++[ ))4==+@+@!7 & a " (r>   c                     UR                   R                  S5      =(       a*    UR                   R                  S5      R                  S5      $ )Nop_namescopez/gradient_cliprU   r  r   
startswithr   s     r;   r   &PipelineOptimizer._is_gradient_clip_opt  s;    ww/ 'BGGLL5

*%
&	'r>   c                     UR                   R                  S5      =(       a*    UR                   R                  S5      R                  S5      $ )Nr  z/regularizationr  r   s     r;   r  'PipelineOptimizer._is_regularization_opy  s;    ww/ (BGGLL5

*&
'	(r>   c                     UR                   R                  S5      =(       a    SUR                   R                  S5      ;   $ )Nr  zweight decay)rU   r  r   r   s     r;   _is_weight_decay_op%PipelineOptimizer._is_weight_decay_op~  s6    ww
 =^ <<	=r>   c                    [        [        5      n[        [        5      n[        UR                  5       HS  u  pEUR                   H  nX6   R                  XT/5        M     UR                   H  nX&   R                  XT/5        M     MU     X#4$ )z"
Get info of op input and output.
)r   r0  r/  rT   r   r}   rV   )r6   rb   r4   r5   r   rc   r   s          r;   _get_input_output_info(PipelineOptimizer._get_input_output_info  s    
 't,%d+"599-IE..)00"= /// *112+> 0 .  00r>   c           	         U R                   S:w  a  gUR                  S5      nU R                  S:X  a  SOSnSn[        UR                  5       H1  u  pVUR
                  U:X  d  M  U R                  U5      (       d  M/  Un  O   Uc  gSn[        [        UR                  5      5       H  u  pVXT:  a    OUR
                  S:X  d  M  UR                  S5      (       d  M6  UR                  S   nUR                  U5      n	UR                  XW-   S	S
9  US-  nUR                  USSU	/0SU	/0U R                  U R                  R                  0S9  M     UR!                  5         g)z3
optimize forward send's sync_comm_stream schedule
rM  Nr   r   rJ  rX  rV  rW  F)syncnoprG   rH   rP  )r]  rb   rb  r/  rT   r!   r   r0  r  r   rW   r   r^  r,   r*   r   r   )
r6   r   rb   	recv_typebackward_recv_indexr   rc   rf   r   rW   s
             r;   _optimize_forward_send_sync-PipelineOptimizer._optimize_forward_send_sync  sQ    'a !%1!4I.	""599-IEww)#(<(<R(@(@&+# . &"4		?3IE+ww..2;;3O3O--a0ii)  e <! ---#<"SEN,,dmm.D.DE .  4* 	r>   c           	         SnSnUR                  5       n[        UR                  5       R                  5      n[        U5       GH	  nSnUR                  5       R                  U   n[	        UR                  U R                  5      5      n	U	[	        U R                  R                  5      :X  a  Uc  UnUR                  S:w  a2  UR                  S:w  a"  UR                  S:w  a  UR                  S:w  a  M  U	[	        U R                  R                  5      :X  a  Xb:X  a  US-  nM  UnOAU	[	        U R                  R                  5      :X  a  Xc:X  a  US-  nGM  UnO[        SU	 35      e0 n
UR                   H  nUR                  U5      X'   M     0 nUR                   H  nUR                  U5      X'   M     UR!                  UUR                  U
UUR#                  5       S	9  UR%                  US-   5        U	[	        U R                  R                  5      :X  a  US-  nGM  U	[	        U R                  R                  5      :X  d  GM  US-  nGM     UR'                  5         g)
zK
A pass to move the recv op to the beginning of
the forward/backward phase
r   NrX  rY  r,  rJ  r   zUnknown op_role: rP  )r   r   rT   r  r   r   r,   r*   r   r!   r   r    r   r{   r  r  r^  	all_attrsr   r   )r6   r   forward_insert_indexbackward_insert_indexrb   num_opsr#  rp  rc   r   	op_inputsrC   
op_outputss                r;   _mv_head_recvPipelineOptimizer._mv_head_recv  s   
  ! $$$&g**,001wAL%%'++A.B"''$"3"345G3t}}5566)1()%>)GG22GGu$GGy(#dmm3344,(A-(3C 6 677-)Q.)4 #4WI!>??I"$((4.	 'J#%99T?
  ())"WW "lln *  QU##dmm3344$)$C 6 677%*%[  \ 	r>   c                    UR                  5       n[        5       n[        5       nUR                   H  nU R                  U5      (       aH  UR                   H6  nUR
                  U   nUR                  (       d  M%  UR                  U5        M8     Ma  U R                  U5      (       d  My  UR                   H  nXc;   d  M
  UR                  U5        M     M     [        U5      S:X  a  g[        R                  " SU 35        g)z+
Pipeline may need multiple forward before
r   Na  The pipeline requires multiple forward calculations before backward, so when the persistable var is changed in the forward, it may cause errors in the backward calculation who using this persistable var. However, some backward op don't need this var(NoNeedBufferVars), there will be no error at this time.
So please check these persistable vars which changed in forward and used in backward:
)r   rx   rT   r   rV   r   rq   r   r   r   r   warningswarn)r6   r   rb   persist_outputused_in_backwardrc   r   rW   s           r;   _check_pipeline_persist_var-PipelineOptimizer._check_pipeline_persist_var  s     $$&5))B""2&& " 3 3H**X.C&**84 !4 %%b)) " 2 2H1(,,X6 !3   A%. /?-?A	
r>   c                 p	   UR                   nXPl        UR                  nUc
  [        5       nUR                  nU(       d   S5       e/ SQnU H  n	X;   a  M
   SU	 S35       e   US   U l        US   U l        US   U l        US   U l        US	   U l	        US
   U l
        US   U l        US   U l        UR                  SS5      U l        U R                  S:  d   eSU R                  s=::  a  U R                  :  d   e   eU R                  R!                  XX45      u  pU R"                  R$                  U l        U R'                  U5      u  U l        U l        U R-                  U5        U R/                  U5      nS n[1        U[3        U5      S9nX:X  d   S5       eU R5                  U5        UR                  nU R7                  Xl5      nU H#  nU R9                  UR;                  5       U5        M%     [<        R>                  " SS 5      (       aE  [A        [<        R>                  " S5      5      U l        U R
                  [C        U5      :  d   S5       eOU =R
                  [C        U5      -  sl        U RE                  XR
                     5        U RG                  X_5        / nU Hd  n[A        URI                  S5      S   5      n[J        RL                  " 5       (       d  M<  URO                  [J        RP                  " US-  5      5        Mf     U RS                  X R
                  5      nSU0Ul        XR
                     R;                  5       nU R                  (       d  U RU                  U5        U R                  (       dB  U RW                  U5        URY                  5         U R[                  U5        URY                  5         [J        RL                  " 5       (       a   [A        [<        R>                  " SS5      5      nU R]                  XR
                     5        U R_                  XR
                     5        SSU R
                  [C        U5      U R                  [C        U5      XR
                     UU R
                     WSU R`                  U Rb                  S.Ul        U
UUU Rd                  U Rf                  4$ )NzPlease use pipeline with fleet.)
local_rankr]  r_  rR   r\   r   rb  rc  z&Please use pipeline with fleet to use r   rB  r]  r_  r   rR   r\   rb  rc  scale_gradientFr   r   c                     [        U R                  S5      S   5      n[        UR                  S5      S   5      nX#:  a  gX#:  a  gg)Nr   r   r  r   )r   r   )device1device2dev1_iddev2_ids       r;   
device_cmp.PipelineOptimizer.minimize.<locals>.device_cmpJ  sG    '--,Q/0G'--,Q/0G "r>   )r   z`With pipeline parallelism, you must use gpu devices one after another in the order of their ids.PADDLE_MANUAL_PIPELINE_STAGEzTManually specified pipeline stage must be less than total number of pipeline stages.r   r   FLAGS_selected_gpus0PipelineTrainerSectionr  )trainerdevice_workerpipeline_stagenum_pipeline_stagesr]  inner_parallelismsection_programplaceplace_id
sync_stepsr7   r8   )4rb   origin_main_blockr   r
   _pipeline_optrB  r]  r_  r   rR   r\   rb  rc  getrC  r"   minimizer#   r1   r(  r4   r5   r1  r9  sortedr   rz  r   r   r   osgetenvr   r   r/  r  r   r   r   r}   	CUDAPlacer   r  r  r   r  r8  r?  r%   r&   r2   r3   )r6   lossr   parameter_listno_grad_setr  r   pipeline_optrequired_keysr   optimize_opsparams_gradsr6  rI  sorted_device_listr   p
place_listdev	dev_indexr   
real_blockrW  s                          r;   r\  PipelineOptimizer.minimize  sd    ZZ
!+!))"57O#11>>>|	
 !C& 8Q?& ! '|4)/: ,-? @(8#I.*+;<%k2#I.*../?G~~"""DLL14>>11111%)__%=%=>&
" "&!7!7!I!I
 ''
3	
!  	  ,,,Z8	 $KZ
5KL!0 	
1	
0
 	00< "))**<EAann.
;  993T::!")),J"KLDO??S%55 5 OOs;//O((oo)FG 	Z6
CCIIcN1-.I))++!!$..Q"?@  #99__

 2)
% "//2??A
""##J/   **:6%%'&&z2%%'%%''299%:C@AH 	<89 	((oo)FG )&"oo#&{#3!//!$[!1+OO<0  $ 6 6!%!8!8&
" 
 	
r>   )r   r%   r0   r*   r,   r.   r"   r#   r1   r2   r'   r3   r&   r\   r5   rB  r_  rb  rc  rY  r4   rR   rC  r]  r   )r   r   r   )FNN)NNN)0__name__
__module____qualname____firstlineno____doc__r<   ri   r   r   r   r   r   rz   r   r   r   r   r   r   r   r   r   r  r  r  r)  r1  r9  rz  r  r  r  r  r  r  r  r  r
  r  r   r  r%  r(  r/  r8  r?  r\  __static_attributes__r  r>   r;   r   r   "   s   9v)$Z8tUn




J


$L#8(.&5
1-xEt<.3jgR	$:& KO]%~]8@ 37<*|$
(9"||'
(
=1"*X7r
> LPW
r>   r   )r^  r;  collectionsr   	functoolsr   r   numpyr`  r   paddle.baser   r   paddle.base.frameworkr   r	   r
   r   __all__r   r  r>   r;   <module>r{     s9    
  # (   )  M
 M
r>   