/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.MappingRule;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;

public class HCatAccessorService
implements Service {
    public static final String CONF_PREFIX = "oozie.service.HCatAccessorService.";
    public static final String JMS_CONNECTIONS_PROPERTIES = "oozie.service.HCatAccessorService.jmsconnections";
    public static final String HCAT_CONFIGURATION = "oozie.service.HCatAccessorService.hcat.configuration";
    private static XLog LOG;
    private static String DELIMITER;
    private Configuration conf;
    private JMSAccessorService jmsService;
    private List<MappingRule> mappingRules;
    private JMSConnectionInfo defaultJMSConnInfo;
    private Configuration hcatConf;
    private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
    private Set<String> nonJMSPublishers;
    private Map<String, String> registeredTopicsMap;

    @Override
    public void init(Services services) throws ServiceException {
        LOG = XLog.getLog(this.getClass());
        this.conf = services.getConf();
        this.jmsService = services.get(JMSAccessorService.class);
        this.initializeMappingRules();
        this.nonJMSPublishers = new HashSet<String>();
        this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
        this.registeredTopicsMap = new HashMap<String, String>();
        try {
            this.loadHCatConf(services);
        }
        catch (IOException ioe) {
            throw new ServiceException(ErrorCode.E0100, HCatAccessorService.class.getName(), "An exception occured while attemptingto load the HCat Configuration", ioe);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadHCatConf(Services services) throws IOException {
        String path = this.conf.get(HCAT_CONFIGURATION);
        if (path != null) {
            if (path.startsWith("hdfs")) {
                Path p = new Path(path);
                HadoopAccessorService has = services.get(HadoopAccessorService.class);
                try {
                    FileSystem fs = has.createFileSystem(System.getProperty("user.name"), p.toUri(), (Configuration)has.createJobConf(p.toUri().getAuthority()));
                    if (fs.exists(p)) {
                        FSDataInputStream is = null;
                        try {
                            is = fs.open(p);
                            this.hcatConf = new XConfiguration((InputStream)is);
                        }
                        finally {
                            if (is != null) {
                                is.close();
                            }
                        }
                        LOG.info("Loaded HCat Configuration: " + path);
                    }
                    LOG.warn("HCat Configuration could not be found at [" + path + "]");
                }
                catch (HadoopAccessorException hae) {
                    throw new IOException(hae);
                }
            } else {
                File f = new File(path);
                if (f.exists()) {
                    FileInputStream is = null;
                    try {
                        is = new FileInputStream(f);
                        this.hcatConf = new XConfiguration(is);
                    }
                    finally {
                        if (is != null) {
                            ((InputStream)is).close();
                        }
                    }
                    LOG.info("Loaded HCat Configuration: " + path);
                } else {
                    LOG.warn("HCat Configuration could not be found at [" + path + "]");
                }
            }
        } else {
            LOG.info("HCat Configuration not specified");
        }
    }

    public Configuration getHCatConf() {
        return this.hcatConf;
    }

    private void initializeMappingRules() {
        String[] connections = ConfigurationService.getStrings(this.conf, JMS_CONNECTIONS_PROPERTIES);
        if (connections != null) {
            this.mappingRules = new ArrayList<MappingRule>(connections.length);
            for (String connection : connections) {
                String[] values = connection.split("=", 2);
                String key = values[0].trim();
                String value = values[1].trim();
                if (key.equals("default")) {
                    this.defaultJMSConnInfo = new JMSConnectionInfo(value);
                    continue;
                }
                this.mappingRules.add(new MappingRule(key, value));
            }
        } else {
            LOG.warn("No JMS connection defined");
        }
    }

    public boolean isKnownPublisher(URI sourceURI) {
        if (this.nonJMSPublishers.contains(sourceURI.getAuthority())) {
            return true;
        }
        JMSConnectionInfo connInfo = this.publisherJMSConnInfoMap.get(sourceURI.getAuthority());
        return connInfo == null ? this.getJMSConnectionInfo(sourceURI) != null : true;
    }

    public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
        String publisherAuthority = publisherURI.getAuthority();
        JMSConnectionInfo connInfo = null;
        if (this.publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
            connInfo = this.publisherJMSConnInfoMap.get(publisherAuthority);
        } else {
            String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
            for (MappingRule mr : this.mappingRules) {
                String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
                if (jndiPropertiesString == null) continue;
                connInfo = new JMSConnectionInfo(jndiPropertiesString);
                this.publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
                LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
                break;
            }
            if (connInfo == null && this.defaultJMSConnInfo != null) {
                connInfo = this.defaultJMSConnInfo;
                this.publisherJMSConnInfoMap.put(publisherAuthority, this.defaultJMSConnInfo);
                LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
            } else {
                this.nonJMSPublishers.add(publisherAuthority);
                LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
            }
        }
        return connInfo;
    }

    public boolean isRegisteredForNotification(HCatURI hcatURI) {
        return this.registeredTopicsMap.containsKey(this.getKeyForRegisteredTopicsMap(hcatURI));
    }

    public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
        JMSConnectionInfo connInfo = this.getJMSConnectionInfo(hcatURI.getURI());
        this.jmsService.registerForNotification(connInfo, topic, msgHandler);
        this.registeredTopicsMap.put(this.getKeyForRegisteredTopicsMap(hcatURI), topic);
    }

    public void unregisterFromNotification(HCatURI hcatURI) {
        String topic = this.registeredTopicsMap.remove(this.getKeyForRegisteredTopicsMap(hcatURI));
        if (topic != null) {
            JMSConnectionInfo connInfo = this.getJMSConnectionInfo(hcatURI.getURI());
            this.jmsService.unregisterFromNotification(connInfo, topic);
        }
    }

    public void unregisterFromNotification(String server, String database, String table) {
        String key = server + DELIMITER + database + DELIMITER + table;
        String topic = this.registeredTopicsMap.remove(key);
        if (topic != null) {
            try {
                JMSConnectionInfo connInfo = this.getJMSConnectionInfo(new URI("hcat://" + server));
                this.jmsService.unregisterFromNotification(connInfo, topic);
            }
            catch (URISyntaxException e) {
                LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
            }
        }
    }

    private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
        return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
    }

    @Override
    public void destroy() {
        this.publisherJMSConnInfoMap.clear();
    }

    @Override
    public Class<? extends Service> getInterface() {
        return HCatAccessorService.class;
    }

    static {
        DELIMITER = "#";
    }
}

