/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.impl.builtin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.Pair;

public class PartitionSkewedKeys
extends EvalFunc<Map<String, Object>> {
    public static final String PARTITION_LIST = "partition.list";
    public static final String TOTAL_REDUCERS = "totalreducers";
    public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
    private Log log = LogFactory.getLog(this.getClass());
    BagFactory mBagFactory = BagFactory.getInstance();
    TupleFactory mTupleFactory = TupleFactory.getInstance();
    private int currentIndex_ = 0;
    private int totalReducers_ = -1;
    private long totalMemory_;
    private String inputFile_;
    private long totalSampleCount_;
    private double heapPercentage_;
    private int tupleMCount_;

    public PartitionSkewedKeys() {
        this(null);
    }

    public PartitionSkewedKeys(String[] args) {
        if (args != null && args.length > 0) {
            this.heapPercentage_ = Double.parseDouble(args[0]);
            this.tupleMCount_ = Integer.parseInt(args[1]);
            this.inputFile_ = args[2];
        } else {
            this.heapPercentage_ = 0.3f;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("pig.skewedjoin.reduce.memusage=" + this.heapPercentage_));
            this.log.debug((Object)("input file: " + this.inputFile_));
        }
        this.log.info((Object)("input file: " + this.inputFile_));
    }

    @Override
    public Map<String, Object> exec(Tuple in) throws IOException {
        if (in == null || in.size() == 0) {
            return null;
        }
        HashMap<String, Object> output = new HashMap<String, Object>();
        this.totalMemory_ = (long)((double)Runtime.getRuntime().maxMemory() * this.heapPercentage_);
        this.log.info((Object)("Maximum of available memory is " + this.totalMemory_));
        ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
        Tuple currentTuple = null;
        long count = 0L;
        long totalSampleMSize = 0L;
        long totalInputRows = 0L;
        try {
            this.totalReducers_ = (Integer)in.get(0);
            DataBag samples = (DataBag)in.get(1);
            this.totalSampleCount_ = samples.size();
            this.log.info((Object)("totalSample: " + this.totalSampleCount_));
            this.log.info((Object)("totalReducers: " + this.totalReducers_));
            int maxReducers = 0;
            for (Tuple t : samples) {
                totalInputRows += ((Long)t.get(t.size() - 1)).longValue();
            }
            for (Tuple t : samples) {
                if (this.hasSameKey(currentTuple, t) || currentTuple == null) {
                    ++count;
                    totalSampleMSize += this.getMemorySize(t);
                } else {
                    Pair<Tuple, Integer> p = this.calculateReducers(currentTuple, count, totalSampleMSize, totalInputRows);
                    Tuple rt = (Tuple)p.first;
                    if (rt != null) {
                        reducerList.add(rt);
                    }
                    if (maxReducers < (Integer)p.second) {
                        maxReducers = (Integer)p.second;
                    }
                    count = 1L;
                    totalSampleMSize = this.getMemorySize(t);
                }
                currentTuple = t;
            }
            if (count > 0L) {
                Pair<Tuple, Integer> p = this.calculateReducers(currentTuple, count, totalSampleMSize, totalInputRows);
                Tuple rt = (Tuple)p.first;
                if (rt != null) {
                    reducerList.add(rt);
                }
                if (maxReducers < (Integer)p.second) {
                    maxReducers = (Integer)p.second;
                }
            }
            if (maxReducers > this.totalReducers_) {
                if (this.pigLogger != null) {
                    this.pigLogger.warn(this, "You need at least " + maxReducers + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
                } else {
                    this.log.warn((Object)("You need at least " + maxReducers + " reducers to avoid spillage and run this job efficiently."));
                }
            }
            output.put(PARTITION_LIST, this.mBagFactory.newDefaultBag(reducerList));
            output.put(TOTAL_REDUCERS, this.totalReducers_);
            this.log.info((Object)((Object)output).toString());
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)((Object)output).toString());
            }
            return output;
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple, long count, long totalMSize, long totalTuples) {
        double avgM = (double)totalMSize / (double)count;
        long tupleMCount = this.tupleMCount_ <= 0 ? (long)((double)this.totalMemory_ / avgM) : (long)this.tupleMCount_;
        long keyTupleCount = (long)((double)count / (double)this.totalSampleCount_ * (double)totalTuples);
        int redCount = (int)Math.round(Math.ceil((double)keyTupleCount / (double)tupleMCount));
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("avgM: " + avgM));
            this.log.debug((Object)("tuple count: " + keyTupleCount));
            this.log.debug((Object)("count: " + count));
            this.log.debug((Object)("A reducer can take " + tupleMCount + " tuples and " + keyTupleCount + " tuples are find for " + currentTuple));
            this.log.debug((Object)("key " + currentTuple + " need " + redCount + " reducers"));
        }
        if (redCount <= 1) {
            return new Pair<Object, Integer>(null, 1);
        }
        Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
        try {
            int i;
            for (i = 0; i < currentTuple.size() - 2; ++i) {
                t.set(i, currentTuple.get(i));
            }
            int effectiveRedCount = redCount > this.totalReducers_ ? this.totalReducers_ : redCount;
            t.set(i++, this.currentIndex_);
            this.currentIndex_ = (this.currentIndex_ + effectiveRedCount) % this.totalReducers_ - 1;
            if (this.currentIndex_ < 0) {
                this.currentIndex_ += this.totalReducers_;
            }
            t.set(i++, this.currentIndex_);
        }
        catch (ExecException e) {
            throw new RuntimeException("Failed to set value to tuple." + e);
        }
        this.currentIndex_ = (this.currentIndex_ + 1) % this.totalReducers_;
        Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
        return p;
    }

    private long getMemorySize(Tuple t) {
        int s = t.size();
        try {
            return (Long)t.get(s - 2);
        }
        catch (ExecException e) {
            throw new RuntimeException("Unable to retrive the size field from tuple.", e);
        }
    }

    private boolean hasSameKey(Tuple t1, Tuple t2) {
        int sz2;
        int sz1 = t1 == null ? 0 : t1.size();
        int n = sz2 = t2 == null ? 0 : t2.size();
        if (sz2 != sz1) {
            return false;
        }
        for (int i = 0; i < sz1 - 2; ++i) {
            try {
                int c = DataType.compare(t1.get(i), t2.get(i));
                if (c == 0) continue;
                return false;
            }
            catch (ExecException e) {
                throw new RuntimeException("Unable to compare tuples", e);
            }
        }
        return true;
    }
}

