
    ϑi                         S SK r SSKJr  SSKJrJr   " S S\ R                  R                  R                  R                  5      r
 " S S5      rSS	 jrSS
 jrg)    N   )Variable)LayerHelpercorec                   8   ^  \ rS rSrSrU 4S jrU 4S jrSrU =r$ )BlockGuardServ   z`
BlockGuardServ class.

BlockGuardServ class is used to create an op with a block in a program.
c                    > [        U[        5      (       d  [        S5      e[        TU ]  UR
                  R                  5        Xl        g )Nz$BlockGuardServ takes a ListenAndServ)
isinstanceListenAndServ	TypeErrorsuper__init__helpermain_programserver)selfr   	__class__s     [/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/incubate/nn/layer/io.pyr   BlockGuardServ.__init__   s8    6=11BCC334    c                 `   > Ub  gU R                   R                  5         [        TU ]  XU5      $ )NF)r   complete_opr   __exit__)r   exc_typeexc_valexc_tbr   s       r   r   BlockGuardServ.__exit__!   s.    !w6::r   )r   )	__name__
__module____qualname____firstlineno____doc__r   r   __static_attributes____classcell__)r   s   @r   r   r      s    ; ;r   r   c                   :    \ rS rSrSrS
S jrS rS rS rS r	Sr
g	)r   )   aV  
**ListenAndServ Layer**

ListenAndServ is used to create a rpc server bind and listen
on specific TCP port, this server will run the sub-block when
received variables from clients.

Args:
    endpoint(string): IP:port string which the server will listen on.
    inputs(list): a list of variables that the server will get from clients.
    fan_in(int): how many client are expected to report to this server, default: 1.
    optimizer_mode(bool): whether to run the server as a parameter server, default: True.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.incubate.nn.layer.io import ListenAndServ
        >>> import paddle
        >>> paddle.enable_static()
        >>> place = paddle.CPUPlace()
        >>> main = paddle.static.Program()
        >>> with paddle.static.program_guard(main):
        ...     serv = ListenAndServ(
        ...         "127.0.0.1:6170", ["X"], optimizer_mode=False)
        ...     with serv.do():
        ...         x = paddle.static.data(
        ...             shape=[32, 32],
        ...             dtype='float32',
        ...             name="X")
        ...         paddle.nn.initializer.Constant(value=1.0)(x, main.global_block())
        ...         paddle.scale(x=x, scale=10.0)

        >>> exe = paddle.static.Executor(place)
        >>> exe.run(main)
c                 b    [        S5      U l        X l        / U l        Xl        X0l        X@l        g )Nlisten_and_serv)r   r   inputsoutputsendpointfan_inoptimizer_mode)r   r,   r*   r-   r.   s        r   r   ListenAndServ.__init__O   s-    !"34  -r   c                     [        U 5      $ )N)r   )r   s    r   doListenAndServ.doY   s    d##r   c                    U R                   R                  nUR                  5       nU R                  5       n/ n/ nUR                   H  nU R
                  (       av  SUR                  ;   ad  SUR                  ;   aR  UR                  UR                  S   R                  5        UR                  UR                  S   R                  5        M  M  M  UR                   H[  nUR                  U5       HC  nUR                  UR                  U5      5        UR                  UR                  U5      5        ME     M]     M     XE4$ )NGradParam)r   r   current_blockparent_blockopsr.   r*   appendnameinput_namesinputvar)	r   r   r6   r7   paramsgradsopinamein_var_names	            r   get_params_and_grads"ListenAndServ.get_params_and_grads\   s    {{//$224((*##B""RYY&7bii+?MM"))G"4"9"9:LL6!2!7!78 ,@&
  ^^E')xxl&6&6{&CD\%5%5k%BC (7 , $ }r   c                     U R                   R                  nUR                  5       R                  nUS:  d   eUR	                  U5      nU$ )Nr   )r   r   r6   
parent_idxblock)r   progrF   r7   s       r   r7   ListenAndServ.parent_blockr   sE    {{'''')44
Qzz*-r   c                    SSK Jn  U R                  R                  nUR	                  5       nU R                  5       nUR                  SSU R                  00 U R                  U R                  U/UR                  S/S.S9  g )Nr   )DistributedModer)   X )r,   Faninoptimize_blocksdistributed_modegrad_to_block_idtyper*   r+   attrs)7paddle.incubate.distributed.fleet.parameter_server.moderK   r   r   r6   r7   	append_opr*   r,   r-   SYNC)r   rK   r   r6   r7   s        r   r   ListenAndServ.complete_opy   s    	
 {{//$224((*"% MM!$ %4$8$8%'D	 	 	
r   )r,   r-   r   r*   r.   r+   N)   T)r   r    r!   r"   r#   r   r1   rC   r7   r   r$    r   r   r   r   )   s!    #J-$,
r   r   c                    [        U5      [        :X  d   eUc  / nO[        U[        5      (       a  U/n[        U5      [        :X  d   eU R	                  S5      n[        [        U5      5      n [        S
0 [        5       D6n[        R                  R                  5       nUR                  SSU0SU0SU SUU[        R                  R                  R                  0S9  U(       a  UR                  S	SU0S/ 0SU 0S9  gg)aK  
Send variables to the server side, and get vars from server
side when server have finished running server side program.

Args:
    endpoints (str): comma separated IP:PORT pairs in the order
               of send_vars to send
    send_vars (list): variables to send to server
    sync (bool): whether to wait the request finish

N,sendrL   Out	endpointsepmaprR   send_barrier)Send)rS   listr   r   splitsetr   localsr   op_proto_and_checker_makerkOpRoleAttrNamerV   OpRoleRPC)r_   	send_varsdummy_outputsyncr`   r   rpc_op_role_names          r   rb   rb      s    	?d"""	L(	+	+$~%%%OOC ESZ I,68,F66FFH
Y%Ud==DDHH
	  	 &BK	*	 	 	
 r   c                 n   [        U5      [        :X  d   eUc  / nO[        U[        5      (       a  U/n[        U5      [        :X  d   eU R	                  S5      n[        [        U5      5      n [        S
0 [        5       D6nUR                  SSU0SU0XS.S9  U(       a  UR                  SSU0SU 0S	9  U$ )a5  
Receive variables from server side

Args:
    endpoints (str): comma separated IP:PORT pairs in the order
               of send_vars to send
    get_vars (list): vars to get from server after send completes.
    sync (bool): whether to wait the request finish

Returns:
    list: list of received variables
r\   recvrL   r^   )r_   r`   rR   fetch_barrierr_   )rS   r+   rT   )Recv)	rS   rc   r   r   rd   re   r   rf   rV   )r_   get_varsdummy_inputrm   r`   r   s         r   rr   rr      s     >T!!!	K	*	*"m$$$OOC ESZ I,68,F
[!!%6	    H%	* 	 	

 Or   )NT)paddlebase.frameworkr   	frameworkr   r   staticnncontrol_flow
BlockGuardr   r   rb   rr   rZ   r   r   <module>r|      sJ     ' +;V]]%%22== ;*f
 f
R+
\&r   