
    ΑiI"                    |    S SK Jr  S SKrS SKJrJr  S SKJr  S SKJ	r	  / r
SqS r        S
S jrSS jrSS	 jrg)    )annotationsN)ManagerProcess)core)wait_server_readyc                X   SSK Jn  U" [        U 5      US9nUR                  5         SnUR	                  SS5      (       d  UR                  5       (       dF  [        R                  " U5        UR	                  SS5      (       a  M/  UR                  5       (       d  MF  UR                  5         g )Nr   )KVServer)size   runningF)	*paddle.distributed.fleet.utils.http_serverr	   intstartgetshould_stoptimesleepstop)porthttp_server_dr
   r	   http_serverwait_secondss         e/var/www/html/banglarbhumi/venv/lib/python3.13/site-packages/paddle/distributed/parallel_with_gloo.py_start_kv_serverr      s    C3t940KL


Iu
-
-[5L5L5N5N

<  

Iu
-
-[5L5L5N5N    c                z   US:  SL d   S5       e[        5       nUR                  5       nSUS'   U S:X  aK  SU0n[        [        [	        UR                  S5      S   5      XE4S	9nS
Ul        S
US'   UR                  5         [        U/5        [        R                  " 5       nXl        Xl        UR                  S5      S   Ul        [	        UR                  S5      S   5      Ul        SUl        SUl        [        R"                  " U5      q[$        R'                  5         U S:X  a  SUS'   WR)                  5         gg)a  
Initialize parallel environment with gloo for cpu only.

Args:
    - rank_id (int, required) - the index of current rank;
    - rank_num (int, required) - the number of ranks in this parallel env;
    - server_endpoint (str, required) - endpoint of server to init gloo context in ip:port format;

Returns:
    None

Examples:
    .. code-block:: python

        >>> import paddle
        >>> import multiprocessing
        >>> from contextlib import closing
        >>> import socket

        >>> port_set = set() # type: ignore

        >>> def find_free_port():
        ...     def _free_port():
        ...         with closing(socket.socket(socket.AF_INET,
        ...             socket.SOCK_STREAM)) as s:
        ...             s.bind(('', 0))
        ...             return s.getsockname()[1]
        ...     while True:
        ...         port = _free_port()
        ...         if port not in port_set:
        ...             port_set.add(port)
        ...             return port

        >>> def test_gloo_init(id, rank_num, server_endpoint):
        ...     paddle.distributed.gloo_init_parallel_env(
        ...         id, rank_num, server_endpoint)

        >>> def test_gloo_init_with_multiprocess(num_of_ranks):
        ...     jobs = []
        ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
        ...     for id in range(num_of_ranks):
        ...         p = multiprocessing.Process(
        ...             target=test_gloo_init,
        ...             args=(id, num_of_ranks, server_endpoint))
        ...         jobs.append(p)
        ...         p.start()
        ...     for proc in jobs:
        ...         proc.join()

        >>> if __name__ == '__main__':
        ...     # Arg: number of ranks (processes)
        ...     test_gloo_init_with_multiprocess(2)
   FzSrank_num should greater than or equal to 2 for parallel environment initialization.r   r   _worker:   )targetargsTi  i N)r   dictr   r   r   splitdaemonr   r   r   GlooParallelStrategyrankrank_num
ip_addressip_portinit_secondsrun_secondsGlooParallelContext_global_gloo_ctxinitjoin)rank_idr(   server_endpointmanagerhttp_server_statusr
   http_server_procgloo_strategys           r   gloo_init_parallel_envr7   *   sN   r qLU" ]"
 iG $)y!!|8$"#o++C0346HO
 #'(,9%  '(--/M %.44S9!<M 5 5c :1 =>M!%M 'M //>!|(-9% r   c                 H    [         c   S5       e[         R                  5         g)ap  
Call barrier function with initialized gloo context.

Args:
    None

Returns:
    None

Examples:
    .. code-block:: python

        >>> import paddle
        >>> import multiprocessing
        >>> from contextlib import closing
        >>> import socket

        >>> port_set = set() # type: ignore

        >>> def find_free_port():
        ...     def _free_port():
        ...         with closing(socket.socket(socket.AF_INET,
        ...             socket.SOCK_STREAM)) as s:
        ...             s.bind(('', 0))
        ...             return s.getsockname()[1]
        ...     while True:
        ...         port = _free_port()
        ...         if port not in port_set:
        ...             port_set.add(port)
        ...             return port

        >>> def test_gloo_barrier(id, rank_num, server_endpoint):
        ...     paddle.distributed.gloo_init_parallel_env(
        ...         id, rank_num, server_endpoint)
        ...     paddle.distributed.gloo_barrier()

        >>> def test_gloo_barrier_with_multiprocess(num_of_ranks):
        ...     jobs = []
        ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
        ...     for id in range(num_of_ranks):
        ...         p = multiprocessing.Process(
        ...             target=test_gloo_barrier,
        ...             args=(id, num_of_ranks, server_endpoint))
        ...         jobs.append(p)
        ...         p.start()
        ...     for proc in jobs:
        ...         proc.join()

        >>> if __name__ == '__main__':
        ...     # Arg: number of ranks (processes)
        ...     test_gloo_barrier_with_multiprocess(2)
Nz gloo context is not initialized.)r.   barrier r   r   gloo_barrierr;      s#    l 'K)KK'r   c                 <    [         b  [         R                  5         gg)a  
Release the parallel environment initialized by gloo

Args:
    None

Returns:
    None

Examples:
    .. code-block:: python

        >>> import paddle
        >>> import multiprocessing
        >>> from contextlib import closing
        >>> import socket

        >>> port_set = set() # type: ignore

        >>> def find_free_port():
        ...     def _free_port():
        ...         with closing(socket.socket(socket.AF_INET,
        ...             socket.SOCK_STREAM)) as s:
        ...             s.bind(('', 0))
        ...             return s.getsockname()[1]
        ...     while True:
        ...         port = _free_port()
        ...         if port not in port_set:
        ...             port_set.add(port)
        ...             return port

        >>> def test_gloo_release(id, rank_num, server_endpoint):
        ...     paddle.distributed.gloo_init_parallel_env(
        ...         id, rank_num, server_endpoint)
        ...     paddle.distributed.gloo_barrier()
        ...     paddle.distributed.gloo_release()

        >>> def test_gloo_release_with_multiprocess(num_of_ranks):
        ...     jobs = []
        ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
        ...     for id in range(num_of_ranks):
        ...         p = multiprocessing.Process(
        ...             target=test_gloo_release,
        ...             args=(id, num_of_ranks, server_endpoint))
        ...         jobs.append(p)
        ...         p.start()
        ...     for proc in jobs:
        ...         proc.join()

        >>> if __name__ == '__main__':
        ...     # Arg: number of ranks (processes)
        ...     test_gloo_release_with_multiprocess(2)
N)r.   releaser:   r   r   gloo_releaser>      s    n #  " $r   )r1   r   r(   r   r2   strreturnNone)r@   rA   )
__future__r   r   multiprocessingr   r   paddle.baser   5paddle.distributed.fleet.base.private_helper_functionr   __all__r.   r   r7   r;   r>   r:   r   r   <module>rG      sb    #  ,   ` ` ` 25` 	` F7t8#r   