
    Αi                       S SK Jr  S SKrS SKrS SKrS SKrS SKrS SKrS SKrS SK	r	S SK
JrJrJrJrJr  S SKJr  SSKJr  \(       a-  S SKJr  \" S5      r\" S	5      r\" S
\\S.5      r " S S\5      r/ r " S S\5      r " S S\5      r " S S\5      r " S S\5      r  " S S\5      r! " S S5      r" " S S\"5      r# S!   S"S jjr$ " S S\"5      r% " S S \"5      r&g)#    )annotationsN)TYPE_CHECKINGCallableLiteral	TypedDictTypeVar)core   )logger)	ParamSpec_InputT_RetT_HDFSClientConfig)zfs.default.namezhadoop.job.ugic                  *    \ rS rSr% S\S'   S\S'   Srg)	_FileInfo,   strpathintsize N)__name__
__module____qualname____firstlineno____annotations____static_attributes__r       a/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/fleet/utils/fs.pyr   r   ,   s    		r   r   c                      \ rS rSrSrg)ExecuteError4   r   Nr   r   r   r   r   r   r   r   r!   r!   4       r   r!   c                      \ rS rSrSrg)FSFileExistsError8   r   Nr#   r   r   r   r&   r&   8   r$   r   r&   c                      \ rS rSrSrg)FSFileNotExistsError<   r   Nr#   r   r   r   r)   r)   <   r$   r   r)   c                      \ rS rSrSrg)	FSTimeOut@   r   Nr#   r   r   r   r,   r,   @   r$   r   r,   c                      \ rS rSrSrg)FSShellCmdAbortedD   r   Nr#   r   r   r   r/   r/   D   r$   r   r/   c                  <   \ rS rSr\R
                  S 5       r\R
                  S 5       r\R
                  S 5       r\R
                  S 5       r	\R
                  S 5       r
\R
                  S 5       r\R
                  S 5       r\R
                  S	 5       r\R
                  S
 5       r\R
                  S 5       r\R
                  SS j5       r\R
                  S 5       r\R
                  S 5       r\R
                  SS j5       r\R
                  SS j5       rSrg)FSH   c                    [         eNNotImplementedErrorselffs_paths     r   ls_dir	FS.ls_dirI       !!r   c                    [         er5   r6   r8   s     r   is_file
FS.is_fileM   r=   r   c                    [         er5   r6   r8   s     r   is_dir	FS.is_dirQ   r=   r   c                    [         er5   r6   r8   s     r   is_existFS.is_existU   r=   r   c                    [         er5   r6   )r9   
local_pathr:   s      r   upload	FS.uploadY   r=   r   c                    [         er5   r6   )r9   r:   rH   s      r   downloadFS.download]   r=   r   c                    [         er5   r6   r8   s     r   mkdirs	FS.mkdirsa   r=   r   c                    [         er5   r6   r8   s     r   delete	FS.deletee   r=   r   c                    [         er5   r6   r9   s    r   need_upload_downloadFS.need_upload_downloadi   r=   r   c                    [         er5   r6   r9   fs_src_pathfs_dst_paths      r   rename	FS.renamem   r=   r   c                    [         er5   r6   r9   rZ   r[   	overwritetest_existss        r   mvFS.mvq   r=   r   c                    [         er5   r6   )r9   	local_dirdest_dirs      r   
upload_dirFS.upload_diru   r=   r   c                    [         er5   r6   r8   s     r   	list_dirsFS.list_dirsy   r=   r   c                    [         er5   r6   r9   r:   exist_oks      r   touchFS.touch}   r=   r   Nc                    [         er5   r6   r8   s     r   catFS.cat   r=   r   r   FFTr5   )r   r   r   r   abcabstractmethodr;   r?   rB   rE   rI   rL   rO   rR   rV   r\   rb   rg   rj   ro   rr   r   r   r   r   r2   r2   H   s   " " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" " 	" "r   r2   c                      \ rS rSrSrSS jrSS jrSS jrS rS r	SS jr
SS	 jrSS
 jrSS jrSS jrSSS jjr  S         SS jjrSS jrSrg)LocalFS   a  
A tool of local file system.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> subdirs, files = client.ls_dir("./")

c                   U R                  U5      (       d  / / 4$ / n/ n[        R                  " U5       HQ  n[        R                  R	                  US-   U-   5      (       a  UR                  U5        M@  UR                  U5        MS     X#4$ )a  
List directories and files under `fs_path` .

Args:
    fs_path(str): The local file path.

Returns:
    Tuple: Return a 2-tuple, the first is a list of all its subdirectories,
    and the second is a list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> subdirs, files = client.ls_dir("./")

/)rE   oslistdirr   isdirappend)r9   r:   dirsfilesfs        r   r;   LocalFS.ls_dir   su    * }}W%%r6MG$Aww}}Ws]Q.//AQ	 % {r   c                    [         R                  R                  U5      (       a
   U S35       e[         R                  " USS9  g)aY  
Create a local directory.

Args:
    fs_path(str): The local directory path.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.mkdirs("test_mkdirs")
        >>> client.delete("test_mkdirs")

z is already a fileT)rn   N)r}   r   isfilemakedirsr8   s     r   rO   LocalFS.mkdirs   s8    $ 77>>'**Jwi7I,JJ*
Gd+r   c                0    [         R                  " X5        g)a  
Rename the file.

Args:
    fs_src_path(str): The actual name of the file or directory
    fs_dst_path(str): The new name of the file or directory.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.touch("test_rename_src")
        >>> print(client.is_exist("test_rename_src"))
        True
        >>> client.rename("test_rename_src", "test_rename_dst")
        >>> print(client.is_exist("test_rename_src"))
        False
        >>> print(client.is_exist("test_rename_dst"))
        True
        >>> client.delete("test_rename_dst")

N)r}   r\   rY   s      r   r\   LocalFS.rename   s    4 			++r   c                0    [         R                  " U5        g r5   )shutilrmtreer8   s     r   _rmrLocalFS._rmr   s    gr   c                0    [         R                  " U5        g r5   )r}   remover8   s     r   _rmLocalFS._rm   s    
		'r   c                    U R                  U5      (       d  g[        R                  R                  U5      (       a  U R	                  U5      $ U R                  U5      $ )a  
Delete the local file path, whether it's a file or directory.

Args:
    fs_path(str): The local file path.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.mkdirs("test_localFS_mkdirs")
        >>> client.delete("test_localFS_mkdirs")

N)rE   r}   r   r   r   r   r8   s     r   rR   LocalFS.delete   sG    $ }}W%%77>>'""88G$$yy!!r   c                    g)NFr   rU   s    r   rV   LocalFS.need_upload_download  s    r   c                @    [         R                  R                  U5      $ )a  
Whether the local file path is a file.

Args:
    fs_path(str): The local file path.

Returns:
    Bool: Return true if the path exists and it's a file, otherwise return false.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.touch("test_is_file")
        >>> print(client.is_file("test_is_file"))
        True
        >>> client.delete("test_is_file")

)r}   r   r   r8   s     r   r?   LocalFS.is_file  s    . ww~~g&&r   c                @    [         R                  R                  U5      $ )a  
Whether the local file path is a directory.

Args:
    fs_path(str): The local file path.

Returns:
    Bool: Return true if the path exists and it's a directory, otherwise return false.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.mkdirs("test_is_dir")
        >>> print(client.is_dir("test_is_dir"))
        True
        >>> client.delete("test_is_dir")

)r}   r   r   r8   s     r   rB   LocalFS.is_dir$  s    . ww}}W%%r   c                @    [         R                  R                  U5      $ )a  
Whether the local file path exists.

Args:
    fs_path(str): The local file path.

Returns:
    Bool: Whether it's a file or directory, return true if the path exists,
    otherwise return false.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> local_fs = LocalFS()
        >>> ret = local_fs.is_exist("test_is_exist")

)r}   r   existsr8   s     r   rE   LocalFS.is_exist=  s    * ww~~g&&r   c                    U R                  U5      (       a  U(       a  g[        e[        US5          SSS5        g! , (       d  f       g= f)a  
Create a local file.

Args:
    fs_path(str): The local file path.
    exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
    program will throw an Exception. Default is true.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.touch("test_touch")
        >>> client.delete("test_touch")

Na)rE   r&   openrm   s      r   ro   LocalFS.touchT  s8    ( ==!!##'3  s	   <
A
c                    U R                  U5      (       d  [        eU(       a'  U R                  U5      (       a  U R                  U5        U R                  U5      (       a  [        eU R	                  X5      $ )ap  
Move a local file or directory from `src_path` to `dst_path` .

Args:
    src_path(str):  Name of the file or directory, that's needed to be moved.
    dst_path(str):  Name of the file or directory to which to move to.
    overwrite(bool): Whether to re-write `dst_path` if that exists. Default is False.

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> client.touch("test_mv_src")
        >>> client.mv("test_mv_src", "test_mv_dst")
        >>> client.delete("test_mv_dst")

)rE   r)   rR   r&   r\   )r9   src_pathdst_pathr`   ra   s        r   rb   
LocalFS.mvo  sZ    6 }}X&&&&x00KK!==""##{{8..r   c                    U R                  U5      (       d  / $ [        R                  " U5       Vs/ s H1  n[        R                  R	                  US-   U-   5      (       d  M/  UPM3     nnU$ s  snf )a  
Only list directories under `fs_path` .

Args:
    fs_path(str): The local file path.

Returns:
    List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

Examples:
    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import LocalFS

        >>> client = LocalFS()
        >>> subdirs = client.list_dirs("./")

r|   )rE   r}   r~   r   r   )r9   r:   r   r   s       r   rj   LocalFS.list_dirs  sd    ( }}W%%I zz'*
*!bggmmGcMA<M.NA* 	 
 	
s   .A-#A-r   Nr:   r   returnztuple[list[str], list[str]]r:   r   r   None)rZ   r   r[   r   r   r   )r   zLiteral[False]r:   r   r   boolru   r:   r   rn   r   r   r   rt   )
r   r   r   r   r`   r   ra   r   r   r   r:   r   r   	list[str])r   r   r   r   __doc__r;   rO   r\   r   r   rR   rV   r?   rB   rE   ro   rb   rj   r   r   r   r   ry   ry      s     D,*,8"4'2&2'.>  !$/$/ $/ 	$/
 $/ 
$/Lr   ry   c                   ^  SU 4S jjnU$ )Nc                N   >^  [         R                  " T 5      SU U4S jj5       nU$ )Nc                 Z  > U S   nT	nUc  [        UR                  5      S-  nOUS-  n[        UR                  5      S-  n[        R                  " 5       nUn  T" U 0 UD6$ ! [         ac  n[        R                  " 5       U-
  U:  a'  [        SU  S[        R                  " 5       U-
   35      e[        R                  " U5         S nAOS nAff = f[        R                  " 5       U-
  S:  a<  [        SU  S[        R                  " 5       U-
   35        [        R                  " 5       nM  )Nr   g     @@zargs:z	 timeout:   zhadoop operator timeout:args:)float	_time_out_sleep_intertimer!   r,   sleepprint)
argskwargsotime_outinterstartlast_print_timeer   max_time_outs
           r   handler2_handle_errors.<locals>.decorator.<locals>.handler  s   QA#H -6F"!..)F2EIIKE#O	&d-f--# &yy{U*h6'#D6499;3F2GH  JJu%%& 99;0257vYtyy{UZGZF[\ '+iikO! s   A# #
C-ACC)r   z_InputT.argsr   z_InputT.kwargsr   r   )	functoolswraps)r   r   r   s   ` r   	decorator!_handle_errors.<locals>.decorator  s%    			2 
	2: r   )r   Callable[_InputT, _RetT]r   r   r   )r   r   s   ` r   _handle_errorsr     s    B r   c                  z   \ rS rSr% SrS\S'     S$         S%S jjrS&S jrS&S jr\	" 5       S'S j5       r
\	" 5       S(S	 j5       rS
 rS r\	" 5       S)S j5       rS rS)S jr\	" 5       S)S j5       r S*       S+S jjr  S,         S-S jjr\	" 5       S 5       r  S,         S.S jjr\	" 5       S 5       r\	" 5       S/S j5       r  S0         S1S jjr\	" 5       S 5       rS rS r\	" 5       S/S j5       rS2S3S jjr\	" 5       S 5       rS4S jrS5S6S jjr \	" 5       S  5       r!S! r"S7S" jr#S#r$g)8
HDFSClienti  a  
A tool of HDFS.

Args:
    hadoop_home(str): Hadoop home.
    configs(dict): Hadoop config. It is a dictionary and needs to contain the
        keys: "fs.default.name" and "hadoop.job.ugi".

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient
        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"

        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.ls_dir("hdfs:/test_hdfs_client")
        ([], [])

r   pre_commandsc                   / U l         U S3nU R                   R                  U5        SnU R                   R                  U5        U(       a<  UR                  5        H(  u  pxSU SU 3n	U R                   R                  U	5        M*     X0l        X@l        SR                  U R                   5      U l        [        R                  " S5      U l	        g )Nz/bin/hadoopfsz-D= z8\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:)
r   r   itemsr   r   join	_base_cmdrecompile
_bd_err_re)
r9   hadoop_homeconfigsr   sleep_inter
hadoop_bindfskvconfig_commands
             r   __init__HDFSClient.__init__  s     #}K0
  ,  %#%aS!!!((8 ( "'$"3"34**G
r   c                &   U R                    SU 3nSnS nSn[        US-   5       HG  n[        R                  " USSU5      u  pV[	        U5      nUS:X  a    O[
        R                  " U5        MI     US:X  a  [        U5      eXVR                  5       4$ )Nz -r      r
   rz   )	r   ranger	   shell_execute_cmdr   r   r   r/   
splitlines)	r9   cmdredirect_stderrretry_timesexe_cmdretoutputretry_sleep_secondxs	            r   _run_cmdHDFSClient._run_cmd  s    ^^$Bse,{Q'A00!QPKCc(CaxJJ)* ( #:#C((%%'''r   c           	        U R                   /UR                  5       QnSnSnSn[        US-   5       H[  n [        R                  " US[        R
                  U(       a  [        R                  O[        R
                  SS9n	U	R                  n  O   US:X  a  [        U5      eg ! [        R                   a9  n
U
R                  nU
R                  n[        R                  " U5         S n
A
M  S n
A
f[         a  n
 S n
A
  Mn  S n
A
ff = f)Nr    r   r
   T)checkstdoutstderrtextrz   )r   splitr   
subprocessrunPIPESTDOUTr   CalledProcessError
returncoder   r   r   	Exceptionr/   )r9   r   r   r   r   r   r   r   r   processr   s              r   _run_safe_cmdHDFSClient._run_safe_cmd!  s    >>0CIIK0{Q'A$..%?? + #))'__
 ! (, #:#C((  00 /ll

-.. s   AB##D7.C++D?Dc                \    U R                  U5      (       d  / $ U R                  U5      u  p#U$ )a  
Only list directories under `fs_path` .

Args:
    fs_path(str): The HDFS file path.

Returns:
    List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

rE   _ls_dirr9   r:   r   r   s       r   rj   HDFSClient.list_dirs?  s-    8 }}W%%Ill7+r   c                X    U R                  U5      (       d  / / 4$ U R                  U5      $ )a,  
List directories and files under `fs_path` .

Args:
    fs_path(str): The HDFS file path.

Returns:
    Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
    and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

r  r8   s     r   r;   HDFSClient.ls_dira  s+    : }}W%%r6M||G$$r   c                Z   SU 3nU R                  U5      u  p4US:w  a  [        U5      e/ n/ nU Hv  nUR                  5       n[        U5      S:w  a  M$  [        R
                  R                  US   5      n	US   S   S:X  a  UR                  U	5        Me  UR                  U	5        Mx     XV4$ )Nls r         d)r   r!   r   lenr}   r   basenamer   )
r9   r:   r   r   linesr   r   linearrps
             r   r  HDFSClient._ls_dir  s    G9o]]3'
!8s##D**,C3x1}  Q(A1vayCAQ  {r   c                Z    U H%  nU R                   R                  U5      nUc  M#  Us  $    g r5   )r   match)r9   r  lms       r   _test_matchHDFSClient._test_match  s/    A%%a(A} 
 r   c                R    U R                  U5      (       d  gU R                  U5      $ )a  
Whether the remote HDFS path is a directory.

Args:
    fs_path(str): The HDFS file path.

Returns:
    Bool: Return true if the path exists and it's a directory, otherwise return false.

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> ret = client.is_file("hdfs:/test_hdfs_client")

FrE   _is_dirr8   s     r   rB   HDFSClient.is_dir  s%    8 }}W%%||G$$r   c                    SU 3nU R                  USSS9u  p4U(       aG  U R                  U5      (       a0  [        S5        [        SR                  U5      5        [	        U5      egg)Nztest -d Tr
   r   r   zraise exception: 
F)r   r  r   r   r!   )r9   r:   r   r   r  s        r   r   HDFSClient._is_dir  sc    	"]]3!]L
&&)*dii&'"3''r   c                \    U R                  U5      (       d  gU R                  U5      (       + $ )a  
Whether the remote HDFS path is a file.

Args:
    fs_path(str): The HDFS file path.

Returns:
    Bool: Return true if the path exists and it's a file, otherwise return false.

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> ret = client.is_file("hdfs:/test_hdfs_client")

Fr  r8   s     r   r?   HDFSClient.is_file  s(    6 }}W%%<<(((r   c                D    SU S3nU R                  USSS9u  p4US:w  a  gg)a  
Whether the remote HDFS path exists.

Args:
    fs_path(str): The hdfs file path.

Returns:
    Bool: Whether it's is file or directory, return true if the path exists,
    otherwise return false.

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> ret = client.is_exist("hdfs:/test_hdfs_client")

ztest -e r   Tr
   r#  r   F)r   )r9   r:   r   r   outs        r   rE   HDFSClient.is_exist  s5    : 	#==d=J!8r   c                j   UR                  S5      nUR                  S5      n[        R                  R                  U5      nU R	                  US-   U-   5      (       a  U(       a  U R                  US-   U-   5        U R	                  U5      (       d  U R                  U5        U R                  X5        gz
upload dir to hdfs
Args:
    local_dir(str): local dir
    dest_dir(str): hdfs dest dir
    overwrite(bool): is overwrite
Returns:
    return code
r|   N)rstripr}   r   r  rE   rR   rO   _try_uploadr9   re   rf   r`   local_basenames        r   rg   HDFSClient.upload_dir  s     $$S)	??3')))4==C.899iKK378}}X&&KK!-r   c                  ^  U 4S jnS n[        5       nUR                  U5      (       d  [        U S35      eU" U5      nU(       d  [        S5        gT R                  U5      (       a)  U(       a"  T R	                  U5        T R                  U5        / n	[        U5       HL  n
T R                  XU5      n[        R                  " XRU4S9nU	R                  U5        UR                  5         MN     U	 H  nUR                  5         M     g)a  
Upload the local path to remote HDFS.

Args:
    local_path(str): The local path.
    fs_path(str): The HDFS path.
    multi_processes(int|1): the upload data process at the same time, default=5
    overwrite(bool|False): will overwrite file on HDFS or not

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

c                :   > U H  nTR                  X 5        M     g r5   )r.  )hdfs_path_singledatasdatar9   s      r   __subprocess_upload.HDFSClient.upload.<locals>.__subprocess_uploadL  s      8 r   c                ^   / n[         R                  R                  U 5      (       d  U$ [         R                  R                  U 5      (       aO  [         R                  " U 5       H3  n[         R                  R                  X5      nUR                  U5        M5     U$ UR                  U 5        U$ )zR
get local files
Args:
    path(str): local path
Returns:
    list of local files
)r}   r   r   r   r~   r   r   )r   rlistfilets       r   get_local_files*HDFSClient.upload.<locals>.get_local_filesP  s~     E77>>$''ww}}T""JJt,DT0ALLO -
 L T"Lr    not existsz/there are nothing need to upload, function exitNtargetr   )ry   rE   r)   r   rR   rO   r   _split_filesmultiprocessingProcessr   r   r   )r9   rH   r:   multi_processesr`   _HDFSClient__subprocess_uploadr=  local	all_filesprocsiprocess_datasr  procs   `             r   rI   HDFSClient.upload*  s    D	9	* 	~~j))&*['ABB#J/	CD==!!iKK KK 'A --iOLM''*=1IA LLOGGI ( DIIK r   c                    SU SU 3nSn U R                  U5      u  pEUS:w  a  [        U5      eg ! [         a  nU R                  U5        UeS nAff = f)Nzput r   r   )r   r!   r  rR   )r9   rH   r:   r   r   _r   s          r   r.  HDFSClient._try_upload  sf    ZL'+	]]3'FCax"3''  	KK G	s   $1 
AAAc                J  ^  U 4S jnT R                  U5      (       d  [        U S35      eT R                  U5      (       a  T R                  X5      $ T R	                  U5      u  pgU Vs/ s H
  oS-   U-   PM     n	nU	R                  U Vs/ s H
  oS-   U-   PM     sn5        / n
[        U5       HL  nT R                  XU5      n[        R                  " XRU4S9nU
R                  U5        UR                  5         MN     U
 H  nUR                  5         M     gs  snf s  snf )a  
Download remote HDFS path to the local.

Args:
    fs_path(str):  The HDFS path.
    local_path(str): The local path.
    multi_processes(int|1): the download data process at the same time, default=1
    overwrite(bool): is overwrite

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.download("hdfs:/test_hdfs_client", "./")

c                :   > U H  nTR                  X 5        M     g)zp
download file from HDFS
Args:
    local_path(str): the local file path
    datas(str): the hdfs file path list
N)_try_download)rH   r5  r6  r9   s      r   __subprocess_download2HDFSClient.download.<locals>.__subprocess_download  s     ""44 r   
 not exitsr|   r@  N)rE   r)   r?   rS  r;   extendr   rB  rC  rD  r   r   r   )r9   r:   rH   rE  r`    _HDFSClient__subprocess_downloadr   all_filenamesrJ  rH  rI  rK  r  rL  s   `             r   rL   HDFSClient.download  s   D	5 }}W%%&'*'=>><<  %%g::"kk'20=>1s]Q&	>T:TC-!+T:;'A --iOLM'',3NA LLOGGI ( DIIK  ?:s   *DD c                    SU SU 3nSn U R                  U5      u  pEUS:w  a  [        U5      eg ! [         a"  n[        5       nUR	                  U5        UeS nAff = f)Nzget r   r   )r   r!   r  ry   rR   )r9   r:   rH   r   r   rO  r   local_fss           r   rS  HDFSClient._try_download  sm    WIQzl+	]]3'FCax"3''  	yHOOJ'G	s   $1 
AAAc                L   U R                  U5      (       a  gSnSU S3nU R                  USS9u  pEUS:w  a%  U H  nSU;   d  M  Sn  O   U(       d  [        U5      eU(       aA  U R                  U5      (       d*  S	U 3nU R                  U5      u  pGUS:w  a  [        U5      eggg)
a<  
Create a remote HDFS directory.

Args:
    fs_path(str): The HDFS directory path.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.mkdirs("hdfs:/test_hdfs_client")

NFzmkdir r   T)r   r   zNo such file or directoryz	mkdir -p )rE   r   r!   )r9   r:   out_hdfsr   r   r)  r  rO  s           r   rO   HDFSClient.mkdirs  s    2 ==!!wiq!==d=;!8.!3#H  "3''DMM'22gY'C]]3'FCax"3''  38r   c                   U(       a'  U R                  U5      (       a  U R                  U5        U(       aH  U R                  U5      (       d  [        U S35      eU R                  U5      (       a  [        U S35      eU R	                  X5      $ )a  
Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

Args:
    fs_src_path(str):  Name of the file or directory, that's needed to be moved.
    fs_dst_path(str):  Name of the file or directory to which to move to.
    overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
    test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

 is not exists exists already)rE   rR   r)   r&   _try_mvr_   s        r   rb   HDFSClient.mv	  sv    B {33KK$==--*k].+IJJ}}[))';-(GHH||K55r   c                    SU SU 3nSn U R                  USS9u  pEUS:w  a  [        U5      eg ! [         a8  nU R                  U5      (       d  U R                  U5      (       a   S nAg UeS nAff = f)Nzmv r   r   r
   r   )r   r!   r  rE   )r9   rZ   r[   r   r   rO  r   s          r   rd  HDFSClient._try_mv6  s~    K=+/	]]3A]6FCax"3''  	==--$--2L2LG	s   #0 
A2,A-+A--A2c                V    SU 3nU R                  U5      u  p4US:w  a  [        U5      eg )Nzrmr r   r   r!   r9   r:   r   r   rO  s        r   r   HDFSClient._rmrC  s5    WIs#!8s## r   c                V    SU 3nU R                  U5      u  p4US:w  a  [        U5      eg )Nzrm r   rj  rk  s        r   r   HDFSClient._rmI  s4    G9os#!8s## r   c                    U R                  U5      (       d  gU R                  U5      nU(       a  U R                  U5      $ U R                  U5      $ )aN  
Delete a remote HDFS path, whether it's a file or directory.

Args:
    fs_path(str): The HDFS file path.

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.delete("hdfs:/test_hdfs_client")

N)rE   r   r   r   )r9   r:   rB   s      r   rR   HDFSClient.deleteO  sE    2 }}W%%g&99W%%xx  r   c                l    U R                  U5      (       a  U(       a  g[        eU R                  U5      $ )a  
Create a remote HDFS file.

Args:
    fs_path(str): The HDFS file path.
    exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
    program will throw an Exception. Default is true.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.touch("hdfs:/test_hdfs_client")

N)rE   r&   _touchzrm   s      r   ro   HDFSClient.touchq  s.    4 ==!!##||G$$r   c                V    SU 3nU R                  U5      u  p4US:w  a  [        U5      eg )Nztouchz r   rj  rk  s        r   rr  HDFSClient._touchz  s5    y!s#!8s## r   c                    gNTr   rU   s    r   rV   HDFSClient.need_upload_download      r   Nc                t    U R                  U5      (       a"  U R                  U5      nSR                  U5      $ g)aQ  
Cat a remote HDFS file.

Args:
    fs_path(str|None): The HDFS file path.

Returns:
    file content

Examples:

    .. code-block:: python

        >>> # doctest: +REQUIRES(env:DISTRIBUTED)
        >>> from paddle.distributed.fleet.utils import HDFSClient

        >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
        >>> configs = {
        ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
        ...     "hadoop.job.ugi": "hello,hello123"
        ... }

        >>> client = HDFSClient(hadoop_home, configs)
        >>> client.cat("hdfs:/test_hdfs_client")
        ''

r$  r   )r?   _try_catr   )r9   r:   r   s      r   rr   HDFSClient.cat  s3    8 <<  ]]7+F99V$$r   c                V    SU 3nU R                  USS9u  p4US:w  a  [        U5      eU$ )Nzcat r
   rg  r   rj  )r9   r:   r   r   r   s        r   r{  HDFSClient._try_cat  s9    WImmCQm7!8s##r   c                    [        U5      U-  n[        U5      U-  nU/U-  n[        U5       H  nXg==   S-  ss'   M     / /U-  nSn	[        U5       H  nXXU   -    X'   XU   -  n	M     X   $ z
split file list
Args:
    files(list): file list
    trainer_id(int): trainer mpi rank id
    trainers(int): all trainers num
Returns:
    filelist(list): file list of current trainer
r
   r   r  r   
r9   r   
trainer_idtrainers	remainder	blocksizeblocksrJ  trainer_filesbegins
             r   rB  HDFSClient._split_files       J)	J(*	x'y!AINI " xxA$UAY->?MAYE ! ((r   c                ~   [        U5      S::  a  / $ / nSnU H
  nX4S-   -  nM     SU-   S-   nU R                  U5      u  pg[        U5      S:X  a  [        R                  " SU S35        / $ U HK  nUR	                  S5      n	[        U	5      S:  a  M%  U	S	   n
[        U	S   5      nUR                  XS
.5        MM     U$ )z
list_files return file path and size
Args:
    path_list(list): file list
Returns:
    filelist(list): file list with file path and size
r   r   r   r  z) | awk '{if ($8 != "") {print $5" "$8 }}'zlist_files empty, path[]   r
   )r   r   )r  r   r   warningr   r   r   )r9   	path_list	file_list
str_concatr   r   r   r  r  r  	file_path	file_sizes               r   list_files_infoHDFSClient.list_files_info  s     y>QI	 
D*$J  J!PP 	 ]]3'
u:?NN4YKqABID**S/C3x!|AICFIiCD  r   )r   r   r   r   r   i i  )
r   r   r   r   r   r   r   r   r   r   )F   r   r   r   F)re   r   rf   r   r`   r   r   r   )r  F)
rH   r   r:   r   rE  r   r`   r   r   r   )
r:   r   rH   r   rE  r   r`   r   r   r   r   FT)
rZ   r   r[   r   r`   r   ra   r   r   r   ru   r   )r   zLiteral[True]r5   )r:   z
str | Noner   r   )r  r   r   zlist[_FileInfo])%r   r   r   r   r   r   r   r   r  r   rj   r;   r  r  rB   r   r?   rE   rg   rI   r.  rL   rS  rO   rb   rd  r   r   rR   ro   rr  rV   rr   r{  rB  r  r   r   r   r   r   r     s   6  &

 #
 	

 
 

2( )<  B % %B, % %@)@ ! !H @E..(+.8<.	.4  !SS S 	S
 S 
Sj 	 	   !@@ @ 	@
 @ 
@D 
 
 +( +(b   +6+6 +6 	+6
 +6 
+6Z 
 
$$ ! !B%B $ $ D  )2 r   r   c                      \ rS rSrSrSS jrS rS rS rS r	S r
S	 rS
 rS rSS jrSS jrSS jrS rSS jrS rSS jrS rSS jrS rSrg)	AFSClienti  a  
A tool of AFS. Use AfsWrapper.
When WITH_PSLIB=ON, you can use this class directly.
When WITH_PSCORE=ON, you should export LD_LIBRARY_PATH='YOUR_AFSAPISO_PATH' before using this class.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.ls_dir("hdfs:/test_hdfs_client")

c                D    [         R                  " 5       U l        Xl        g r5   )r	   
AfsWrapper_fsr   )r9   r   r   s      r   r   AFSClient.__init__  s    ??$!r   c                <    U R                   R                  XX45        g r5   )r  init)r9   fs_namefs_user	fs_passwdfs_confs        r   r  AFSClient.init  s    g	;r   c                \    U R                  U5      (       d  / $ U R                  U5      u  p#U$ )a  
Only list directories under `fs_path` .

Args:
    fs_path(str): The HDFS file path.

Returns:
    List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

r  r  s       r   rj   AFSClient.list_dirs  s-    , }}W%%Ill7+r   c                X    U R                  U5      (       d  / / 4$ U R                  U5      $ )a  
List directories and files under `fs_path` .

Args:
    fs_path(str): The HDFS file path.

Returns:
    Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
    and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

r  r8   s     r   r;   AFSClient.ls_dir8  s+    . }}W%%r6M||G$$r   c                D    U R                   R                  U5      nU/nX24$ r5   )r  list)r9   r:   r   r   s       r   r  AFSClient._ls_dirT  s#    g&y{r   c                R    U R                  U5      (       d  gU R                  U5      $ )a  
Whether the remote HDFS path is a directory.

Args:
    fs_path(str): The HDFS file path.

Returns:
    Bool: Return true if the path exists and it's a directory, otherwise return false.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> ret = client.is_dir("hdfs:/test_hdfs_client")

Fr  r8   s     r   rB   AFSClient.is_dirY  s%    , }}W%%||G$$r   c                Z    U R                   R                  U5      n[        U5      S:  a  gg)Nr   TF)r  r  r  )r9   r:   	list_paths      r   r   AFSClient._is_dirt  s&    HHMM'*		Nar   c                \    U R                  U5      (       d  gU R                  U5      (       + $ )a  
Whether the remote HDFS path is a file.

Args:
    fs_path(str): The HDFS file path.

Returns:
    Bool: Return true if the path exists and it's a file, otherwise return false.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> ret = client.is_file("hdfs:/test_hdfs_client")

Fr  r8   s     r   r?   AFSClient.is_file{  s(    , }}W%%<<(((r   c                8    U R                   R                  U5      $ )a"  
Whether the remote HDFS path exists.

Args:
    fs_path(str): The hdfs file path.

Returns:
    Bool: Whether it's is file or directory, return true if the path exists,
    otherwise return false.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> ret = client.is_exist("hdfs:/test_hdfs_client")

)r  existr8   s     r   rE   AFSClient.is_exist  s    . xx~~g&&r   c                ~   UR                  S5      nUR                  S5      n[        R                  R                  U5      nU R	                  US-   U-   5      (       a  U(       a  U R                  US-   U-   5        U R	                  U5      (       d  U R                  U5        U R                  R                  X5        gr,  )	r-  r}   r   r  rE   rR   rO   r  rI   r/  s        r   rg   AFSClient.upload_dir  s     $$S)	??3')))4==C.899iKK378}}X&&KK!	,r   c                    [        5       nUR                  U5      (       d  [        U S35      eU R                  R	                  X5        g)aj  
Upload the local path to remote HDFS.

Args:
    local_path(str): The local path.
    fs_path(str): The HDFS path.
    multi_processes(int|1): the upload data process at the same time, default=5
    overwrite(bool|False): will overwrite file on HDFS or not

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

r?  N)ry   rE   r)   r  rI   )r9   rH   r:   rE  r`   rG  s         r   rI   AFSClient.upload  s<    . 	~~j))&*['ABB
,r   c                   U R                  U5      (       d  [        U S35      eU R                  U5      (       a  U R                  R	                  X!5      $ U R                  U5      u  pVU H^  n[        R                  R                  U[        R                  R                  U5      S   5      nU R                  R	                  X5        M`     g)aG  
Download remote HDFS path to the local.

Args:
    fs_path(str):  The HDFS path.
    local_path(str): The local path.
    multi_processes(int|1): the download data process at the same time, default=1
    overwrite(bool): is overwrite

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.download("hdfs:/test_hdfs_client", "./")

rV  r
   N)
rE   r)   r?   r  rL   r;   r}   r   r   r   )	r9   r:   rH   rE  r`   rO  rY  	file_namelocal_file_names	            r   rL   AFSClient.download  s    . }}W%%&'*'=>><<  88$$Z99  ;;w/&I ggllBGGMM)4Q7O HHo9	 'r   c                h    U R                  U5      (       a  gU R                  R                  U5        g)a  
Create a remote HDFS directory.

Args:
    fs_path(str): The HDFS directory path.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.mkdirs("hdfs:/test_hdfs_client")

N)rE   r  mkdirr8   s     r   rO   AFSClient.mkdirs  s&    & ==!!wr   c                4   U(       a'  U R                  U5      (       a  U R                  U5        U(       aH  U R                  U5      (       d  [        U S35      eU R                  U5      (       a  [        U S35      eU R                  R                  X5        g)a{  
Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

Args:
    fs_src_path(str):  Name of the file or directory, that's needed to be moved.
    fs_dst_path(str):  Name of the file or directory to which to move to.
    overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
    test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

rb  rc  N)rE   rR   r)   r&   r  rb   r_   s        r   rb   AFSClient.mv  sv    , {33KK$==--*k].+IJJ}}[))';-(GHHK-r   c                h    U R                  U5      (       d  gU R                  R                  U5        g)a  
Delete a remote HDFS path, whether it's a file or directory.

Args:
    fs_path(str): The HDFS file path.

Examples:

    .. code-block:: python


        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.delete("hdfs:/test_hdfs_client")

N)rE   r  r   r8   s     r   rR   AFSClient.delete>  s&    ( }}W%% r   c                    U R                  U5      (       a  U(       a  g[        eU R                  R                  U5      $ )a  
Create a remote HDFS file.

Args:
    fs_path(str): The HDFS file path.
    exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
    program will throw an Exception. Default is true.

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.touch("hdfs:/test_hdfs_client")

N)rE   r&   r  touchzrm   s      r   ro   AFSClient.touchV  s2    * ==!!##xxw''r   c                    grw  r   rU   s    r   rV   AFSClient.need_upload_downloadr  ry  r   Nc                f    U R                  U5      (       a  U R                  R                  U5      $ g)a  
Cat a remote HDFS file.

Args:
    fs_path(str): The HDFS file path.

Returns:
    file content

Examples:

    .. code-block:: python

        >>> # doctest: +SKIP('depend on external file')
        >>> from paddle.distributed.fleet.utils.fs import AFSClient

        >>> client = AFSClient()
        >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
        >>> client.cat("hdfs:/test_hdfs_client")

r   )r?   r  rr   r8   s     r   rr   AFSClient.catu  s)    , <<  88<<((r   c                    [        U5      U-  n[        U5      U-  nU/U-  n[        U5       H  nXg==   S-  ss'   M     / /U-  nSn	[        U5       H  nXXU   -    X'   XU   -  n	M     X   $ r  r  r  s
             r   rB  AFSClient._split_files  r  r   )r  r   r  r  )r
   Fr  ru   r5   )r   r   r   r   r   r   r  rj   r;   r  rB   r   r?   rE   rg   rI   rL   rO   rb   rR   ro   rV   rr   rB  r   r   r   r   r  r    sh    $"<8%8
%6)6'2-(-:#:J . .D!0(86)r   r  r5   )r   zfloat | Noner   z>Callable[[Callable[_InputT, _RetT]], Callable[_InputT, _RetT]])'
__future__r   rv   r   rC  r}   r   r   r   r   typingr   r   r   r   r   paddle.baser	   log_utilr   typing_extensionsr   r   r   r   r   r   __all__r  r!   r&   r)   r,   r/   r2   ry   r   r   r  r   r   r   <module>r     s   " # 
   	 	    G G  +	"GGE!LI 
 	9 			 		9 			 		 	;" ;"|jb j\	 "&$$C$Ne ePe) e)r   