
    x-j                        d dl mZ d dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZ er'd dlmZmZ d dl	mZ d dlmZ d d	lmZ eeeegef         Z G d
 d          Zej        	 ddd            ZddZddZdS )    )annotationsN)TYPE_CHECKINGCallable)	framework)_get_global_group_warn_cur_rank_not_in_group)	GeneratorSequence)Tensor)task)Groupc                  H    e Zd ZU dZded<   ded<   ded<   ded	<   	 dddZd
S )P2POpa  
    A class that makes point-to-point operations for "batch_isend_irecv".

    This class creates the type of P2P operation, communication buffer, peer rank,
    Group. Instances of this class will be passed to
    ``paddle.distributed.batch_isend_irecv`` for point-to-point communication.

    Args:
        op (callable): A function to send data to or receive data from a peer process.
            The type of ``op`` is either ``paddle.distributed.isend`` or ``paddle.distributed.irecv``.
        tensor (Tensor): Tensor to send or receive.
        peer (int): The destination or source rank.
        group (Group, optional): The group instance return by new_group or None for global
            default group. Default: None.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env: DISTRIBUTED)

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> dist.init_parallel_env()
            >>> rank = dist.get_rank()
            >>> world_size = dist.get_world_size()

            >>> send_t = paddle.arange(2) + rank
            >>> # paddle.tensor([0, 1])  # Rank-0
            >>> # paddle.tensor([1, 2])  # Rank-1

            >>> recv_t = paddle.empty(shape=[2], dtype=send_t.dtype)

            >>> send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size)
            >>> recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size)

    
_P2POpTypeopr   tensorintpeerGroup | NonegroupNreturnNonec                    |t           j        t           j        fvrt          d          || _        || _        || _        |t                      n|| _        d S )NztInvalid ``op`` function. Expected ``op`` to be of type ``paddle.distributed.isend`` or ``paddle.distributed.irecv``.)	distisendirecvRuntimeErrorr   r   r   r   r   )selfr   r   r   r   s        r/var/www/html/banglarbhumi/venv/lib/python3.11/site-packages/paddle/distributed/communication/batch_isend_irecv.py__init__zP2POp.__init__P   sa     dj$*---0   	,1M&(((u


    N)
r   r   r   r   r   r   r   r   r   r   )__name__
__module____qualname____doc____annotations__r     r!   r   r   r   $   so         $ $L NNNNNNIII #E E E E E E Er!   r   r   r   taskstask | Noner   Generator[None, None, None]c              #  v  K   | t                      n| } | j        }|                                 	 d V  |t          |          dk    r|                                 d S |                    |           d S # |t          |          dk    r|                                 w |                    |           w xY w)Nr   )r   process_group_start_coalescinglen_end_coalescing)r   r)   pgs      r   _coalescing_managerr2   d   s       $)=eE		B&=CJJ!OO     u%%%%% =CJJ!OO    u%%%%s   A7 7AB8p2p_op_listSequence[P2POp]r   c                    t          | t                    rt          d | D                       st          d          | d         j        j        t          fd| D                       st          d          dS )zu
    Helper to check that the ``p2p_op_list`` is a list of P2POp instances and
    all ops use the same backend.
    c              3  @   K   | ]}t          |t                    V  d S r"   )
isinstancer   ).0p2p_ops     r   	<genexpr>z%_check_p2p_op_list.<locals>.<genexpr>y   s=       4 4&,
65!!4 4 4 4 4 4r!   z[Invalid ``p2p_op_list``. Each op is expected to to be of type ``paddle.distributed.P2POp``.r   c              3  8   K   | ]}|j         j        k    V  d S r"   )r   backend)r8   r9   r<   s     r   r:   z%_check_p2p_op_list.<locals>.<genexpr>   s-      II6w&,..IIIIIIr!   z(All groups need to use the same backend.N)r7   listallr   r   r<   )r3   r<   s    @r   _check_p2p_op_listr?   t   s    
 k4(( 
 4 40;4 4 4 1 1 
 :
 
 	

 !n"*GIIII[IIIII GEFFFG Gr!   list[P2POp]
list[task]c                   t          |            | d         j        }t          |          rdS t          j                    r|t                      n|}|j        }g }t          ||          5  | D ]B}|j        }|j	        }|j
        }|j        } ||||          }	|	|                    |	           C	 ddd           n# 1 swxY w Y   |S t          d          )a  
    Send or Receive a batch of tensors asynchronously and return a list of requests.

    Process each of the point-to-point operations in ``p2p_op_list`` and return the
    corresponding tasks. NCCL are currently supported.

    Args:
        p2p_op_list (List[P2POp]): A list of point-to-point operations(type of each operator is
            ``paddle.distributed.P2POp``). The order of the isend/irecv in the list
            matters and it needs to match with corresponding isend/irecv on the
            remote end.

    Returns:
        A list of distributed tasks returned by calling the corresponding
        op in the op_list.

    Warning:
        This API only supports the dygraph mode.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env: DISTRIBUTED)

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> dist.init_parallel_env()
            >>> rank = dist.get_rank()
            >>> world_size = dist.get_world_size()

            >>> send_t = paddle.arange(2) + rank
            >>> # paddle.tensor([0, 1])  # Rank-0
            >>> # paddle.tensor([1, 2])  # Rank-1

            >>> recv_t = paddle.empty(shape=[2], dtype=send_t.dtype)

            >>> send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size)
            >>> recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size)

            >>> tasks = dist.batch_isend_irecv([send_op, recv_op])

            >>> for task in tasks:
            ...     task.wait()

            >>> print(recv_t)
            >>> # paddle.tensor([1, 2])     # Rank-0
            >>> # paddle.tensor([0, 1])     # Rank-1
    r   Nz*Don't support static graph mode currently.)r?   r   r   r   in_dynamic_moder   r<   r2   r   r   r   appendr   )
r3   r   r<   r)   r9   r   r   r   
comm_groupr   s
             r   batch_isend_irecvrF      s>   d {###N E"5))  "" I',}!###%- .. 	' 	'% ' 'Y{#\
r&$
33#LL&&&'	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' GHHHs   ,AB??CCr"   )r   r   r)   r*   r   r+   )r3   r4   r   r   )r3   r@   r   rA   )
__future__r   
contextlibtypingr   r   paddle.distributeddistributedr   paddler   &paddle.distributed.communication.groupr   r   collections.abcr	   r
   r   paddle.base.corer   r   r   r   r   contextmanagerr2   r?   rF   r(   r!   r   <module>rQ      s   # " " " " "     * * * * * * * * ! ! ! ! ! !             
  633333333%%%%%%((((((63.45J=E =E =E =E =E =E =E =E@ '+& & & & &G G G G$FI FI FI FI FI FIr!   