
    Αi.                        S SK Jr  S SKrS SKJrJrJr  S SKrS SKJ	r
  S SKJrJr  \(       a  S SKJr     S           SS jjrg)	    )annotationsN)TYPE_CHECKINGAnyCallable)flattenpack_sequence_as)ProcessMeshc                L   ^ ^^^ SU UUU4S jjn[         R                  " XS5      $ )a  
The `local_map` API allows users to pass dist_tensors to a function that is written
to be applied on ``paddle.Tensor`` s. It works by extracting the local components
of dist_tensors, calling the function, and wrapping the outputs as dist_tensors
according to the ``out_placements``.

Args:
    func (Callable): The function to be applied on each local shard of dist_tensors.

    out_placements (list[list[dist.Placement]]):
        The desired placements for each output tensor. Must be a list where each element
        is a list of Placement objects specifying the distribution strategy for that
        output tensor. The length of the outer list must match the number of outputs
        from ``func``. For non-tensor outputs, the corresponding placement must be None.
        When there are no dist_tensor inputs, process_mesh must be specified to use
        non-None placements.

    in_placements (Optional[list[list[dist.Placement]]], optional):
        The required placements for each input tensor. If specified, must be a list
        where each element is a list of Placement objects defining the distribution
        strategy for that input tensor. The length of the outer list must match the
        number of input tensors.
        Default: None

    process_mesh (ProcessMesh, optional):
        The process mesh that all dist_tensors are placed on. If not specified,
        this will be inferred from the input dist_tensors' process mesh.
        local_map requires all dist_tensors to be placed on the same process mesh.
        Must be specified when there are no dist_tensor inputs but out_placements
        contains non-None values.
        Default: None

    reshard_inputs (bool, optional):
        the bool value indicating whether to reshard the input :dist_tensors when
        their placements are different from the required input placements. If this
        value is ``False`` and some :dist_tensor input has a different placement,
        an exception will be raised. Default: False.

Returns:
    Callable: A function that applies func to local shards of input dist_tensors and returns dist_tensors or original values.

Example:
    .. code-block:: python

        >>> from __future__ import annotations
        >>> import paddle
        >>> import paddle.distributed as dist
        >>> from paddle import Tensor
        >>> from paddle.distributed import ProcessMesh

        >>> def custom_function(x):
        ...     mask = paddle.zeros_like(x)
        ...     if dist.get_rank() == 0:
        ...         mask[1:3] = 1
        ...     else:
        ...         mask[4:7] = 1
        ...     x = x * mask
        ...     mask_sum = paddle.sum(x)
        ...     mask_sum = mask_sum / mask.sum()
        ...     return mask_sum

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> dist.init_parallel_env()
        >>> mesh = ProcessMesh([0, 1], dim_names=["x"])
        >>> local_input = paddle.arange(0, 10, dtype="float32")
        >>> local_input = local_input + dist.get_rank()
        >>> input_dist = dist.auto_parallel.api.dtensor_from_local(
        ...     local_input, mesh, [dist.Shard(0)]
        ... )
        >>> wrapped_func = dist.local_map(
        ...     custom_function,
        ...     out_placements=[[dist.Partial(dist.ReduceType.kRedSum)]],
        ...     in_placements=[[dist.Shard(0)]],
        ...     process_mesh=mesh
        ... )
        >>> output_dist = wrapped_func(input_dist)

        >>> local_value = output_dist._local_value()
        >>> gathered_values: list[Tensor] = []
        >>> dist.all_gather(gathered_values, local_value)

        >>> print(f"[Rank 0] local_value={gathered_values[0].item()}")
        [Rank 0] local_value=1.5
        >>> print(f"[Rank 1] local_value={gathered_values[1].item()}")
        [Rank 1] local_value=6.0
        >>> print(f"global_value (distributed)={output_dist.item()}")
        global_value (distributed)=7.5

        >>> # This case needs to be executed in a multi-card environment
        >>> # export CUDA_VISIBLE_DEVICES=0,1
        >>> # python -m paddle.distributed.launch {test_case}.py
c                
  > [        U5      nTb8  [        T5      [        U5      :X  d    S[        T5       S[        U5       S35       e/ nSn[        U5       GH  u  pg[        R                  R
                  R                  U5      (       Ga  UnU cA  [        R                  " 5       (       a  UR                  n OUR                  5       R                  n SnTGb  TU   n	U	cB  [        R                  " 5       (       a  UR                  n	OUR                  5       R                  n	O[        R                  " 5       (       aJ  XR                  :w  a:  T(       a  [        R                  " XU	5      nO[        SU	 SUR                   35      eOgU	UR                  5       R                  :w  aI  T(       a  [        R                  " XU	5      nO*[        SU	 SUR                  5       R                   S	35      e[        R                  R
                  R                  XW	5      n
UR                  U
5        GM  UR                  U5        GM     [!        X5      nT" U0 UD6nUnU(       Ga#  [        U5      n[        U5      [        T5      :X  d    S
[        T5       S[        U5       S35       e/ n[#        UT5       GH  u  nn[        R                  " 5       (       a  [%        U[        R&                  5      (       a  [        R                  R
                  R                  U5      (       a   S[)        U5       SU 35       eUR                  [        R                  R
                  R+                  XU5      5        M  Ub   SU SU S35       eUR                  U5        M  [%        U[        R,                  R.                  R0                  R2                  5      (       a  [        R                  R
                  R                  U5      (       a   S[)        U5       SU 35       eUR                  [        R                  R
                  R+                  XU5      5        GM  Ub   SU SU S35       eUR                  U5        GM     [!        X5      $ [        U5      n/ n[#        UT5       H_  u  nnUbE  U c   S5       eUR                  [        R                  R
                  R+                  XU5      5        MN  UR                  U5        Ma     [!        X5      $ )Nzin_placements length z% does not match number of input args !FTzin_placement z' does not match dist_tensor.placements z3 does not match dist_tensor.dist_attr().placements zBIf reshard_inputs is wanted, set reshard_inputs=True to local_map.z@local_map requires one PlacementType for each output value, got z placements but expected z%Expected dense tensor output but got z: z/Expected None placements for non-tensor output z	 but got z>process_mesh must be specified when out_placements is not None)r   len	enumeratedistauto_parallelapiis_dist_tensorpaddlein_dynamic_modeprocess_mesh	dist_attr
placementsreshard
ValueErrordtensor_to_localappendr   zip
isinstanceTensortypedtensor_from_localbase	libpaddlepirValue)r   argskwargsflat_dist_argsflat_local_argsseen_dist_tensoridxargdist_tensorin_placementlocal_tensor
local_argsoutoriginal_outflat_outflat_dist_and_arg_outout_placementfuncin_placementsout_placementsreshard_inputss                    j/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/auto_parallel/local_map.pywrappedlocal_map.<locals>.wrapped   s    $}%^)<< 'M(:'; <((+N(;'<A?<
  !.1HC!!%%44S99!'--//'2'?'?'2'<'<'>'K'K#'  ,#0#5L#+!1133+6+A+AL+6+@+@+B+M+ML!1133+/E/EE#126,,(3<3&K +5*7~Elmx  nD  nD  mE  )F+& %&  F !-#.#8#8#:#E#E!F $226,,(3<3&K +5*7~Ex  zE  zO  zO  zQ  z\  z\  y])L)L+& %&
  $1155FF|   &&|4&&s+e 2h &d<
J)&)s|Hx=C$77 >*++Dx=/$7 %'!&)(N&C"]))++!#v}}55#'#5#5#9#9#H#H#M#M CDI;bQTPUVM .44 ..22EE #=  -4 McU S''4oQ84 .44S9!#v{{'<'<'@'@'F'FGG#'#5#5#9#9#H#H#M#M CDI;bQTPUVM .44 ..22EE #=  -4 McU S''4oQ84 .44S9E 'DF $LHHs|H$&!&)(N&C"] ,'3 X3 *00**..AA} *005 'D $LHH    )r   ProcessMesh | None)	functoolspartial)r5   r7   r6   r   r8   r:   s   ``` ` r9   	local_mapr@      s&    HI IB W33r<   )NNF)r5   Callable[..., Any]r7   zlist[list[dist.Placement]]r6   z!list[list[dist.Placement]] | Noner   r=   r8   boolreturnrA   )
__future__r   r>   typingr   r   r   r   paddle.distributeddistributedr   paddle.utilsr   r   r	   r@    r<   r9   <module>rJ      sq    #  / /  ! 2. 8<'+ e4
e4.e4 5e4 %	e4
 e4 e4r<   