/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.pipes;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.pipes.BinaryProtocol;
import org.apache.hadoop.mapred.pipes.DownwardProtocol;
import org.apache.hadoop.mapred.pipes.OutputHandler;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

class Application<K1 extends WritableComparable, V1 extends Writable, K2 extends WritableComparable, V2 extends Writable> {
    private static final Log LOG = LogFactory.getLog((String)Application.class.getName());
    private ServerSocket serverSocket = new ServerSocket(0);
    private Process process;
    private Socket clientSocket;
    private OutputHandler<K2, V2> handler;
    private DownwardProtocol<K1, V1> downlink;
    static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");

    Application(JobConf conf, RecordReader<FloatWritable, NullWritable> recordReader, OutputCollector<K2, V2> output, Reporter reporter, Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass) throws IOException, InterruptedException {
        String executable;
        HashMap<String, String> env = new HashMap<String, String>();
        env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
        env.put("mapreduce.pipes.command.port", Integer.toString(this.serverSocket.getLocalPort()));
        Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf.getCredentials());
        byte[] password = jobToken.getPassword();
        String localPasswordFile = new File(".") + "/" + "jobTokenPassword";
        this.writePasswordToLocalFile(localPasswordFile, password, conf);
        env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
        List<String> cmd = new ArrayList<String>();
        String interpretor = conf.get("mapreduce.pipes.executable.interpretor");
        if (interpretor != null) {
            cmd.add(interpretor);
        }
        if (!FileUtil.canExecute((File)new File(executable = DistributedCache.getLocalCacheFiles(conf)[0].toString()))) {
            FileUtil.chmod((String)executable, (String)"u+x");
        }
        cmd.add(executable);
        TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapreduce.task.attempt.id"));
        File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
        File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
        long logLength = TaskLog.getTaskLogLength(conf);
        cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength, false);
        this.process = Application.runClient(cmd, env);
        this.clientSocket = this.serverSocket.accept();
        String challenge = this.getSecurityChallenge();
        String digestToSend = Application.createDigest(password, challenge);
        String digestExpected = Application.createDigest(password, digestToSend);
        this.handler = new OutputHandler<K2, V2>(output, reporter, recordReader, digestExpected);
        WritableComparable outputKey = (WritableComparable)ReflectionUtils.newInstance(outputKeyClass, (Configuration)conf);
        Writable outputValue = (Writable)ReflectionUtils.newInstance(outputValueClass, (Configuration)conf);
        this.downlink = new BinaryProtocol(this.clientSocket, this.handler, outputKey, outputValue, conf);
        this.downlink.authenticate(digestToSend, challenge);
        this.waitForAuthentication();
        LOG.debug((Object)"Authentication succeeded");
        this.downlink.start();
        this.downlink.setJobConf(conf);
    }

    private String getSecurityChallenge() {
        Random rand = new Random(System.currentTimeMillis());
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append(rand.nextInt(Integer.MAX_VALUE));
        strBuilder.append(rand.nextInt(Integer.MAX_VALUE));
        strBuilder.append(rand.nextInt(Integer.MAX_VALUE));
        strBuilder.append(rand.nextInt(Integer.MAX_VALUE));
        return strBuilder.toString();
    }

    private void writePasswordToLocalFile(String localPasswordFile, byte[] password, JobConf conf) throws IOException {
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        Path localPath = new Path(localPasswordFile);
        FSDataOutputStream out = FileSystem.create((FileSystem)localFs, (Path)localPath, (FsPermission)new FsPermission("400"));
        out.write(password);
        out.close();
    }

    DownwardProtocol<K1, V1> getDownlink() {
        return this.downlink;
    }

    void waitForAuthentication() throws IOException, InterruptedException {
        this.downlink.flush();
        LOG.debug((Object)"Waiting for authentication response");
        this.handler.waitForAuthentication();
    }

    boolean waitForFinish() throws Throwable {
        this.downlink.flush();
        return this.handler.waitForFinish();
    }

    void abort(Throwable t) throws IOException {
        LOG.info((Object)("Aborting because of " + StringUtils.stringifyException((Throwable)t)));
        try {
            this.downlink.abort();
            this.downlink.flush();
        }
        catch (IOException e) {
            // empty catch block
        }
        try {
            this.handler.waitForFinish();
        }
        catch (Throwable ignored) {
            this.process.destroy();
        }
        IOException wrapper = new IOException("pipe child exception");
        wrapper.initCause(t);
        throw wrapper;
    }

    void cleanup() throws IOException {
        this.serverSocket.close();
        try {
            this.downlink.close();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }

    static Process runClient(List<String> command, Map<String, String> env) throws IOException {
        ProcessBuilder builder = new ProcessBuilder(command);
        if (env != null) {
            builder.environment().putAll(env);
        }
        Process result = builder.start();
        return result;
    }

    public static String createDigest(byte[] password, String data) throws IOException {
        SecretKey key = JobTokenSecretManager.createSecretKey(password);
        return SecureShuffleUtils.hashFromString(data, key);
    }
}

