# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Provides helper methods for talking to the Compute Engine metadata server.

See https://cloud.google.com/compute/docs/metadata for more details.
"""

import datetime
import http.client as http_client
import json
import logging
import os
from urllib.parse import urljoin

import requests

from google.auth import _helpers
from google.auth import environment_vars
from google.auth import exceptions
from google.auth import metrics
from google.auth import transport
from google.auth._exponential_backoff import ExponentialBackoff
from google.auth.compute_engine import _mtls


_LOGGER = logging.getLogger(__name__)

_GCE_DEFAULT_MDS_IP = "169.254.169.254"
_GCE_DEFAULT_HOST = "metadata.google.internal"
_GCE_DEFAULT_MDS_HOSTS = [_GCE_DEFAULT_HOST, _GCE_DEFAULT_MDS_IP]

# Environment variable GCE_METADATA_HOST is originally named
# GCE_METADATA_ROOT. For compatibility reasons, here it checks
# the new variable first; if not set, the system falls back
# to the old variable.
_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None)
if not _GCE_METADATA_HOST:
    _GCE_METADATA_HOST = os.getenv(
        environment_vars.GCE_METADATA_ROOT, _GCE_DEFAULT_HOST
    )


def _validate_gce_mds_configured_environment():
    """Validates the GCE metadata server environment configuration for mTLS.

    mTLS is only supported when connecting to the default metadata server hosts.
    If we are in strict mode (which requires mTLS), ensure that the metadata host
    has not been overridden to a custom value (which means mTLS will fail).

    Raises:
        google.auth.exceptions.MutualTLSChannelError: if the environment
            configuration is invalid for mTLS.
    """
    mode = _mtls._parse_mds_mode()
    if mode == _mtls.MdsMtlsMode.STRICT:
        # mTLS is only supported when connecting to the default metadata host.
        # Raise an exception if we are in strict mode (which requires mTLS)
        # but the metadata host has been overridden to a custom MDS. (which means mTLS will fail)
        if _GCE_METADATA_HOST not in _GCE_DEFAULT_MDS_HOSTS:
            raise exceptions.MutualTLSChannelError(
                "Mutual TLS is required, but the metadata host has been overridden. "
                "mTLS is only supported when connecting to the default metadata host."
            )


def _get_metadata_root(use_mtls: bool):
    """Returns the metadata server root URL."""

    scheme = "https" if use_mtls else "http"
    return "{}://{}/computeMetadata/v1/".format(scheme, _GCE_METADATA_HOST)


def _get_metadata_ip_root(use_mtls: bool):
    """Returns the metadata server IP root URL."""
    scheme = "https" if use_mtls else "http"
    return "{}://{}".format(
        scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP)
    )


_METADATA_FLAVOR_HEADER = "metadata-flavor"
_METADATA_FLAVOR_VALUE = "Google"
_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE}

# Timeout in seconds to wait for the GCE metadata server when detecting the
# GCE environment.
try:
    _METADATA_DEFAULT_TIMEOUT = int(os.getenv("GCE_METADATA_TIMEOUT", 3))
except ValueError:  # pragma: NO COVER
    _METADATA_DEFAULT_TIMEOUT = 3

# Detect GCE Residency
_GOOGLE = "Google"
_GCE_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name"


def is_on_gce(request):
    """Checks to see if the code runs on Google Compute Engine

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.

    Returns:
        bool: True if the code runs on Google Compute Engine, False otherwise.
    """
    if ping(request):
        return True

    if os.name == "nt":
        # TODO: implement GCE residency detection on Windows
        return False

    # Detect GCE residency on Linux
    return detect_gce_residency_linux()


def detect_gce_residency_linux():
    """Detect Google Compute Engine residency by smbios check on Linux

    Returns:
        bool: True if the GCE product name file is detected, False otherwise.
    """
    try:
        with open(_GCE_PRODUCT_NAME_FILE, "r") as file_obj:
            content = file_obj.read().strip()

    except Exception:
        return False

    return content.startswith(_GOOGLE)


def _prepare_request_for_mds(request, use_mtls=False) -> None:
    """Prepares a request for the metadata server.

    This will check if mTLS should be used and mount the mTLS adapter if needed.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.
        use_mtls (bool): Whether to use mTLS for the request.

    Returns:
        google.auth.transport.Request: A request object to use.
            If mTLS is enabled, the request will have the mTLS adapter mounted.
            Otherwise, the original request will be returned unchanged.
    """
    # Only modify the request if mTLS is enabled.
    if use_mtls:
        # Ensure the request has a session to mount the adapter to.
        if not request.session:
            request.session = requests.Session()

        adapter = _mtls.MdsMtlsAdapter()
        # Mount the adapter for all default GCE metadata hosts.
        for host in _GCE_DEFAULT_MDS_HOSTS:
            request.session.mount(f"https://{host}/", adapter)


def ping(request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=3):
    """Checks to see if the metadata server is available.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.
        timeout (int): How long to wait for the metadata server to respond.
        retry_count (int): How many times to attempt connecting to metadata
            server using above timeout.

    Returns:
        bool: True if the metadata server is reachable, False otherwise.
    """
    use_mtls = _mtls.should_use_mds_mtls()
    _prepare_request_for_mds(request, use_mtls=use_mtls)
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    headers = _METADATA_HEADERS.copy()
    headers[metrics.API_CLIENT_HEADER] = metrics.mds_ping()

    backoff = ExponentialBackoff(total_attempts=retry_count)

    for attempt in backoff:
        try:
            response = request(
                url=_get_metadata_ip_root(use_mtls),
                method="GET",
                headers=headers,
                timeout=timeout,
            )

            metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER)
            return (
                response.status == http_client.OK
                and metadata_flavor == _METADATA_FLAVOR_VALUE
            )

        except exceptions.TransportError as e:
            _LOGGER.warning(
                "Compute Engine Metadata server unavailable on "
                "attempt %s of %s. Reason: %s",
                attempt,
                retry_count,
                e,
            )

    return False


def get(
    request,
    path,
    root=None,
    params=None,
    recursive=False,
    retry_count=5,
    headers=None,
    return_none_for_not_found_error=False,
    timeout=_METADATA_DEFAULT_TIMEOUT,
):
    """Fetch a resource from the metadata server.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.
        path (str): The resource to retrieve. For example,
            ``'instance/service-accounts/default'``.
        root (Optional[str]): The full path to the metadata server root. If not
            provided, the default root will be used.
        params (Optional[Mapping[str, str]]): A mapping of query parameter
            keys to values.
        recursive (bool): Whether to do a recursive query of metadata. See
            https://cloud.google.com/compute/docs/metadata#aggcontents for more
            details.
        retry_count (int): How many times to attempt connecting to metadata
            server using above timeout.
        headers (Optional[Mapping[str, str]]): Headers for the request.
        return_none_for_not_found_error (Optional[bool]): If True, returns None
            for 404 error instead of throwing an exception.
        timeout (int): How long to wait, in seconds for the metadata server to respond.

    Returns:
        Union[Mapping, str]: If the metadata server returns JSON, a mapping of
            the decoded JSON is returned. Otherwise, the response content is
            returned as a string.

    Raises:
        google.auth.exceptions.TransportError: if an error occurred while
            retrieving metadata.
        google.auth.exceptions.MutualTLSChannelError: if using mtls and the environment
            configuration is invalid for mTLS (for example, the metadata host
            has been overridden in strict mTLS mode).

    """
    use_mtls = _mtls.should_use_mds_mtls()
    # Prepare the request object for mTLS if needed.
    # This will create a new request object with the mTLS session.
    _prepare_request_for_mds(request, use_mtls=use_mtls)

    if root is None:
        root = _get_metadata_root(use_mtls)

    # mTLS is only supported when connecting to the default metadata host.
    # If we are in strict mode (which requires mTLS), ensure that the metadata host
    # has not been overridden to a non-default host value (which means mTLS will fail).
    _validate_gce_mds_configured_environment()

    base_url = urljoin(root, path)
    query_params = {} if params is None else params

    headers_to_use = _METADATA_HEADERS.copy()
    if headers:
        headers_to_use.update(headers)

    if recursive:
        query_params["recursive"] = "true"

    url = _helpers.update_query(base_url, query_params)

    backoff = ExponentialBackoff(total_attempts=retry_count)
    failure_reason = None
    for attempt in backoff:
        try:
            response = request(
                url=url, method="GET", headers=headers_to_use, timeout=timeout
            )
            if response.status in transport.DEFAULT_RETRYABLE_STATUS_CODES:
                _LOGGER.warning(
                    "Compute Engine Metadata server unavailable on "
                    "attempt %s of %s. Response status: %s",
                    attempt,
                    retry_count,
                    response.status,
                )
                failure_reason = (
                    response.data.decode("utf-8")
                    if hasattr(response.data, "decode")
                    else response.data
                )
                continue
            else:
                break

        except exceptions.TransportError as e:
            _LOGGER.warning(
                "Compute Engine Metadata server unavailable on "
                "attempt %s of %s. Reason: %s",
                attempt,
                retry_count,
                e,
            )
            failure_reason = e
    else:
        raise exceptions.TransportError(
            "Failed to retrieve {} from the Google Compute Engine "
            "metadata service. Compute Engine Metadata server unavailable due to {}".format(
                url, failure_reason
            )
        )

    content = _helpers.from_bytes(response.data)

    if response.status == http_client.NOT_FOUND and return_none_for_not_found_error:
        return None

    if response.status == http_client.OK:
        if (
            _helpers.parse_content_type(response.headers["content-type"])
            == "application/json"
        ):
            try:
                return json.loads(content)
            except ValueError as caught_exc:
                new_exc = exceptions.TransportError(
                    "Received invalid JSON from the Google Compute Engine "
                    "metadata service: {:.20}".format(content)
                )
                raise new_exc from caught_exc
        else:
            return content

    raise exceptions.TransportError(
        "Failed to retrieve {} from the Google Compute Engine "
        "metadata service. Status: {} Response:\n{}".format(
            url, response.status, response.data
        ),
        response,
    )


def get_project_id(request):
    """Get the Google Cloud Project ID from the metadata server.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.

    Returns:
        str: The project ID

    Raises:
        google.auth.exceptions.TransportError: if an error occurred while
            retrieving metadata.
    """
    return get(request, "project/project-id")


def get_universe_domain(request):
    """Get the universe domain value from the metadata server.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.

    Returns:
        str: The universe domain value. If the universe domain endpoint is not
        not found, return the default value, which is googleapis.com

    Raises:
        google.auth.exceptions.TransportError: if an error other than
            404 occurs while retrieving metadata.
    """
    universe_domain = get(
        request, "universe/universe-domain", return_none_for_not_found_error=True
    )
    if not universe_domain:
        return "googleapis.com"
    return universe_domain


def get_service_account_info(request, service_account="default"):
    """Get information about a service account from the metadata server.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.
        service_account (str): The string 'default' or a service account email
            address. The determines which service account for which to acquire
            information.

    Returns:
        Mapping: The service account's information, for example::

            {
                'email': '...',
                'scopes': ['scope', ...],
                'aliases': ['default', '...']
            }

    Raises:
        google.auth.exceptions.TransportError: if an error occurred while
            retrieving metadata.
    """
    path = "instance/service-accounts/{0}/".format(service_account)
    # See https://cloud.google.com/compute/docs/metadata#aggcontents
    # for more on the use of 'recursive'.
    return get(request, path, params={"recursive": "true"})


def get_service_account_token(request, service_account="default", scopes=None):
    """Get the OAuth 2.0 access token for a service account.

    Args:
        request (google.auth.transport.Request): A callable used to make
            HTTP requests.
        service_account (str): The string 'default' or a service account email
            address. The determines which service account for which to acquire
            an access token.
        scopes (Optional[Union[str, List[str]]]): Optional string or list of
            strings with auth scopes.
    Returns:
        Tuple[str, datetime]: The access token and its expiration.

    Raises:
        google.auth.exceptions.TransportError: if an error occurred while
            retrieving metadata.
    """
    from google.auth import _agent_identity_utils

    params = {}
    if scopes:
        if not isinstance(scopes, str):
            scopes = ",".join(scopes)
        params["scopes"] = scopes

    cert = _agent_identity_utils.get_and_parse_agent_identity_certificate()
    if cert:
        if _agent_identity_utils.should_request_bound_token(cert):
            fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(cert)
            params["bindCertificateFingerprint"] = fingerprint

    metrics_header = {
        metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds()
    }

    path = "instance/service-accounts/{0}/token".format(service_account)
    token_json = get(request, path, params=params, headers=metrics_header)
    token_expiry = _helpers.utcnow() + datetime.timedelta(
        seconds=token_json["expires_in"]
    )
    return token_json["access_token"], token_expiry
