/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine;

import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.tools.pigstats.PigStats;

public abstract class HExecutionEngine
implements ExecutionEngine {
    private static final Log LOG = LogFactory.getLog(HExecutionEngine.class);
    public static final String HADOOP_SITE = "hadoop-site.xml";
    public static final String CORE_SITE = "core-site.xml";
    public static final String YARN_SITE = "yarn-site.xml";
    public static final String CORE_DEFAULT_SITE = "core-default.xml";
    public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
    public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
    public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
    public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
    public static final String LOCAL = "local";
    protected PigContext pigContext;
    protected DataStorage ds;
    protected Launcher launcher;
    protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
    protected Map<Operator, PhysicalOperator> newLogToPhyMap;

    public HExecutionEngine(PigContext pigContext) {
        this.pigContext = pigContext;
        this.ds = null;
        this.logicalToPhysicalKeys = Maps.newHashMap();
    }

    @Deprecated
    public JobConf getJobConf() {
        JobConf jc = new JobConf(false);
        Utils.recomputeProperties(jc, this.pigContext.getProperties());
        return jc;
    }

    @Override
    public DataStorage getDataStorage() {
        return this.ds;
    }

    @Override
    public void init() throws ExecException {
        this.init(this.pigContext.getProperties());
    }

    public JobConf getS3Conf() throws ExecException {
        JobConf jc = new JobConf();
        jc.addResource(CORE_SITE);
        JobConf s3Jc = new JobConf(false);
        for (Map.Entry e : jc) {
            String key = (String)e.getKey();
            String value = (String)e.getValue();
            if (!key.startsWith("fs.s3") && !key.startsWith("fs.s3n")) continue;
            s3Jc.set(key, value);
        }
        return s3Jc;
    }

    public JobConf getLocalConf() {
        JobConf jc = new JobConf(false);
        jc.addResource(CORE_DEFAULT_SITE);
        jc.addResource(MAPRED_DEFAULT_SITE);
        jc.addResource(YARN_DEFAULT_SITE);
        return jc;
    }

    public JobConf getExecConf(Properties properties) throws ExecException {
        JobConf jc = null;
        String isHadoopConfigsOverriden = properties.getProperty("pig.use.overriden.hadoop.configs");
        if (isHadoopConfigsOverriden != null && isHadoopConfigsOverriden.equals("true")) {
            jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
        } else {
            Configuration testConf = new Configuration();
            ClassLoader cl = testConf.getClassLoader();
            URL hadoop_site = cl.getResource(HADOOP_SITE);
            URL core_site = cl.getResource(CORE_SITE);
            if (hadoop_site == null && core_site == null) {
                throw new ExecException("Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath). If you plan to use local mode, please put -x local option in command line", 4010);
            }
            jc = new JobConf();
        }
        jc.addResource("pig-cluster-hadoop-site.xml");
        jc.addResource(YARN_SITE);
        return jc;
    }

    private void init(Properties properties) throws ExecException {
        JobConf jc;
        String cluster = null;
        String nameNode = null;
        if (!this.pigContext.getExecType().isLocal()) {
            jc = this.getExecConf(properties);
            new DistributedFileSystem();
        } else {
            if (properties.getProperty("mapreduce.framework.name") == null) {
                properties.setProperty("mapreduce.framework.name", LOCAL);
            }
            properties.setProperty("mapred.job.tracker", LOCAL);
            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
            properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
            jc = this.getLocalConf();
            JobConf s3Jc = this.getS3Conf();
            ConfigurationUtil.mergeConf((Configuration)jc, (Configuration)s3Jc);
        }
        Utils.recomputeProperties(jc, properties);
        cluster = jc.get("mapred.job.tracker");
        nameNode = jc.get(FILE_SYSTEM_LOCATION);
        if (nameNode == null) {
            nameNode = (String)this.pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION);
        }
        if (cluster != null && cluster.length() > 0) {
            if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
                cluster = cluster + ":50020";
            }
            properties.setProperty("mapred.job.tracker", cluster);
        }
        if (nameNode != null && nameNode.length() > 0) {
            if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
                nameNode = nameNode + ":8020";
            }
            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
        }
        LOG.info((Object)("Connecting to hadoop file system at: " + (nameNode == null ? LOCAL : nameNode)));
        this.ds = new HDataStorage(properties);
        if (cluster != null && !cluster.equalsIgnoreCase(LOCAL)) {
            LOG.info((Object)("Connecting to map-reduce job tracker at: " + jc.get("mapred.job.tracker")));
        }
    }

    public PhysicalPlan compile(LogicalPlan plan, Properties properties) throws FrontendException {
        if (plan == null) {
            int errCode = 2041;
            String msg = "No Plan to compile";
            throw new FrontendException(msg, errCode, 4);
        }
        LogToPhyTranslationVisitor translator = new LogToPhyTranslationVisitor(plan);
        translator.setPigContext(this.pigContext);
        translator.visit();
        this.newLogToPhyMap = translator.getLogToPhyMap();
        return translator.getPhysicalPlan();
    }

    public Map<Operator, PhysicalOperator> getLogToPhyMap() {
        return this.newLogToPhyMap;
    }

    public Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap(LogicalPlan plan) {
        HashMap result = Maps.newHashMap();
        Iterator<Operator> outerIter = plan.getOperators();
        while (outerIter.hasNext()) {
            Operator oper = outerIter.next();
            if (!(oper instanceof LOForEach)) continue;
            LogicalPlan innerPlan = ((LOForEach)oper).getInnerPlan();
            HashMap innerOpMap = Maps.newHashMap();
            Iterator<Operator> innerIter = innerPlan.getOperators();
            while (innerIter.hasNext()) {
                Operator innerOper = innerIter.next();
                innerOpMap.put((LogicalRelationalOperator)innerOper, this.newLogToPhyMap.get(innerOper));
            }
            result.put((LOForEach)oper, innerOpMap);
        }
        return result;
    }

    @Override
    public PigStats launchPig(LogicalPlan lp, String grpName, PigContext pc) throws FrontendException, ExecException {
        try {
            PhysicalPlan pp = this.compile(lp, pc.getProperties());
            if (FetchOptimizer.isPlanFetchable(pc, pp)) {
                new PhyPlanSetter(pp).visit();
                PigStats pigStats = new FetchLauncher(pc).launchPig(pp);
                return pigStats;
            }
            PigStats pigStats = this.launcher.launchPig(pp, grpName, this.pigContext);
            return pigStats;
        }
        catch (ExecException e) {
            throw e;
        }
        catch (FrontendException e) {
            throw e;
        }
        catch (Exception e) {
            throw new ExecException(e);
        }
        finally {
            this.launcher.reset();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void explain(LogicalPlan lp, PigContext pc, PrintStream ps, String format, boolean verbose, File file, String suffix) throws PlanException, VisitorException, IOException, FrontendException {
        PrintStream pps = ps;
        PrintStream eps = ps;
        boolean isFetchable = false;
        try {
            if (file != null) {
                pps = new PrintStream(new File(file, "physical_plan-" + suffix));
                eps = new PrintStream(new File(file, "exec_plan-" + suffix));
            }
            PhysicalPlan pp = this.compile(lp, pc.getProperties());
            pp.explain(pps, format, verbose);
            MapRedUtil.checkLeafIsStore(pp, this.pigContext);
            isFetchable = FetchOptimizer.isPlanFetchable(pc, pp);
            if (isFetchable) {
                new FetchLauncher(this.pigContext).explain(pp, pc, eps, format);
                return;
            }
            this.launcher.explain(pp, this.pigContext, eps, format, verbose);
        }
        finally {
            this.launcher.reset();
            if (isFetchable) {
                this.pigContext.getProperties().remove("pig.job.converted.fetch");
            }
            if (file != null) {
                pps.close();
                eps.close();
            }
        }
    }

    @Override
    public Properties getConfiguration() {
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)this.pigContext.getProperties());
        return properties;
    }

    @Override
    public void setConfiguration(Properties newConfiguration) throws ExecException {
        this.init(newConfiguration);
    }

    @Override
    public void setProperty(String property, String value) {
        Properties properties = this.pigContext.getProperties();
        properties.put(property, value);
    }

    @Override
    public ExecutableManager getExecutableManager() {
        return new HadoopExecutableManager();
    }

    @Override
    public void killJob(String jobID) throws BackendException {
        if (this.launcher != null) {
            this.launcher.killJob(jobID, (Configuration)this.getJobConf());
        }
    }

    @Override
    public void destroy() {
        if (this.launcher != null) {
            this.launcher.destroy();
        }
    }
}

