/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.accumulo;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.fate.Fate;
import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadStoreCaster;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.accumulo.AccumuloStorageOptions;
import org.apache.pig.backend.hadoop.accumulo.Column;
import org.apache.pig.backend.hadoop.accumulo.Utils;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.thrift.TServiceClient;
import org.apache.zookeeper.ZooKeeper;
import org.joda.time.DateTime;

public abstract class AbstractAccumuloStorage
extends LoadFunc
implements StoreFuncInterface {
    private static final Log log = LogFactory.getLog(AbstractAccumuloStorage.class);
    protected static final char COLON = ':';
    protected static final char COMMA = ',';
    protected static final String ASTERISK = "*";
    private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
    private static final String OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName();
    private static final String STRING_CASTER = "UTF8StorageConverter";
    private static final String BYTE_CASTER = "AccumuloBinaryConverter";
    private static final String CASTER_PROPERTY = "pig.accumulo.caster";
    protected final AccumuloStorageOptions storageOptions;
    protected final CommandLine commandLine;
    private RecordReader<Key, Value> reader;
    private RecordWriter<Text, Mutation> writer;
    protected String inst;
    protected String zookeepers;
    protected String user;
    protected String password;
    protected String table;
    protected Text tableName;
    protected Authorizations authorizations;
    protected List<Column> columns;
    protected String start = null;
    protected String end = null;
    protected int maxWriteThreads = 3;
    protected long maxMutationBufferSize = 0x3200000L;
    protected long maxLatency = Long.MAX_VALUE;
    protected String columnSeparator = ",";
    protected boolean ignoreWhitespace = true;
    protected LoadStoreCaster caster;
    protected ResourceSchema schema;
    protected String contextSignature = null;

    public AbstractAccumuloStorage(String columns, String args) throws ParseException, IOException {
        this.storageOptions = new AccumuloStorageOptions();
        this.commandLine = this.storageOptions.getCommandLine(args);
        this.extractArgs(this.commandLine, this.storageOptions);
        this.parseColumns(columns);
    }

    private void parseColumns(String columnStr) {
        this.columns = new LinkedList<Column>();
        if (this.ignoreWhitespace) {
            columnStr = StringUtils.strip((String)columnStr);
        }
        if (!columnStr.isEmpty()) {
            for (String column : StringUtils.split((String)columnStr, (String)this.columnSeparator)) {
                this.columns.add(new Column(this.ignoreWhitespace ? StringUtils.strip((String)column) : column));
            }
        } else {
            this.columns.add(new Column(ASTERISK));
        }
    }

    protected void extractArgs(CommandLine cli, AccumuloStorageOptions opts) throws IOException {
        String casterOption;
        if (opts.hasAuthorizations(cli)) {
            this.authorizations = opts.getAuthorizations(cli);
        }
        this.start = cli.getOptionValue(AccumuloStorageOptions.START_ROW_OPTION.getOpt(), null);
        this.end = cli.getOptionValue(AccumuloStorageOptions.END_ROW_OPTION.getOpt(), null);
        if (cli.hasOption(AccumuloStorageOptions.MAX_LATENCY_OPTION.getOpt())) {
            this.maxLatency = opts.getInt(cli, AccumuloStorageOptions.MAX_LATENCY_OPTION);
        }
        if (cli.hasOption(AccumuloStorageOptions.WRITE_THREADS_OPTION.getOpt())) {
            this.maxWriteThreads = opts.getInt(cli, AccumuloStorageOptions.WRITE_THREADS_OPTION);
        }
        if (cli.hasOption(AccumuloStorageOptions.MUTATION_BUFFER_SIZE_OPTION.getOpt())) {
            this.maxMutationBufferSize = opts.getLong(cli, AccumuloStorageOptions.MUTATION_BUFFER_SIZE_OPTION);
        }
        Properties clientSystemProps = UDFContext.getUDFContext().getClientSystemProps();
        String defaultCaster = STRING_CASTER;
        if (null != clientSystemProps) {
            defaultCaster = clientSystemProps.getProperty(CASTER_PROPERTY, defaultCaster);
        }
        if (STRING_CASTER.equalsIgnoreCase(casterOption = cli.getOptionValue("caster", defaultCaster))) {
            this.caster = new Utf8StorageConverter();
        } else if (BYTE_CASTER.equalsIgnoreCase(casterOption)) {
            this.caster = new HBaseBinaryConverter();
        } else {
            try {
                this.caster = (LoadStoreCaster)PigContext.instantiateFuncFromSpec(casterOption);
            }
            catch (ClassCastException e) {
                log.error((Object)"Configured caster does not implement LoadCaster interface.");
                throw new IOException(e);
            }
            catch (RuntimeException e) {
                log.error((Object)"Configured caster class not found.", (Throwable)e);
                throw new IOException(e);
            }
        }
        log.debug((Object)("Using caster " + this.caster.getClass()));
        if (cli.hasOption(AccumuloStorageOptions.COLUMN_SEPARATOR_OPTION.getOpt())) {
            this.columnSeparator = cli.getOptionValue(AccumuloStorageOptions.COLUMN_SEPARATOR_OPTION.getOpt());
        }
        if (cli.hasOption(AccumuloStorageOptions.COLUMN_IGNORE_WHITESPACE_OPTION.getOpt())) {
            String value = cli.getOptionValue(AccumuloStorageOptions.COLUMN_IGNORE_WHITESPACE_OPTION.getOpt());
            if ("false".equalsIgnoreCase(value)) {
                this.ignoreWhitespace = false;
            } else if ("true".equalsIgnoreCase(value)) {
                this.ignoreWhitespace = true;
            } else {
                log.warn((Object)("Ignoring unknown value for " + AccumuloStorageOptions.COLUMN_IGNORE_WHITESPACE_OPTION.getOpt() + ": " + value));
            }
        }
    }

    protected CommandLine getCommandLine() {
        return this.commandLine;
    }

    protected Map<String, String> getInputFormatEntries(Configuration conf) {
        return this.getEntries(conf, INPUT_PREFIX);
    }

    protected Map<String, String> getOutputFormatEntries(Configuration conf) {
        return this.getEntries(conf, OUTPUT_PREFIX);
    }

    protected void unsetEntriesFromConfiguration(Configuration conf, Map<String, String> entriesToUnset) {
        boolean configurationHasUnset = true;
        try {
            conf.getClass().getMethod("unset", String.class);
        }
        catch (NoSuchMethodException e) {
            configurationHasUnset = false;
        }
        catch (SecurityException e) {
            configurationHasUnset = false;
        }
        if (configurationHasUnset) {
            this.simpleUnset(conf, entriesToUnset);
        } else {
            this.clearUnset(conf, entriesToUnset);
        }
    }

    protected void simpleUnset(Configuration conf, Map<String, String> entriesToUnset) {
        try {
            Method unset = conf.getClass().getMethod("unset", String.class);
            for (String key : entriesToUnset.keySet()) {
                unset.invoke((Object)conf, key);
            }
        }
        catch (NoSuchMethodException e) {
            log.error((Object)"Could not invoke Configuration.unset method", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (IllegalAccessException e) {
            log.error((Object)"Could not invoke Configuration.unset method", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (IllegalArgumentException e) {
            log.error((Object)"Could not invoke Configuration.unset method", (Throwable)e);
            throw new RuntimeException(e);
        }
        catch (InvocationTargetException e) {
            log.error((Object)"Could not invoke Configuration.unset method", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    protected void clearUnset(Configuration conf, Map<String, String> entriesToUnset) {
        Iterator originalEntries = conf.iterator();
        conf.clear();
        while (originalEntries.hasNext()) {
            Map.Entry originalEntry = (Map.Entry)originalEntries.next();
            if (entriesToUnset.containsKey(originalEntry.getKey())) continue;
            conf.set((String)originalEntry.getKey(), (String)originalEntry.getValue());
        }
    }

    @Override
    public Tuple getNext() throws IOException {
        try {
            if (!this.reader.nextKeyValue()) {
                return null;
            }
            Key key = (Key)this.reader.getCurrentKey();
            Value value = (Value)this.reader.getCurrentValue();
            assert (key != null && value != null);
            return this.getTuple(key, value);
        }
        catch (InterruptedException e) {
            throw new IOException(e.getMessage());
        }
    }

    protected abstract Tuple getTuple(Key var1, Value var2) throws IOException;

    @Override
    public InputFormat getInputFormat() {
        return new AccumuloInputFormat();
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) {
        this.reader = reader;
    }

    private void setLocationFromUri(String location) throws IOException {
        String columns = "";
        String auths = "";
        try {
            if (!location.startsWith("accumulo://")) {
                throw new Exception("Bad scheme.");
            }
            String[] urlParts = location.split("\\?");
            if (urlParts.length > 1) {
                for (String param : urlParts[1].split("&")) {
                    String[] pair = param.split("=");
                    if (pair[0].equals("instance")) {
                        this.inst = pair[1];
                        continue;
                    }
                    if (pair[0].equals("user")) {
                        this.user = pair[1];
                        continue;
                    }
                    if (pair[0].equals("password")) {
                        this.password = pair[1];
                        continue;
                    }
                    if (pair[0].equals("zookeepers")) {
                        this.zookeepers = pair[1];
                        continue;
                    }
                    if (pair[0].equals("auths")) {
                        auths = pair[1];
                        continue;
                    }
                    if (pair[0].equals("fetch_columns")) {
                        columns = pair[1];
                        continue;
                    }
                    if (pair[0].equals("start")) {
                        this.start = pair[1];
                        continue;
                    }
                    if (pair[0].equals("end")) {
                        this.end = pair[1];
                        continue;
                    }
                    if (pair[0].equals("write_buffer_size_bytes")) {
                        this.maxMutationBufferSize = Long.parseLong(pair[1]);
                        continue;
                    }
                    if (pair[0].equals("write_threads")) {
                        this.maxWriteThreads = Integer.parseInt(pair[1]);
                        continue;
                    }
                    if (!pair[0].equals("write_latency_ms")) continue;
                    this.maxLatency = Long.parseLong(pair[1]);
                }
            }
            String[] parts = urlParts[0].split("/+");
            this.table = parts[1];
            this.tableName = new Text(this.table);
            this.authorizations = null == this.authorizations && auths == null ? new Authorizations() : new Authorizations(StringUtils.split((String)auths, (char)','));
            if (!StringUtils.isEmpty((String)columns)) {
                this.parseColumns(columns);
            }
        }
        catch (Exception e) {
            throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[start=startRow,end=endRow,fetch_columns=[cf1:cq1,cf2:cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': " + e.getMessage());
        }
    }

    protected RecordWriter<Text, Mutation> getWriter() {
        return this.writer;
    }

    protected Map<String, String> getEntries(Configuration conf, String prefix) {
        HashMap<String, String> entries = new HashMap<String, String>();
        for (Map.Entry entry : conf) {
            String key = (String)entry.getKey();
            if (!key.startsWith(prefix)) continue;
            entries.put(key, (String)entry.getValue());
        }
        return entries;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        this.setLocationFromUri(location);
        this.loadDependentJars(job.getConfiguration());
        Map<String, String> entries = this.getInputFormatEntries(job.getConfiguration());
        this.unsetEntriesFromConfiguration(job.getConfiguration(), entries);
        try {
            AccumuloInputFormat.setConnectorInfo((Job)job, (String)this.user, (AuthenticationToken)new PasswordToken((CharSequence)this.password));
        }
        catch (AccumuloSecurityException e) {
            throw new IOException(e);
        }
        AccumuloInputFormat.setInputTableName((Job)job, (String)this.table);
        AccumuloInputFormat.setScanAuthorizations((Job)job, (Authorizations)this.authorizations);
        AccumuloInputFormat.setZooKeeperInstance((Job)job, (String)this.inst, (String)this.zookeepers);
        LinkedList<Pair<Text, Text>> inputFormatColumns = new LinkedList<Pair<Text, Text>>();
        int colfamPrefix = 0;
        block7: for (Column c : this.columns) {
            switch (c.getType()) {
                case LITERAL: {
                    inputFormatColumns.add(this.makePair(c.getColumnFamily(), c.getColumnQualifier()));
                    continue block7;
                }
                case COLFAM_PREFIX: {
                    ++colfamPrefix;
                    continue block7;
                }
                case COLQUAL_PREFIX: {
                    inputFormatColumns.add(this.makePair(c.getColumnFamily(), null));
                    continue block7;
                }
            }
            log.info((Object)"Ignoring unhandled column type");
        }
        if (0 == colfamPrefix && !inputFormatColumns.isEmpty()) {
            AccumuloInputFormat.fetchColumns((Job)job, inputFormatColumns);
        }
        Set<Range> ranges = Collections.singleton(new Range((CharSequence)this.start, (CharSequence)this.end));
        log.info((Object)("Scanning Accumulo for " + ranges + " for table " + this.table));
        AccumuloInputFormat.setRanges((Job)job, ranges);
        this.configureInputFormat(job);
    }

    protected Pair<Text, Text> makePair(String first, String second) {
        return new Pair((Object)(null == first ? null : new Text(first)), (Object)(null == second ? null : new Text(second)));
    }

    protected void loadDependentJars(Configuration conf) throws IOException {
        Utils.addDependencyJars(conf, Tracer.class, Instance.class, Fate.class, ZooKeeper.class, TServiceClient.class);
    }

    protected void configureInputFormat(Job job) {
    }

    protected void configureOutputFormat(Job job) {
    }

    @Override
    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
        return location;
    }

    @Override
    public void setUDFContextSignature(String signature) {
        this.contextSignature = signature;
    }

    @Override
    public void setStoreFuncUDFContextSignature(String signature) {
        this.contextSignature = signature;
    }

    protected Properties getUDFProperties() {
        return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{this.contextSignature});
    }

    @Override
    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
        return this.relativeToAbsolutePath(location, curDir);
    }

    @Override
    public void setStoreLocation(String location, Job job) throws IOException {
        this.setLocationFromUri(location);
        this.loadDependentJars(job.getConfiguration());
        Map<String, String> entries = this.getOutputFormatEntries(job.getConfiguration());
        this.unsetEntriesFromConfiguration(job.getConfiguration(), entries);
        try {
            AccumuloOutputFormat.setConnectorInfo((Job)job, (String)this.user, (AuthenticationToken)new PasswordToken((CharSequence)this.password));
        }
        catch (AccumuloSecurityException e) {
            throw new IOException(e);
        }
        AccumuloOutputFormat.setCreateTables((Job)job, (boolean)true);
        AccumuloOutputFormat.setZooKeeperInstance((Job)job, (String)this.inst, (String)this.zookeepers);
        BatchWriterConfig bwConfig = new BatchWriterConfig();
        bwConfig.setMaxLatency(this.maxLatency, TimeUnit.MILLISECONDS);
        bwConfig.setMaxMemory(this.maxMutationBufferSize);
        bwConfig.setMaxWriteThreads(this.maxWriteThreads);
        AccumuloOutputFormat.setBatchWriterOptions((Job)job, (BatchWriterConfig)bwConfig);
        log.info((Object)("Writing data to " + this.table));
        this.configureOutputFormat(job);
    }

    @Override
    public OutputFormat getOutputFormat() {
        return new AccumuloOutputFormat();
    }

    @Override
    public void prepareToWrite(RecordWriter writer) {
        this.writer = writer;
    }

    protected abstract Collection<Mutation> getMutations(Tuple var1) throws ExecException, IOException;

    @Override
    public void putNext(Tuple tuple) throws ExecException, IOException {
        Collection<Mutation> muts = this.getMutations(tuple);
        for (Mutation mut : muts) {
            try {
                this.getWriter().write((Object)this.tableName, (Object)mut);
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    @Override
    public void cleanupOnFailure(String failure, Job job) {
    }

    @Override
    public void cleanupOnSuccess(String location, Job job) {
    }

    @Override
    public void checkSchema(ResourceSchema s) throws IOException {
        if (!(this.caster instanceof LoadStoreCaster)) {
            log.error((Object)"Caster must implement LoadStoreCaster for writing to Accumulo.");
            throw new IOException("Bad Caster " + this.caster.getClass());
        }
        this.schema = s;
        this.getUDFProperties().setProperty(this.contextSignature + "_schema", ObjectSerializer.serialize(this.schema));
    }

    protected Text tupleToText(Tuple tuple, int i, ResourceSchema.ResourceFieldSchema[] fieldSchemas) throws IOException {
        Object o = tuple.get(i);
        byte type = this.schemaToType(o, i, fieldSchemas);
        return this.objToText(o, type);
    }

    protected Text objectToText(Object o, ResourceSchema.ResourceFieldSchema fieldSchema) throws IOException {
        byte type = this.schemaToType(o, fieldSchema);
        return this.objToText(o, type);
    }

    protected byte schemaToType(Object o, ResourceSchema.ResourceFieldSchema fieldSchema) {
        return fieldSchema == null ? DataType.findType(o) : fieldSchema.getType();
    }

    protected byte schemaToType(Object o, int i, ResourceSchema.ResourceFieldSchema[] fieldSchemas) {
        return fieldSchemas == null ? DataType.findType(o) : fieldSchemas[i].getType();
    }

    protected byte[] tupleToBytes(Tuple tuple, int i, ResourceSchema.ResourceFieldSchema[] fieldSchemas) throws IOException {
        Object o = tuple.get(i);
        byte type = this.schemaToType(o, i, fieldSchemas);
        return this.objToBytes(o, type);
    }

    protected Text objToText(Object o, byte type) throws IOException {
        byte[] bytes = this.objToBytes(o, type);
        if (null == bytes) {
            log.warn((Object)"Creating empty text from null value");
            return new Text();
        }
        return new Text(bytes);
    }

    protected byte[] objToBytes(Object o, byte type) throws IOException {
        if (o == null) {
            return null;
        }
        switch (type) {
            case 50: {
                return ((DataByteArray)o).get();
            }
            case 120: {
                return this.caster.toBytes((DataBag)o);
            }
            case 55: {
                return this.caster.toBytes((String)o);
            }
            case 25: {
                return this.caster.toBytes((Double)o);
            }
            case 20: {
                return this.caster.toBytes((Float)o);
            }
            case 10: {
                return this.caster.toBytes((Integer)o);
            }
            case 15: {
                return this.caster.toBytes((Long)o);
            }
            case 65: {
                return this.caster.toBytes((BigInteger)o);
            }
            case 70: {
                return this.caster.toBytes((BigDecimal)o);
            }
            case 5: {
                return this.caster.toBytes((Boolean)o);
            }
            case 30: {
                return this.caster.toBytes((DateTime)o);
            }
            case 100: {
                return this.caster.toBytes((Map)o);
            }
            case 1: {
                return null;
            }
            case 110: {
                return this.caster.toBytes((Tuple)o);
            }
            case -1: {
                throw new IOException("Unable to determine type of " + o.getClass());
            }
        }
        throw new IOException("Unable to find a converter for tuple field " + o);
    }

    @Override
    public LoadCaster getLoadCaster() throws IOException {
        return this.caster;
    }
}

