#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Fleet Utils."""

import collections
import json
import logging
import math
import os
import re
import sys
import time

import numpy as np

import paddle
from paddle import base
from paddle.base.log_helper import get_logger
from paddle.distributed.fleet.utils.fs import HDFSClient

from . import utils

__all__ = ["FleetUtil", "GPUPSUtil"]

_logger = get_logger(
    __name__, logging.INFO, fmt='%(asctime)s %(levelname)s: %(message)s'
)

fleet = None


class FleetUtil:
    """
    FleetUtil provides some common functions for users' convenience.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
            >>> fleet_util = FleetUtil()
            >>> fleet_util.rank0_print("my log")

    """

    def __init__(self, mode="pslib"):
        global fleet
        self.mode = mode
        if mode == "pslib":
            from paddle.incubate.distributed.fleet.parameter_server.pslib import (
                fleet as fleet_pslib,
            )

            fleet = fleet_pslib
        elif mode == "transpiler":
            from paddle.incubate.distributed.fleet.parameter_server.distribute_transpiler import (
                fleet as fleet_transpiler,
            )

            fleet = fleet_transpiler

        elif mode == "pscore":
            from paddle.distributed import fleet
        else:
            raise ValueError(
                'Please choose one mode from ["pslib", "transpiler"]'
            )

    def rank0_print(self, s):
        """
        Worker of rank 0 print some log.

        Args:
            s(str): string to print

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.rank0_print("my log")

        """
        if fleet.worker_index() != 0:
            return
        print(s)
        sys.stdout.flush()

    def rank0_info(self, s):
        """
        Worker of rank 0 print some log info.

        Args:
            s(str): string to log

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.rank0_info("my log info")

        """
        if fleet.worker_index() != 0:
            return
        _logger.info(s)

    def rank0_error(self, s):
        """
        Worker of rank 0 print some log error.

        Args:
            s(str): string to log

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.rank0_error("my log error")

        """
        if fleet.worker_index() != 0:
            return
        _logger.error(s)

    def set_zero(
        self,
        var_name,
        scope=base.global_scope(),
        place=base.CPUPlace(),
        param_type="int64",
    ):
        """
        Set tensor of a Variable to zero.

        Args:
            var_name(str): name of Variable
            scope(Scope): Scope object, default is base.global_scope()
            place(Place): Place object, default is base.CPUPlace()
            param_type(str): param data type, default is int64

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.set_zero(myvar.name, myscope)

        """
        param = scope.var(var_name).get_tensor()
        param_array = np.zeros(param._get_dims()).astype(param_type)
        param.set(param_array, place)

    def print_global_auc(
        self,
        scope=base.global_scope(),
        stat_pos="_generated_var_2",
        stat_neg="_generated_var_3",
        print_prefix="",
    ):
        r"""
        Print global auc of all distributed workers.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos(str): name of auc pos bucket Variable
            stat_neg(str): name of auc neg bucket Variable
            print_prefix(str): prefix of print auc

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.print_global_auc(myscope, stat_pos=stat_pos.name,
                ...                           stat_neg=stat_neg.name)

                >>> # below is part of model
                >>> emb = my_slot_net(slots, label) # emb can be fc layer of size 1
                >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(
                ...     emb, min=-15.0, max=15.0), name="similarity_norm")
                >>> binary_predict = paddle.concat(input=[
                ...     paddle.subtract(
                ...         paddle.ceil(similarity_norm),
                ...         similarity_norm),
                ...     similarity_norm],
                ...     axis=1)
                >>> auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos,
                ...     stat_neg] = paddle.static.auc(input=binary_predict,
                ...                                   label=label,curve='ROC',
                ...                                   num_thresholds=4096)

        """
        auc_value = self.get_global_auc(scope, stat_pos, stat_neg)
        self.rank0_print(f"{print_prefix} global auc = {auc_value}")

    def get_global_auc(
        self,
        scope=base.global_scope(),
        stat_pos="_generated_var_2",
        stat_neg="_generated_var_3",
    ):
        """
        Get global auc of all distributed workers.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos(str): name of auc pos bucket Variable
            stat_neg(str): name of auc neg bucket Variable

        Returns:
            auc_value(float), total_ins_num(int)

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> auc_value, _ = fleet_util.get_global_auc(myscope,
                ...                                          stat_pos=stat_pos,
                ...                                          stat_neg=stat_neg)

        """
        if scope.find_var(stat_pos) is None or scope.find_var(stat_neg) is None:
            self.rank0_print("not found auc bucket")
            return None
        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()
        # auc pos bucket
        pos = np.array(scope.find_var(stat_pos).get_tensor())
        # auc pos bucket shape
        old_pos_shape = np.array(pos.shape)
        # reshape to one dim
        pos = pos.reshape(-1)
        global_pos = np.copy(pos) * 0
        # mpi allreduce
        fleet._role_maker._all_reduce(pos, global_pos)
        # reshape to its original shape
        global_pos = global_pos.reshape(old_pos_shape)

        # auc neg bucket
        neg = np.array(scope.find_var(stat_neg).get_tensor())
        old_neg_shape = np.array(neg.shape)
        neg = neg.reshape(-1)
        global_neg = np.copy(neg) * 0
        fleet._role_maker._all_reduce(neg, global_neg)
        global_neg = global_neg.reshape(old_neg_shape)

        # calculate auc
        num_bucket = len(global_pos[0])
        area = 0.0
        pos = 0.0
        neg = 0.0
        new_pos = 0.0
        new_neg = 0.0
        total_ins_num = 0
        for i in range(num_bucket):
            index = num_bucket - 1 - i
            new_pos = pos + global_pos[0][index]
            total_ins_num += global_pos[0][index]
            new_neg = neg + global_neg[0][index]
            total_ins_num += global_neg[0][index]
            area += (new_neg - neg) * (pos + new_pos) / 2
            pos = new_pos
            neg = new_neg

        auc_value = None
        if pos * neg == 0 or total_ins_num == 0:
            auc_value = 0.5
        else:
            auc_value = area / (pos * neg)

        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()
        return auc_value

    def load_fleet_model_one_table(self, table_id, path):
        """
        load pslib model to one table

        Args:
            table_id(int): load model to one table, default is None, which mean
                           load all table.
            path(str): model path

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.load_fleet_model_one_table(1, path="hdfs:/my/model/path")
        """
        fleet.load_one_table(table_id, path)

    def load_fleet_model(self, path, mode=0):
        """
        load pslib model

        Args:
            path(str): model path
            mode(str): 0 or 1, which means load checkpoint or delta model,
                       default is 0

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()

                >>> fleet_util.load_fleet_model("hdfs:/my/model/path")

                >>> fleet_util.load_fleet_model("hdfs:/my/model/path", mode=0)

        """
        fleet.init_server(path, mode=mode)

    def save_fleet_model(self, path, mode=0):
        """
        save pslib model

        Args:
            path(str): model path
            mode(str): 0 or 1, which means save checkpoint or delta model,
                       default is 0

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_fleet_model("hdfs:/my/model/path")

        """
        fleet.save_persistables(None, path, mode=mode)

    def _get_xbox_str(
        self,
        output_path,
        day,
        model_path,
        xbox_base_key,
        data_path,
        hadoop_fs_name,
        monitor_data={},
        mode="patch",
    ):
        xbox_dict = collections.OrderedDict()
        if mode == "base":
            xbox_dict["id"] = str(xbox_base_key)
        elif mode == "patch":
            xbox_dict["id"] = str(int(time.time()))
        else:
            print(f"warning: unknown mode {mode}, set it to patch")
            mode = "patch"
            xbox_dict["id"] = str(int(time.time()))
        xbox_dict["key"] = str(xbox_base_key)
        if model_path.startswith("hdfs:") or model_path.startswith("afs:"):
            model_path = model_path[model_path.find(":") + 1 :]
        xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000"
        xbox_dict["record_count"] = "111111"
        xbox_dict["partition_type"] = "2"
        xbox_dict["job_name"] = "default_job_name"
        xbox_dict["ins_tag"] = "feasign"
        xbox_dict["ins_path"] = data_path
        job_id_with_host = os.popen("echo -n ${JOB_ID}").read().strip()
        instance_id = os.popen("echo -n ${INSTANCE_ID}").read().strip()
        start_pos = instance_id.find(job_id_with_host)
        end_pos = instance_id.find("--")
        if start_pos != -1 and end_pos != -1:
            job_id_with_host = instance_id[start_pos:end_pos]
        xbox_dict["job_id"] = job_id_with_host
        # currently hard code here, set monitor_data empty string
        xbox_dict["monitor_data"] = ""
        xbox_dict["monitor_path"] = (
            output_path.rstrip("/") + "/monitor/" + day + ".txt"
        )
        xbox_dict["mpi_size"] = str(fleet.worker_num())
        return json.dumps(xbox_dict)

    def write_model_donefile(
        self,
        output_path,
        day,
        pass_id,
        xbox_base_key,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
        donefile_name="donefile.txt",
    ):
        """
        write donefile when save model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            xbox_base_key(str|int): xbox base key
            hadoop_fs_name(str): hdfs/afs fs name
            hadoop_fs_ugi(str): hdfs/afs fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is "donefile.txt"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.write_model_donefile(output_path="hdfs:/my/output",
                ...                                 day=20190723,
                ...                                 pass_id=66,
                ...                                 xbox_base_key=int(time.time()),
                ...                                 hadoop_fs_name="hdfs://xxx",
                ...                                 hadoop_fs_ugi="user,passwd")

        """
        day = str(day)
        pass_id = str(pass_id)
        xbox_base_key = int(xbox_base_key)

        if pass_id != "-1":
            suffix_name = f"/{day}/{pass_id}/"
            model_path = output_path.rstrip("/") + suffix_name
        else:
            suffix_name = f"/{day}/0/"
            model_path = output_path.rstrip("/") + suffix_name

        if fleet.worker_index() == 0:
            donefile_path = output_path + "/" + donefile_name
            content = f"{day}\t{xbox_base_key}\t{model_path}\t{pass_id}\t{0}"
            configs = {
                "fs.default.name": hadoop_fs_name,
                "hadoop.job.ugi": hadoop_fs_ugi,
            }
            client = HDFSClient(hadoop_home, configs)
            if client.is_file(donefile_path):
                pre_content = client.cat(donefile_path)
                pre_content_list = pre_content.split("\n")
                day_list = [i.split("\t")[0] for i in pre_content_list]
                pass_list = [i.split("\t")[3] for i in pre_content_list]
                exist = False
                for i in range(len(day_list)):
                    if int(day) == int(day_list[i]) and int(pass_id) == int(
                        pass_list[i]
                    ):
                        exist = True
                        break
                if not exist:
                    with open(donefile_name, "w") as f:
                        f.write(pre_content + "\n")
                        f.write(content + "\n")
                    client.delete(donefile_path)
                    client.upload(donefile_name, output_path)
                    self.rank0_error(
                        f"write {day}/{pass_id} {donefile_name} succeed"
                    )
                else:
                    self.rank0_error(
                        f"not write {donefile_name} because {day}/{pass_id} already "
                        "exists"
                    )
            else:
                with open(donefile_name, "w") as f:
                    f.write(content + "\n")
                client.upload(donefile_name, output_path)
                self.rank0_error(
                    f"write {day}/{pass_id} {donefile_name} succeed"
                )
        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()

    def write_xbox_donefile(
        self,
        output_path,
        day,
        pass_id,
        xbox_base_key,
        data_path,
        hadoop_fs_name,
        hadoop_fs_ugi,
        monitor_data={},
        hadoop_home="$HADOOP_HOME",
        donefile_name=None,
    ):
        """
        write delta donefile or xbox base donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            xbox_base_key(str|int): xbox base key
            data_path(str|list): training data path
            hadoop_fs_name(str): hdfs/afs fs name
            hadoop_fs_ugi(str): hdfs/afs fs ugi
            monitor_data(dict): metrics
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is None"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.write_xbox_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     xbox_base_key=int(time.time()),
                ...     data_path="hdfs:/my/data/",
                ...     hadoop_fs_name="hdfs://xxx",
                ...     hadoop_fs_ugi="user,passwd",
                ...     monitor_data={})

        """
        day = str(day)
        pass_id = str(pass_id)
        xbox_base_key = int(xbox_base_key)
        mode = None

        if pass_id != "-1":
            mode = "patch"
            suffix_name = f"/{day}/delta-{pass_id}/"
            model_path = output_path.rstrip("/") + suffix_name
            if donefile_name is None:
                donefile_name = "xbox_patch_done.txt"
        else:
            mode = "base"
            suffix_name = f"/{day}/base/"
            model_path = output_path.rstrip("/") + suffix_name
            if donefile_name is None:
                donefile_name = "xbox_base_done.txt"

        if isinstance(data_path, list):
            data_path = ",".join(data_path)

        if fleet.worker_index() == 0:
            donefile_path = output_path + "/" + donefile_name
            xbox_str = self._get_xbox_str(
                output_path,
                day,
                model_path,
                xbox_base_key,
                data_path,
                hadoop_fs_name,
                monitor_data={},
                mode=mode,
            )
            configs = {
                "fs.default.name": hadoop_fs_name,
                "hadoop.job.ugi": hadoop_fs_ugi,
            }
            client = HDFSClient(hadoop_home, configs)
            if client.is_file(donefile_path):
                pre_content = client.cat(donefile_path)
                last_dict = json.loads(pre_content.split("\n")[-1])
                last_day = last_dict["input"].split("/")[-3]
                last_pass = last_dict["input"].split("/")[-2].split("-")[-1]
                exist = False
                if (
                    int(day) < int(last_day)
                    or int(day) == int(last_day)
                    and int(pass_id) <= int(last_pass)
                ):
                    exist = True
                if not exist:
                    with open(donefile_name, "w") as f:
                        f.write(pre_content + "\n")
                        f.write(xbox_str + "\n")
                    client.delete(donefile_path)
                    client.upload(donefile_name, output_path)
                    self.rank0_error(
                        f"write {day}/{pass_id} {donefile_name} succeed"
                    )
                else:
                    self.rank0_error(
                        f"not write {donefile_name} because {day}/{pass_id} already "
                        "exists"
                    )
            else:
                with open(donefile_name, "w") as f:
                    f.write(xbox_str + "\n")
                client.upload(donefile_name, output_path)
                self.rank0_error(
                    f"write {day}/{pass_id} {donefile_name} succeed"
                )
        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()

    def write_cache_donefile(
        self,
        output_path,
        day,
        pass_id,
        key_num,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
        donefile_name="sparse_cache.meta",
        **kwargs,
    ):
        """
        write cache donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            key_num(str|int): save cache return value
            hadoop_fs_name(str): hdfs/afs fs name
            hadoop_fs_ugi(str): hdfs/afs fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is "sparse_cache.meta"
            kwargs(dict): user defined properties
                          file_num(int): cache file num
                          table_id(int): cache table id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.write_cache_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     key_num=123456,
                ...     hadoop_fs_name="hdfs://xxx",
                ...     hadoop_fs_ugi="user,passwd")

        """
        day = str(day)
        pass_id = str(pass_id)
        key_num = int(key_num)
        file_num = kwargs.get("file_num", 16)
        table_id = kwargs.get("table_id", 0)

        if pass_id != "-1":
            suffix_name = f"/{day}/delta-{pass_id}/{table_id:03}_cache"
            model_path = output_path.rstrip("/") + suffix_name
        else:
            suffix_name = f"/{day}/base/{table_id:03}_cache"
            model_path = output_path.rstrip("/") + suffix_name

        if fleet.worker_index() == 0:
            donefile_path = model_path + "/" + donefile_name
            configs = {
                "fs.default.name": hadoop_fs_name,
                "hadoop.job.ugi": hadoop_fs_ugi,
            }
            client = HDFSClient(hadoop_home, configs)
            if client.is_file(donefile_path):
                self.rank0_error(
                    f"not write because {donefile_path} already exists"
                )
            else:
                meta_str = f"file_prefix:part\npart_num:{file_num}\nkey_num:{key_num}\n"
                with open(donefile_name, "w") as f:
                    f.write(meta_str)
                client.upload(donefile_name, model_path)
                self.rank0_error(f"write {donefile_path} succeed")
        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()

    def load_model(self, output_path, day, pass_id):
        """
        load pslib model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.load_model("hdfs:/my/path", 20190722, 88)

        """
        day = str(day)
        pass_id = str(pass_id)
        suffix_name = f"/{day}/{pass_id}/"
        load_path = output_path + suffix_name
        self.rank0_error(f"going to load_model {load_path}")
        self.load_fleet_model(load_path)
        self.rank0_error("load_model done")

    def save_model(self, output_path, day, pass_id):
        """
        save pslib model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_model("hdfs:/my/path", 20190722, 88)

        """
        day = str(day)
        pass_id = str(pass_id)
        suffix_name = f"/{day}/{pass_id}/"
        model_path = output_path + suffix_name
        self.rank0_print(f"going to save_model {model_path}")
        self.save_fleet_model(model_path)
        self.rank0_print("save_model done")

    def save_batch_model(self, output_path, day):
        """
        save batch model

        Args:
            output_path(str): output path
            day(str|int): training day

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_batch_model("hdfs:/my/path", 20190722)

        """
        day = str(day)
        suffix_name = f"/{day}/0/"
        model_path = output_path + suffix_name
        self.rank0_print(f"going to save_model {model_path}")
        fleet.save_persistables(None, model_path, mode=3)
        self.rank0_print("save_batch_model done")

    def save_delta_model(self, output_path, day, pass_id):
        """
        save delta model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_delta_model("hdfs:/my/path", 20190722, 88)

        """
        day = str(day)
        pass_id = str(pass_id)
        suffix_name = f"/{day}/delta-{pass_id}/"
        model_path = output_path + suffix_name
        self.rank0_print(f"going to save_delta_model {model_path}")
        fleet.save_persistables(None, model_path, mode=1)
        self.rank0_print("save_delta_model done")

    def save_xbox_base_model(self, output_path, day):
        """
        save xbox base model

        Args:
            output_path(str): output path
            day(str|int): training day

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722)

        """
        day = str(day)
        suffix_name = f"/{day}/base/"
        model_path = output_path + suffix_name
        self.rank0_print("going to save_xbox_base_model " + model_path)
        fleet.save_persistables(None, model_path, mode=2)
        self.rank0_print("save_xbox_base_model done")

    def save_cache_model(self, output_path, day, pass_id, mode=1, **kwargs):
        """
        save cache model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            mode(str|int): save mode
            kwargs(dict): user defined properties
                          table_id(int): table id to save cache

        Returns:
            key_num(int): cache key num

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_cache_model("hdfs:/my/path", 20190722, 88)

        """
        day = str(day)
        pass_id = str(pass_id)
        mode = int(mode)
        table_id = kwargs.get("table_id", 0)
        suffix_name = f"/{day}/delta-{pass_id}"
        model_path = output_path.rstrip("/") + suffix_name
        self.rank0_print(f"going to save_cache_model {model_path}")
        key_num = fleet.save_cache_model(
            None, model_path, mode=mode, table_id=table_id
        )
        self.rank0_print("save_cache_model done")
        return key_num

    def save_cache_base_model(self, output_path, day, **kwargs):
        """
        save cache model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            kwargs(dict): user defined properties
                          table_id(int): table id to save cache

        Returns:
            key_num(int): cache key num

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_cache_base_model("hdfs:/my/path", 20190722)

        """
        day = str(day)
        table_id = kwargs.get("table_id", 0)
        suffix_name = f"/{day}/base"
        model_path = output_path.rstrip("/") + suffix_name
        self.rank0_print(f"going to save_cache_base_model {model_path}")
        key_num = fleet.save_cache_model(
            None, model_path, mode=2, table_id=table_id
        )
        self.rank0_print("save_cache_base_model done")
        return key_num

    def pull_all_dense_params(self, scope, program):
        """
        pull all dense params in trainer of rank 0

        Args:
            scope(Scope): base Scope
            program(Program): base Program

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.pull_all_dense_params(my_scope, my_program)

        """
        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()
        if fleet._role_maker.is_first_worker():
            prog_id = str(id(program))
            tables = (
                fleet._opt_info["program_id_to_worker"][prog_id]
                .get_desc()
                .dense_table
            )
            prog_conf = fleet._opt_info['program_configs'][prog_id]
            prog_tables = {}
            for key in prog_conf:
                if "dense" not in key:
                    continue
                for table_id in prog_conf[key]:
                    prog_tables[int(table_id)] = 0
            for table in tables:
                if int(table.table_id) not in prog_tables:
                    continue
                var_name_list = []
                for i in range(0, len(table.dense_variable_name)):
                    var_name = table.dense_variable_name[i]
                    if scope.find_var(var_name) is None:
                        raise ValueError(
                            "var "
                            + var_name
                            + " not found in scope "
                            + "when pull dense"
                        )
                    var_name_list.append(var_name)
                fleet._fleet_ptr.pull_dense(
                    scope, int(table.table_id), var_name_list
                )
        fleet._role_maker._barrier_worker()

    def save_paddle_inference_model(
        self,
        executor,
        scope,
        program,
        feeded_vars,
        target_vars,
        output_path,
        day,
        pass_id,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
        save_combine=True,
    ):
        """
        save paddle inference model, and upload to hdfs dnn_plugin path

        Args:
            executor(Executor): base Executor
            scope(Scope): base Scope
            program(Program): base Program
            feeded_vars(list[Variable]): feed vars
            target_vars(list[variable]): fetch vars
            output_path(str): hdfs/afs output path
            day(str|int): training day
            pass_id(str|int): training pass
            hadoop_fs_name(str): hadoop fs name
            hadoop_fs_ugi(str): hadoop fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            save_combine(bool): whether to save in a file or separate files,
                                default is True

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_paddle_inference_model(exe,
                ...                                        join_scope,
                ...                                        join_program,
                ...                                        feeded_vars,
                ...                                        target_vars,
                ...                                        "hdfs:/my/output/path/",
                ...                                        day=20190727,
                ...                                        pass_id=6,
                ...                                        hadoop_fs_name="xxx",
                ...                                        hadoop_fs_ugi="xxx,xxx")
        """
        day = str(day)
        pass_id = str(pass_id)
        model_name = "inference_model"
        # pull dense before save
        self.pull_all_dense_params(scope, program)
        if fleet.worker_index() == 0:
            with base.scope_guard(scope):
                paddle.static.io.save_inference_model(
                    model_name,
                    feeded_vars,
                    target_vars,
                    executor,
                    program=program.clone(),
                )

            configs = {
                "fs.default.name": hadoop_fs_name,
                "hadoop.job.ugi": hadoop_fs_ugi,
            }
            client = HDFSClient(hadoop_home, configs)

            if pass_id == "-1":
                dest = f"{output_path}/{day}/base/dnn_plugin/"
            else:
                dest = f"{output_path}/{day}/delta-{pass_id}/dnn_plugin/"
            if not client.is_exist(dest):
                client.makedirs(dest)

            client.upload(model_name, dest, multi_processes=5, overwrite=True)

        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()

    def save_paddle_params(
        self,
        executor,
        scope,
        program,
        model_name,
        output_path,
        day,
        pass_id,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
        var_names=None,
        save_combine=True,
    ):
        """
        save paddle model, and upload to hdfs dnn_plugin path

        Args:
            executor(Executor): base Executor
            scope(Scope): base Scope
            program(Program): base Program
            model_name(str): save model local dir or filename
            output_path(str): hdfs/afs output path
            day(str|int): training day
            pass_id(str|int): training pass
            hadoop_fs_name(str): hadoop fs name
            hadoop_fs_ugi(str): hadoop fs ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            var_names(list): save persistable var names, default is None
            save_combine(bool): whether to save in a file or separate files,
                                default is True

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.save_paddle_params(exe,
                ...                               join_scope,
                ...                               join_program,
                ...                               "paddle_dense.model.0",
                ...                               "hdfs:/my/output/path/",
                ...                               day=20190727,
                ...                               pass_id=6,
                ...                               hadoop_fs_name="xxx",
                ...                               hadoop_fs_ugi="xxx,xxx",
                ...                               var_names=join_all_var_names)
                >>> fleet_util.save_paddle_params(exe,
                ...                               join_scope,
                ...                               join_program,
                ...                               "paddle_dense.model.usr.0",
                ...                               "hdfs:/my/output/path/",
                ...                               day=20190727,
                ...                               pass_id=6,
                ...                               hadoop_fs_name="xxx",
                ...                               hadoop_fs_ugi="xxx,xxx",
                ...                               var_names=join_user_var_names)
                >>> fleet_util.save_paddle_params(exe,
                ...                               join_scope,
                ...                               join_program,
                ...                               "paddle_dense.model.item.0",
                ...                               "hdfs:/my/output/path/",
                ...                               day=20190727,
                ...                               pass_id=6,
                ...                               hadoop_fs_name="xxx",
                ...                               hadoop_fs_ugi="xxx,xxx",
                ...                               var_names=join_user_item_names)

        """
        day = str(day)
        pass_id = str(pass_id)
        # pull dense before save
        self.pull_all_dense_params(scope, program)
        if fleet.worker_index() == 0:
            vars = [program.global_block().var(i) for i in var_names]
            with base.scope_guard(scope):
                if save_combine:
                    paddle.static.io.save_vars(
                        executor, "./", program, vars=vars, filename=model_name
                    )
                else:
                    paddle.static.io.save_vars(
                        executor, model_name, program, vars=vars
                    )

            configs = {
                "fs.default.name": hadoop_fs_name,
                "hadoop.job.ugi": hadoop_fs_ugi,
            }
            client = HDFSClient(hadoop_home, configs)

            if pass_id == "-1":
                dest = f"{output_path}/{day}/base/dnn_plugin/"
            else:
                dest = f"{output_path}/{day}/delta-{pass_id}/dnn_plugin/"
            if not client.is_exist(dest):
                client.mkdirs(dest)
            client.upload(model_name, dest, multi_processes=5, overwrite=True)

        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()

    def get_last_save_xbox_base(
        self,
        output_path,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
    ):
        r"""
        get last saved base xbox info from xbox_base_done.txt

        Args:
            output_path(str): output path
            hadoop_fs_name(str): hdfs/afs fs_name
            hadoop_fs_ugi(str): hdfs/afs fs_ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"

        Returns:
            [last_save_day, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> last_save_day, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox_base("hdfs:/my/path",
                ...                                        hadoop_fs_name="hdfs://xxx",
                ...                                        hadoop_fs_ugi="user,passwd")

        """
        donefile_path = output_path + "/xbox_base_done.txt"
        configs = {
            "fs.default.name": hadoop_fs_name,
            "hadoop.job.ugi": hadoop_fs_ugi,
        }
        client = HDFSClient(hadoop_home, configs)
        if not client.is_file(donefile_path):
            return [-1, -1, int(time.time())]
        pre_content = client.cat(donefile_path)
        last_dict = json.loads(pre_content.split("\n")[-1])
        last_day = int(last_dict["input"].split("/")[-3])
        last_path = "/".join(last_dict["input"].split("/")[:-1])
        xbox_base_key = int(last_dict["key"])
        return [last_day, last_path, xbox_base_key]

    def get_last_save_xbox(
        self,
        output_path,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
    ):
        r"""
        get last saved xbox info from xbox_patch_done.txt

        Args:
            output_path(str): output path
            hadoop_fs_name(str): hdfs/afs fs_name
            hadoop_fs_ugi(str): hdfs/afs fs_ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox("hdfs:/my/path",
                ...                                   hadoop_fs_name="hdfs://xxx",
                ...                                   hadoop_fs_ugi="user,passwd")

        """
        donefile_path = output_path + "/xbox_patch_done.txt"
        configs = {
            "fs.default.name": hadoop_fs_name,
            "hadoop.job.ugi": hadoop_fs_ugi,
        }
        client = HDFSClient(hadoop_home, configs)
        if not client.is_file(donefile_path):
            return [-1, -1, "", int(time.time())]
        pre_content = client.cat(donefile_path)
        last_dict = json.loads(pre_content.split("\n")[-1])
        last_day = int(last_dict["input"].split("/")[-3])
        last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1])
        last_path = "/".join(last_dict["input"].split("/")[:-1])
        xbox_base_key = int(last_dict["key"])
        return [last_day, last_pass, last_path, xbox_base_key]

    def get_last_save_model(
        self,
        output_path,
        hadoop_fs_name,
        hadoop_fs_ugi,
        hadoop_home="$HADOOP_HOME",
    ):
        r"""
        get last saved model info from donefile.txt

        Args:
            output_path(str): output path
            hadoop_fs_name(str): hdfs/afs fs_name
            hadoop_fs_ugi(str): hdfs/afs fs_ugi
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_model("hdfs:/my/path",
                ...                                    hadoop_fs_name="hdfs://xxx",
                ...                                    hadoop_fs_ugi="user,passwd")

        """
        last_save_day = -1
        last_save_pass = -1
        last_path = ""
        donefile_path = output_path + "/donefile.txt"
        configs = {
            "fs.default.name": hadoop_fs_name,
            "hadoop.job.ugi": hadoop_fs_ugi,
        }
        client = HDFSClient(hadoop_home, configs)
        if not client.is_file(donefile_path):
            return [-1, -1, "", int(time.time())]
        content = client.cat(donefile_path)
        content = content.split("\n")[-1].split("\t")
        last_save_day = int(content[0])
        last_save_pass = int(content[3])
        last_path = content[2]
        xbox_base_key = int(content[1])
        return [last_save_day, last_save_pass, last_path, xbox_base_key]

    def get_online_pass_interval(
        self, days, hours, split_interval, split_per_pass, is_data_hourly_placed
    ):
        """
        get online pass interval

        Args:
            days(str): days to train
            hours(str): hours to train
            split_interval(int|str): split interval
            split_per_pass(int}str): split per pass
            is_data_hourly_placed(bool): is data hourly placed

        Returns:
            online_pass_interval(list)

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> online_pass_interval = fleet_util.get_online_pass_interval(
                ...     days="{20190720..20190729}",
                ...     hours="{0..23}",
                ...     split_interval=5,
                ...     split_per_pass=2,
                ...     is_data_hourly_placed=False)

        """
        pattern = r'^\d+|{[0-9]+}|{[0-9]+\.\.[0-9]+}$'
        if not re.fullmatch(pattern, str(days)):
            raise Exception("days format is not right")
        days = os.popen("echo -n " + days).read().split(" ")
        if not re.fullmatch(pattern, str(hours)):
            raise Exception("hours format is not right")
        hours = os.popen("echo -n " + hours).read().split(" ")
        split_interval = int(split_interval)
        split_per_pass = int(split_per_pass)
        splits_per_day = (
            (int(hours[-1]) - int(hours[0]) + 1) * 60 // split_interval
        )
        pass_per_day = splits_per_day // split_per_pass
        left_train_hour = int(hours[0])
        right_train_hour = int(hours[-1])

        start = 0
        split_path = []
        for i in range(splits_per_day):
            h = start // 60
            m = start % 60
            if h < left_train_hour or h > right_train_hour:
                start += split_interval
                continue
            if is_data_hourly_placed:
                split_path.append(f"{h:02}")
            else:
                split_path.append(f"{h:02}{m:02}")
            start += split_interval

        start = 0
        online_pass_interval = []
        for i in range(pass_per_day):
            online_pass_interval.append([])
            for j in range(start, start + split_per_pass):
                online_pass_interval[i].append(split_path[j])
            start += split_per_pass

        return online_pass_interval

    def get_global_metrics(
        self,
        scope=base.global_scope(),
        stat_pos_name="_generated_var_2",
        stat_neg_name="_generated_var_3",
        sqrerr_name="sqrerr",
        abserr_name="abserr",
        prob_name="prob",
        q_name="q",
        pos_ins_num_name="pos",
        total_ins_num_name="total",
    ):
        r"""
        get global metrics, including auc, bucket_error, mae, rmse,
        actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos_name(str): name of auc pos bucket Variable
            stat_neg_name(str): name of auc neg bucket Variable
            sqrerr_name(str): name of sqrerr Variable
            abserr_name(str): name of abserr Variable
            prob_name(str): name of prob Variable
            q_name(str): name of q Variable
            pos_ins_num_name(str): name of pos ins num Variable
            total_ins_num_name(str): name of total ins num Variable

        Returns:
            [auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc,
             mean_predict_qvalue, total_ins_num]

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> metric_list = fleet_util.get_global_metrics(myscope,
                ...                                             stat_pos.name,
                ...                                             stat_neg.name,
                ...                                             local_sqrerr.name,
                ...                                             local_abserr.name,
                ...                                             local_prob.name,
                ...                                             local_q.name,
                ...                                             local_pos_ins.name,
                ...                                             local_total_ins.name)

                >>> # below is part of example model
                >>> label = paddle.static.data(name="click", shape=[-1, 1],\
                ...     dtype="int64")
                >>> emb = my_slot_net(slots, label) # emb can be fc layer of size 1
                >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(\
                ...     emb, min=-15.0, max=15.0), name="similarity_norm")\
                >>> binary_predict = paddle.concat(input=[\
                ...     paddle.subtract(\
                ...         paddle.ceil(similarity_norm), similarity_norm),\
                ...     similarity_norm], axis=1)
                >>> auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
                ...     stat_neg] = paddle.static.auc(input=binary_predict,\
                ...                                  label=label, curve='ROC',\
                ...                                  num_thresholds=4096)
                >>> local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins,\
                ...     local_total_ins = paddle.static.ctr_metric_bundle(\
                ...         similarity_norm, label)

        """
        if (
            scope.find_var(stat_pos_name) is None
            or scope.find_var(stat_neg_name) is None
        ):
            self.rank0_print("not found auc bucket")
            return [None] * 9
        elif scope.find_var(sqrerr_name) is None:
            self.rank0_print(f"not found sqrerr_name={sqrerr_name}")
            return [None] * 9
        elif scope.find_var(abserr_name) is None:
            self.rank0_print(f"not found abserr_name={abserr_name}")
            return [None] * 9
        elif scope.find_var(prob_name) is None:
            self.rank0_print(f"not found prob_name={prob_name}")
            return [None] * 9
        elif scope.find_var(q_name) is None:
            self.rank0_print(f"not found q_name={q_name}")
            return [None] * 9
        elif scope.find_var(pos_ins_num_name) is None:
            self.rank0_print(f"not found pos_ins_num_name={pos_ins_num_name}")
            return [None] * 9
        elif scope.find_var(total_ins_num_name) is None:
            self.rank0_print(
                f"not found total_ins_num_name={total_ins_num_name}"
            )
            return [None] * 9

        # barrier worker to ensure all workers finished training
        if self.mode == "pscore":
            fleet.barrier_worker()
        else:
            fleet._role_maker._barrier_worker()

        # get auc
        auc = self.get_global_auc(scope, stat_pos_name, stat_neg_name)
        pos = np.array(scope.find_var(stat_pos_name).get_tensor())
        # auc pos bucket shape
        old_pos_shape = np.array(pos.shape)
        # reshape to one dim
        pos = pos.reshape(-1)
        global_pos = np.copy(pos) * 0
        # mpi allreduce
        fleet._role_maker._all_reduce(pos, global_pos)
        # reshape to its original shape
        global_pos = global_pos.reshape(old_pos_shape)
        # auc neg bucket
        neg = np.array(scope.find_var(stat_neg_name).get_tensor())
        old_neg_shape = np.array(neg.shape)
        neg = neg.reshape(-1)
        global_neg = np.copy(neg) * 0
        fleet._role_maker._all_reduce(neg, global_neg)
        global_neg = global_neg.reshape(old_neg_shape)

        num_bucket = len(global_pos[0])

        def get_metric(name):
            metric = np.array(scope.find_var(name).get_tensor())
            old_metric_shape = np.array(metric.shape)
            metric = metric.reshape(-1)
            global_metric = np.copy(metric) * 0
            fleet._role_maker._all_reduce(metric, global_metric)
            global_metric = global_metric.reshape(old_metric_shape)
            return global_metric[0]

        global_sqrerr = get_metric(sqrerr_name)
        global_abserr = get_metric(abserr_name)
        global_prob = get_metric(prob_name)
        global_q_value = get_metric(q_name)
        # note: get ins_num from auc bucket is not actual value,
        # so get it from metric op
        pos_ins_num = get_metric(pos_ins_num_name)
        total_ins_num = get_metric(total_ins_num_name)
        neg_ins_num = total_ins_num - pos_ins_num

        mae = global_abserr / total_ins_num
        rmse = math.sqrt(global_sqrerr / total_ins_num)
        return_actual_ctr = pos_ins_num / total_ins_num
        predicted_ctr = global_prob / total_ins_num
        mean_predict_qvalue = global_q_value / total_ins_num
        copc = 0.0
        if abs(predicted_ctr > 1e-6):
            copc = return_actual_ctr / predicted_ctr

        # calculate bucket error
        last_ctr = -1.0
        impression_sum = 0.0
        ctr_sum = 0.0
        click_sum = 0.0
        error_sum = 0.0
        error_count = 0.0
        click = 0.0
        show = 0.0
        ctr = 0.0
        adjust_ctr = 0.0
        relative_error = 0.0
        actual_ctr = 0.0
        relative_ctr_error = 0.0
        k_max_span = 0.01
        k_relative_error_bound = 0.05
        for i in range(num_bucket):
            click = global_pos[0][i]
            show = global_pos[0][i] + global_neg[0][i]
            ctr = float(i) / num_bucket
            if abs(ctr - last_ctr) > k_max_span:
                last_ctr = ctr
                impression_sum = 0.0
                ctr_sum = 0.0
                click_sum = 0.0
            impression_sum += show
            ctr_sum += ctr * show
            click_sum += click
            if impression_sum == 0:
                continue
            adjust_ctr = ctr_sum / impression_sum
            if adjust_ctr == 0:
                continue
            relative_error = math.sqrt(
                (1 - adjust_ctr) / (adjust_ctr * impression_sum)
            )
            if relative_error < k_relative_error_bound:
                actual_ctr = click_sum / impression_sum
                relative_ctr_error = abs(actual_ctr / adjust_ctr - 1)
                error_sum += relative_ctr_error * impression_sum
                error_count += impression_sum
                last_ctr = -1

        bucket_error = error_sum / error_count if error_count > 0 else 0.0

        return [
            auc,
            bucket_error,
            mae,
            rmse,
            return_actual_ctr,
            predicted_ctr,
            copc,
            mean_predict_qvalue,
            int(total_ins_num),
        ]

    def print_global_metrics(
        self,
        scope=base.global_scope(),
        stat_pos_name="_generated_var_2",
        stat_neg_name="_generated_var_3",
        sqrerr_name="sqrerr",
        abserr_name="abserr",
        prob_name="prob",
        q_name="q",
        pos_ins_num_name="pos",
        total_ins_num_name="total",
        print_prefix="",
    ):
        r"""
        print global metrics, including auc, bucket_error, mae, rmse,
        actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num.

        Args:
            scope(Scope): Scope object, default is base.global_scope()
            stat_pos_name(str): name of auc pos bucket Variable
            stat_neg_name(str): name of auc neg bucket Variable
            sqrerr_name(str): name of sqrerr Variable
            abserr_name(str): name of abserr Variable
            prob_name(str): name of prob Variable
            q_name(str): name of q Variable
            pos_ins_num_name(str): name of pos ins num Variable
            total_ins_num_name(str): name of total ins num Variable
            print_prefix(str): print prefix

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> # doctest: +SKIP('dependency on custom variables')
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> fleet_util.print_global_metrics(myscope,
                ...                                 stat_pos.name,
                ...                                 stat_neg.name,
                ...                                 local_sqrerr.name,
                ...                                 local_abserr.name,
                ...                                 local_prob.name,
                ...                                 local_q.name,
                ...                                 local_pos_ins.name,
                ...                                 local_total_ins.name)

                >>> # below is part of model
                >>> label = paddle.static.data(name="click", shape=[-1, 1],\
                ...     dtype="int64")
                >>> emb = my_slot_net(slots, label) # emb can be fc layer of size 1
                >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(\
                ...     emb, min=-15.0, max=15.0), name="similarity_norm")\
                >>> binary_predict = paddle.concat(input=[\
                ...     paddle.subtract(\
                ...         paddle.ceil(similarity_norm), similarity_norm),\
                ...     similarity_norm], axis=1)
                >>> auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
                ...     stat_neg] = paddle.static.auc(input=binary_predict,\
                ...                                  label=label, curve='ROC',\
                ...                                  num_thresholds=4096)
                >>> local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins, \
                ...     local_total_ins = paddle.static.ctr_metric_bundle(\
                ...         similarity_norm, label)

        """
        if (
            scope.find_var(stat_pos_name) is None
            or scope.find_var(stat_neg_name) is None
        ):
            self.rank0_print("not found auc bucket")
            return
        elif scope.find_var(sqrerr_name) is None:
            self.rank0_print(f"not found sqrerr_name={sqrerr_name}")
            return
        elif scope.find_var(abserr_name) is None:
            self.rank0_print(f"not found abserr_name={abserr_name}")
            return
        elif scope.find_var(prob_name) is None:
            self.rank0_print(f"not found prob_name={prob_name}")
            return
        elif scope.find_var(q_name) is None:
            self.rank0_print(f"not found q_name={q_name}")
            return
        elif scope.find_var(pos_ins_num_name) is None:
            self.rank0_print(f"not found pos_ins_num_name={pos_ins_num_name}")
            return
        elif scope.find_var(total_ins_num_name) is None:
            self.rank0_print(
                f"not found total_ins_num_name={total_ins_num_name}"
            )
            return

        (
            auc,
            bucket_error,
            mae,
            rmse,
            actual_ctr,
            predicted_ctr,
            copc,
            mean_predict_qvalue,
            total_ins_num,
        ) = self.get_global_metrics(
            scope,
            stat_pos_name,
            stat_neg_name,
            sqrerr_name,
            abserr_name,
            prob_name,
            q_name,
            pos_ins_num_name,
            total_ins_num_name,
        )
        self.rank0_print(
            f"{print_prefix} global AUC={auc:.6f} BUCKET_ERROR={bucket_error:.6f} MAE={mae:.6f} "
            f"RMSE={rmse:.6f} Actual_CTR={actual_ctr:.6f} Predicted_CTR={predicted_ctr:.6f} "
            f"COPC={copc:.6f} MEAN Q_VALUE={mean_predict_qvalue:.6f} Ins number={total_ins_num}"
        )

    def program_type_trans(self, prog_dir, prog_fn, is_text):
        return utils.program_type_trans(prog_dir, prog_fn, is_text)

    def load_program(self, model_filename, is_text):
        return utils.load_program(model_filename, is_text)

    def draw_from_program_file(
        self, model_filename, is_text, output_dir, output_filename
    ):
        """draw program from file"""
        program = self.load_program(model_filename, is_text)
        utils.graphviz(program.global_block(), output_dir, output_filename)

    def draw_from_program(self, program, output_dir, output_name):
        """draw Program"""
        utils.graphviz(program.global_block(), output_dir, output_name)

    def check_two_programs(self, config):
        train_prog = self.load_program(
            config.train_prog_path, config.is_text_train_program
        )
        pruned_prog = self.load_program(
            config.pruned_prog_path, config.is_text_pruned_program
        )
        if config.draw:
            pruned_dir = os.path.dirname(config.pruned_prog_path)
            self.draw_from_program(
                pruned_prog, pruned_dir, config.draw_out_name
            )
        res = utils.check_pruned_program_vars(train_prog, pruned_prog)
        if res:
            _logger.info("check_programs succeed.")
        else:
            _logger.info(
                "check_programs failed. pruned program and train program not match!"
            )
        return res

    def check_vars_and_dump(self, config):
        _logger.info("start check_vars_and_dump.")
        results = utils.check_saved_vars_try_dump(
            config.dump_model_dir,
            config.dump_program_filename,
            config.is_text_dump_program,
            config.feed_config,
            config.fetch_config,
            config.batch_size,
            config.save_params_filename,
        )
        _logger.info("check_vars_and_dump succeed.")
        return results

    def parse_program_proto(self, prog_path, is_text, output_dir):
        """
        Parse program.proto into a more readable format.
        This function will generate three files:
        output_dir/vars_all.log,
        output_dir/vars_persistable.log,
        output_dir/ops.log.

        Args:
            prog_path(str): proto file path to be parsed.
            is_text(bool): proto file is human-readale format or not(binary).
            output_dir(str): output dir.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import FleetUtil
                >>> fleet_util = FleetUtil()
                >>> program_path = "./program.pbtxt"
                >>> is_text = True
                >>> output_dir = "/tmp/"
                >>> fleet_util.parse_program_proto(program_path, is_text, output_dir)
        """
        program = self.load_program(prog_path, is_text)
        utils.parse_program(program, output_dir)


class GPUPSUtil(FleetUtil):
    """
    GPUPSUtil provides some common functions for users' convenience.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
            >>> fleet_util = GPUPSUtil()
            >>> fleet_util.rank0_print("my log")
    """

    def __init__(self, fs_client=None, mode="pslib"):
        super().__init__(mode)
        self._afs = fs_client
        # self._afs = fs_client._fs

    def init(self, fs_name, fs_user, fs_passwd, fs_conf):
        r"""
        init for fs config

        Args:
            fs_name(str): fs name
            fs_user(str): fs user
            fs_passwd(str): fs password
            fs_conf(str): fs and afs conf path

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.init(20190722, 88, 88, "./afs.conf")
        """
        self._afs.init(fs_name, fs_user, fs_passwd, fs_conf)

    def set_fsclient(self, fs_client):
        r"""
        set fs_client for fs config

        Args:
            fs_client(AFSClient): fs_client object

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
        """
        self._afs = fs_client

    def get_last_save_xbox_base(self, output_path):
        r"""
        get last saved base xbox info from xbox_base_done.txt

        Args:
            output_path(str): output path

        Returns:
            [last_save_day, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> last_save_day, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox_base("hdfs:/my/path")

        """
        donefile_path = output_path + "/xbox_base_done.txt"

        if not self._afs.is_file(donefile_path):
            return [-1, -1, int(time.time())]
        self._afs.download(donefile_path, "./xbox_base_done.txt")
        # pre_content = self._afs.cat(donefile_path)
        pre_content = ""
        with open("xbox_base_done.txt", "r") as f:
            pre_content = f.read()
        pre_content = pre_content.strip()
        last_dict = json.loads(pre_content.split("\n")[-1])
        last_day = int(last_dict["input"].split("/")[-3])
        last_path = "/".join(last_dict["input"].split("/")[:-1])
        xbox_base_key = int(last_dict["key"])
        return [last_day, last_path, xbox_base_key]

    def get_last_save_xbox(self, output_path):
        r"""
        get last saved xbox info from xbox_patch_done.txt

        Args:
            output_path(str): output path

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_xbox("hdfs:/my/path")

        """
        donefile_path = output_path + "/xbox_patch_done.txt"

        if not self._afs.is_file(donefile_path):
            return [-1, -1, "", int(time.time())]
        self._afs.download(donefile_path, "xbox_patch_done.txt")
        pre_content = ""
        with open("xbox_patch_done.txt", "r") as f:
            pre_content = f.read()
        pre_content = pre_content.strip()
        last_dict = json.loads(pre_content.split("\n")[-1])
        last_day = int(last_dict["input"].split("/")[-3])
        last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1])
        last_path = "/".join(last_dict["input"].split("/")[:-1])
        xbox_base_key = int(last_dict["key"])
        os.remove("xbox_patch_done.txt")
        return [last_day, last_pass, last_path, xbox_base_key]

    def get_last_save_model(self, output_path):
        r"""
        get last saved model info from donefile.txt

        Args:
            output_path(str): output path

        Returns:
            [last_save_day, last_save_pass, last_path, xbox_base_key]
            last_save_day(int): day of saved model
            last_save_pass(int): pass id of saved
            last_path(str): model path
            xbox_base_key(int): xbox key

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> last_save_day, last_save_pass, last_path, xbox_base_key = \
                ...     fleet_util.get_last_save_model("hdfs:/my/path")

        """
        last_save_day = -1
        last_save_pass = -1
        last_path = ""
        donefile_path = output_path + "/donefile.txt"
        if not self._afs.is_file(donefile_path):
            return [-1, -1, "", int(time.time())]
        self._afs.download(donefile_path, "./donefile.txt")
        content = ""
        with open("donefile.txt", "r") as f:
            content = f.read()
        content = content.strip().split("\n")[-1].split("\t")
        last_save_day = int(content[0])
        last_save_pass = int(content[3])
        last_path = content[2]
        xbox_base_key = int(content[1])
        os.remove("donefile.txt")
        return [last_save_day, last_save_pass, last_path, xbox_base_key]

    def write_model_donefile(
        self,
        output_path,
        day,
        pass_id,
        xbox_base_key,
        donefile_name="donefile.txt",
    ):
        """
        write donefile when save model

        Args:
            output_path(str): output path
            day(str|int): training day
            pass_id(str|int): training pass id
            xbox_base_key(str|int): xbox base key
            donefile_name(str): donefile name, default is "donefile.txt"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> fleet_util.write_model_donefile(output_path="hdfs:/my/output",
                ...                                 day=20190723,
                ...                                 pass_id=66,
                ...                                 xbox_base_key=int(time.time()))

        """
        day = str(day)
        pass_id = str(pass_id)
        xbox_base_key = int(xbox_base_key)

        if pass_id != "-1":
            suffix_name = f"/{day}/{pass_id}/"
            model_path = output_path.rstrip("/") + suffix_name
        else:
            suffix_name = f"/{day}/0/"
            model_path = output_path.rstrip("/") + suffix_name

        if fleet.worker_index() == 0:
            donefile_path = output_path + "/" + donefile_name
            content = f"{day}\t{xbox_base_key}\t{model_path}\t{pass_id}\t{0}"
            if self._afs.is_file(donefile_path):
                self._afs.download(donefile_path, donefile_name)
                pre_content = ""
                with open(donefile_name, "r") as f:
                    pre_content = f.read()
                pre_content_list = pre_content.strip().split("\n")
                day_list = [i.split("\t")[0] for i in pre_content_list]
                pass_list = [i.split("\t")[3] for i in pre_content_list]
                os.remove(donefile_name)
                exist = False
                for i in range(len(day_list)):
                    if int(day) == int(day_list[i]) and int(pass_id) == int(
                        pass_list[i]
                    ):
                        exist = True
                        break
                if not exist:
                    with open(donefile_name, "w") as f:
                        f.write(pre_content.strip() + "\n")
                        f.write(content + "\n")
                    self._afs.delete(donefile_path)
                    self._afs.upload(donefile_name, donefile_path)
                    self.rank0_error(
                        f"write {day}/{pass_id} {donefile_name} succeed"
                    )
                else:
                    self.rank0_error(
                        f"not write {donefile_name} because {day}/{pass_id} already "
                        "exists"
                    )
            else:
                with open(donefile_name, "w") as f:
                    f.write(content + "\n")
                self._afs.upload(donefile_name, donefile_path)
                self.rank0_error(
                    f"write {day}/{pass_id} {donefile_name} succeed"
                )

    def write_xbox_donefile(
        self,
        output_path,
        day,
        pass_id,
        xbox_base_key,
        data_path,
        hadoop_fs_name,
        hadoop_fs_ugi,
        monitor_data={},
        hadoop_home="$HADOOP_HOME",
        donefile_name=None,
    ):
        """
        write delta donefile or xbox base donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            xbox_base_key(str|int): xbox base key
            data_path(str|list): training data path
            monitor_data(dict): metrics
            hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
            donefile_name(str): donefile name, default is None"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> fleet_util.write_xbox_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     xbox_base_key=int(time.time()),
                ...     data_path="hdfs:/my/data/",
                ...     monitor_data={})

        """
        day = str(day)
        pass_id = str(pass_id)
        xbox_base_key = int(xbox_base_key)
        mode = None
        if pass_id != "-1":
            mode = "patch"
            suffix_name = f"/{day}/delta-{pass_id}/"
            model_path = output_path.rstrip("/") + suffix_name
            if donefile_name is None:
                donefile_name = "xbox_patch_done.txt"
        else:
            mode = "base"
            suffix_name = f"/{day}/base/"
            model_path = output_path.rstrip("/") + suffix_name
            if donefile_name is None:
                donefile_name = "xbox_base_done.txt"

        if isinstance(data_path, list):
            data_path = ",".join(data_path)
        if fleet.worker_index() == 0:
            donefile_path = output_path + "/" + donefile_name
            xbox_str = self._get_xbox_str(
                output_path,
                day,
                model_path,
                xbox_base_key,
                data_path,
                hadoop_fs_name,
                monitor_data={},
                mode=mode,
            )

            if self._afs.is_exist(donefile_path):
                self.rank0_info(f"exist {donefile_path} succeed")
                self._afs.download(donefile_path, donefile_name)
                pre_content = ""
                with open(donefile_name, "r") as f:
                    pre_content = f.read()
                last_dict = json.loads(pre_content.strip().split("\n")[-1])
                last_day = last_dict["input"].split("/")[-3]
                last_pass = last_dict["input"].split("/")[-2].split("-")[-1]

                os.remove(donefile_name)
                self.rank0_info(f"remove {donefile_name} succeed")
                exist = False
                if (
                    int(day) < int(last_day)
                    or int(day) == int(last_day)
                    and int(pass_id) <= int(last_pass)
                ):
                    exist = True
                if not exist:
                    with open(donefile_name, "w") as f:
                        f.write(pre_content.strip() + "\n")
                        f.write(xbox_str + "\n")
                    self._afs.delete(donefile_path)
                    self._afs.upload(donefile_name, donefile_path)
                    self.rank0_info(
                        f"write {day}/{pass_id} {donefile_name} succeed"
                    )
                else:
                    self.rank0_info(
                        f"not write {donefile_name} because {day}/{pass_id} already "
                        "exists"
                    )
            else:
                with open(donefile_name, "w") as f:
                    f.write(xbox_str + "\n")
                self._afs.upload(donefile_name, donefile_path)
                self.rank0_error(
                    f"write {day}/{pass_id} {donefile_name} succeed"
                )

    def write_cache_donefile(
        self,
        output_path,
        day,
        pass_id,
        key_num,
        donefile_name="sparse_cache.meta",
        **kwargs,
    ):
        """
        write cache donefile

        Args:
            output_path(str): output path
            day(str|int): training day of model
            pass_id(str|int): training pass id of model
            key_num(str|int): save cache return value
            donefile_name(str): donefile name, default is "sparse_cache.meta"
            kwargs(dict): user defined properties
                          file_num(int): cache file num
                          table_id(int): cache table id

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.incubate.distributed.fleet.fleet_util import GPUPSUtil
                >>> from paddle.distributed.fleet.utils.fs import AFSClient
                >>> hdfs_client = AFSClient()
                >>> fleet_util = GPUPSUtil()
                >>> fleet_util.set_fsclient(hdfs_client)
                >>> fleet_util.write_cache_donefile(
                ...     output_path="hdfs:/my/output/",
                ...     day=20190722,
                ...     pass_id=1,
                ...     key_num=123456)

        """
        day = str(day)
        pass_id = str(pass_id)
        key_num = int(key_num)
        file_num = kwargs.get("file_num", 16)
        table_id = kwargs.get("table_id", 0)

        if pass_id != "-1":
            suffix_name = f"/{day}/delta-{pass_id}/{table_id:03}_cache"
            model_path = output_path.rstrip("/") + suffix_name
        else:
            suffix_name = f"/{day}/base/{table_id:03}_cache"
            model_path = output_path.rstrip("/") + suffix_name

        if fleet.worker_index() == 0:
            donefile_path = model_path + "/" + donefile_name

            if self._afs.is_file(donefile_path):
                self.rank0_error(
                    f"not write because {donefile_path} already exists"
                )
            else:
                meta_str = f"file_prefix:part\npart_num:{file_num}\nkey_num:{key_num}\n"
                with open(donefile_name, "w") as f:
                    f.write(meta_str)
                self._afs.upload(donefile_name, donefile_path)
                self.rank0_error(f"write {donefile_path} succeed")

    def _get_xbox_str(
        self,
        output_path,
        day,
        model_path,
        xbox_base_key,
        data_path,
        hadoop_fs_name,
        monitor_data={},
        mode="patch",
    ):
        xbox_dict = collections.OrderedDict()
        if mode == "base":
            xbox_dict["id"] = str(xbox_base_key)
        elif mode == "patch":
            xbox_dict["id"] = str(int(time.time()))
        else:
            print(f"warning: unknown mode {mode}, set it to patch")
            mode = "patch"
            xbox_dict["id"] = str(int(time.time()))
        xbox_dict["key"] = str(xbox_base_key)
        if model_path.startswith("hdfs:") or model_path.startswith("afs:"):
            model_path = model_path[model_path.find(":") + 1 :]
        xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000"
        xbox_dict["record_count"] = "111111"
        xbox_dict["partition_type"] = "2"
        xbox_dict["job_name"] = "default_job_name"
        xbox_dict["ins_tag"] = "feasign"
        xbox_dict["ins_path"] = data_path
        xbox_dict["job_id"] = os.environ.get("PADDLE_JOB_ID", "")
        # currently hard code here, set monitor_data empty string
        xbox_dict["monitor_data"] = ""
        xbox_dict["monitor_path"] = (
            output_path.rstrip("/") + "/monitor/" + day + ".txt"
        )
        xbox_dict["mpi_size"] = str(fleet.worker_num())
        return json.dumps(xbox_dict)
