
    ͑io                        S r SSKJr  SSKJr  SSKrSSKJr  SSKJ	r	  SS	K
Jr  / r " S
 S5      r " S S5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      rg)zBThis is definition of dataset class, which is high performance IO.    )annotations)text_formatN)data_feed_pb2   )
deprecated   )corec                  ,    \ rS rSrSrS rSSS jjrSrg)	DatasetFactory   aF  
DatasetFactory is a factory which create dataset by its name,
you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
the default is "QueueDataset".

Example:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
c                    g)Init.N selfs    S/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/base/dataset.py__init__DatasetFactory.__init__*   s        c                T     [        5       U   " 5       nU$ !   [        SU S35      e= f)ax  
Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
the default is "QueueDataset".

Args:
    datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset.
                         Default is QueueDataset.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
zdatafeed class z does not exist)globals
ValueError)r   datafeed_classdatasets      r   create_datasetDatasetFactory.create_dataset.   s9    	Pi/1GN	P~.>oNOOs    'r   N)QueueDataset)returnDatasetBase)__name__
__module____qualname____firstlineno____doc__r   r   __static_attributes__r   r   r   r   r      s    
P Pr   r   c                      \ rS rSrSrS rS rS rS rSS jr	S r
S	 rS
 rS rS rS rS rS rS rS rS rS rS rS rS rSrg)r   D   zBase dataset class.c                    [         R                  " 5       U l        SU R                  l        [        R
                  " S5      U l        SU l        / U l        SU l	        SU l
        g)r   catMultiSlotDatasetr   FN)r   DataFeedDesc
proto_descpipe_commandr	   Datasetr   
thread_numfilelist
use_ps_gpupsgpur   s    r   r   DatasetBase.__init__G   sM     (446',$||$67
r   c                $    XR                   l        g)aX  
Set pipe command of current dataset
A pipe command is a UNIX pipeline command that can be used only

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_pipe_command("python my_script.py")

Args:
    pipe_command(str): pipe command

N)r,   r-   )r   r-   s     r   set_pipe_commandDatasetBase.set_pipe_commandS   s      (4$r   c                $    XR                   l        g)a  
Set so parser name of current dataset

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_so_parser_name("./abc.so")

Args:
    pipe_command(str): pipe command

N)r,   so_parser_name)r   r8   s     r   set_so_parser_nameDatasetBase.set_so_parser_namee   s     *8&r   c                $    XR                   l        g)a(  
Set rank_offset for merge_pv. It set the message of Pv.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_rank_offset("rank_offset")

Args:
    rank_offset(str): rank_offset's name

N)r,   rank_offset)r   r<   s     r   set_rank_offsetDatasetBase.set_rank_offsetv   s     '2#r   c                T    U(       a  U R                   R                  X!5        X l        g)a:  
set fea eval mode for slots shuffle to debug the importance level of
slots(features), fea_eval need to be set True for slots shuffle.

Args:
    record_candidate_size(int): size of instances candidate to shuffle
                                one slot
    fea_eval(bool): whether enable fea eval mode to enable slots shuffle.
                    default is True.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_fea_eval(1000000, True)

N)r   set_fea_evalfea_eval)r   record_candidate_sizerA   s      r   r@   DatasetBase.set_fea_eval   s    & LL%%hF r   c                t    U R                   (       a'  [        U5      nU R                  R                  U5        gg)a=  
Slots Shuffle
Slots Shuffle is a shuffle method in slots level, which is usually used
in sparse feature with large scale of instances. To compare the metric, i.e.
auc while doing slots shuffle on one or several slots with baseline to
evaluate the importance level of slots(features).

Args:
    slots(list[string]): the set of slots(string) to do slots shuffle.

Examples:
    import paddle.base as base
    dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
    dataset.set_merge_by_lineid()
    #suppose there is a slot 0
    dataset.slots_shuffle(['0'])
N)rA   setr   slots_shuffler   slots	slots_sets      r   rF   DatasetBase.slots_shuffle   s+    $ ==E
ILL&&y1 r   c                $    XR                   l        g)a  
Set batch size. Will be effective during training

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_batch_size(128)

Args:
    batch_size(int): batch size

N)r,   
batch_size)r   rL   s     r   set_batch_sizeDatasetBase.set_batch_size   s     &0"r   c                $    XR                   l        g)a#  
Set pv batch size. It will be effective during enable_pv_merge

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_pv_batch_size(128)
Args:
    pv_batch_size(int): pv batch size

N)r,   pv_batch_size)r   rP   s     r   set_pv_batch_sizeDatasetBase.set_pv_batch_size   s     )6%r   c                F    U R                   R                  U5        Xl        g)a   
Set thread num, it is the num of readers.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_thread(12)

Args:
    thread_num(int): thread num
N)r   set_thread_numr/   r   r/   s     r   
set_threadDatasetBase.set_thread   s     	##J/$r   c                F    U R                   R                  U5        Xl        g)a  
Set file list in current worker.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_filelist(['a.txt', 'b.txt'])

Args:
    filelist(list): file list
N)r   set_filelistr0   )r   r0   s     r   rY   DatasetBase.set_filelist   s     	!!(+ r   c                $    XR                   l        g N)r,   
input_type)r   r]   s     r   set_input_typeDatasetBase.set_input_type   s    %/"r   c                d   U R                   R                  nU GH  nUR                  R                  5       nSUl        UR
                  Ul        [        R                  R                  5       (       d<  UR                  S:X  a,  SUl
        UR                  R                  UR                  5        UR                  [        R                  :X  a	  SUl        M  UR                  [        R                   :X  a	  SUl        M  UR                  [        R"                  :X  a
  SUl        GM  [%        S5      e   g)a  
Set Variables which you will use.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> paddle.enable_static()
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> data = paddle.static.data(name="data", shape=[None, 10, 10], dtype="int64")
        >>> label = paddle.static.data(name="label", shape=[None, 1], dtype="int64", lod_level=1)
        >>> dataset.set_use_var([data, label])

Args:
    var_list(list): variable list
Tr   floatuint64uint32zPCurrently, base.dataset only supports dtype=float32, dtype=int32 and dtype=int64N)r,   multi_slot_descrH   addis_usednamepaddle	frameworkin_pir_mode	lod_levelis_denseshapeextenddtypefloat32typeint64int32r   )r   var_list
multi_slotvarslot_vars        r   set_use_varDatasetBase.set_use_var   s    " __44
C!''++-H#HHHHM##//11==A%(,H%NN))#))4yyFNN* 'fll* (fll* ( f  r   c                :    U R                   R                  X5        g)a$  
Set hdfs config: fs name ad ugi

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")

Args:
    fs_name(str): fs name
    fs_ugi(str): fs ugi
N)r   set_hdfs_config)r   fs_namefs_ugis      r   r{   DatasetBase.set_hdfs_config  s     	$$W5r   c                :    U R                   R                  U5        g)a(  
Set customized download cmd: download_cmd

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> dataset.set_download_cmd("./read_from_afs")

Args:
    download_cmd(str): customized download command
N)r   set_download_cmd)r   download_cmds     r   r   DatasetBase.set_download_cmd0  s     	%%l3r   c                N   U R                   [        U R                  5      :  a  [        U R                  5      U l         U R                  R	                  U R                   5        U R                  R                  U R                  5       5        U R                  R                  5         g)P
Set data_feed_desc before load or shuffle,
user no need to call this function.
N)r/   lenr0   r   rT   set_data_feed_descdesccreate_readersr   s    r   _prepare_to_runDatasetBase._prepare_to_run@  sg    
 ??S//!$--0DO##DOO4''		4##%r   c                    SU l         [        R                  " 5       (       d  SU l         gU R                   (       a  Xl        gg)z1
set use_ps_gpu flag

Args:
    use_ps_gpu: bool
TFN)r1   r	   _is_compiled_with_heterpsr2   )r   r2   s     r   _set_use_ps_gpuDatasetBase._set_use_ps_gpuK  s2     --//#DO__J r   c                8    U R                   R                  5         g r\   )r   destroy_readersr   s    r   _finish_to_runDatasetBase._finish_to_runY  s    $$&r   c                B    [         R                  " U R                  5      $ )z
Returns a protobuf message for this DataFeedDesc

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset()
        >>> print(dataset.desc())

Returns:
    A string message
)r   MessageToStringr,   r   s    r   r   DatasetBase.desc\  s     **4??;;r   c                    g r\   r   rU   s     r   _dynamic_adjust_before_train(DatasetBase._dynamic_adjust_before_trainl      r   c                    g r\   r   r   s    r   _dynamic_adjust_after_train'DatasetBase._dynamic_adjust_after_traino  r   r   )r   rA   r0   r,   r2   r/   r1   N)T)r    r!   r"   r#   r$   r   r5   r9   r=   r@   rF   rM   rQ   rV   rY   r^   rx   r{   r   r   r   r   r   r   r   r%   r   r   r   r   r   D   sj    
4$8"2"!.2,0"6 %"!"0#J6"4 	&'< r   r   c                    ^  \ rS rSrSr\" SSS9U 4S j5       r\" SSS9S 5       r\" SS	S9S
 5       r\" SSS9S 5       r	\" SSS9S 5       r
\" SSS9S 5       r\" SSS9S 5       r\" SSS9S 5       rS rS r\" SSS9S 5       rS rS rS rS r\" SSS9SBS j5       r\" SSS9SCS  j5       r\" SS!S9SDS" j5       r\" SS#S9S$ 5       r\" SS%S9S& 5       rS' r\" SS(S9SES) j5       r\" SS*S9SFS+ j5       r\" SS,S9S- 5       r\" SS.S9S/ 5       r\" SS0S9SGS1 j5       r\" SS2S9S3 5       r S4 r!S5 r"S6 r#\" SS7S9SFS8 j5       r$\" SS9S9SFS: j5       r%SES; jr&S< r'S= r(S> r)SHS? jr*S@ r+SAr,U =r-$ )IInMemoryDatasetis  z
InMemoryDataset, it will load data into memory
and shuffle data before training.
This class should be created by DatasetFactory

Example:
    dataset = paddle.base.DatasetFactory().create_dataset("InMemoryDataset")
2.0.0z"paddle.distributed.InMemoryDatasetsince	update_toc                   > [         TU ]  5         SU R                  l        SU l        SU l        SU l        SU l        SU l        SU l	        SU l
        SU l        SU l        SU l        SU l        SU l        g)r   MultiSlotInMemoryDataFeedNFTr   )superr   r,   rg   fleet_send_batch_sizeis_user_set_queue_num	queue_numparse_ins_idparse_contentparse_logkeymerge_by_sidenable_pv_mergemerge_by_lineidfleet_send_sleep_secondstrainer_numpass_idr   	__class__s    r   r   InMemoryDataset.__init__}  sz     	:%)"%*"!"! $$(,%r   z1paddle.distributed.InMemoryDataset._set_feed_typec                    XR                   l        U R                   R                  S:X  a  [        R                  " S5      U l        gg)z
Set data_feed_desc
SlotRecordInMemoryDataFeedSlotRecordDatasetN)r,   rg   r	   r.   r   )r   data_feed_types     r   set_feed_typeInMemoryDataset.set_feed_type  s8      .??#??<<(;<DL @r   z2paddle.distributed.InMemoryDataset._prepare_to_runc                .   U R                   S::  a  SU l         U R                  R                  U R                   5        U R                  c  U R                   U l        U R                  R	                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R!                  5       5        U R                  R#                  5         U R                  R%                  5         g)r   r   r   N)r/   r   rT   r   set_queue_numset_parse_ins_idr   set_parse_contentr   set_parse_logkeyr   set_merge_by_sidr   set_enable_pv_merger   r   r   create_channelr   r   s    r   r   InMemoryDataset._prepare_to_run  s    ??aDO##DOO4>>!!__DN""4>>2%%d&7&78&&t'9'9:%%d&7&78%%d&7&78(()=)=>''		4##%##%r   z?paddle.distributed.InMemoryDataset._dynamic_adjust_before_trainc                    U R                   (       dJ  U R                  (       a  U R                  R                  US5        OU R                  R                  US5        U R                  R	                  U5        g NTF)r   r1   r   dynamic_adjust_channel_numdynamic_adjust_readers_numrU   s     r   r   ,InMemoryDataset._dynamic_adjust_before_train  sN    
 ))77
DI77
EJ//
;r   z>paddle.distributed.InMemoryDataset._dynamic_adjust_after_trainc                ,   U R                   (       d^  U R                  (       a'  U R                  R                  U R                  S5        O&U R                  R                  U R                  S5        U R                  R                  U R                  5        g r   )r   r1   r   r   r/   r   r   s    r   r   +InMemoryDataset._dynamic_adjust_after_train  sZ    
 ))77N77O//@r   z1paddle.distributed.InMemoryDataset._set_queue_numc                    SU l         Xl        g)a<  
Set Dataset output queue num, training threads get data from queues

Args:
    queue_num(int): dataset output queue num

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_queue_num(12)

TN)r   r   )r   r   s     r   r   InMemoryDataset.set_queue_num  s    & &*""r   z4paddle.distributed.InMemoryDataset._set_parse_ins_idc                    Xl         g)a#  
Set id Dataset need to parse ins_id

Args:
    parse_ins_id(bool): if parse ins_id or not

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_parse_ins_id(True)

N)r   )r   r   s     r   r    InMemoryDataset.set_parse_ins_id  
    & )r   z5paddle.distributed.InMemoryDataset._set_parse_contentc                    Xl         g)a'  
Set if Dataset need to parse content

Args:
    parse_content(bool): if parse content or not

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_parse_content(True)

N)r   )r   r   s     r   r   !InMemoryDataset.set_parse_content  s
    & +r   c                    Xl         g)a$  
Set if Dataset need to parse logkey

Args:
    parse_content(bool): if parse logkey or not

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_parse_logkey(True)

N)r   )r   r   s     r   r    InMemoryDataset.set_parse_logkey  s
     )r   c                    Xl         g)z
Set trainer num

Args:
    trainer_num(int): trainer num

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset._set_trainer_num(1)

N)r   )r   r   s     r   _set_trainer_num InMemoryDataset._set_trainer_num  s
     'r   z4paddle.distributed.InMemoryDataset._set_merge_by_sidc                    Xl         g)a<  
Set if Dataset need to merge sid. If not, one ins means one Pv.

Args:
    merge_by_sid(bool): if merge sid or not

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_merge_by_sid(True)

N)r   )r   r   s     r   r    InMemoryDataset.set_merge_by_sid-  r   r   c                    Xl         g)a)  
Set if Dataset need to merge pv.

Args:
    enable_pv_merge(bool): if enable_pv_merge or not

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_enable_pv_merge(True)

N)r   )r   r   s     r   r   #InMemoryDataset.set_enable_pv_mergeB  s
      /r   c                8    U R                   R                  5         g)a  
Merge pv instance and convey it from input_channel to input_pv_channel.
It will be effective when enable_pv_merge_ is True.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.preprocess_instance()

N)r   preprocess_instancer   s    r   r   #InMemoryDataset.preprocess_instanceS  s    " 	((*r   c                :    U R                   R                  U5        g)a  
Set current phase in train. It is useful for unittest.
current_phase : 1 for join, 0 for update.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.set_current_phase(1)

N)r   set_current_phase)r   current_phases     r   r   !InMemoryDataset.set_current_phasef  s    " 	&&}5r   c                8    U R                   R                  5         g)a	  
Divide pv instance and convey it to input_channel.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.preprocess_instance()
        >>> exe.train_from_dataset(dataset)
        >>> dataset.postprocess_instance()

N)r   postprocess_instancer   s    r   r   $InMemoryDataset.postprocess_instancey  s    $ 	))+r   z=paddle.distributed.InMemoryDataset._set_fleet_send_batch_sizec                    Xl         g)a9  
Set fleet send batch size, default is 1024

Args:
    fleet_send_batch_size(int): fleet send batch size

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_fleet_send_batch_size(800)

N)r   )r   r   s     r   set_fleet_send_batch_size)InMemoryDataset.set_fleet_send_batch_size  s
    & &;"r   z@paddle.distributed.InMemoryDataset._set_fleet_send_sleep_secondsc                    Xl         g)a:  
Set fleet send sleep time, default is 0

Args:
    fleet_send_sleep_seconds(int): fleet send sleep time

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_fleet_send_sleep_seconds(2)

N)r   )r   r   s     r   set_fleet_send_sleep_seconds,InMemoryDataset.set_fleet_send_sleep_seconds  s    & )A%r   z7paddle.distributed.InMemoryDataset._set_merge_by_lineidc                V    U R                   R                  U5        SU l        SU l        g)a  
Set merge by line id, instances of same line id will be merged after
shuffle, you should parse line id in data generator.

Args:
    merge_size(int): ins size to merge. default is 2.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_merge_by_lineid()

TN)r   set_merge_by_lineidr   r   )r   
merge_sizes     r   r   #InMemoryDataset.set_merge_by_lineid  s&    ( 	((4# r   z@paddle.distributed.InMemoryDataset._set_generate_unique_feasignsc                R    U R                   R                  U5        Xl        X l        g r\   )r   set_generate_unique_feasignsgen_uni_feasignslocal_shard_num)r   generate_uni_feasigns	shard_nums      r   r   ,InMemoryDataset.set_generate_unique_feasigns  s#    
 	112GH 5(r   z@paddle.distributed.InMemoryDataset._generate_local_tables_unlockc                >    U R                   R                  XX4U5        g r\   )r   generate_local_tables_unlock)r   table_idfea_dimread_thread_numconsume_thread_numr   s         r   r   ,InMemoryDataset.generate_local_tables_unlock  s     	11I	
r   c                    [        USS 5      n[        USS 5      n[        USS 5      nU R                  (       a8  [        R                  " 5       (       a  U R                  R                  X#U5        ggg)a}  
:api_attr: Static Graph

Set training date for pull sparse parameters, saving and loading model. Only used in psgpu

Args:
    date(str): training date(format : YYMMDD). eg.20211111

Examples:
    .. code-block:: python

        >>> import paddle.base as base

        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_date("20211111")
N      )intr1   r	   r   r2   set_dater   dateyearmonthdays        r   r  InMemoryDataset.set_date  sb    " 48}D1I$qr(m??t==??JJS1  @?r   z3paddle.distributed.InMemoryDataset.load_into_memoryc                2   U R                  5         U R                  (       d  U R                  R                  5         g[        R
                  " 5       (       aA  U R                  R                  U R                  5        U R                  R                  U5        gg)a  
Load data into memory

 Args:
    is_shuffle(bool): whether to use local shuffle, default is False

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
N)r   r1   r   load_into_memoryr	   r   r2   set_dataset)r   
is_shuffles     r   r
   InMemoryDataset.load_into_memory  sc    * 	LL))+++--JJ""4<<0JJ''
3 .r   z6paddle.distributed.InMemoryDataset.preload_into_memoryc                    U R                  5         Uc  U R                  nU R                  R                  U5        U R                  R	                  5         U R                  R                  5         g)a  
Load data into memory in async mode

Args:
    thread_num(int): preload thread num

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
        >>> dataset.wait_preload_done()
N)r   r/   r   set_preload_thread_numcreate_preload_readerspreload_into_memoryrU   s     r   r  #InMemoryDataset.preload_into_memory  sQ    , 	J++J7++-((*r   z4paddle.distributed.InMemoryDataset.wait_preload_donec                l    U R                   R                  5         U R                   R                  5         g)a  
Wait preload_into_memory done

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
        >>> dataset.wait_preload_done()
N)r   wait_preload_donedestroy_preload_readersr   s    r   r  !InMemoryDataset.wait_preload_done3  s$    & 	&&(,,.r   z0paddle.distributed.InMemoryDataset.local_shufflec                8    U R                   R                  5         g)a  
Local shuffle

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.local_shuffle()
N)r   local_shuffler   s    r   r  InMemoryDataset.local_shuffleI  s    & 	""$r   z1paddle.distributed.InMemoryDataset.global_shufflec                L   Ubl  [        US5      (       a  [        S5        UR                  5         OUR                  R                  5         U R                  S:X  a  UR                  5       U l        U R                  c  SU l        U R                  c  SU l        U R                  R                  5         U R                  R                  U R                  5        U R                  R                  U R                  5        U R                  R                  U R                  5        Ub<  [        US5      (       a  UR                  5         OUR                  R                  5         U R                  R                  U5        Ub<  [        US5      (       a  UR                  5         OUR                  R                  5         U R                  (       a  U R                  R                  5         Ub=  [        US5      (       a  UR                  5         gUR                  R                  5         gg)a$  
Global shuffle.
Global shuffle can be used only in distributed mode. i.e. multiple
processes on single machine or multiple machines training together.
If you run in distributed mode, you should pass fleet instead of None.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> from paddle.incubate.distributed.fleet.parameter_server.pslib import fleet
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.global_shuffle(fleet)

Args:
    fleet(Fleet): fleet singleton. Default None.
    thread_num(int): shuffle thread num. Default is 12.

Nbarrier_workerzpscore fleetr      r   )hasattrprintr  _role_makerr   
worker_numr   r   r   "register_client2client_msg_handlerset_trainer_numr   r   global_shuffler   )r   fleetr/   s      r   r#  InMemoryDataset.global_shuffle^  s   8 u.//n%$$&!!0022%#(#3#3#5 %%-)-D&((0,-D)779$$T%5%56..t/I/IJ11$2O2OPu.//$$&!!002##J/u.//$$&!!002LL((*u.//$$&!!002	 r   z1paddle.distributed.InMemoryDataset.release_memoryc                8    U R                   R                  5         g)a	  
:api_attr: Static Graph

Release InMemoryDataset memory data, when data will not be used again.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> from paddle.incubate.distributed.fleet.parameter_server.pslib import fleet
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.global_shuffle(fleet)
        >>> exe = base.Executor(base.CPUPlace())
        >>> exe.run(base.default_startup_program())
        >>> exe.train_from_dataset(base.default_main_program(), dataset)
        >>> dataset.release_memory()

N)r   release_memoryr   s    r   r'  InMemoryDataset.release_memory  s    6 	##%r   c                6    U R                   R                  5       $ )aq  
Get memory data size of Pv, user can call this function to know the pv num
of ins in all workers after load into memory.

Note:
    This function may cause bad performance, because it has barrier

Returns:
    The size of memory pv data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> print(dataset.get_pv_data_size())

)r   get_pv_data_sizer   s    r   r*   InMemoryDataset.get_pv_data_size  s    . ||,,..r   c                6    U R                   R                  5       $ r\   )r   get_epoch_finishr   s    r   r-   InMemoryDataset.get_epoch_finish  s    ||,,..r   c                8    U R                   R                  5         g r\   )r   clear_sample_stater   s    r   r0  "InMemoryDataset.clear_sample_state  s    '')r   z7paddle.distributed.InMemoryDataset.get_memory_data_sizec                    SSK nU R                  R                  5       nUR                  U/5      nUb%  US-  nUR                  R                  X45        US   $ US   $ )a  
Get memory data size, user can call this function to know the num
of ins in all workers after load into memory.

Note:
    This function may cause bad performance, because it has barrier

Args:
    fleet(Fleet): Fleet Object.

Returns:
    The size of memory data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> from paddle.incubate.distributed.fleet.parameter_server.pslib import fleet
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> print(dataset.get_memory_data_size(fleet))

r   N)numpyr   get_memory_data_sizearrayr  all_reduce_workerr   r$  nplocal_data_sizeglobal_data_sizes        r   r4  $InMemoryDataset.get_memory_data_size  sj    > 	,,;;=((O#45.2// $A&&q!!r   z8paddle.distributed.InMemoryDataset.get_shuffle_data_sizec                6   SSK nU R                  R                  5       nUR                  U/5      n[	        SU5        UbV  US-  n[        US5      (       a   UR                  R                  U5      nUS   $ UR                  R                  X45        US   $ US   $ )aV  
Get shuffle data size, user can call this function to know the num
of ins in all workers after local/global shuffle.

Note:
    This function may cause bad performance to local shuffle,
    because it has barrier. It does not affect global shuffle.

Args:
    fleet(Fleet): Fleet Object.

Returns:
    The size of shuffle data.

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> from paddle.incubate.distributed.fleet.parameter_server.pslib import fleet
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
        >>> dataset.global_shuffle(fleet)
        >>> print(dataset.get_shuffle_data_size(fleet))

r   Nz global shuffle local_data_size: util)
r3  r   get_shuffle_data_sizer5  r  r  r=  
all_reducer  r6  r7  s        r   r>  %InMemoryDataset.get_shuffle_data_size  s    B 	,,<<>((O#450/B.2uf%%#(::#8#8#I 
 $A&& !!33# $A&&q!!r   c                :    U R                   R                  U5        g)z7
Set heter ps mode
user no need to call this function.
N)r   set_heter_ps)r   enable_heter_pss     r   _set_heter_psInMemoryDataset._set_heter_ps5  s    
 	!!/2r   c                H   UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  S	S
5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l	        UR                  SS5      U R                  R                  l
        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        UR                  SS5      U R                  R                  l        U R0                  R3                  S5        g )!aX  
Set graph config, user can set graph config in gpu graph mode.

Args:
    config(dict): config dict.

Returns:
    The size of shuffle data.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> from paddle.incubate.distributed.fleet.parameter_server.pslib import fleet
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> graph_config = {"walk_len": 24,
        ...     "walk_degree": 10,
        ...     "once_sample_startid_len": 80000,
        ...     "sample_times_one_chunk": 5,
        ...     "window": 3,
        ...     "debug_mode": 0,
        ...     "batch_size": 800,
        ...     "meta_path": "cuid2clk-clk2cuid;cuid2conv-conv2cuid;clk2cuid-cuid2clk;clk2cuid-cuid2conv",
        ...     "gpu_graph_training": 1}
        >>> dataset.set_graph_config(graph_config)

walk_degreer   walk_len   window   once_sample_startid_leni@  sample_times_one_chunk
   rL   
debug_moder   first_node_type 	meta_pathgpu_graph_trainingT	sage_modeFsamplestrain_table_capi 5 infer_table_capexcluded_train_pairinfer_node_type
get_degreeweighted_samplereturn_weight
pair_labelaccumulate_numN)getr,   graph_configrG  rH  rJ  rL  rM  rL   rO  rP  rR  rS  rT  rU  rV  rW  rX  rY  rZ  r[  r\  r]  r^  r   set_gpu_graph_mode)r   configs     r   set_graph_config InMemoryDataset.set_graph_config<  s   8 4:::mQ3O$$006

:r0J$$-.4jj1.E$$+?Ezz%t@
$$< ?Ejj$b?
$$; 39**\12M$$/28**\12M$$/7=zzr8
$$4 28K1L$$.:@** $;
$$7 28K1O$$./5zz)R/H$$,7=zzv8
$$4 8>zzv8
$$4 <B::!2<
$$8 8>zzr8
$$4 39**%3
$$/ 8>zzu8
$$4 6<ZZU6
$$2 39**\22N$$/6<jja7
$$3 	''-r   c                F    Xl         U R                  R                  U5        g)a0  
Set pass id, user can set pass id in gpu graph mode.

Args:
    pass_id: pass id.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> pass_id = 0
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> dataset.set_pass_id(pass_id)
N)r   r   set_pass_id)r   r   s     r   rf  InMemoryDataset.set_pass_id  s       )r   c                    U R                   $ )a  
Get pass id, user can set pass id in gpu graph mode.

Returns:
    The pass id.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("InMemoryDataset")
        >>> pass_id = dataset.get_pass_id()
)r   r   s    r   get_pass_idInMemoryDataset.get_pass_id  s     ||r   c                :    U R                   R                  X5        g)z
dump_walk_path
N)r   dump_walk_path)r   path	dump_rates      r   rl  InMemoryDataset.dump_walk_path  s     	##D4r   c                :    U R                   R                  U5        g)z
dump_sample_neighbors
N)r   dump_sample_neighbors)r   rm  s     r   rq  %InMemoryDataset.dump_sample_neighbors  s     	**40r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r/   r   )r  )r   )r   )Fr\   )N   )i  ).r    r!   r"   r#   r$   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r
  r  r  r  r#  r'  r*  r-  r0  r4  r>  rD  rc  rf  ri  rl  rq  r%   __classcell__r   s   @r   r   r   s  s    g)MN O" E=	= F&	&( S<	< RA	A E#	#$ H)	)" I+	+")"'" H)	)"/"+&6&,( Q;	;" TA	A" K!	!( T)	)
 T
	
2. G4	40 J+	+2 H/	/$ D%	%" E93	93v E&	&2/2/* K%"	%"N L+"	+"Z3I.V*$ 51 1r   r   c                  V   ^  \ rS rSrSrU 4S jr\" SSS9S 5       rS rSS	 jr	S
r
U =r$ )r   i  z
QueueDataset, it will process data streamly.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("QueueDataset")

c                D   > [         TU ]  5         SU R                  l        g)zH
Initialize QueueDataset
This class should be created by DatasetFactory
MultiSlotDataFeedNr   r   r,   rg   r   s    r   r   QueueDataset.__init__  s    
 	2r   r   z/paddle.distributed.QueueDataset._prepare_to_runr   c                   U R                   [        U R                  5      :  a  [        U R                  5      U l         U R                   S:X  a  SU l         U R                  R	                  U R                   5        U R                  R                  U R                  5        U R                  R                  U R                  5       5        U R                  R                  5         g)zX
Set data_feed_desc/thread num/filelist before run,
user no need to call this function.
r   r   N)	r/   r   r0   r   rT   rY   r   r   r   r   s    r   r   QueueDataset._prepare_to_run  s     ??S//!$--0DO??aDO##DOO4!!$--0''		4##%r   c                    [        S5      e)a  
Local shuffle data.

Local shuffle is not supported in QueueDataset
NotImplementedError will be raised

Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('NotImplementedError will be raised.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("QueueDataset")
        >>> dataset.local_shuffle()

Raises:
    NotImplementedError: QueueDataset does not support local shuffle

zYQueueDataset does not support local shuffle, please use InMemoryDataset for local_shuffleNotImplementedErrorr   s    r   r  QueueDataset.local_shuffle  s    & ";
 	
r   c                    [        S5      e)a  
Global shuffle data.

Global shuffle is not supported in QueueDataset
NotImplementedError will be raised

Args:
    fleet(Fleet): fleet singleton. Default None.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> from paddle.incubate.distributed.fleet.parameter_server.pslib import fleet
        >>> dataset = base.DatasetFactory().create_dataset("QueueDataset")
        >>> #dataset.global_shuffle(fleet)

Raises:
    NotImplementedError: QueueDataset does not support global shuffle

z[QueueDataset does not support global shuffle, please use InMemoryDataset for global_shuffler~  r   r$  s     r   r#  QueueDataset.global_shuffle  s    , "<
 	
r   )r/   r\   )r    r!   r"   r#   r$   r   r   r   r  r#  r%   rt  ru  s   @r   r   r     s;    	3 C&	&
0
 
r   r   c                  <   ^  \ rS rSrSrU 4S jrS rSS jrSrU =r	$ )FileInstantDataseti  z
FileInstantDataset, it will process data streamly.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory.create_dataset("FileInstantDataset")
c                D   > [         TU ]  5         SU R                  l        g)zN
Initialize FileInstantDataset
This class should be created by DatasetFactory
MultiSlotFileInstantDataFeedNry  r   s    r   r   FileInstantDataset.__init__  s    
 	=r   c                    [        S5      e)zA
Local shuffle
FileInstantDataset does not support local shuffle
z_FileInstantDataset does not support local shuffle, please use InMemoryDataset for local_shuffler~  r   s    r   r   FileInstantDataset.local_shuffle#  s    
 ";
 	
r   c                    [        S5      e)zC
Global shuffle
FileInstantDataset does not support global shuffle
zaFileInstantDataset does not support global shuffle, please use InMemoryDataset for global_shuffler~  r  s     r   r#  !FileInstantDataset.global_shuffle-  s    
 "<
 	
r   r   r\   )
r    r!   r"   r#   r$   r   r  r#  r%   rt  ru  s   @r   r  r    s    >

 
r   r  c                  b   ^  \ rS rSrSrU 4S jrS rS rS rS r	S r
S	 rS
 rS rS rSrU =r$ )BoxPSDataseti8  z
BoxPSDataset: derived from InMemoryDataset.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
c                   > [         TU ]  5         [        R                  " U R                  5      U l        SU R                  l        g)zH
Initialize BoxPSDataset
This class should be created by DatasetFactory
PaddleBoxDataFeedN)r   r   r	   BoxPSr   boxpsr,   rg   r   s    r   r   BoxPSDataset.__init__C  s0    
 	ZZ-
2r   c                    [        USS 5      n[        USS 5      n[        USS 5      nU R                  R                  X#U5        g)z
Workaround for date
Nr   r   )r  r  r  r  s        r   r  BoxPSDataset.set_dateL  sE     48}D1I$qr(m

D-r   c                8    U R                   R                  5         g)a  
Begin Pass
Notify BoxPS to load sparse parameters of next pass to GPU Memory

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
        >>> dataset.begin_pass()
N)r  
begin_passr   s    r   r  BoxPSDataset.begin_passU  s     	

r   c                :    U R                   R                  U5        g)z
End Pass
Notify BoxPS that current pass ended
Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
        >>> dataset.end_pass(True)
N)r  end_pass)r   need_save_deltas     r   r  BoxPSDataset.end_passc  s     	

O,r   c                8    U R                   R                  5         g)a  
Wait async preload done
Wait Until Feed Pass Done
Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
        >>> dataset.wait_preload_done()
N)r  wait_feed_pass_doner   s    r   r  BoxPSDataset.wait_preload_donep  s     	

&&(r   c                X    U R                  5         U R                  R                  5         g)a  
Load next pass into memory and notify boxps to fetch its emb from SSD
Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.load_into_memory()
N)r   r  r
  r   s    r   r
  BoxPSDataset.load_into_memory  s      	

##%r   c                X    U R                  5         U R                  R                  5         g)a  
Begin async preload next pass while current pass may be training
Examples:
    .. code-block:: python

        >>> # doctest: +SKIP('Depends on external files.')
        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
        >>> filelist = ["a.txt", "b.txt"]
        >>> dataset.set_filelist(filelist)
        >>> dataset.preload_into_memory()
N)r   r  r  r   s    r   r   BoxPSDataset.preload_into_memory  s      	

&&(r   c                    U R                   (       d  U R                  R                  US5        U R                  R                  U5        g )NT)r   r   r   r   rU   s     r   r   )BoxPSDataset._dynamic_adjust_before_train  s1    ))LL33JE//
;r   c                    g r\   r   r   s    r   r   (BoxPSDataset._dynamic_adjust_after_train  r   r   c                P    [        U5      nU R                  R                  U5        g)a~  
Slots Shuffle
Slots Shuffle is a shuffle method in slots level, which is usually used
in sparse feature with large scale of instances. To compare the metric, i.e.
auc while doing slots shuffle on one or several slots with baseline to
evaluate the importance level of slots(features).

Args:
    slots(list[string]): the set of slots(string) to do slots shuffle.

Examples:
    .. code-block:: python

        >>> import paddle.base as base
        >>> dataset = base.DatasetFactory().create_dataset("BoxPSDataset")
        >>> dataset.set_merge_by_lineid()
        >>> #suppose there is a slot 0
        >>> dataset.slots_shuffle(['0'])
N)rE   r  rF   rG   s      r   rF   BoxPSDataset.slots_shuffle  s    ( J	

  +r   )r  )r    r!   r"   r#   r$   r   r  r  r  r  r
  r  r   r   rF   r%   rt  ru  s   @r   r  r  8  s?    3. -)"& ) <
, ,r   r  )r$   
__future__r   google.protobufr   rh   paddle.base.protor   utilsr   rQ  r	   __all__r   r   r   r   r  r  r   r   r   <module>r     su    I " '  +  
$P $PNl l^	@1k @1FW
; W
t%
 %
PF,? F,r   