/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.tez;

import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.StatusGetOpts;

public class TezTask
extends Task<TezWork> {
    private static final String CLASS_NAME = TezTask.class.getName();
    private final PerfLogger perfLogger = SessionState.getPerfLogger();
    private TezCounters counters;
    private final DagUtils utils;

    public TezTask() {
        this(DagUtils.getInstance());
    }

    public TezTask(DagUtils utils) {
        this.utils = utils;
    }

    public TezCounters getTezCounters() {
        return this.counters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int execute(DriverContext driverContext) {
        int rc = 1;
        boolean cleanContext = false;
        Context ctx = null;
        DAGClient client = null;
        TezSessionState session = null;
        try {
            ctx = driverContext.getCtx();
            if (ctx == null) {
                ctx = new Context(this.conf);
                cleanContext = true;
            }
            SessionState ss = SessionState.get();
            session = ss.getTezSession();
            session = TezSessionPoolManager.getInstance().getSession(session, this.conf, false);
            ss.setTezSession(session);
            JobConf jobConf = this.utils.createConfiguration(this.conf);
            String[] inputOutputJars = ((TezWork)this.work).configureJobConfAndExtractJars(jobConf);
            Path scratchDir = ctx.getMRScratchDir();
            scratchDir = this.utils.createTezDir(scratchDir, this.conf);
            Map<String, LocalResource> inputOutputLocalResources = this.getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
            this.updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
            List<LocalResource> additionalLr = session.getLocalizedResources();
            if (LOG.isDebugEnabled()) {
                if (additionalLr == null || additionalLr.size() == 0) {
                    LOG.debug((Object)"No local resources to process (other than hive-exec)");
                } else {
                    for (LocalResource lr : additionalLr) {
                        LOG.debug((Object)("Adding local resource: " + lr.getResource()));
                    }
                }
            }
            LocalResource appJarLr = session.getAppJarLr();
            DAG dag = this.build(jobConf, (TezWork)this.work, scratchDir, appJarLr, additionalLr, ctx);
            this.addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
            client = this.submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr, inputOutputJars, inputOutputLocalResources);
            TezJobMonitor monitor = new TezJobMonitor();
            rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), this.conf, dag);
            EnumSet<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
            this.counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
            TezSessionPoolManager.getInstance().returnSession(session);
            if (LOG.isInfoEnabled() && this.counters != null) {
                if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY)) {
                    for (CounterGroup group : this.counters) {
                        LOG.info((Object)(group.getDisplayName() + ":"));
                        for (TezCounter counter : group) {
                            LOG.info((Object)("   " + counter.getDisplayName() + ": " + counter.getValue()));
                        }
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Failed to execute tez graph.", (Throwable)e);
        }
        finally {
            Utilities.clearWork(this.conf);
            if (cleanContext) {
                try {
                    ctx.clear();
                }
                catch (Exception e) {
                    LOG.warn((Object)"Failed to clean up after tez job");
                }
            }
            if (client != null) {
                rc = this.close((TezWork)this.work, rc);
            }
        }
        return rc;
    }

    Map<String, LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir, String[] inputOutputJars) throws Exception {
        HashMap<String, LocalResource> resources = new HashMap<String, LocalResource>();
        List<LocalResource> localResources = this.utils.localizeTempFiles(scratchDir.toString(), (Configuration)jobConf, inputOutputJars);
        if (null != localResources) {
            for (LocalResource lr : localResources) {
                resources.put(this.utils.getBaseName(lr), lr);
            }
        }
        return resources;
    }

    void updateSession(TezSessionState session, JobConf jobConf, Path scratchDir, String[] inputOutputJars, Map<String, LocalResource> extraResources) throws Exception {
        boolean missingLocalResources;
        boolean bl = missingLocalResources = !session.hasResources(inputOutputJars);
        if (!session.isOpen()) {
            LOG.info((Object)"Tez session hasn't been created yet. Opening session");
            session.open(this.conf, inputOutputJars);
        } else {
            LOG.info((Object)"Session is already open");
            if (missingLocalResources) {
                LOG.info((Object)"Tez session missing resources, adding additional necessary resources");
                session.getSession().addAppMasterLocalFiles(extraResources);
            }
            session.refreshLocalResourcesFromConf(this.conf);
        }
    }

    void addExtraResourcesToDag(TezSessionState session, DAG dag, String[] inputOutputJars, Map<String, LocalResource> inputOutputLocalResources) throws Exception {
        if (!session.hasResources(inputOutputJars) && null != inputOutputLocalResources) {
            dag.addTaskLocalFiles(inputOutputLocalResources);
        }
    }

    DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx) throws Exception {
        this.perfLogger.PerfLogBegin(CLASS_NAME, "TezBuildDag");
        HashMap<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
        HashMap<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
        List<BaseWork> ws = work.getAllWork();
        Collections.reverse(ws);
        FileSystem fs = scratchDir.getFileSystem((Configuration)conf);
        DAG dag = DAG.create((String)work.getName());
        dag.setCredentials(conf.getCredentials());
        for (BaseWork w : ws) {
            boolean isFinal = work.getLeaves().contains(w);
            this.perfLogger.PerfLogBegin(CLASS_NAME, "TezCreateVertex." + w.getName());
            if (w instanceof UnionWork) {
                LinkedList<BaseWork> unionWorkItems = new LinkedList<BaseWork>();
                LinkedList<BaseWork> children = new LinkedList<BaseWork>();
                for (BaseWork v : work.getChildren(w)) {
                    TezEdgeProperty.EdgeType type = work.getEdgeProperty(w, v).getEdgeType();
                    if (type == TezEdgeProperty.EdgeType.CONTAINS) {
                        unionWorkItems.add(v);
                        continue;
                    }
                    children.add(v);
                }
                Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
                int i = 0;
                for (BaseWork v : unionWorkItems) {
                    vertexArray[i++] = (Vertex)workToVertex.get(v);
                }
                VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
                JobConf parentConf = (JobConf)workToConf.get(unionWorkItems.get(0));
                for (BaseWork v : children) {
                    GroupInputEdge e = this.utils.createEdge(group, parentConf, (Vertex)workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v));
                    dag.addEdge(e);
                }
                continue;
            }
            JobConf wxConf = this.utils.initializeVertexConf(conf, ctx, w);
            Vertex wx = this.utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work, work.getVertexType(w));
            dag.addVertex(wx);
            this.utils.addCredentials(w, dag);
            this.perfLogger.PerfLogEnd(CLASS_NAME, "TezCreateVertex." + w.getName());
            workToVertex.put(w, wx);
            workToConf.put(w, wxConf);
            for (BaseWork v : work.getChildren(w)) {
                assert (workToVertex.containsKey(v));
                Edge e = null;
                TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
                e = this.utils.createEdge(wxConf, wx, (Vertex)workToVertex.get(v), edgeProp, work.getVertexType(v));
                dag.addEdge(e);
            }
        }
        this.perfLogger.PerfLogEnd(CLASS_NAME, "TezBuildDag");
        return dag;
    }

    DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSessionState sessionState, List<LocalResource> additionalLr, String[] inputOutputJars, Map<String, LocalResource> inputOutputLocalResources) throws Exception {
        this.perfLogger.PerfLogBegin(CLASS_NAME, "TezSubmitDag");
        DAGClient dagClient = null;
        HashMap<String, LocalResource> resourceMap = new HashMap<String, LocalResource>();
        if (additionalLr != null) {
            for (LocalResource lr : additionalLr) {
                if (lr.getType() != LocalResourceType.FILE) continue;
                resourceMap.put(this.utils.getBaseName(lr), lr);
            }
        }
        try {
            sessionState.getSession().addAppMasterLocalFiles(resourceMap);
            dagClient = sessionState.getSession().submitDAG(dag);
        }
        catch (SessionNotRunning nr) {
            this.console.printInfo("Tez session was closed. Reopening...");
            TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, true);
            this.console.printInfo("Session re-established.");
            dagClient = sessionState.getSession().submitDAG(dag);
        }
        this.perfLogger.PerfLogEnd(CLASS_NAME, "TezSubmitDag");
        return dagClient;
    }

    int close(TezWork work, int rc) {
        block5: {
            try {
                List<BaseWork> ws = work.getAllWork();
                for (BaseWork w : ws) {
                    if (w instanceof MergeJoinWork) {
                        w = ((MergeJoinWork)w).getMainWork();
                    }
                    for (Operator<?> op : w.getAllOperators()) {
                        op.jobClose(this.conf, rc == 0);
                    }
                }
            }
            catch (Exception e) {
                if (rc != 0) break block5;
                rc = 3;
                String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
                this.console.printError(mesg, "\n" + StringUtils.stringifyException((Throwable)e));
            }
        }
        return rc;
    }

    @Override
    public void updateTaskMetrics(Metrics metrics) {
        metrics.incrementCounter("hive_tez_tasks");
    }

    @Override
    public boolean isMapRedTask() {
        return true;
    }

    @Override
    public StageType getType() {
        return StageType.MAPRED;
    }

    @Override
    public String getName() {
        return "TEZ";
    }

    @Override
    public Collection<MapWork> getMapWork() {
        LinkedList<MapWork> result = new LinkedList<MapWork>();
        TezWork work = (TezWork)this.getWork();
        for (BaseWork w : work.getAllWorkUnsorted()) {
            if (!(w instanceof MapWork)) continue;
            List<BaseWork> parents = work.getParents(w);
            boolean candidate = true;
            for (BaseWork parent : parents) {
                if (parent instanceof UnionWork) continue;
                candidate = false;
            }
            if (!candidate) continue;
            result.add((MapWork)w);
        }
        return result;
    }

    @Override
    public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) {
        List<BaseWork> children = ((TezWork)this.getWork()).getChildren(mapWork);
        if (children.size() != 1) {
            return null;
        }
        if (!(children.get(0) instanceof ReduceWork)) {
            return null;
        }
        return ((ReduceWork)children.get(0)).getReducer();
    }
}

