
    x-j                       d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZmZmZmZ d dlmZ ddlmZ er9d dlmZ  ed          Z ed	          Z ed
eed          Z G d de          Zg Z G d de          Z G d de          Z G d de          Z G d de          Z  G d de          Z! G d d          Z" G d de"          Z#	 d%d&d Z$ G d! d"e"          Z% G d# d$e"          Z&dS )'    )annotationsN)TYPE_CHECKINGCallableLiteral	TypedDictTypeVar)core   )logger)	ParamSpec_InputT_RetT_HDFSClientConfig)zfs.default.namezhadoop.job.ugic                  $    e Zd ZU ded<   ded<   dS )	_FileInfostrpathintsizeN)__name__
__module____qualname____annotations__     a/var/www/html/banglarbhumi/venv/lib/python3.11/site-packages/paddle/distributed/fleet/utils/fs.pyr   r   ,   s"         								r   r   c                      e Zd ZdS )ExecuteErrorNr   r   r   r   r   r   r   r   4           Dr   r   c                      e Zd ZdS )FSFileExistsErrorNr   r   r   r   r"   r"   8   r    r   r"   c                      e Zd ZdS )FSFileNotExistsErrorNr   r   r   r   r$   r$   <   r    r   r$   c                      e Zd ZdS )	FSTimeOutNr   r   r   r   r&   r&   @   r    r   r&   c                      e Zd ZdS )FSShellCmdAbortedNr   r   r   r   r(   r(   D   r    r   r(   c                     e Zd Zej        d             Zej        d             Zej        d             Zej        d             Zej        d             Z	ej        d             Z
ej        d             Zej        d             Zej        d	             Zej        d
             Zej        dd            Zej        d             Zej        d             Zej        dd            Zej        dd            ZdS )FSc                    t           NNotImplementedErrorselffs_paths     r   ls_dirz	FS.ls_dirI       !!r   c                    t           r,   r-   r/   s     r   is_filez
FS.is_fileM   r3   r   c                    t           r,   r-   r/   s     r   is_dirz	FS.is_dirQ   r3   r   c                    t           r,   r-   r/   s     r   is_existzFS.is_existU   r3   r   c                    t           r,   r-   )r0   
local_pathr1   s      r   uploadz	FS.uploadY   r3   r   c                    t           r,   r-   )r0   r1   r;   s      r   downloadzFS.download]   r3   r   c                    t           r,   r-   r/   s     r   mkdirsz	FS.mkdirsa   r3   r   c                    t           r,   r-   r/   s     r   deletez	FS.deletee   r3   r   c                    t           r,   r-   r0   s    r   need_upload_downloadzFS.need_upload_downloadi   r3   r   c                    t           r,   r-   r0   fs_src_pathfs_dst_paths      r   renamez	FS.renamem   r3   r   Fc                    t           r,   r-   r0   rH   rI   	overwritetest_existss        r   mvzFS.mvq   r3   r   c                    t           r,   r-   )r0   	local_dirdest_dirs      r   
upload_dirzFS.upload_diru   r3   r   c                    t           r,   r-   r/   s     r   	list_dirszFS.list_dirsy   r3   r   Tc                    t           r,   r-   r0   r1   exist_oks      r   touchzFS.touch}   r3   r   Nc                    t           r,   r-   r/   s     r   catzFS.cat   r3   r   FFTr,   )r   r   r   abcabstractmethodr2   r5   r7   r9   r<   r>   r@   rB   rE   rJ   rO   rS   rU   rY   r[   r   r   r   r*   r*   H   s       " " " 	" " " 	" " " 	" " " 	" " " 	" " " 	" " " 	" " " 	" " " 	" " " 	" " " " 	" " " 	" " " 	" " " " 	" " " " " "r   r*   c                  ~    e Zd ZdZd!dZd"dZd#dZd Zd Zd"dZ	d$dZ
d%dZd%dZd%dZd&d'dZ	 	 d(d)dZd*dZd S )+LocalFSa(  
    A tool of local file system.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import LocalFS

            >>> client = LocalFS()
            >>> subdirs, files = client.ls_dir("./")

    r1   r   returntuple[list[str], list[str]]c                   |                      |          sg g fS g }g }t          j        |          D ]R}t          j                            |dz   |z             r|                    |           =|                    |           S||fS )a{  
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            Tuple: Return a 2-tuple, the first is a list of all its subdirectories,
            and the second is a list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs, files = client.ls_dir("./")

        /)r9   oslistdirr   isdirappend)r0   r1   dirsfilesfs        r   r2   zLocalFS.ls_dir   s    * }}W%% 	r6MG$$ 	  	 Aw}}Ws]Q.//  AQU{r   Nonec                    t           j                            |          rJ | d            t          j        |d           dS )a  
        Create a local directory.

        Args:
            fs_path(str): The local directory path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_mkdirs")
                >>> client.delete("test_mkdirs")

        z is already a fileT)rX   N)rf   r   isfilemakedirsr/   s     r   r@   zLocalFS.mkdirs   sJ    $ 7>>'**JJw,J,J,JJJ*
Gd++++++r   rH   rI   c                0    t          j        ||           dS )aI  
        Rename the file.

        Args:
            fs_src_path(str): The actual name of the file or directory
            fs_dst_path(str): The new name of the file or directory.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_rename_src")
                >>> print(client.is_exist("test_rename_src"))
                True
                >>> client.rename("test_rename_src", "test_rename_dst")
                >>> print(client.is_exist("test_rename_src"))
                False
                >>> print(client.is_exist("test_rename_dst"))
                True
                >>> client.delete("test_rename_dst")

        N)rf   rJ   rG   s      r   rJ   zLocalFS.rename   s    4 		+{+++++r   c                .    t          j        |           d S r,   )shutilrmtreer/   s     r   _rmrzLocalFS._rmr   s    gr   c                .    t          j        |           d S r,   )rf   remover/   s     r   _rmzLocalFS._rm   s    
	'r   c                    |                      |          sdS t          j                            |          r|                     |          S |                     |          S )a  
        Delete the local file path, whether it's a file or directory.

        Args:
            fs_path(str): The local file path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_localFS_mkdirs")
                >>> client.delete("test_localFS_mkdirs")

        N)r9   rf   r   ro   rx   ru   r/   s     r   rB   zLocalFS.delete   sW    $ }}W%% 	F7>>'"" 	%88G$$$yy!!!r   Literal[False]c                    dS )NFr   rD   s    r   rE   zLocalFS.need_upload_download  s    ur   boolc                @    t           j                            |          S )au  
        Whether the local file path is a file.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_is_file")
                >>> print(client.is_file("test_is_file"))
                True
                >>> client.delete("test_is_file")

        )rf   r   ro   r/   s     r   r5   zLocalFS.is_file  s    . w~~g&&&r   c                @    t           j                            |          S )a|  
        Whether the local file path is a directory.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_is_dir")
                >>> print(client.is_dir("test_is_dir"))
                True
                >>> client.delete("test_is_dir")

        rf   r   rh   r/   s     r   r7   zLocalFS.is_dir$  s    . w}}W%%%r   c                @    t           j                            |          S )a  
        Whether the local file path exists.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Whether it's a file or directory, return true if the path exists,
            otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> local_fs = LocalFS()
                >>> ret = local_fs.is_exist("test_is_exist")

        )rf   r   existsr/   s     r   r9   zLocalFS.is_exist=  s    * w~~g&&&r   TrX   c                    |                      |          r|rdS t          t          |d          5  	 ddd           dS # 1 swxY w Y   dS )a1  
        Create a local file.

        Args:
            fs_path(str): The local file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_touch")
                >>> client.delete("test_touch")

        Na)r9   r"   openrW   s      r   rY   zLocalFS.touchT  s    ( ==!! 	$ ##'3 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   A  AAFsrc_pathdst_pathrM   rN   c                    |                      |          st          |r*|                      |          r|                     |           |                      |          rt          |                     ||          S )a  
        Move a local file or directory from `src_path` to `dst_path` .

        Args:
            src_path(str):  Name of the file or directory, that's needed to be moved.
            dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `dst_path` if that exists. Default is False.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_mv_src")
                >>> client.mv("test_mv_src", "test_mv_dst")
                >>> client.delete("test_mv_dst")

        )r9   r$   rB   r"   rJ   )r0   r   r   rM   rN   s        r   rO   z
LocalFS.mvo  sy    6 }}X&& 	'&& 	"x00 	"KK!!!=="" 	$##{{8X...r   	list[str]c                v    |                                sg S fdt          j                  D             }|S )a  
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs = client.list_dirs("./")

        c                ^    g | ])}t           j                            d z   |z             '|*S re   r   ).0rl   r1   s     r   
<listcomp>z%LocalFS.list_dirs.<locals>.<listcomp>  sE     
 
 
bgmmGcMA<M.N.N

 
 
r   )r9   rf   rg   )r0   r1   rj   s    ` r   rU   zLocalFS.list_dirs  sW    ( }}W%% 	I
 
 
 
z'**
 
 
 r   Nr1   r   rb   rc   r1   r   rb   rm   )rH   r   rI   r   rb   rm   )rb   rz   r1   r   rb   r|   r]   r1   r   rX   r|   rb   rm   r\   )
r   r   r   r   rM   r|   rN   r|   rb   rm   r1   r   rb   r   )r   r   r   __doc__r2   r@   rJ   ru   rx   rB   rE   r5   r7   r9   rY   rO   rU   r   r   r   ra   ra      s,               D, , , ,*, , , ,8    " " " "4   ' ' ' '2& & & &2' ' ' '.    >  !$/ $/ $/ $/ $/L     r   ra   max_time_outfloat | Nonerb   >Callable[[Callable[_InputT, _RetT]], Callable[_InputT, _RetT]]c                     d fd}|S )Nrl   Callable[_InputT, _RetT]rb   c                L     t          j                   d fd            }|S )	Nargs_InputT.argskwargs_InputT.kwargsrb   r   c                 L   | d         }	}|t          |j                  dz  }n|dz  }t          |j                  dz  }t          j                    }|}	 	  | i |S # t          $ ra}t          j                    |z
  |k    r)t          d|  dt          j                    |z
             t          j        |           Y d }~nd }~ww xY wt          j                    |z
  dk    r<t          d|  dt          j                    |z
              t          j                    })Nr   g     @@Tzargs:z	 timeout:   zhadoop operator timeout:args:)float	_time_out_sleep_intertimer   r&   sleepprint)
r   r   otime_outinterstartlast_print_timeerl   r   s
           r   handlerz2_handle_errors.<locals>.decorator.<locals>.handler  s[   QA#H --6F"!.))F2EIKKE#O2	&1d-f---# & & &y{{U*h66'HDHH49;;3FHH   Ju%%%%%%%%& 9;;0255\\\ty{{UZGZ\\   '+ikkO!2s   A! !
C+ACC)r   r   r   r   rb   r   )	functoolswraps)rl   r   r   s   ` r   	decoratorz!_handle_errors.<locals>.decorator  sC    				2 	2 	2 	2 	2 	2 
		2: r   )rl   r   rb   r   r   )r   r   s   ` r   _handle_errorsr     s*         B r   c                  p   e Zd ZU dZded<   	 	 d@dAdZdBdZdBdZ e            dCd            Z	 e            dDd            Z
d Zd Z e            dEd            Zd ZdEdZ e            dEd            Z	 dFdGd"Z	 	 dHdId%Z e            d&             Z	 	 dHdJd'Z e            d(             Z e            dKd)            Z	 	 dLdMd.Z e            d/             Zd0 Zd1 Z e            dKd2            ZdNdOd4Z e            d5             ZdPd7ZdQdRd:Z e            d;             Z d< Z!dSd?Z"d8S )T
HDFSClienta  
    A tool of HDFS.

    Args:
        hadoop_home(str): Hadoop home.
        configs(dict): Hadoop config. It is a dictionary and needs to contain the
            keys: "fs.default.name" and "hadoop.job.ugi".

    Examples:

        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import HDFSClient
            >>> hadoop_home = "/home/client/hadoop-client/hadoop/"

            >>> configs = {
            ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
            ...     "hadoop.job.ugi": "hello,hello123"
            ... }

            >>> client = HDFSClient(hadoop_home, configs)
            >>> client.ls_dir("hdfs:/test_hdfs_client")
            ([], [])

    r   pre_commands   hadoop_homer   configsr   r   r   sleep_interrb   rm   c                   g | _         | d}| j                             |           d}| j                             |           |r<|                                D ]'\  }}d| d| }	| j                             |	           (|| _        || _        d                    | j                   | _        t          j        d          | _	        d S )Nz/bin/hadoopfsz-D= z8\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:)
r   ri   itemsr   r   join	_base_cmdrecompile
_bd_err_re)
r0   r   r   r   r   
hadoop_bindfskvconfig_commands
             r   __init__zHDFSClient.__init__  s     #000
  ,,,  %%% 	9 9 91!-a!!((8888!'$"344*G
 
r   F   c                0   | j          d| }d}d }d}t          |dz             D ]G}t          j        |dd|          \  }}t	          |          }|dk    r nt          j        |           H|dk    rt          |          ||                                fS )Nz -r      r
      )	r   ranger	   shell_execute_cmdr   r   r   r(   
splitlines)	r0   cmdredirect_stderrretry_timesexe_cmdretoutputretry_sleep_secondxs	            r   _run_cmdzHDFSClient._run_cmd  s    ^,,s,,{Q'' 	+ 	+A0!QPPKCc((CaxxJ)****#::#C(((F%%''''r   c                   | j         g|                                }d}d}d}t          |dz             D ]}	 t          j        |dt          j        |rt          j        nt          j        d          }	|	j        } nQ# t          j        $ r,}
|
j	        }|
j
        }t          j        |           Y d }
~
}d }
~
wt          $ r}
Y d }
~
 nd }
~
ww xY w|dk    rt          |          d S )Nr    r   r
   T)checkstdoutstderrtextr   )r   splitr   
subprocessrunPIPESTDOUTr   CalledProcessError
returncoder   r   r   	Exceptionr(   )r0   r   r   r   r   r   r   r   r   processr   s              r   _run_safe_cmdzHDFSClient._run_safe_cmd!  s%   >0CIIKK0{Q'' 	 	A$.%? +-
))'_
 
 
 !0 / / /l
-........    #::#C((( :s   AA<<C
"B22C
C
r1   c                d    |                      |          sg S |                     |          \  }}|S )a*  
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        r9   _ls_dirr0   r1   rj   rk   s       r   rU   zHDFSClient.list_dirs?  s7    8 }}W%% 	Ill7++er   rc   c                ^    |                      |          sg g fS |                     |          S )a  
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        r   r/   s     r   r2   zHDFSClient.ls_dira  s3    : }}W%% 	r6M||G$$$r   c                   d| }|                      |          \  }}|dk    rt          |          g }g }|D ]}|                                }t          |          dk    r*t          j                            |d                   }	|d         d         dk    r|                    |	           w|                    |	           ||fS )Nls r         d)r   r   r   lenrf   r   basenameri   )
r0   r1   r   r   linesrj   rk   linearrps
             r   r   zHDFSClient._ls_dir  s    Goo]]3''
U!88s### 		  		 D**,,C3xx1}}  Q((A1vayCAQU{r   c                P    |D ]"}| j                             |          }||c S #d S r,   )r   match)r0   r   lms       r   _test_matchzHDFSClient._test_match  s@     	 	A%%a((A}  tr   r|   c                Z    |                      |          sdS |                     |          S )a.  
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr9   _is_dirr/   s     r   r7   zHDFSClient.is_dir  s/    8 }}W%% 	5||G$$$r   c                    d| }|                      |dd          \  }}|rW|                     |          r@t          d           t          d                    |                     t	          |          dS dS )Nztest -d Tr
   r   r   zraise exception: 
F)r   r   r   r   r   )r0   r1   r   r   r   s        r   r   zHDFSClient._is_dir  s    """]]3!]LL
U 	&& ()***dii&&'''"3'''5tr   c                \    |                      |          sdS |                     |           S )a$  
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   r/   s     r   r5   zHDFSClient.is_file  s2    6 }}W%% 	5<<((((r   c                X    d| d}|                      |dd          \  }}|dk    rdS dS )aA  
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        ztest -e r   Tr
   r  r   F)r   )r0   r1   r   r   outs        r   r9   zHDFSClient.is_exist  sB    : $###==d=JJS!885tr   rQ   rR   rM   c                   |                     d          }|                     d          }t          j                            |          }|                     |dz   |z             r|r|                     |dz   |z              |                     |          s|                     |           |                     ||           dS z
        upload dir to hdfs
        Args:
            local_dir(str): local dir
            dest_dir(str): hdfs dest dir
            overwrite(bool): is overwrite
        Returns:
            return code
        re   N)rstriprf   r   r   r9   rB   r@   _try_uploadr0   rQ   rR   rM   local_basenames        r   rS   zHDFSClient.upload_dir  s     $$S))	??3'')))44==C.899 	9i 	9KK37888}}X&& 	"KK!!!H-----r   r;   multi_processesc                J     fd}d }t                      }|                    |          st          | d           ||          }|st          d           dS                      |          r,|r*                     |                                |           g }	t          |          D ]Z}
                     ||
|          }t          j	        |||f          }|	
                    |           |                                 [|	D ]}|                                 dS )a  
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        c                >    |D ]}                     ||            d S r,   )r
  )hdfs_path_singledatasdatar0   s      r   __subprocess_uploadz.HDFSClient.upload.<locals>.__subprocess_uploadL  s6     9 9  '788889 9r   c                N   g }t           j                            |           s|S t           j                            |           rMt          j        |           D ]7}t           j                            | |          }|                    |           8n|                    |            |S )z
            get local files
            Args:
                path(str): local path
            Returns:
                list of local files
            )rf   r   r   rh   rg   r   ri   )r   rlistfilets       r   get_local_filesz*HDFSClient.upload.<locals>.get_local_filesP  s     E7>>$'' w}}T"" #Jt,, $ $DT400ALLOOOO$ T"""Lr    not existsz/there are nothing need to upload, function exitNtargetr   )ra   r9   r$   r   rB   r@   r   _split_filesmultiprocessingProcessri   r   r   )r0   r;   r1   r  rM   _HDFSClient__subprocess_uploadr  local	all_filesprocsiprocess_datasr   procs   `             r   r<   zHDFSClient.upload*  sk   D	9 	9 	9 	9 	9	 	 	* 		~~j)) 	C&*'A'A'ABBB#OJ//	 	CDDDF==!! 	!i 	!KK   KK   '' 	 	A --iOLLM'*'=1I  A LLOOOGGIIII  	 	DIIKKKK	 	r   c                    d| d| }d}	 |                      |          \  }}|dk    rt          |          d S # t          $ r}|                     |           |d }~ww xY w)Nzput r   r   )r   r   r   rB   )r0   r;   r1   r   r   _r   s          r   r
  zHDFSClient._try_upload  s    +Z++'++	]]3''FCaxx"3''' x 	 	 	KK   G	s   -; 
A!AA!c                P     fd}                                st           d                                         r                     |          S                                \  }}fd|D             }|                    fd|D                        g }	t          |          D ]Z}
                     ||
|          }t          j	        |||f          }|	
                    |           |                                 [|	D ]}|                                 dS )al  
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.download("hdfs:/test_hdfs_client", "./")

        c                >    |D ]}                     ||            dS )z
            download file from HDFS
            Args:
                local_path(str): the local file path
                datas(str): the hdfs file path list
            N)_try_download)r;   r  r  r0   s      r   __subprocess_downloadz2HDFSClient.download.<locals>.__subprocess_download  s7      5 5""444445 5r   
 not exitsc                     g | ]
}d z   |z   S r   r   r   r#  r1   s     r   r   z'HDFSClient.download.<locals>.<listcomp>  s"    >>>1Ws]Q&>>>r   c                     g | ]
}d z   |z   S r   r   r.  s     r   r   z'HDFSClient.download.<locals>.<listcomp>  s"    :::'C-!+:::r   r  N)r9   r$   r5   r*  r2   extendr   r  r  r  ri   r   r   )r0   r1   r;   r  rM    _HDFSClient__subprocess_downloadrj   all_filenamesr!  r"  r#  r$  r   r%  s   ``            r   r>   zHDFSClient.download  sk   D	5 	5 	5 	5 	5 }}W%% 	?&''='='=>>><<   	;%%gz:::"kk'22m>>>>>>>	::::T:::;;;'' 	 	A --iOLLM',J3N  A LLOOOGGIIII  	 	DIIKKKK	 	r   c                    d| d| }d}	 |                      |          \  }}|dk    rt          |          d S # t          $ r*}t                      }|                    |           |d }~ww xY w)Nzget r   r   )r   r   r   ra   rB   )r0   r1   r;   r   r   r'  r   local_fss           r   r*  zHDFSClient._try_download  s    +W++z++	]]3''FCaxx"3''' x 	 	 	yyHOOJ'''G	s   -; 
A/%A**A/c                Z   |                      |          rdS d}d| d}|                     |d          \  }}|dk    r|D ]
}d|v rd} n|st          |          |rG|                      |          s4d	| }|                     |          \  }}|dk    rt          |          dS dS dS )
a  
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        NFzmkdir r   T)r   r   zNo such file or directoryz	mkdir -p )r9   r   r   )r0   r1   out_hdfsr   r   r  r   r'  s           r   r@   zHDFSClient.mkdirs  s   2 ==!! 	F!w!!!==d=;;S!88  .!33#HE 4  ("3''' 	(DMM'22 	('g''C]]3''FCaxx"3'''		( 	( 	( 	( xr   TrH   rI   rN   c                &   |r*|                      |          r|                     |           |rN|                      |          st          | d          |                      |          rt          | d          |                     ||          S )a  
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

         is not exists exists already)r9   rB   r$   r"   _try_mvrL   s        r   rO   zHDFSClient.mv	  s    B  	%{33 	%KK$$$ 	I==-- K*k+I+I+IJJJ}}[)) I';(G(G(GHHH||K555r   c                   d| d| }d}	 |                      |d          \  }}|dk    rt          |          d S # t          $ r7}|                     |          s|                     |          rY d }~d S |d }~ww xY w)Nzmv r   r   r
   r   )r   r   r   r9   )r0   rH   rI   r   r   r'  r   s          r   r:  zHDFSClient._try_mv6  s    /K//+//	]]3A]66FCaxx"3''' x 	 	 	==-- $--2L2L G	s   /= 
A>*A97A99A>c                j    d| }|                      |          \  }}|dk    rt          |          d S )Nzrmr r   r   r   r0   r1   r   r   r'  s        r   ru   zHDFSClient._rmrC  sB    Ws##Q!88s### 8r   c                j    d| }|                      |          \  }}|dk    rt          |          d S )Nzrm r   r>  r?  s        r   rx   zHDFSClient._rmI  s@    Goos##Q!88s### 8r   c                    |                      |          sdS |                     |          }|r|                     |          S |                     |          S )a  
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.delete("hdfs:/test_hdfs_client")

        N)r9   r   ru   rx   )r0   r1   r7   s      r   rB   zHDFSClient.deleteO  sZ    2 }}W%% 	Fg&& 	&99W%%%xx   r   rX   c                l    |                      |          r|rdS t          |                     |          S )a6  
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.touch("hdfs:/test_hdfs_client")

        N)r9   r"   _touchzrW   s      r   rY   zHDFSClient.touchq  s=    4 ==!! 	$ ##||G$$$r   c                j    d| }|                      |          \  }}|dk    rt          |          d S )Nztouchz r   r>  r?  s        r   rC  zHDFSClient._touchz  sB    !!!s##Q!88s### 8r   Literal[True]c                    dS NTr   rD   s    r   rE   zHDFSClient.need_upload_download      tr   N
str | Nonec                    |                      |          r*|                     |          }d                    |          S dS )a  
        Cat a remote HDFS file.

        Args:
            fs_path(str|None): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.cat("hdfs:/test_hdfs_client")
                ''

        r  r   )r5   _try_catr   )r0   r1   r   s      r   r[   zHDFSClient.cat  s@    8 <<   	]]7++F99V$$$2r   c                n    d| }|                      |d          \  }}|dk    rt          |          |S )Nzcat r
   r<  r   r>  )r0   r1   r   r   r   s        r   rK  zHDFSClient._try_cat  sD    WmmCQm77V!88s###r   c                    t          |          |z  }t          |          |z  }|g|z  }t          |          D ]}||xx         dz  cc<   g g|z  }d}	t          |          D ]#}||	|	||         z            ||<   |	||         z  }	$||         S z
        split file list
        Args:
            files(list): file list
            trainer_id(int): trainer mpi rank id
            trainers(int): all trainers num
        Returns:
            filelist(list): file list of current trainer
        r
   r   r   r   
r0   rk   
trainer_idtrainers	remainder	blocksizeblocksr#  trainer_filesbegins
             r   r  zHDFSClient._split_files       JJ)	JJ(*	x'y!! 	 	A1IIINIIIIxx 	 	A$UUVAY->%>?M!VAYEEZ((r   	path_listlist[_FileInfo]c                   t          |          dk    rg S g }d}|D ]
}||dz   z  }d|z   dz   }|                     |          \  }}t          |          dk    rt          j        d| d           g S |D ]`}|                    d          }	t          |	          dk     r+|	d	         }
t          |	d                   }|                    |
|d
           a|S )z
        list_files return file path and size
        Args:
            path_list(list): file list
        Returns:
            filelist(list): file list with file path and size
        r   r   r   r   z) | awk '{if ($8 != "") {print $5" "$8 }}'zlist_files empty, path[]   r
   )r   r   )r   r   r   warningr   r   ri   )r0   rY  	file_list
str_concatr   r   r   r   r   r   	file_path	file_sizes               r   list_files_infozHDFSClient.list_files_info  s    y>>QI	 
 	% 	%D$*$JJJ!PP 	 ]]3''
Uu::??NAYAAABBBI 	E 	ED**S//C3xx!||AICFIiCCDDDDr   r   r   )
r   r   r   r   r   r   r   r   rb   rm   )Fr   r   r   r   F)rQ   r   rR   r   rM   r|   rb   rm   )r   F)
r;   r   r1   r   r  r   rM   r|   rb   rm   )
r1   r   r;   r   r  r   rM   r|   rb   rm   r   FT)
rH   r   rI   r   rM   r|   rN   r|   rb   rm   r]   r   )rb   rE  r,   )r1   rI  rb   r   )rY  r   rb   rZ  )#r   r   r   r   r   r   r   r   r   rU   r2   r   r   r7   r   r5   r9   rS   r<   r
  r>   r*  r@   rO   r:  ru   rx   rB   rY   rC  rE   r[   rK  r  rc  r   r   r   r   r     s         6  &
 
 
 
 
2( ( ( ( ) ) ) )< ^   B ^% % % %B  ,   ^% % % %@  ) ) ) )@ ^! ! ! !H @E. . . . .4  !S S S S Sj ^	 	 	   !@ @ @ @ @D ^
 
 
 ^+( +( +( +(b   +6 +6 +6 +6 +6Z ^
 
 
$ $ $$ $ $ ^! ! ! !B% % % % %B ^$ $ $            D ^  ) ) )2           r   r   c                      e Zd ZdZddZd Zd Zd Zd Zd	 Z	d
 Z
d Zd ZddZddZddZd ZddZd ZddZd Zd dZd ZdS )!	AFSClienta:  
    A tool of AFS. Use AfsWrapper.
    When WITH_PSLIB=ON, you can use this class directly.
    When WITH_PSCORE=ON, you should export LD_LIBRARY_PATH='YOUR_AFSAPISO_PATH' before using this class.

    Examples:

        .. code-block:: python

            >>> # doctest: +SKIP('depend on external file')
            >>> from paddle.distributed.fleet.utils.fs import AFSClient

            >>> client = AFSClient()
            >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
            >>> client.ls_dir("hdfs:/test_hdfs_client")

    r   r   c                D    t          j                    | _        || _        d S r,   )r	   
AfsWrapper_fsr   )r0   r   r   s      r   r   zAFSClient.__init__  s    ?$$!r   c                @    | j                             ||||           d S r,   )rk  init)r0   fs_namefs_user	fs_passwdfs_confs        r   rm  zAFSClient.init  s"    gw	7;;;;;r   c                d    |                      |          sg S |                     |          \  }}|S )a{  
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        r   r   s       r   rU   zAFSClient.list_dirs  s7    , }}W%% 	Ill7++er   c                ^    |                      |          sg g fS |                     |          S )a  
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        r   r/   s     r   r2   zAFSClient.ls_dir8  s3    . }}W%% 	r6M||G$$$r   c                D    | j                             |          }|g}||fS r,   )rk  list)r0   r1   rk   rj   s       r   r   zAFSClient._ls_dirT  s&    g&&yU{r   c                Z    |                      |          sdS |                     |          S )a~  
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_dir("hdfs:/test_hdfs_client")

        Fr   r/   s     r   r7   zAFSClient.is_dirY  s/    , }}W%% 	5||G$$$r   c                d    | j                             |          }t          |          dk    rdS dS )Nr   TF)rk  ru  r   )r0   r1   	list_paths      r   r   zAFSClient._is_dirt  s0    HMM'**		NNa45r   c                \    |                      |          sdS |                     |           S )au  
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   r/   s     r   r5   zAFSClient.is_file{  s2    , }}W%% 	5<<((((r   c                6    | j                             |          S )a  
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        )rk  existr/   s     r   r9   zAFSClient.is_exist  s    . x~~g&&&r   Fc                   |                     d          }|                     d          }t          j                            |          }|                     |dz   |z             r|r|                     |dz   |z              |                     |          s|                     |           | j                            ||           dS r  )	r	  rf   r   r   r9   rB   r@   rk  r<   r  s        r   rS   zAFSClient.upload_dir  s     $$S))	??3'')))44==C.899 	9i 	9KK37888}}X&& 	"KK!!!	8,,,,,r   r
   c                    t                      }|                    |          st          | d          | j                            ||           dS )a  
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        r  N)ra   r9   r$   rk  r<   )r0   r;   r1   r  rM   r   s         r   r<   zAFSClient.upload  sU    . 		~~j)) 	C&*'A'A'ABBB
G,,,,,r   c                   |                      |          st          | d          |                     |          r| j                            ||          S |                     |          \  }}|D ]`}t          j                            |t          j        	                    |          d                   }| j                            ||           adS )a  
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.download("hdfs:/test_hdfs_client", "./")

        r,  r
   N)
r9   r$   r5   rk  r>   r2   rf   r   r   r   )	r0   r1   r;   r  rM   r'  r2  	file_namelocal_file_names	            r   r>   zAFSClient.download  s    . }}W%% 	?&''='='=>>><<   	:8$$Z999  ;;w//=& 	: 	:I gllBGMM)44Q7 O Hoy9999		: 	:r   c                h    |                      |          rdS | j                            |           dS )a  
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        N)r9   rk  mkdirr/   s     r   r@   zAFSClient.mkdirs  s7    & ==!! 	Fwr   Tc                4   |r*|                      |          r|                     |           |rN|                      |          st          | d          |                      |          rt          | d          | j                            ||           dS )a  
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

        r8  r9  N)r9   rB   r$   r"   rk  rO   rL   s        r   rO   zAFSClient.mv  s    ,  	%{33 	%KK$$$ 	I==-- K*k+I+I+IJJJ}}[)) I';(G(G(GHHHK-----r   c                h    |                      |          sdS | j                            |           dS )a  
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python


                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.delete("hdfs:/test_hdfs_client")

        N)r9   rk  rw   r/   s     r   rB   zAFSClient.delete>  s7    ( }}W%% 	F     r   c                v    |                      |          r|rdS t          | j                            |          S )a  
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.touch("hdfs:/test_hdfs_client")

        N)r9   r"   rk  touchzrW   s      r   rY   zAFSClient.touchV  s?    * ==!! 	$ ##xw'''r   c                    dS rG  r   rD   s    r   rE   zAFSClient.need_upload_downloadr  rH  r   Nc                d    |                      |          r| j                            |          S dS )a  
        Cat a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.cat("hdfs:/test_hdfs_client")

        r   )r5   rk  r[   r/   s     r   r[   zAFSClient.catu  s1    , <<   	8<<(((2r   c                    t          |          |z  }t          |          |z  }|g|z  }t          |          D ]}||xx         dz  cc<   g g|z  }d}	t          |          D ]#}||	|	||         z            ||<   |	||         z  }	$||         S rN  rO  rP  s
             r   r  zAFSClient._split_files  rX  r   rd  re  )r
   Frf  r]   r,   )r   r   r   r   r   rm  rU   r2   r   r7   r   r5   r9   rS   r<   r>   r@   rO   rB   rY   rE   r[   r  r   r   r   rh  rh    s`        $" " " "< < <  8% % %8  
% % %6  ) ) )6' ' '2- - - -(- - - -:#: #: #: #:J     . .  .  .  .D! ! !0( ( ( (8     6) ) ) ) )r   rh  r,   )r   r   rb   r   )'
__future__r   r^   r   r  rf   r   rs   r   r   typingr   r   r   r   r   paddle.baser	   log_utilr   typing_extensionsr   r   r   r   r   r   __all__r   r   r"   r$   r&   r(   r*   ra   r   r   rh  r   r   r   <module>r     s   " # " " " " " 



         				 				       G G G G G G G G G G G G G G             ++++++i	""GGGE!	LL     I   
 	 	 	 	 	9 	 	 		 	 	 	 		 	 	 		 	 	 	 	9 	 	 		 	 	 	 		 	 	 		 	 	 	 	 	 	 	;" ;" ;" ;" ;" ;" ;" ;"|j j j j jb j j j\	 "&$ $ $ $ $Ne e e e e e e ePe) e) e) e) e) e) e) e) e) e)r   