
    Αi                    &   S SK Jr  S SKrS SKrS SKJrJr  S SKJrJ	r	  S SK
Jr  S SKJrJrJrJr  S SKJr  S SKJr  \(       a  S	S
KJr  S SKrS SKJr  S SKJr  S	SKJrJrJrJr  \R@                  " \!5      r" " S S\5      r#\#RH                  r$\#RJ                  r%\#RL                  r&\#RN                  r'\#RP                  r(\#RR                  r)\#RT                  r*\#RV                  r+\#RX                  r,\#RZ                  r-\$r.\%r/\&r0\-r1\Rd                  " S5      r3 " S S\5      r4 " S S\5      r5 " S S\55      r6S%S&S jjr7S%S&S jjr8 " S S\65      r9 " S S\Rt                  5      r;S r<S r= " S S\65      r> " S  S!\55      r?  S'S" jr@ " S# S$\?5      rAg)(    )annotationsN)ABCabstractmethod)Counterdefaultdict)Enum)TYPE_CHECKINGAnyCallable
NamedTuple)nn)PipelineStage   )_PipelineStageBase)profiler)TensorChunkSpec_split_tensormerge_chunkssplit_args_kwargs_into_chunksc                  R    \ rS rSrSrSrSrSrSrSr	Sr
S	rS
rSrS r\S 5       rSrg)_ActType2   r                        	   
   c                N   [         R                  S[         R                  S[         R                  S[         R                  S[         R
                  S[         R                  S[         R                  S[         R                  S[         R                  S	[         R                  S
0
nX   $ )NFIWUNSHARDRESHARDSEND_FRECV_FSEND_BRECV_BB)r   FORWARDBACKWARD_INPUTBACKWARD_WEIGHTr&   r'   r(   r)   r*   r+   FULL_BACKWARD)selfstr_maps     u/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/auto_parallel/pipelining/schedules.py__str___ActType.__str__>   sy    c##S$$ciiOOXOOXOOXOOX""C
 }    c                   U S:X  a  [         R                  $ U S:X  a  [         R                  $ U S:X  a  [         R                  $ U S:X  a  [         R                  $ U S:X  a  [         R
                  $ U S:X  a  [         R                  $ U S:X  a  [         R                  $ U S:X  a  [         R                  $ U S	:X  a  [         R                  $ U S
:X  a  [         R                  $ [        SU  35      e)Nr#   r$   r%   r&   r'   r(   r)   r*   r+   r,   zInvalid computation type )r   r-   r.   r/   r&   r'   r(   r)   r*   r+   r0   RuntimeError)actions    r3   from_str_ActType.from_strM   s    S=###s]***s]+++y ###y ###x??"x??"x??"x??"s])))!:6(CDDr6    N)__name__
__module____qualname____firstlineno__r-   r.   r/   r&   r'   r(   r)   r*   r+   r0   r4   staticmethodr:   __static_attributes__r<   r6   r3   r   r   2   sO    GNOGGFFFFM E Er6   r   z?(\d+)(F|I|B|W|UNSHARD|RESHARD|SEND_F|RECV_F|SEND_B|RECV_B)(\d*)c                  >    \ rS rSr% S\S'   S\S'   SrS\S'   S	 rS
rg)_Action~   intstage_indexr   computation_typeNz
int | Nonemicrobatch_indexc                    [        U R                  5      nU[        U R                  5      -  nU R                  b  U[        U R                  5      -  nU$ N)strrG   rH   rI   )r1   reprs     r3   __repr___Action.__repr__   sM    4##$D))**  ,C--..Dr6   r<   )r=   r>   r?   r@   __annotations__rI   rN   rB   r<   r6   r3   rD   rD   ~   s    #'j'r6   rD   c                      \ rS rSr    S         SS jjrS rS rS r\    S       SS jj5       r	\SSSS	.   SS
 jj5       r
    S       SS jjrS r S   SS jjrSS jrSrg)_PipelineSchedule   Nc                    Xl         X l        X0l        X@l        XPl         U R                  S LU l        / U l        [        R                  SU R                  R                  5        g )NzUsing %s)_n_microbatches_loss_fn_args_chunk_spec_kwargs_chunk_spec_output_merge_spec_has_backward_internal_lossesloggerinfo	__class__r=   )r1   n_microbatchesloss_fnargs_chunk_speckwargs_chunk_specoutput_merge_specs         r3   __init___PipelineSchedule.__init__   s[      . /"3"3	 "]]$6 68J 7 78r6   c                    UR                   (       aB  U R                  (       a0  U R                  X#U   5      nU R                  R	                  U5        g g g rK   )is_lastrZ   _compute_lossr[   append)r1   stageoutput
target_mbsmb_indexlosss         r3   _maybe_compute_loss%_PipelineSchedule._maybe_compute_loss   sA    ==T//%%f.BCD!!((. 0=r6   c                :   SUs=:*  =(       a    [        U R                  5      :  Os  nUR                  (       a'  U R                  (       a  U(       a  U R                  U   $ [        U R                  5      S:w  a"  U(       d  [	        SU SU R                   35      eg )Nr   zLoss for microbatch z6 is not available. Available losses for microbatches: )lenr[   rg   rZ   r8   )r1   rj   rm   valid_indexs       r3   _maybe_get_loss!_PipelineSchedule._maybe_get_loss   s    8@@c$*?*?&@@==T//K((22&&'1,[&xj 166:6K6K5LN 
 r6   c                   [        U[        5      (       d  U/n[        S U 5       5      nU(       a  Ub|  [        U R                  5      U R
                  :w  a.  [        SU R
                   S[        U R                  5       35      eUR                  5         UR                  U R                  5        U R                  R                  5         g)z2
Update the losses to those in the internal state
c              3  8   #    U  H  oR                   v   M     g 7frK   rg   ).0rj   s     r3   	<genexpr>3_PipelineSchedule._update_losses.<locals>.<genexpr>   s     !DVE--Vs   N
Expecting z losses but got )	
isinstancelistanyrr   r[   rU   r8   clearextend)r1   stageslossescontains_last_stages       r3   _update_losses _PipelineSchedule._update_losses   s    
 &$''XF!!DV!DD 6#54(()T-A-AA" !5!5 66Fs4K`K`GaFbc 
 LLNMM$//0##%r6   c                    [         e)z
Run one iteration of the pipeline schedule with list of microbatches.
Will go through all the microbatches according to the schedule
implementation.

Args:
    microbatches: list of microbatch args.
NotImplementedError)r1   arg_mbs	kwarg_mbsrl   r   s        r3   _step_microbatches$_PipelineSchedule._step_microbatches   s
      "!r6   Ftargetr   return_outputc                   [         e)  
Run one iteration of the pipeline schedule with *whole-batch* input.
Will chunk the input into microbatches automatically, and go through the
microbatches according to the schedule implementation.

args: positional arguments to the model (as in non-pipeline case).
kwargs: keyword arguments to the model (as in non-pipeline case).
target: target for the loss function.
losses: a list to store the losses for each microbatch.
r   )r1   r   r   r   argskwargss         r3   step_PipelineSchedule.step   s
    & "!r6   c                  ^  SU 4S jjnUb
  U" US5        OS/T R                   -  nUb
  U" US5        O0 /T R                   -  nUb	  U" US5        Ub,  [        U[        5      (       d  [        S[	        U5       35      eX4$ )z
Pre-process/check inputs
c           
        > [        U [        5      (       d  [        U S[        U 5       35      e[	        U 5      TR
                  :w  a'  [        STR
                   SU S[	        U 5       35      eg )Nz must be a list but got a r|    z	 but got )r}   r~   	TypeErrortyperr   rU   
ValueError)mbsnamer1   s     r3   check_type_and_len;_PipelineSchedule._check_inputs.<locals>.check_type_and_len  sp    c4((4&(B49+ NOO3x4///  !5!5 6avYs3xjQ  0r6   r   r<   r   rl   z losses must be a list but got a )r   rL   )rU   r}   r~   r   r   )r1   r   r   rl   r   r   s   `     r3   _check_inputs_PipelineSchedule._check_inputs   s    	 w	2dT111G y+6t333I!z<8fd++6tF|nE  !!r6   c                $    U R                  X5      $ rK   )rV   )r1   rk   r   s      r3   rh   _PipelineSchedule._compute_loss   s    }}V,,r6   c                    U(       d  U(       a2  [        UUU R                  U R                  U R                  5      u  p4X44$ S/U R                  -  0 /U R                  -  4$ )zR
Splits a full-batch input into chunks (i.e. microbatches) and returns
the chunks
r<   )r   rU   rW   rX   )r1   r   r   
args_splitkwargs_splits        r3   _split_inputs_PipelineSchedule._split_inputs#  si     6'D$$%%''($J ++ 4$...t7K7K0KKKr6   c                .    [        UU R                  5      $ )z
Merge output chunks back to a batch state.
If output_merge_spec is None, the utility will merge output chunks by dimension 0 (batch dim).
)r   rY   )r1   output_chunkss     r3   _merge_outputs _PipelineSchedule._merge_outputs:  s    
 ##
 	
r6   )rW   rZ   r[   rX   rV   rU   rY   NNNN)
r_   rF   r`   z#Callable[..., paddle.Tensor] | Nonera   "tuple[TensorChunkSpec, ...] | Nonerb   !dict[str, TensorChunkSpec] | Nonerc   "dict[str, Any] | tuple[Any] | Noner   list | Noner   r   rl   r   r   r   r   r   r   boolrK   )r   tuple[Any, ...]r   zdict[str, Any] | None)r   z	list[Any]returnr
   )r=   r>   r?   r@   rd   ro   rt   r   r   r   r   r   rh   r   r   rB   r<   r6   r3   rR   rR      s2    8<>B?C@D99 59 <	9
 =9 >9:/

&.   $!%"&""" "  	"
 " ""  "#" 	"
 " ",  $!%"&"&"&" &"  	&"
 &"P- )-LL &L.
r6   rR   c                  t   ^  \ rS rSrSr    S
           SU 4S jjjrS rSSSS.   SS jjrS	rU =r	$ )PipelineScheduleSingleiE  z}
Base class for single-stage schedules.
Implements the `step` method.
Derived classes should implement `_step_microbatches`.
Nc                   > [         TU ]  UUUUUS9  Xl        UR                  U l        U R
                  U R                  l        SU l        g )Nr_   r`   ra   rb   rc   F)superrd   _stage
num_stages_num_stagesrZ   has_backward_stage_initialized)r1   rj   r_   r`   ra   rb   rc   r^   s          r3   rd   PipelineScheduleSingle.__init__L  sU     	)+// 	 	
  ++#'#5#5 "'r6   c                   U R                   R                  (       a'  U R                   R                  U R                  X5      nO'U R                   R                  U R                  SU5      nS nU R                   R                  (       a  U R                  US   U5      nU R                  (       a&  U R                   R                  U R                  U5        SU l        g )Nr<   r   T)	r   is_first_prepare_forward_infrarU   rg   rV   rZ   _prepare_backward_infrar   )r1   r   r   labelsnext_stage_argsrn   s         r3   _initialize_stage(PipelineScheduleSingle._initialize_staged  s    ;;"kk@@$$dO #kk@@$$b&O ;;==!3V<DKK//0D0DdK"&r6   Fr   c               X   U R                   R                  5         U R                  XE5      u  pgUb  [        [	        XR
                  5      5      nOSnU R                  XgX5        U(       a@  U R                   R                  (       a%  U R                  U R                   R                  5      $ gr   N)
r   clear_runtime_statesr   r~   r   rU   r   rg   r   r   )	r1   r   r   r   r   r   r   r   targets_splits	            r3   r   PipelineScheduleSingle.stept  s    ( 	((* $(#5#5d#C 
  v7K7K!LMM M 	
-P {{""**4;;+D+DEEr6   )r   r   r   r   )rj   r   r_   rF   r`   Callable | Nonera   r   rb   r   rc   r   r   )
r=   r>   r?   r@   __doc__rd   r   r   rB   __classcell__r^   s   @r3   r   r   E  s     $(>B?C@D(!( ( !	(
 <( =( >( (0'& "#& 	&
 & &r6   r   c                    [        U 5      S:X  a  gU(       a  U S3OSn[        R                  SX 5        [        R                  " U 5      R                  5       $ )zm
Simple wrapper over batch_isend_irecv from paddle.distributed, which just adds a descriptive logger on top.
r   Nz,  zbatch_p2p %s%s)rr   r\   r]   distbatch_isend_irecvpop)p2p_opsdescdesc_strs      r3   
_batch_p2pr     sK     7|q"$r{H
KK (4!!'*..00r6   c                    [        [        5      n0 n[        U 5      S:X  a  U$ U  H   nX$R                     R	                  U5        M"     [        UR                  5       5       H  u  pV[        XaS9X5'   M     U$ )z
Sorts the list of P2P ops by the peer rank, and then calls
batch_isend_irecv. Return a dictionary of works by peer rank. This function
helps us avoid hangs in case of skip connections.
r   r   )r   r~   rr   peerri   sorteditemsr   )r   r   ops_by_peerwork_by_peeropr   opss          r3   _sorted_batch_p2pr     sw     0;4/@K)+L
7|q GG##B'  K--/0	'7 1 r6   c                  <    \ rS rSrSr    S       SS jjrSrg)ScheduleFThenBi  zS
The FThenB schedule.
Will go through all the microbatches in a fill-drain manner.
Nc           	        U R                  XX45      u  pU R                  (       d9  Ub  U R                  US   US   US   5        OU R                  US   US   S5        / n[        U R                  5       GH  n[
        R                  " SU 35         U R                  R                  U5      n[        USS9nUR                  5        H  n	U	R                  5         M     U R                  R                  XaU   X&   5      n
U R                  R                  U5      n[        USS9nUR                  UR                  5       5        SSS5        [        R!                  SU R                  R"                  U5        U R%                  U R                  W
X65        GM"     U H  n	U	R                  5         M     U R&                  (       d  g/ n[        U R                  5       GH&  n[
        R                  " SU 35         U R                  R)                  U5      n[        US	S9nUR                  5        H  n	U	R                  5         M     U R+                  U R                  U5      nU R                  R-                  XlX`R                  S
-
  :H  S9  U R                  R/                  U5      n[        USS9nUR                  UR                  5       5        SSS5        [        R!                  SU R                  R"                  U5        GM)     U R1                  U R                  U5        U H  n	U	R                  5         M     U R                  R3                  5         g! , (       d  f       GN= f! , (       d  f       N= f)z
Run one iteration of the pipeline schedule with list of microbatches.
Will go through all the microbatches according to the FThenB schedule.

Args:
    microbatches: list of microbatch args.
Nr   zForward fwd_recvr   fwd_sendz[%s] Forwarded microbatch %sz	Backward bwd_recvr   rn   last_backwardbwd_sendz[%s] Backwarded microbatch %s)r   r   r   rangerU   r   RecordEventr   get_fwd_recv_opsr   valueswaitforward_one_chunkget_fwd_send_opsr   r\   debugrG   ro   rZ   get_bwd_recv_opsrt   backward_one_chunkget_bwd_send_opsr   _sync_shared_param_grads)r1   r   r   rl   r   fwd_sends_to_waitir   worksworkrk   bwd_sends_to_waitrn   s                r3   r   !ScheduleFThenB._step_microbatches  s    "//

 &&%&&wqz9Q<AO&&wqz9Q<F .0 t++,A%%n5kk2215)#J?!LLNDIIK + 66qz9< kk2215)#J?!((8 6 LL.0G0G $$T[[&*H' -0 &DIIK & !! .0t++,A%%	!o6kk2215)#J?!LLNDIIK + ++DKK;..5I5IA5M0M /  kk2215)#J?!((8 7 LL/1H1H!! -* 	DKK0 &DIIK & 	,,.{ 65D 76s   B2M9CM)
M&	)
M7	r<   r   r   r=   r>   r?   r@   r   r   rB   r<   r6   r3   r   r     sK      $!%"&"Z/Z/ Z/  	Z/
 Z/ Z/r6   r   c                  2   ^  \ rS rSrSU 4S jjrS rSrU =r$ )PipelineChunki$  c                p   > [         TU ]  5         U(       a  U(       a   S5       eXl        X l        X0l        g )Nz-Pipeline stage cannot be both first and last.)r   rd   layersr   rg   )r1   r  r   rg   r^   s       r3   rd   PipelineChunk.__init__%  s4     	
;	
)  r6   c                   U R                   (       a_  UR                  S5      nUR                  S5      nUR                  S5      nX4U4n[        U R                  5       H  u  pxU" U5      nM     U$ U R                  (       aD  Un[        U R                  5       H  u  pxU" U5      nM     [        U[        5      (       a  US   nU$ Un[        U R                  5       H  u  pxU" U5      nM     U$ )N	input_idsattention_maskposition_idsr   )r   get	enumerater  rg   r}   tuple)	r1   r   r   r	  r
  r  outputsidxdecoder_layers	            r3   forwardPipelineChunk.forward.  s    ==

;/I#ZZ(89N!::n5L ,?G(1$++(>$'0 )?N\\G(1$++(>$'0 )?'5))!!* 	 G(1$++(>$'0 )?r6   )r   rg   r  )NFF)r=   r>   r?   r@   rd   r  rB   r   r   s   @r3   r  r  $  s     r6   r  c                  ^^^ U R                   R                  nUS:X  a  U R                   R                  OSnXV-  U-  mXd-  mU R                  mUUU4S jn/ n[	        U5       H"  n	U" XX-  -   U5      n
UR                  U
5        M$     U$ )NVPPr   c                   > S nUS:X  a  [        TS T SSS9nO8UTS-
  :X  a  [        TUT-  US-   T-   SSS9nO[        TUT-  US-   T-   SSS9n[        X1TUS9nU$ )Nr   TF)r   rg   r   )group)r  r   )model	stage_idxr  	new_modelrj   	chunk_num
chunk_sizelayer_listss        r3   _build_stage)_manual_model_split.<locals>._build_stageN  s    	>%KZ(4I )a-'%
*i!mz-I I &
*i!mz-I I iIUKr6   )confignum_hidden_layersvirtual_pp_degreer  r   ri   )r  r  r  mode	pp_degreer!  r"  r  r   r   rj   r  r  r  s              @@@r3   _manual_model_splitr%  G  s    66:>%-66Q"79DJ!-I,,K2 F$%U$=uEe & Mr6   c                    US;   d   SU S35       e[        XR                  XSU5      nUS:X  a  [        XaUS9nU$ US:X  a  [        US   XS9nU$ [	        US   XS9nU$ )N)r  1F1BFThenBz Invalid pipeline schedule mode: z*, must be one of ['VPP', '1F1B', 'FThenB']r  )r_   r`   r'  r   )r%  rankScheduleVPPSchedule1F1Br   )r  	acc_stepsr`   r#  r$  r  r   schedules           r3   get_pipeline_scheduler.  n  s       
 +4&0Z[  !

EKFu}g
 O 
1Ii
 O "1Ii
 Or6   c                  <    \ rS rSrSr    S       SS jjrSrg)r+  i  zc
The 1F1B schedule.
Will perform one forward and one backward on the microbatches in steady state.
Nc                   U R                  XX45      u  pU R                  (       d9  Ub  U R                  US   US   US   5        OU R                  US   US   S5        [        U R                  U R
                  U R                  R                  -
  5      nSnSnSn/ n	[        U5       H  n
U R                  R                  U5      n[        USS9=n(       a  UR                  5         U R                  R                  XaU   X&   5      nU(       a  UR                  5         U R                  R                  U5      n	XeS-
  :w  a
  [        U	SS9nU R                  U R                  XU5        US-  nM      U R                  R                  U5      n[        X-   SS9=n(       a  UR                  5         U R!                  U R                  U5      nU R                  R#                  UUXpR                  S-
  :H  S9  U R                  R%                  U5      nUS-  nX`R                  :X  a  OU R                  R                  U5      n[        UU-   S	S9=n(       a  UR                  5         U R                  R                  XaU   X&   5      nU R                  U R                  XU5        U R                  R                  U5      n	US-  nGMT  [        US
S9nXpR                  :  a  U R                  R                  U5      n[        USS9=n(       a  UR                  5         U R!                  U R                  U5      nU R                  R#                  UUXpR                  S-
  :H  S9  U(       a  UR                  5         U R                  R%                  U5      n[        US
S9nUS-  nXpR                  :  a  M  U(       a  UR                  5         U R'                  U R                  U5        U R                  R)                  5         g)z
Run one iteration of the pipeline schedule with list of microbatches.
Will go through all the microbatches according to the 1F1B schedule.

Args:
    microbatches: list of microbatch args.
Nr   r   r   r   r   fwd_send_bwd_recvr   bwd_send_fwd_recvr   r   )r   r   r   minrU   r   r   rG   r   r   r   r   r   r   ro   r   rt   r   r   r   r   )r1   r   r   rl   r   warmup_chunksfwd_mb_indexbwd_mb_index	send_work	fwd_sends_	fwd_recvs	recv_workrk   	bwd_recvs	fuse_workrn   	bwd_sendss                     r3   r   Schedule1F1B._step_microbatches  s    "//

 &&%&&wqz9Q<AO&&wqz9Q<F   t{{666
  		}%A44\BI&yzBByB  [[22l3Y5LF   44\BIq00&yzB	
 $$V AL= &F 44\BI '%,? y    ''\BDKK***.B.BQ.FF +  44\BIAL333 44\BI 'I%,? y    [[22l3Y5LF
 $$V
 44\BIAL_ d yz:	 11144\BI&yzBByB  ''\BDKK***.B.BQ.FF +    44\BI"9:>IAL+ 1110 NN 	DKK0 	,,.r6   r<   r   r   r  r<   r6   r3   r+  r+    sK      $!%"&"]/]/ ]/  	]/
 ]/ ]/r6   r+  c                     ^  \ rS rSrSr      S               SU 4S jjjrSS jrSSSS.   SS jjr    S       SS	 jjrS
r	U =r
$ )PipelineScheduleMultii,  zE
Base class for multi-stage schedules.
Implements the `step` method.
Nc	                  >^
 [         TU ]  UUUUUS9  Xl        US   R                  U l        US   R
                  U l        US   R                  U l        Ub  U R                   H	  n	Xyl	        M     US   R                  U l	        U R                   H  n	U R                  U	l        M     SU l        U R                  S Lm
U
4S jU l        0 U l        Ub  [         R#                  S5        g g )Nr   r   Fc                .   > U R                   =(       a    T$ rK   rx   )rj   has_losss    r3   <lambda>0PipelineScheduleMulti.__init__.<locals>.<lambda>W  s    %--2LH2Lr6   zDeprecation warning: 'use_full_backward' is no longer supported. Simply stop passing it, and everything should still work fine.)r   rd   _stagesr   r   
group_sizepp_group_size
group_rankr)  stage_index_to_group_rankrZ   r   _stages_initializedrV   _should_compute_losspipeline_orderr\   warning)r1   r   r_   r`   ra   rb   rc   rK  use_full_backwardrj   rD  r^   s             @r3   rd   PipelineScheduleMulti.__init__2  s     	)+// 	 	
 !!9//#AY111I((	$02K/ &)/)L)L& \\E!%!3!3E "#(  d2$L! @B(NNQ )r6   c                0   SnU R                    HN  nUR                  (       a  UR                  U R                  X5      nM2  UR                  U R                  XB5      nMP     S nU R                   S   nUR                  (       a  U R                  US   U5      nU R                  (       ag  [        U R                   5       HN  nUR                  (       a  UR                  U R                  U5        M2  UR                  U R                  S 5        MP     SU l	        g )Nr<   r   T)
rG  r   r   rU   rg   rV   rZ   reversedr   rL  )	r1   r   r   r   r   rj   rn   
last_stagestage_reverses	            r3   _initialize_stages(PipelineScheduleMulti._initialize_stagesb  s     ,.\\E~~"'">">(($# #(">">((/# " \\"%
==!3V<D!)$,,!7 ((!99,,d "99,,d "8 $( r6   Fr   c               l   U R                    H  nUR                  5         M     U R                  XE5      u  pxUb  [        [	        XR
                  5      5      n	OSn	U R                  XxX5        U(       aA  U R                    H1  nUR                  (       d  M  U R                  UR                  5      s  $    gr   )
rG  r   r   r~   r   rU   r   rg   r   r   )
r1   r   r   r   r   r   rj   r   r   r   s
             r3   r   PipelineScheduleMulti.step  s    & \\E&&( " $(#5#5d#C 
 v7K7K!LMM M 	
-P ===..u/B/BCC & r6   c           	     
   U R                  XX45      u  pU R                  (       d9  Ub  U R                  US   US   US   5        OU R                  US   US   S5        U R                   Vs0 s H  oUR                  U_M     nn[        5       n[        5       nUR                  5        H_  n	U	S:  a!  UR                  U R                  U	S-
     5        XR                  S-
  :  d  M>  UR                  U R                  U	S-      5        Ma     [        5       n
[        U R                  U R                     5       GHv  u  p / nUGb  UR                  nUR                  nUR                  n	Uc   S5       eU[         R"                  :X  aP  Xi   nUR%                  XU   X/   5      nU R'                  UUX?5        UR)                  UR+                  U5      5        GO%U[         R,                  :X  ac  Xi   nU R/                  X_5      nX==   S-  ss'   UR1                  UUSX   U R2                  :H  S9  UR)                  UR5                  U5      5        OU[         R6                  :X  aH  Xi   nU R/                  X_5      nUR1                  UUSSS9  UR)                  UR5                  U5      5        ORU[         R8                  :X  a0  Xi   nX==   S-  ss'   UR;                  UX   U R2                  :H  S9  O[=        S	U 35      eU H  nU R                  U   nSnU[?        U5      :  a  UU   nUc  M-  UR                  nUR                  nUR                  n	Uc   S5       eU[         R"                  :X  a4  U	S-   U;   a)  XiS-      nUR)                  URA                  U5      5        M  M  U[,        [6        [8        4;   a  M  [=        S	U 35      e   U H  nU R                  U   nSnU[?        U5      :  a  UU   nUc  M-  UR                  nUR                  nUR                  n	Uc   S5       eU["        [8        4;   a  Mm  U[6        [,        4;   a4  U	S-
  U;   a)  XiS-
     nUR)                  URC                  U5      5        M  M  [=        S	U 35      e   U(       a  [E        U5      RG                  5         GMv  GMy     U RS                  U R                  U5        U R                   H  nURU                  5         M     gs  snf ! [H         a>  n[J        RM                  S
U R                  U RN                  RP                  UU5        UeSnAff = f)zR
Operate on the microbatches for looped schedules (multiple stages on each rank).
Nr   r   zCAll currently supported action types require valid microbatch_indexT)rn   full_backwardr   F)r   zUnknown computation type zy[Rank %s] pipeline schedule %s caught the following exception                      at time_step %s when running action %s)+r   rL  rW  rG  rG   setkeysaddrK  r   r   r  rN  r)  rH   rI   r   r-   r   ro   r   r   r0   rt   r   rU   r   r.   r/   backward_weight_one_chunkr   rr   r   r   r   r   	Exceptionr\   errorr^   r=   r   r   )r1   r   r   rl   r   rj   stage_index_to_stageall_prev_ranksall_next_ranksrG   backward_counter	time_stepr9   r   rH   rm   rk   rn   	prev_rankprev_rank_opsprev_rank_action	next_ranknext_rank_opsnext_rank_actiones                            r3   r   (PipelineScheduleMulti._step_microbatches  s    "//

 ''%''
IaL*Q-P''
IaL$G
 37,,?
2>u$, 	 ?
 $'5#&5/446KQ""22;?C --11""22;?C 7 *1!*4+>+>tyy+I!JIE(*%'-'>'>$%66H"("4"4K#/ ]/ (8+;+;; 4 A!&!8!8$h&79L" 00!6: 

5#9#9(#CD)X-C-CC 4 A#33ED(5:500$!%*.*:*G#33+4	 1  

5#9#9(#CD)X-D-DD 4 A#33ED00$!%*/*/	 1  

5#9#9(#CD)X-E-EE 4 A(5:577$*:*G#33+4 8  )78H7IJ  "0I$($7$7	$BM'+$ 3}#55+8+C('3+;+L+L(#3#D#D&6&B&B'3 a3 ,x/?/??*Q2FF )=1_(M #

5+A+A(+K L	  G
 .)*+2  !",";<L;M N# 9 "0> "0I$($7$7	$BM'+$ 3}#55+8+C('3+;+L+L(#3#D#D&6&B&B'3 a3 ,/II -*)2 
  +Q2FF )=1_(M #

5+A+A(+K L	  G #-";<L;M N# 7 "0@ sO((* w "KP 	DLL&1 \\E**, "?
b  	=IINN++ 	s-   -S3?G(S8+CS87B4S88
U 9T;;U )r   rM  rG  rL  rN  rI  r)  rK  )NNNNNN)r   list[_PipelineStageBase]r_   rF   r`   r   ra   r   rb   r   rc   r   rK  zdict[int, int] | NonerP  zbool | None)r   r   r   r   r   )r=   r>   r?   r@   r   rd   rW  r   r   rB   r   r   s   @r3   rA  rA  ,  s     $(>B?C@D;?)-.(. . !	.
 <. =. >. $9. '. .`(B "#' 	'
 'V  $!%"&"v-v- v-  	v-
 v- v-r6   rA  c
                V   [        [        5      n
[        [        5      n[        [        5      n[        U5       Vs/ s H  nS PM     nnX-  SUS-
  U-
  -  -   X%-   -
  nU	(       a  X-
  S-
  nX#-   U-   n/ nSnU	(       a  [        O[        n[        U5       GH  nUU:  aa  U" U5      nU
U   =nS-   U
U'   UR                  [        U[        R                  U5      5        UUS-
  :X  a  UR                  S /U-  5        Mi  Mk  UUs=::  a  X#-   :  a  O  OU" U5      nU
U   =nS-   U
U'   UR                  [        U[        R                  U5      5        U" U5      nUU   =nS-   UU'   UR                  [        UUU5      5        UR                  U5        U	(       aV  UU-
  U:  aJ  U" UU   5      nUU   =nS-   UU'   UR                  [        U[        R                  U5      5        US-  nGMX  GM[  GM^  U	(       d  UR                  S 5        U" U5      nUU   =nS-   UU'   UR                  [        UUU5      5        UR                  U5        U	(       d  GM  UU-
  U:  d  GM  U" UU   5      nUU   =nS-   UU'   UR                  [        U[        R                  U5      5        US-  nGM     U	(       an  U[        U5      :  a_  U" UU   5      nUU   =nS-   UU'   UR                  [        U[        R                  U5      5        US-  nU	(       a  U[        U5      :  a  M_  U$ s  snf )Nr   r   r   )r   rF   r   r.   r0   ri   rD   r   r-   r   r/   rr   )n_local_stagesrI  
warmup_opsfwd_bwd_opscooldown_opsr)  forward_stage_indexbackward_stage_indexnum_1f1b_microbatchesenable_zero_bubblefwd_stage_mb_indexbwd_stage_mb_indexweight_stage_mb_indexr9  rank_opspost_warmup_ops	total_opsbackward_op_idsweight_op_countFULL_BACKWARD_OR_BACKWARD_INPUTr   fwd_stage_indexrm   r5  bwd_stage_indexr6  weight_stage_indexweight_mb_indexs                               r3   _get_1f1b_rank_opsr  b  s    *5S)9)4S)9,7,< 5:$K%@KqdKH%@ 	&ma.?$.F)GG		O '.2(<7IOO -- $ I
?1"5O /??3/ OO)9)98D Z!^# 89 $ 28
 881"5O 2? CC3/ OO)9)9<H 326O 2? CC3/ OO#3  ""2&!b:o9N&N%9#O4&" (==O'PPO=%&89 * 00'  1$ 'O!& &%226O 2? CC3/ OO#3  ""2&!!b:o9N&N%9#O4&" (==O'PPO=%&89 * 00'  1$k n 33G!G1O,
  55GHHO501 	"H$<$<o	

 	1 33G!G O{ &As   L&c                  \   ^  \ rS rSrSr    S           SU 4S jjjrSS jrSrU =r$ )	r*  i  a7  
The VPP schedule.
See https://arxiv.org/pdf/2104.04473 for details.
Will perform one forward and one backward on the microbatches in steady
state and supports multiple stages per rank. When microbatches are ready for
multiple local stages, VPP prioritizes the earlier microbatch
(also called "depth first").

This schedule is mostly similar to the original paper.
It differs by being relaxing the requirement of num_microbatch % pp_size == 0.
Using the flex_pp schedule, we will have num_rounds = max(1, n_microbatches // pp_group_size) and
it works as long as n_microbatches % num_rounds is 0. As a few examples, support

1. pp_group_size = 4, n_microbatches = 10. We will have num_rounds = 2 and n_microbatches % 2 is 0.
2. pp_group_size = 4, n_microbatches = 3. We will have num_rounds = 1 and n_microbatches % 1 is 0.
c           	       > US   R                   U l        [        T	U ]  UUUUUUS9  [	        U5      U l        US   R                  U l        [        SX R                  -  5      U l	        X R                  -  U l
        X R                  -  S:w  a  [        SU R                   SU S35      e0 U l        [        U R                  5       H"  nU R                  U5      nXR                  U'   M$     g )Nr   )r   r_   r`   ra   rb   rc   r   zRVPP requires the number of microbatches to be a multiple of the number of rounds (z), but got .)rH  rI  r   rd   rr   rr  rJ  r)  maxnumber_of_roundsmicrobatches_per_roundr   rN  r   !_calculate_single_rank_operations)
r1   r   r_   r`   ra   rb   rc   r)  r}  r^   s
            r3   rd   ScheduleVPP.__init__  s    $AY11)+// 	 	
 "&k1I((	 #A~9K9K'K L&48M8M&M#111Q65595J5J4K L)*!-  @B$,,-D==dCH(0% .r6   c           
       ^ ^^	 U 4S jnU" T5      m	T R                   T R                  -  nUT	-
  nX4-
  nT	U-   U-   n[        R                  STT	UUU5        UU 4S jnUU U	4S jn[	        T R                   T R
                  T	UUTUU5      $ )Nc                   > TR                   S-
  TR                  -  nSnXTR                  S-
  U -
  -  -   n[        UTR                  TR                   -  5      $ )Nr   r   )rr  r  rI  r3  rU   )r)  warmups_ops_last_stagemultiply_factorrs  r1   s       r3   get_rank_warmup_opsJScheduleVPP._calculate_single_rank_operations.<locals>.get_rank_warmup_ops,  sm     ##a'++&,"  O/##a'4/3 J
 z4#7#7$:M:M#MNNr6   z=rank %s, warmup_ops %s, 1f1b %s, cooldown_ops %s total_ops %sc                `   > U TR                   -  TR                  -  nUTR                  -  T-   $ rK   )r  rr  rI  )r   local_indexr)  r1   s     r3   rv  JScheduleVPP._calculate_single_rank_operations.<locals>.forward_stage_indexM  s;     333##$K  $"4"44<<r6   c                   > TR                   S-
  U T-
  TR                  -  TR                   -  -
  nUTR                  -  T-   $ )Nr   )rr  r  rI  )r   r  r)  r1   rs  s     r3   rw  KScheduleVPP._calculate_single_rank_operations.<locals>.backward_stage_indexT  sW    ##:%$*E*EE%%&&   $"4"44<<r6   )rr  rU   r\   r   r  rI  )
r1   r)  r  microbatch_opsrt  ru  r  rv  rw  rs  s
   ``       @r3   r  -ScheduleVPP._calculate_single_rank_operations+  s    	O ).
,,t/C/CC$z1%3,|;	K	
	=	= " 	
 		
r6   )r  rr  r  rN  rI  r)  r   )r   rp  r_   rF   r`   r   ra   r   rb   r   rc   r   )r   zlist[_Action | None])	r=   r>   r?   r@   r   rd   r  rB   r   r   s   @r3   r*  r*    se    * $(>B?C@D"1("1 "1 !	"1
 <"1 ="1 >"1 "1H;
 ;
r6   r*  rK   )r   zlist[dist.P2POp]r   z
str | None)r   F)B
__future__r   loggingreabcr   r   collectionsr   r   enumr   typingr	   r
   r   r   paddler   1paddle.distributed.auto_parallel.pipelining.stager   rj   r   paddle.distributeddistributedr   r   
microbatchr   r   r   r   	getLoggerr=   r\   r   r-   r.   r/   r&   r'   r(   r)   r*   r+   r0   r#   r$   r%   r,   compile_action_regexrD   rR   r   r   r   r   Layerr  r%  r.  r+  rA  r  r*  r<   r6   r3   <module>r     s   #  	 # ,    K)  !   
		8	$2Et 2Ej 

((**





				&&  

F

j 
w
 w
tU. Up12`/+ `/F BHH  F$N0c/) c/Ls-- s-~	 Pfq
' q
r6   