
    x-j.                    r    d dl mZ d dlZd dlmZmZmZ d dlZd dlm	Z
 d dlmZmZ erd dlmZ 	 	 	 dddZdS )    )annotationsN)TYPE_CHECKINGAnyCallable)flattenpack_sequence_as)ProcessMeshFfuncCallable[..., Any]out_placementslist[list[dist.Placement]]in_placements!list[list[dist.Placement]] | Noneprocess_meshProcessMesh | Nonereshard_inputsboolreturnc                F     d fd}t          j        ||          S )aT  
    The `local_map` API allows users to pass dist_tensors to a function that is written
    to be applied on ``paddle.Tensor`` s. It works by extracting the local components
    of dist_tensors, calling the function, and wrapping the outputs as dist_tensors
    according to the ``out_placements``.

    Args:
        func (Callable): The function to be applied on each local shard of dist_tensors.

        out_placements (list[list[dist.Placement]]):
            The desired placements for each output tensor. Must be a list where each element
            is a list of Placement objects specifying the distribution strategy for that
            output tensor. The length of the outer list must match the number of outputs
            from ``func``. For non-tensor outputs, the corresponding placement must be None.
            When there are no dist_tensor inputs, process_mesh must be specified to use
            non-None placements.

        in_placements (Optional[list[list[dist.Placement]]], optional):
            The required placements for each input tensor. If specified, must be a list
            where each element is a list of Placement objects defining the distribution
            strategy for that input tensor. The length of the outer list must match the
            number of input tensors.
            Default: None

        process_mesh (ProcessMesh, optional):
            The process mesh that all dist_tensors are placed on. If not specified,
            this will be inferred from the input dist_tensors' process mesh.
            local_map requires all dist_tensors to be placed on the same process mesh.
            Must be specified when there are no dist_tensor inputs but out_placements
            contains non-None values.
            Default: None

        reshard_inputs (bool, optional):
            the bool value indicating whether to reshard the input :dist_tensors when
            their placements are different from the required input placements. If this
            value is ``False`` and some :dist_tensor input has a different placement,
            an exception will be raised. Default: False.

    Returns:
        Callable: A function that applies func to local shards of input dist_tensors and returns dist_tensors or original values.

    Example:
        .. code-block:: python

            >>> from __future__ import annotations
            >>> import paddle
            >>> import paddle.distributed as dist
            >>> from paddle import Tensor
            >>> from paddle.distributed import ProcessMesh

            >>> def custom_function(x):
            ...     mask = paddle.zeros_like(x)
            ...     if dist.get_rank() == 0:
            ...         mask[1:3] = 1
            ...     else:
            ...         mask[4:7] = 1
            ...     x = x * mask
            ...     mask_sum = paddle.sum(x)
            ...     mask_sum = mask_sum / mask.sum()
            ...     return mask_sum

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> dist.init_parallel_env()
            >>> mesh = ProcessMesh([0, 1], dim_names=["x"])
            >>> local_input = paddle.arange(0, 10, dtype="float32")
            >>> local_input = local_input + dist.get_rank()
            >>> input_dist = dist.auto_parallel.api.dtensor_from_local(
            ...     local_input, mesh, [dist.Shard(0)]
            ... )
            >>> wrapped_func = dist.local_map(
            ...     custom_function,
            ...     out_placements=[[dist.Partial(dist.ReduceType.kRedSum)]],
            ...     in_placements=[[dist.Shard(0)]],
            ...     process_mesh=mesh
            ... )
            >>> output_dist = wrapped_func(input_dist)

            >>> local_value = output_dist._local_value()
            >>> gathered_values: list[Tensor] = []
            >>> dist.all_gather(gathered_values, local_value)

            >>> print(f"[Rank 0] local_value={gathered_values[0].item()}")
            [Rank 0] local_value=1.5
            >>> print(f"[Rank 1] local_value={gathered_values[1].item()}")
            [Rank 1] local_value=6.0
            >>> print(f"global_value (distributed)={output_dist.item()}")
            global_value (distributed)=7.5

            >>> # This case needs to be executed in a multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1
            >>> # python -m paddle.distributed.launch {test_case}.py
    r   r   c                	   t          |          }Kt                    t          |          k    s+J dt                     dt          |           d            g }d}t          |          D ]\  }}t          j        j                            |          rm|}| 4t          j                    r|j	        } n|
                                j	        } d}|         }	|	5t          j                    r|j        }	n|
                                j        }	nt          j                    r?|	|j        k    r3rt          j        || |	          }n~t          d|	 d|j                   nc|	|
                                j        k    rFrt          j        || |	          }n-t          d|	 d|
                                j         d	          t          j        j                            || |	          }
|                    |
           |                    |           t!          ||          } |i |}|}|r"t          |          }t          |          t                    k    s+J d
t                     dt          |           d            g }t#          |          D ]\  }}t          j                    rt%          |t          j                  r{t          j        j                            |          rJ dt)          |           d|             |                    t          j        j                            || |                     |J d| d| d            |                    |           t%          |t          j        j        j        j                  r|t          j        j                            |          rJ dt)          |           d|             |                    t          j        j                            || |                     ||J d| d| d            |                    |           t!          ||          S t          |          }g }t#          |          D ]b\  }}|F| 
J d            |                    t          j        j                            || |                     M|                    |           ct!          ||          S )Nzin_placements length z% does not match number of input args !FTzin_placement z' does not match dist_tensor.placements z3 does not match dist_tensor.dist_attr().placements zBIf reshard_inputs is wanted, set reshard_inputs=True to local_map.z@local_map requires one PlacementType for each output value, got z placements but expected z%Expected dense tensor output but got z: z/Expected None placements for non-tensor output z	 but got z>process_mesh must be specified when out_placements is not None)r   len	enumeratedistauto_parallelapiis_dist_tensorpaddlein_dynamic_moder   	dist_attr
placementsreshard
ValueErrordtensor_to_localappendr   zip
isinstanceTensortypedtensor_from_localbase	libpaddlepirValue)r   argskwargsflat_dist_argsflat_local_argsseen_dist_tensoridxargdist_tensorin_placementlocal_tensor
local_argsoutoriginal_outflat_outflat_dist_and_arg_outout_placementr
   r   r   r   s                    j/var/www/html/banglarbhumi/venv/lib/python3.11/site-packages/paddle/distributed/auto_parallel/local_map.pywrappedzlocal_map.<locals>.wrapped   s6    $}%%^)<)<<<<?M(:(: ? ?(+N(;(;? ? ? =<<
  !.11 2	, 2	,HC!%44S99 1,!'-// L'2'?'2'<'<'>'>'K#'  ,#0#5L#+!133 N+6+ALL+6+@+@+B+B+MLL!133 &+{/EEE#1 !&26,(3\<3& 3&KK +5 )F  )F  )Fmx  nD  )F  )F+& +& %&  F !-#.#8#8#:#:#E!F !F $2 	!&26,(3\<3& 3&KK +5)L )L )L  zE  zO  zO  zQ  zQ  z\ )L )L )L+& +& %&
  $15FF|     &&|4444&&s++++%dO<<
dJ)&)) <	Is||Hx==C$7$7777$>**$ $x==$ $ $ 877 %'!&)(N&C&C ": ":"])++ !:!#v}55 :#'#5#9#H#H#M#M  VDIIVVQTVV M .44 .2EE #\=      -448c 8 8'48 8 8  544 .44S9999!#v{'<'@'FGG :#'#5#9#H#H#M#M  VDIIVVQTVV M .44 .2EE #\=      -448c 8 8'48 8 8  544 .44S9999#L2GHHHs||H$&!&)(N&C&C 6 6"] ,'33X 433 *00*.AA}     *005555#L2GHHH    )r   r   )	functoolspartial)r
   r   r   r   r   r@   s   ``` ` r?   	local_maprD      s\    HI I I I I I I I IB Wl333rA   )NNF)r
   r   r   r   r   r   r   r   r   r   r   r   )
__future__r   rB   typingr   r   r   r   paddle.distributeddistributedr   paddle.utilsr   r   r	   rD    rA   r?   <module>rK      s    # " " " " "     / / / / / / / / / /  ! ! ! ! ! ! 2 2 2 2 2 2 2 2 /...... 8<'+ e4 e4 e4 e4 e4 e4 e4rA   