
    Αi                       % S r SSKJr  SSKJrJrJr  SSKJr  SSK	r	SSK
Jr  SSKJr  \(       ab  SSKJrJrJr  SS	K	Jr  SS
KJr  \S   rS\S'   \S   rS\S'    " S S\5      r " S S\5      r " S S\5      r " S S\\5      r " S S\5      r/ r " S S5      r " S S\5      r " S S\5      r  " S S \5      r! " S! S"\5      r"g)#zBThis is definition of dataset class, which is high performance IO.    )annotations)TYPE_CHECKINGLiteral	TypedDict)text_formatN)core)data_feed_pb2)NotRequired	TypeAliasUnpack)Tensor)Fleet)r      r   
_InputType_CurrentPhasec                  f    \ rS rSr% S\S'   S\S'   S\S'   S\S'   S	\S
'   S\S'   S\S'   S\S'   Srg)_DatasetBaseSettings#   NotRequired[int]
batch_size
thread_numzNotRequired[list[Tensor]]use_varNotRequired[str]pipe_commandzNotRequired[_InputType]
input_typefs_namefs_ugidownload_cmd N__name__
__module____qualname____firstlineno____annotations____static_attributes__r       h/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/fleet/dataset/dataset.pyr   r   #   s2    $$$$**&&++!!  &&r'   r   c                  \    \ rS rSr% S\S'   S\S'   S\S'   S\S'   S\S'   S\S	'   S\S
'   Srg)#_InMemoryDatasetDistributedSettings-   r   
merge_sizeNotRequired[bool]parse_ins_idparse_contentfleet_send_batch_sizefleet_send_sleep_secondsfea_evalcandidate_sizer   Nr    r   r'   r(   r*   r*   -   s,    $$''((//"22##((r'   r*   c                  *    \ rS rSr% S\S'   S\S'   Srg)_InMemoryDatasetSettings6   r   data_feed_typer   	queue_numr   Nr    r   r'   r(   r5   r5   6   s    ((##r'   r5   c                      \ rS rSrSrg)_InMemoryDatasetFullSettings:   r   N)r!   r"   r#   r$   r&   r   r'   r(   r:   r:   :   s     	r'   r:   c                  H    \ rS rSr% S\S'   S\S'   S\S'   S\S'   S\S	'   S
rg)_BoxPSDatasetSettings?   r   rank_offsetr   pv_batch_sizer-   parse_logkeymerge_by_sidenable_pv_merger   Nr    r   r'   r(   r=   r=   ?   s     %%''''''**r'   r=   c                     \ rS rSr% SrS\S'   S\S'   S\S'   S	\S
'   S\S'   S\S'   S'S jrSS/ SSSSS4                 S(S jjrS rS r	S r
S)S jrS rS rS rS rS rS rS rS  rS! rS" rS# rS$ rS%rg&)*DatasetBaseJ   zBase dataset class.data_feed_pb2.DataFeedDesc
proto_desccore.Datasetdatasetintr   	list[str]filelistbool
use_ps_gpuzcore.PSGPU | Nonepsgpuc                    [         R                  " 5       U l        SU R                  l        [        R
                  " S5      U l        SU l        / U l        SU l	        SU l
        g)Init.catMultiSlotDatasetr   FN)r	   DataFeedDescrH   r   r   DatasetrJ   r   rM   rO   rP   selfs    r(   __init__DatasetBase.__init__T   sM     (446',$||$67
r'   r   rS   r    c	                    U R                  U5        U R                  U5        U R                  U5        U R                  U5        U R	                  U5        U R                  Xg5        U R                  U5        g)a&  
should be called only once in user's python scripts to initialize settings of dataset instance.
Normally, it is called by InMemoryDataset or QueueDataset.

Args:
    batch_size(int): batch size. It will be effective during training. default is 1.
    thread_num(int): thread num, it is the num of readers. default is 1.
    use_var(list): list of variables. Variables which you will use. default is [].
    pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat"
    input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.
    fs_name(str): fs name. default is "".
    fs_ugi(str): fs ugi. default is "".
    download_cmd(str): customized download command. default is "cat"


N)_set_batch_size_set_thread_set_use_var_set_pipe_command_set_input_type_set_hdfs_config_set_download_cmd)	rX   r   r   r   r   r   r   r   r   s	            r(   initDatasetBase.init`   sf    6 	Z($'"|,Z(g.|,r'   c                $    XR                   l        g)aT  
Set pipe command of current dataset
A pipe command is a UNIX pipeline command that can be used only

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.dataset.DatasetBase()
        >>> dataset._set_pipe_command("python my_script.py")

Args:
    pipe_command(str): pipe command

N)rH   r   )rX   r   s     r(   r`   DatasetBase._set_pipe_command   s      (4$r'   c                $    XR                   l        g)a  
Set batch size. Will be effective during training

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset._set_batch_size(128)

Args:
    batch_size(int): batch size

N)rH   r   )rX   r   s     r(   r]   DatasetBase._set_batch_size   s     &0"r'   c                F    U R                   R                  U5        Xl        g)z
Set thread num, it is the num of readers.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset._set_thread(12)

Args:
    thread_num(int): thread num
N)rJ   set_thread_numr   rX   r   s     r(   r^   DatasetBase._set_thread   s     	##J/$r'   c                F    U R                   R                  U5        Xl        g)aO  
Set file list in current worker. The filelist is indicated by a list of file names (string).

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset.set_filelist(['a.txt', 'b.txt'])

Args:
    filelist(list[str]): list of file names of inputs.
N)rJ   set_filelistrM   )rX   rM   s     r(   ro   DatasetBase.set_filelist   s     	!!(+ r'   c                $    XR                   l        g N)rH   r   )rX   r   s     r(   ra   DatasetBase._set_input_type   s    %/"r'   c                <    U R                   R                  nXl        g)z
Set user slot name.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset._set_uid_slot('6048')

Args:
    set_uid_slot(string): user slot name
N)rH   multi_slot_descuid_slot)rX   rv   
multi_slots      r(   _set_uid_slotDatasetBase._set_uid_slot   s     __44
&r'   c                p   U R                   R                  nU GH  nUR                  R                  5       nSUl        UR
                  Ul        [        R                  R                  5       (       a-  SUl	        UR                  R                  UR                  5        O<UR                  S:X  a,  SUl	        UR                  R                  UR                  5        UR                  [        R                  :X  a	  SUl        M  UR                  [        R                   :X  a
  SUl        GM  [#        S5      e   g)z
Set Variables which you will use.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset._set_use_var([data, label])

Args:
    var_list(list): variable list
Tr   floatuint64zWCurrently, paddle.distributed.fleet.dataset only supports dtype=float32 and dtype=int64N)rH   ru   slotsaddis_usednamepaddle	frameworkuse_pir_apiis_denseshapeextend	lod_leveldtypefloat32typeint64
ValueError)rX   var_listrw   varslot_vars        r(   r_   DatasetBase._set_use_var   s     __44
C!''++-H#HHHHM++--$(!%%cii0==A%(,H%NN))#))4yyFNN* 'fll* ( m ! r'   c                :    U R                   R                  X5        g)a  
Set hdfs config: fs name ad ugi

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset._set_hdfs_config("my_fs_name", "my_fs_ugi")

Args:
    fs_name(str): fs name
    fs_ugi(str): fs ugi
N)rJ   set_hdfs_config)rX   r   r   s      r(   rb   DatasetBase._set_hdfs_config   s     	$$W5r'   c                :    U R                   R                  U5        g)a  
Set customized download cmd: download_cmd

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> dataset._set_download_cmd("./read_from_afs")

Args:
    download_cmd(str): customized download command
N)rJ   set_download_cmd)rX   r   s     r(   rc   DatasetBase._set_download_cmd  s     	%%l3r'   c                N   U R                   [        U R                  5      :  a  [        U R                  5      U l         U R                  R	                  U R                   5        U R                  R                  U R                  5       5        U R                  R                  5         g)P
Set data_feed_desc before load or shuffle,
user no need to call this function.
N)r   lenrM   rJ   rk   set_data_feed_desc_desccreate_readersrW   s    r(   _prepare_to_runDatasetBase._prepare_to_run   sg    
 ??S//!$--0DO##DOO4''

5##%r'   c                    Xl         [        R                  " 5       (       d  SU l         gU R                   (       a  [        R                  " 5       U l        gg)z1
set use_ps_gpu flag

Args:
    use_ps_gpu: bool
r   N)rO   r   _is_compiled_with_heterpsPSGPUrP   )rX   rO   s     r(   _set_use_ps_gpuDatasetBase._set_use_ps_gpu+  s8     %--//DO__DJ r'   c                8    U R                   R                  5         g rr   )rJ   destroy_readersrW   s    r(   _finish_to_runDatasetBase._finish_to_run9  s    $$&r'   c                B    [         R                  " U R                  5      $ )a  
Returns a protobuf message for this DataFeedDesc

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> print(dataset._desc())
        pipe_command: "cat"

Returns:
    A string message
)r   MessageToStringrH   rW   s    r(   r   DatasetBase._desc<  s     **4??;;r'   c                    g rr   r   rl   s     r(   _dynamic_adjust_before_train(DatasetBase._dynamic_adjust_before_trainM      r'   c                    g rr   r   rW   s    r(   _dynamic_adjust_after_train'DatasetBase._dynamic_adjust_after_trainP  r   r'   c           
     8   [        US5      n[        U5      n UR                  5       nU(       GaV  UR                  U5      nU" 5        GH8  n[        U5      n	XY:w  a  [	        SU SU	 35      e[        U5       GH  u  p[        US   5      S:X  a  [	        SUS    S35      eX   R                  [        R                  :X  a<  [        S US    5       5      (       d"  [        S	R                  US   S
US   5      5      eX   R                  [        R                  :X  d"  X   R                  [        R                  :X  d  M  [        S US    5       5      (       a  M  [        SR                  US   SUS   5      5      e   GM;     OOGMr  UR                  5         g)aU  
 Var consistency inspection of use_var_list and data_generator data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('need to work with real dataset')
        >>> import paddle
        >>> from dataset_generator import CTRDataset
        >>> dataset = paddle.distributed.fleet.DatasetBase()
        >>> generator_class = CTRDataset()
        >>> dataset._check_use_var_with_data_generator([data, label], generator_class, "data/part-00000")

Args:
    var_list(list): variable list
    data_generator_class(class): data_generator class
    test_file(str): local test file path
rz&var length mismatch error: var_list = z vs data_generator = r   r   zvar length error: var z 's length in data_generator is 0c              3  B   #    U  H  n[        U[        5      v   M     g 7frr   )
isinstancer{   .0eles     r(   	<genexpr>ADatasetBase._check_use_var_with_data_generator.<locals>.<genexpr>}  s      K>DsJsE22f   a  var dtype mismatch error: var name = {}, var type in var_list = {}, while var in data_generator contains non-float value, which is {} 
Please check if order of var_list and data_generator are aligned. 
Please check if var's type in data_generator is correct.r{   c              3  B   #    U  H  n[        U[        5      v   M     g 7frr   )r   rK   r   s     r(   r   r     s     %Mfsjc&:&:fr   a   var dtype mismatch error: var name = {}, var type in var_list = {}, while var in data_generator contains non-int value, which is {} 
Please check if order of var_list and data_generator are aligned. 
Please check if var's type in data_generator is correct.rK   N)openr   readlinegenerate_sampler   	enumerater   r   r   all	TypeErrorformatr   int32close)rX   r   data_generator_class	test_filefvar_lenline	line_iteruser_parsed_linedata_gen_lenir   s               r(   "_check_use_var_with_data_generator.DatasetBase._check_use_var_with_data_generatorS  s   , C h-::<D0@@F	(1$#&'7#8L.(DWIMbcobpq  #,,<"=s1v;!+","8Q@` a#  $;,,>s K>A!fK H H #,![[a[a$'FGSV\"#  %K--='{00FLL@"%%Mc!f%M"M"M"+![[a[a$'FE3q6\"# + #> )4J S V 	
	r'   )rJ   rM   rH   rP   r   rO   NreturnNone)r   rK   r   rK   r   list[Tensor]r   strr   r   r   r   r   r   r   r   r   r   )rM   rL   r   r   )r!   r"   r#   r$   __doc__r%   rY   rd   r`   r]   r^   ro   ra   rx   r_   rb   rc   r   r   r   r   r   r   r   r&   r   r'   r(   rE   rE   J   s    **O
  "!!"!!-!- !- 	!-
 !- !- !- !- !- 
!-F4$0"%"!"0'"!F6"4 	&&'<"Dr'   rE   c                    ^  \ rS rSr% SrS\S'   S\S'   S\S'   S	\S
'   S\S'   S	\S'   S	\S'   S	\S'   S	\S'   S	\S'   S	\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S'   S\S '   S	\S'   S	\S'   S\S'   S\S'   S	\S!'   S\S"'   SAU 4S# jjr    SBS$ jr    SCS% jrSDU 4S& jjr	S' r
S( rS) rS* rS+ rS, rS- rSES. jrSFS/ jrSGS0 jrS1 rS2 rS3 rSHS4 jr                SIS5 jrSJSKS6 jjrSLSMS7 jjrSAS8 jrSAS9 jr SN     SOS: jjrSAS; jrSLSPS< jjrSLSPS= jjr SQS> jr!SRS? jr"S@r#U =r$$ )SInMemoryDataseti  z
:api_attr: Static Graph

It will load data into memory and shuffle data before training.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()

rI   rJ   rG   rH   
int | Noner0   rN   is_user_set_queue_numr8   r.   r/   rA   rB   rC   merge_by_lineidr1   rK   r   r   r   r   r   r   r   r   r   r   r   r7   r,   r2   r3   c                   > [         TU ]  5         SU R                  l        SU l        SU l        SU l        SU l        SU l        SU l	        SU l
        SU l        SU l        SU l        g)rR   MultiSlotInMemoryDataFeedNFT)superrY   rH   r   r0   r   r8   r.   r/   rA   rB   rC   r   r1   rX   	__class__s    r(   rY   InMemoryDataset.__init__  si    :%)"%*"!"! $$(,%r'   c                   UR                  SS5      nUS:  a  U R                  U5        UR                  SS5      nU R                  U5        UR                  SS5      nU R                  U5        UR                  SS5      nU(       a  U R	                  U5        UR                  S	S5      nU(       a  U R                  U5        UR                  S
S5      nU(       a%  UR                  SS5      nU R                  US5        gg)a  
:api_attr: Static Graph

should be called only once in user's python scripts to initialize distributed-related settings of dataset instance
Args:
    kwargs: Keyword arguments. Currently, we support following keys in **kwargs:

    merge_size(int): ins size to merge, if merge_size > 0, set merge by line id,
                     instances of same line id will be merged after shuffle,
                     you should parse line id in data generator. default is -1.
    parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False.
    parse_content(bool): Set if Dataset need to parse content. default is False.
    fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024
    fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0
    fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle.
                    default is False.
    candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=[])

        >>> dataset._init_distributed_settings(
        ...     parse_ins_id=True,
        ...     parse_content=True,
        ...     fea_eval=True,
        ...     candidate_size=10000)

r,   r   r.   Fr/   r0   Nr1   r2   r3   '  T)get_set_merge_by_lineid_set_parse_ins_id_set_parse_content_set_fleet_send_batch_size_set_fleet_send_sleep_seconds_set_fea_eval)	rX   kwargsr,   r.   r/   r0   r1   r2   r3   s	            r(   _init_distributed_settings*InMemoryDataset._init_distributed_settings  s    R ZZb1
>%%j1zz.%8|,

?E:. &

+BD I ++,AB#)::.H$#O #../GH::j%0#ZZ(8%@N~t4 r'   c                \   U GH  nUS:X  a  U R                  X   5        M  US:X  a  U R                  X   5        M:  US:X  a  U R                  X   5        MU  US:X  a  U R                  X   5        Mp  US:X  a  U R	                  X   5        M  US:X  a  SU;   a  U R                  X   US   5        M  US:X  a  U R                  X   5        M  US	:X  a+  UR                  S	S
5      S:  a  U R                  X   5        M  US:X  a  U R                  X   5        GM  US:X  a  U R                  X   5        GM4  US:X  a  U R                  X   5        GMP  US:X  a  U R                  X   5        GMl  US:X  d  GMu  X   (       d  GM  UR                  SS5      nU R                  US5        GM     g)a	  
:api_attr: Static Graph

should be called in user's python scripts to update settings of dataset instance.

Args:
    kwargs: Keyword arguments. Currently, we support following keys in **kwargs,
            including single node settings and advanced distributed related settings:
    batch_size(int): batch size. It will be effective during training. default is 1.
    thread_num(int): thread num, it is the num of readers. default is 1.
    use_var(list): list of variables. Variables which you will use. default is [].
    input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.
    fs_name(str): fs name. default is "".
    fs_ugi(str): fs ugi. default is "".
    pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat"
    download_cmd(str): customized download command. default is "cat"
    data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed".
    queue_num(int): Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++.

    merge_size(int): ins size to merge, if merge_size > 0, set merge by line id,
                     instances of same line id will be merged after shuffle,
                     you should parse line id in data generator. default is -1.
    parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False.
    parse_content(bool): Set if Dataset need to parse content. default is False.
    fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024
    fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0
    fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle.
                    default is False.
    candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=[])
        >>> dataset._init_distributed_settings(
        ...     parse_ins_id=True,
        ...     parse_content=True,
        ...     fea_eval=True,
        ...     candidate_size=10000)
        >>> dataset.update_settings(batch_size=2)

r   r   r   r   r   r   r   r   r,   r   r   r.   r/   r0   r1   r2   r3   r   TN)r`   r]   r^   r_   ra   rb   rc   r   r   r   r   r   r   r   )rX   r   keyr3   s       r(   update_settingsInMemoryDataset.update_settings  s   l Cn$&&v{3$$$V[1$  -	!!!&+.$$$V[1	!h&&8%%fk6(3CD&&&v{3$L")E)I))&+6&&&v{3'''4////<22226;?
"v{{!',<e!D"">487 r'   c                  > UR                  SS5      nUR                  SS5      nUR                  S/ 5      nUR                  SS5      nUR                  SS5      nUR                  S	S5      nUR                  S
S5      nUR                  SS5      n	U R                  (       a  Sn
OSn
U R                  U
5        [        TU ]  UUUUUUUU	S9  UR                  SS5      S:  a$  UR                  SS5      nU R                  U5        gg)aE
  
:api_attr: Static Graph

should be called only once in user's python scripts to initialize settings of dataset instance

Args:
    kwargs: Keyword arguments. Currently, we support following keys in **kwargs:

    batch_size(int): batch size. It will be effective during training. default is 1.
    thread_num(int): thread num, it is the num of readers. default is 1.
    use_var(list): list of variables. Variables which you will use. default is [].
    input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.
    fs_name(str): fs name. default is "".
    fs_ugi(str): fs ugi. default is "".
    pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat"
    download_cmd(str): customized download command. default is "cat"
    data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed".
    queue_num(int): Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> import os
        >>> paddle.enable_static()

        >>> with open("test_queue_dataset_run_a.txt", "w") as f:
        ...     data = "2 1 2 2 5 4 2 2 7 2 1 3"
        ...     f.write(data)
        >>> with open("test_queue_dataset_run_b.txt", "w") as f:
        ...     data = "2 1 2 2 5 4 2 2 7 2 1 3"
        ...     f.write(data)
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> dataset.set_filelist(
        ...     ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
        >>> dataset.load_into_memory()

        >>> place = paddle.CPUPlace()
        >>> exe = paddle.static.Executor(place)
        >>> startup_program = paddle.static.Program()
        >>> main_program = paddle.static.Program()
        >>> exe.run(startup_program)

        >>> exe.train_from_dataset(main_program, dataset)

        >>> os.remove("./test_queue_dataset_run_a.txt")
        >>> os.remove("./test_queue_dataset_run_b.txt")

r   r   r   r   r   r   r   r[   r   r   rS   r   SlotRecordInMemoryDataFeedr   )r   r   r   r   r   r   r   r   r8   r   N)r   rO   _set_feed_typer   rd   _set_queue_num)rX   r   r   r   r   r   r   r   r   r   r7   r8   r   s               r(   rd   InMemoryDataset.initi  s   ~ ZZa0
ZZa0
**Y+ZZa0
**Y+Hb)zz.%8zz.%8??9N8NN+!!%!% 	 		
 ::k2&*

;3I	* +r'   c                    XR                   l        U R                   R                  S:X  a  [        R                  " S5      U l        gg)z
Set data_feed_desc
r   SlotRecordDatasetN)rH   r   r   rV   rJ   )rX   r7   s     r(   r   InMemoryDataset._set_feed_type  s8      .??#??<<(;<DL @r'   c                .   U R                   S::  a  SU l         U R                  R                  U R                   5        U R                  c  U R                   U l        U R                  R	                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R!                  5       5        U R                  R#                  5         U R                  R%                  5         g)r   r   r   N)r   rJ   rk   r8   set_queue_numset_parse_ins_idr.   set_parse_contentr/   set_parse_logkeyrA   set_merge_by_sidrB   set_enable_pv_mergerC   r   r   create_channelr   rW   s    r(   r   InMemoryDataset._prepare_to_run  s   
 ??aDO##DOO4>>!!__DN""4>>2%%d&7&78&&t'9'9:%%d&7&78%%d&7&78(()=)=>''

5##%##%r'   c                    U R                   (       dJ  U R                  (       a  U R                  R                  US5        OU R                  R                  US5        U R                  R	                  U5        g NTF)r   rO   rJ   dynamic_adjust_channel_numdynamic_adjust_readers_numrl   s     r(   r   ,InMemoryDataset._dynamic_adjust_before_train  sL    ))77
DI77
EJ//
;r'   c                ,   U R                   (       d^  U R                  (       a'  U R                  R                  U R                  S5        O&U R                  R                  U R                  S5        U R                  R                  U R                  5        g r  )r   rO   rJ   r  r   r  rW   s    r(   r   +InMemoryDataset._dynamic_adjust_after_train  sX    ))77N77O//@r'   c                    SU l         Xl        g)a@  
Set Dataset output queue num, training threads get data from queues

Args:
    queue_num(int): dataset output queue num

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_queue_num(12)

TN)r   r8   )rX   r8   s     r(   r   InMemoryDataset._set_queue_num  s      &*""r'   c                    Xl         g)a'  
Set if Dataset need to parse ins id

Args:
    parse_ins_id(bool): if parse ins_id or not

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_parse_ins_id(True)

N)r.   )rX   r.   s     r(   r   !InMemoryDataset._set_parse_ins_id  s
      )r'   c                    Xl         g)a+  
Set if Dataset need to parse content

Args:
    parse_content(bool): if parse content or not

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_parse_content(True)

N)r/   )rX   r/   s     r(   r   "InMemoryDataset._set_parse_content  s
      +r'   c                    Xl         g)a=  
Set fleet send batch size, default is 1024

Args:
    fleet_send_batch_size(int): fleet send batch size

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_fleet_send_batch_size(800)

N)r0   )rX   r0   s     r(   r   *InMemoryDataset._set_fleet_send_batch_size)  s
      &;"r'   c                    Xl         g)a>  
Set fleet send sleep time, default is 0

Args:
    fleet_send_sleep_seconds(int): fleet send sleep time

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_fleet_send_sleep_seconds(2)

N)r1   )rX   r1   s     r(   r   -InMemoryDataset._set_fleet_send_sleep_seconds;  s      )A%r'   c                V    U R                   R                  U5        SU l        SU l        g)a  
Set merge by line id, instances of same line id will be merged after
shuffle, you should parse line id in data generator.

Args:
    merge_size(int): ins size to merge. default is 2.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_merge_by_lineid()

TN)rJ   set_merge_by_lineidr   r.   )rX   r,   s     r(   r   $InMemoryDataset._set_merge_by_lineidM  s&    " 	((4# r'   c                :    U R                   R                  U5        g)a>  
Set if Dataset need to shuffle by uid.

Args:
    set_shuffle_by_uid(bool): if shuffle according to uid or not

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_shuffle_by_uid(True)

N)rJ   set_shuffle_by_uid)rX   enable_shuffle_uids     r(   _set_shuffle_by_uid#InMemoryDataset._set_shuffle_by_uidb  s      	''(:;r'   c                R    U R                   R                  U5        Xl        X l        g rr   )rJ   set_generate_unique_feasignsgen_uni_feasignslocal_shard_num)rX   generate_uni_feasigns	shard_nums      r(   _set_generate_unique_feasigns-InMemoryDataset._set_generate_unique_feasignst  s!    112GH 5(r'   c                >    U R                   R                  XX4U5        g rr   )rJ   generate_local_tables_unlock)rX   table_idfea_dimread_thread_numconsume_thread_numr"  s         r(   _generate_local_tables_unlock-InMemoryDataset._generate_local_tables_unlocky  s     	11I	
r'   c                    [        USS 5      n[        USS 5      n[        USS 5      nU R                  (       a8  [        R                  " 5       (       a  U R                  R                  X#U5        ggg)aN  
:api_attr: Static Graph

Set training date for pull sparse parameters, saving and loading model. Only used in psgpu

Args:
    date(str): training date(format : YYMMDD). eg.20211111

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> dataset.set_date("20211111")

N      )rK   rO   r   r   rP   set_daterX   dateyearmonthdays        r(   r0  InMemoryDataset.set_date  sb    > 48}D1I$qr(m??t==??JJS1  @?r'   c           	     F    U R                   R                  UUUUUUU5        g rr   )rJ   
tdm_sample)rX   	tree_name	tree_pathtdm_layer_countsstart_sample_layerwith_hierarchyseedid_slots           r(   r8  InMemoryDataset.tdm_sample  s+     		
r'   c                2   U R                  5         U R                  (       d  U R                  R                  5         g[        R
                  " 5       (       aA  U R                  R                  U R                  5        U R                  R                  U5        gg)a  
:api_attr: Static Graph

Load data into memory

Args:
    is_shuffle(bool): whether to use local shuffle, default is False

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()

N)r   rO   rJ   load_into_memoryr   r   rP   set_dataset)rX   
is_shuffles     r(   rB   InMemoryDataset.load_into_memory  sd    D 	LL))+++--JJ""4<<0JJ''
3 .r'   c                    U R                  5         Uc  U R                  nU R                  R                  U5        U R                  R	                  5         U R                  R                  5         g)a  
:api_attr: Static Graph

Load data into memory in async mode

Args:
    thread_num(int): preload thread num

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
        >>> dataset.wait_preload_done()

N)r   r   rJ   set_preload_thread_numcreate_preload_readerspreload_into_memoryrl   s     r(   rI  #InMemoryDataset.preload_into_memory  sR    F 	J++J7++-((*r'   c                l    U R                   R                  5         U R                   R                  5         g)a~  
:api_attr: Static Graph

Wait preload_into_memory done

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
        >>> dataset.wait_preload_done()

N)rJ   wait_preload_donedestroy_preload_readersrW   s    r(   rL  !InMemoryDataset.wait_preload_done  s%    @ 	&&(,,.r'   c                8    U R                   R                  5         g)ag  
:api_attr: Static Graph

Local shuffle

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.local_shuffle()

N)rJ   local_shufflerW   s    r(   rP  InMemoryDataset.local_shuffle/  s    @ 	""$r'   c                   SnUb*  UR                   R                  5         UR                  5       nU R                  c  SU l        U R                  c  SU l        U R
                  R                  5         U R
                  R                  U5        U R
                  R                  U R                  5        U R
                  R                  U R                  5        Ub  UR                   R                  5         U R
                  R                  U5        Ub  UR                   R                  5         U R                  (       a  U R
                  R                  5         Ub  UR                   R                  5         gg)a  
:api_attr: Static Graph

Global shuffle.
Global shuffle can be used only in distributed mode. i.e. multiple
processes on single machine or multiple machines training together.
If you run in distributed mode, you should pass fleet instead of None.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.global_shuffle()

Args:
    fleet(Fleet): fleet singleton. Default None.
    thread_num(int): shuffle thread num. Default is 12.

r   N   r   )_role_makerbarrier_worker
worker_numr0   r1   rJ   "register_client2client_msg_handlerset_trainer_numset_fleet_send_batch_sizeset_fleet_send_sleep_secondsglobal_shuffler   )rX   fleetr   trainer_nums       r(   r[  InMemoryDataset.global_shuffleQ  s"   R ,,.**,K%%-)-D&((0,-D)779$$[1..t/I/IJ11$2O2OP,,.##J/,,.LL((*,,. r'   c                8    U R                   R                  5         g)a  
:api_attr: Static Graph

Release InMemoryDataset memory data, when data will not be used again.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.global_shuffle()
        >>> exe = paddle.static.Executor(paddle.CPUPlace())
        >>> startup_program = paddle.static.Program()
        >>> main_program = paddle.static.Program()
        >>> exe.run(startup_program)
        >>> exe.train_from_dataset(main_program, dataset)
        >>> dataset.release_memory()

N)rJ   release_memoryrW   s    r(   r`  InMemoryDataset.release_memory  s    L 	##%r'   c                    SSK nU R                  R                  5       nUR                  U/5      nUb%  US-  nUR                  R                  X45        US   $ US   $ )aw  
:api_attr: Static Graph

Get memory data size, user can call this function to know the num
of ins in all workers after load into memory.

Note:
    This function may cause bad performance, because it has barrier

Args:
    fleet(Fleet|None): Fleet Object.

Returns:
    The size of memory data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)

        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)

        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> print(dataset.get_memory_data_size())

r   N)numpyrJ   get_memory_data_sizearrayrT  all_reduce_workerrX   r\  nplocal_data_sizeglobal_data_sizes        r(   rd  $InMemoryDataset.get_memory_data_size  sk    X 	,,;;=((O#45.2// $A&&q!!r'   c                    SSK nU R                  R                  5       nUR                  U/5      nUb%  US-  nUR                  R                  X45        US   $ US   $ )a  
:api_attr: Static Graph

Get shuffle data size, user can call this function to know the num
of ins in all workers after local/global shuffle.

Note:
    This function may cause bad performance to local shuffle,
    because it has barrier. It does not affect global shuffle.

Args:
    fleet(Fleet|None): Fleet Object.

Returns:
    The size of shuffle data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)

        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)

        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.global_shuffle()
        >>> print(dataset.get_shuffle_data_size())

r   N)rc  rJ   get_shuffle_data_sizere  rT  rf  rg  s        r(   rm  %InMemoryDataset.get_shuffle_data_size  sk    ^ 	,,<<>((O#45.2// $A&&q!!r'   c                T    U(       a  U R                   R                  X!5        X l        g)a>  
set fea eval mode for slots shuffle to debug the importance level of
slots(features), fea_eval need to be set True for slots shuffle.

Args:
    record_candidate_size(int): size of instances candidate to shuffle
                                one slot
    fea_eval(bool): whether enable fea eval mode to enable slots shuffle.
                    default is True.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> paddle.enable_static()
        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._set_fea_eval(1000000, True)

N)rJ   set_fea_evalr2   )rX   record_candidate_sizer2   s      r(   r   InMemoryDataset._set_fea_eval+  s    ( LL%%hF r'   c                t    U R                   (       a'  [        U5      nU R                  R                  U5        gg)a  
Slots Shuffle
Slots Shuffle is a shuffle method in slots level, which is usually used
in sparse feature with large scale of instances. To compare the metric, i.e.
auc while doing slots shuffle on one or several slots with baseline to
evaluate the importance level of slots(features).

Args:
    slots(list[string]): the set of slots(string) to do slots shuffle.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> paddle.enable_static()

        >>> dataset = paddle.distributed.InMemoryDataset()
        >>> dataset._init_distributed_settings(fea_eval=True)
        >>> slots = ["slot1", "slot2", "slot3", "slot4"]
        >>> slots_vars = []
        >>> for slot in slots:
        ...     var = paddle.static.data(
        ...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
        ...     slots_vars.append(var)
        >>> dataset.init(
        ...     batch_size=1,
        ...     thread_num=2,
        ...     input_type=1,
        ...     pipe_command="cat",
        ...     use_var=slots_vars)
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.slots_shuffle(['slot1'])
N)r2   setrJ   slots_shufflerX   r}   	slots_sets      r(   ru  InMemoryDataset.slots_shuffleC  s,    J ==E
ILL&&y1 r'   )rJ   rC   r2   r0   r1   r  r   r   r   rB   r/   r.   rA   r8   r   r   )r   z+Unpack[_InMemoryDatasetDistributedSettings]r   r   )r   z$Unpack[_InMemoryDatasetFullSettings]r   r   )r   z Unpack[_InMemoryDatasetSettings]r   r   )rS  )r   )   r2  r   r   r   )r9  r   r:  r   r;  z	list[int]r<  rK   r=  rN   r>  rK   r?  rK   r   r   )F)rD  rN   r   r   rr   )r   r   r   r   )N   )r\  Fleet | Noner   rK   r   r   )r\  r|  r   rK   )Tr}   rL   r   r   )%r!   r"   r#   r$   r   r%   rY   r   r   rd   r   r   r   r   r   r   r   r   r   r   r  r#  r+  r0  r8  rB  rI  rL  rP  r[  r`  rd  rm  r   ru  r&   __classcell__r   s   @r(   r   r     s    **%%((OOLKNO!!N->5C>5	>5@Q9<Q9	Q9f[+z=&(<A#&)$+$;$A$!*<$)

#2J

 
 $	

  
 
 
 
 

('4R(+T!/F %F =?=/!=/69=/	=/~&&P6"p9"v!0'2 '2r'   r   c                  F   ^  \ rS rSrSrSU 4S jjrSU 4S jjrS rSrU =r	$ )	QueueDatasetim  z
:api_attr: Static Graph

QueueDataset, it will process data streamly.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.QueueDataset()

c                D   > [         TU ]  5         SU R                  l        g)z
Initialize QueueDataset
MultiSlotDataFeedNr   rY   rH   r   r   s    r(   rY   QueueDataset.__init__{  s     	2r'   c                &   > [         TU ]  " S0 UD6  g)zz
:api_attr: Static Graph

should be called only once in user's python scripts to initialize settings of dataset instance

Nr   r   rd   rX   r   r   s     r(   rd   QueueDataset.init  s     	vr'   c                   U R                   [        U R                  5      :  a  [        U R                  5      U l         U R                   S:X  a  SU l         U R                  R	                  U R                   5        U R                  R                  U R                  5        U R                  R                  U R                  5       5        U R                  R                  5         g)zX
Set data_feed_desc/thread num/filelist before run,
user no need to call this function.
r   r   N)	r   r   rM   rJ   rk   ro   r   r   r   rW   s    r(   r   QueueDataset._prepare_to_run  s    
 ??S//!$--0DO??aDO##DOO4!!$--0''

5##%r'   )r   r   r   zUnpack[_DatasetBaseSettings]r   r   )
r!   r"   r#   r$   r   rY   rd   r   r&   r~  r  s   @r(   r  r  m  s    3& &r'   r  c                  @   ^  \ rS rSrSrSU 4S jjrSU 4S jjrSrU =r$ )FileInstantDataseti  z
FileInstantDataset, it will process data streamly.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.FileInstantDataset()
c                D   > [         TU ]  5         SU R                  l        g)z
Initialize FileInstantDataset
MultiSlotFileInstantDataFeedNr  r   s    r(   rY   FileInstantDataset.__init__  s     	=r'   c                &   > [         TU ]  " S0 UD6  g)`
should be called only once in user's python scripts to initialize settings of dataset instance
Nr   r  r  s     r(   rd   FileInstantDataset.init  s     	vr'   r   r   r  )	r!   r"   r#   r$   r   rY   rd   r&   r~  r  s   @r(   r  r    s    > r'   r  c                     ^  \ rS rSr% SrS\S'   SU 4S jjrSU 4S jjrS rS r	S	 r
S
 rS rSS jrSS jrSS jrSS jrSS jrSS jrS rS rSS jrSS jrS S jrSS jrSS jrSrU =r$ )!BoxPSDataseti  z
BoxPSDataset: derived from InMemoryDataset.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
z
core.BoxPSboxpsc                   > [         TU ]  5         [        R                  " U R                  5      U l        SU R                  l        g)z
Initialize BoxPSDataset
PaddleBoxDataFeedN)r   rY   r   BoxPSrJ   r  rH   r   r   s    r(   rY   BoxPSDataset.__init__  s0     	ZZ-
2r'   c                  > [         TU ]  " S
0 UD6  UR                  SS5      nU R                  U5        UR                  SS5      nU R	                  U5        UR                  SS5      nU R                  U5        UR                  SS5      nU R                  U5        UR                  SS5      nU R                  U5        g	)r  r?   r[   r@   r   rA   FrB   rC   Nr   )r   rd   r   _set_rank_offset_set_pv_batch_size_set_parse_logkey_set_merge_by_sid_set_enable_pv_merge)rX   r   r?   r@   rA   rB   rC   r   s          r(   rd   BoxPSDataset.init  s     	vjj3k*

?A6.zz.%8|,zz.%8|, **%6>!!/2r'   c                $    XR                   l        g)a  
Set rank_offset for merge_pv. It set the message of Pv.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset._set_rank_offset("rank_offset")

Args:
    rank_offset(str): rank_offset's name

N)rH   r?   )rX   r?   s     r(   r  BoxPSDataset._set_rank_offset  s     '2#r'   c                $    XR                   l        g)a  
Set pv batch size. It will be effective during enable_pv_merge

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset._set_pv_batch_size(128)
Args:
    pv_batch_size(int): pv batch size

N)rH   r@   )rX   r@   s     r(   r  BoxPSDataset._set_pv_batch_size  s     )6%r'   c                    Xl         g)a  
Set if Dataset need to parse logkey

Args:
    parse_content(bool): if parse logkey or not

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset._set_parse_logkey(True)

N)rA   )rX   rA   s     r(   r  BoxPSDataset._set_parse_logkey  
     )r'   c                    Xl         g)a   
Set if Dataset need to merge sid. If not, one ins means one Pv.

Args:
    merge_by_sid(bool): if merge sid or not

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset._set_merge_by_sid(True)

N)rB   )rX   rB   s     r(   r  BoxPSDataset._set_merge_by_sid  r  r'   c                    Xl         g)a  
Set if Dataset need to merge pv.

Args:
    enable_pv_merge(bool): if enable_pv_merge or not

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset._set_enable_pv_merge(True)

N)rC   )rX   rC   s     r(   r  !BoxPSDataset._set_enable_pv_merge  s
      /r'   c                    [        USS 5      n[        USS 5      n[        USS 5      nU R                  R                  X#U5        g)z
Workaround for date
Nr.  r/  )rK   r  r0  r1  s        r(   r0  BoxPSDataset.set_date-  sE     48}D1I$qr(m

D-r'   c                8    U R                   R                  5         g)z
Begin Pass
Notify BoxPS to load sparse parameters of next pass to GPU Memory

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset.begin_pass()
N)r  
begin_passrW   s    r(   r  BoxPSDataset.begin_pass6  s     	

r'   c                :    U R                   R                  U5        g)z
End Pass
Notify BoxPS that current pass ended

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset.end_pass(True)
N)r  end_pass)rX   need_save_deltas     r(   r  BoxPSDataset.end_passD  s     	

O,r'   c                8    U R                   R                  5         g)a  
Wait async preload done
Wait Until Feed Pass Done

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
        >>> dataset.wait_preload_done()
N)r  wait_feed_pass_donerW   s    r(   rL  BoxPSDataset.wait_preload_doneR  s      	

&&(r'   c                X    U R                  5         U R                  R                  5         g)as  
Load next pass into memory and notify boxps to fetch its emb from SSD

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
N)r   r  rB  rW   s    r(   rB  BoxPSDataset.load_into_memoryd  s      	

##%r'   c                X    U R                  5         U R                  R                  5         g)aq  
Begin async preload next pass while current pass may be training

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
N)r   r  rI  rW   s    r(   rI   BoxPSDataset.preload_into_memoryu  s      	

&&(r'   c                    U R                   (       d  U R                  R                  US5        U R                  R                  U5        g )NT)r   rJ   r  r  rl   s     r(   r   )BoxPSDataset._dynamic_adjust_before_train  s1    ))LL33JE//
;r'   c                    g rr   r   rW   s    r(   r   (BoxPSDataset._dynamic_adjust_after_train  r   r'   c                P    [        U5      nU R                  R                  U5        g)ae  
Slots Shuffle
Slots Shuffle is a shuffle method in slots level, which is usually used
in sparse feature with large scale of instances. To compare the metric, i.e.
auc while doing slots shuffle on one or several slots with baseline to
evaluate the importance level of slots(features).

Args:
    slots(list[string]): the set of slots(string) to do slots shuffle.

Examples:
    .. code-block:: python

        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> dataset._set_merge_by_lineid()
        >>> #suppose there is a slot 0
        >>> dataset.slots_shuffle(['0'])
N)rt  r  ru  rv  s      r(   ru  BoxPSDataset.slots_shuffle  s    ( J	

  +r'   c                :    U R                   R                  U5        g)a  
Set current phase in train. It is useful for untest.
current_phase : 1 for join, 0 for update.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.set_current_phase(1)

N)rJ   set_current_phase)rX   current_phases     r(   r  BoxPSDataset.set_current_phase  s    " 	&&}5r'   c                6    U R                   R                  5       $ )aJ  
Get memory data size of Pv, user can call this function to know the pv num
of ins in all workers after load into memory.

Note:
    This function may cause bad performance, because it has barrier

Returns:
    The size of memory pv data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> print(dataset.get_pv_data_size())

)rJ   get_pv_data_sizerW   s    r(   r  BoxPSDataset.get_pv_data_size  s    . ||,,..r'   c                8    U R                   R                  5         g)a  
Merge pv instance and convey it from input_channel to input_pv_channel.
It will be effective when enable_pv_merge_ is True.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.preprocess_instance()

N)rJ   preprocess_instancerW   s    r(   r   BoxPSDataset.preprocess_instance  s    " 	((*r'   c                8    U R                   R                  5         g)a  
Divide pv instance and convey it to input_channel.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('No files to read')
        >>> import paddle
        >>> dataset = paddle.distributed.fleet.BoxPSDataset()
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.preprocess_instance()
        >>> exe.train_from_dataset(dataset)
        >>> dataset.postprocess_instance()

N)rJ   postprocess_instancerW   s    r(   r  !BoxPSDataset.postprocess_instance  s    $ 	))+r'   )r  rC   rB   rA   r   )r   zUnpack[_BoxPSDatasetSettings]r   r   rz  )r  rN   r   r   r}  )r  r   r   r   )r   rK   )r!   r"   r#   r$   r   r%   rY   rd   r  r  r  r  r  r0  r  r  rL  rB  rI  r   r   ru  r  r  r  r  r&   r~  r  s   @r(   r  r    sy     33"2"6 )")"/". -)$&")"<
,.6&/2+&, ,r'   r  )#r   
__future__r   typingr   r   r   google.protobufr   r   paddle.baser   paddle.base.protor	   typing_extensionsr
   r   r   r   paddle.distributed.fleetr   r   r%   r   r   r*   r5   r:   r=   __all__rE   r   r  r  r  r   r'   r(   <module>r     s    I " 4 4 '   +@@.#DMJ	)&t}M9,'y ')i )$#7 $+-E
+ 4 + M M`
P2k P2f*&; *&Z 2C,? C,r'   