
    x-jI"                    h    d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	 g Z
dad ZddZddZddZdS )    )annotationsN)ManagerProcess)core)wait_server_readyc                j   ddl m}  |t          |           |          }|                                 d}|                    dd          s|                                s>t          j        |           |                    dd          *|                                >|                                 d S )Nr   )KVServer)size   runningF)	*paddle.distributed.fleet.utils.http_serverr	   intstartgetshould_stoptimesleepstop)porthttp_server_dr
   r	   http_serverwait_secondss         e/var/www/html/banglarbhumi/venv/lib/python3.11/site-packages/paddle/distributed/parallel_with_gloo.py_start_kv_serverr      s    CCCCCC(3t994000KL


Iu
-
- ![5L5L5N5N !
<    

Iu
-
- ![5L5L5N5N !    rank_idr   rank_numserver_endpointstrreturnNonec                   |dk     du s
J d            t                      }|                                }d|d<   | dk    rcd|i}t          t          t	          |                    d          d                   ||f	          }d
|_        d
|d<   |                                 t          |g           t          j
                    }| |_        ||_        |                    d          d         |_        t	          |                    d          d                   |_        d|_        d|_        t          j        |          at$                                           | dk    rd|d<   |                                 dS dS )a  
    Initialize parallel environment with gloo for cpu only.

    Args:
        - rank_id (int, required) - the index of current rank;
        - rank_num (int, required) - the number of ranks in this parallel env;
        - server_endpoint (str, required) - endpoint of server to init gloo context in ip:port format;

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set() # type: ignore

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_init(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)

            >>> def test_gloo_init_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_init,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_init_with_multiprocess(2)
       FzSrank_num should greater than or equal to 2 for parallel environment initialization.r   r   _worker:   )targetargsTi  i N)r   dictr   r   r   splitdaemonr   r   r   GlooParallelStrategyrankr   
ip_addressip_portinit_secondsrun_secondsGlooParallelContext_global_gloo_ctxinitjoin)r   r   r   managerhttp_server_statusr
   http_server_procgloo_strategys           r   gloo_init_parallel_envr:   *   s   r qLU"""] #""
 iiG $)y!!||8$"#o++C003446H$O
 
 
 #'(,9%    '(((-//M M%M.44S99!<M 5 5c : :1 =>>M!%M 'M />>!||(-9% |r   c                 Z    t           
J d            t                                            dS )a  
    Call barrier function with initialized gloo context.

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set() # type: ignore

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_barrier(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)
            ...     paddle.distributed.gloo_barrier()

            >>> def test_gloo_barrier_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_barrier,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_barrier_with_multiprocess(2)
    Nz gloo context is not initialized.)r3   barrier r   r   gloo_barrierr>      s2    l '')K'''r   c                 J    t           t                                            dS dS )aN  
    Release the parallel environment initialized by gloo

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set() # type: ignore

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_release(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)
            ...     paddle.distributed.gloo_barrier()
            ...     paddle.distributed.gloo_release()

            >>> def test_gloo_release_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_release,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_release_with_multiprocess(2)
    N)r3   releaser=   r   r   gloo_releaserA      s+    n #  """"" $#r   )r   r   r   r   r   r   r    r!   )r    r!   )
__future__r   r   multiprocessingr   r   paddle.baser   5paddle.distributed.fleet.base.private_helper_functionr   __all__r3   r   r:   r>   rA   r=   r   r   <module>rG      s    # " " " " "  , , , , , , , ,               `  `  `  ` F7 7 7 7t8# 8# 8# 8# 8# 8#r   