/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import com.google.common.annotations.VisibleForTesting;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.dependency.hcat.HCatDependencyCache;
import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JobsConcurrencyService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;

public class PartitionDependencyManagerService
implements Service {
    public static final String CONF_PREFIX = "oozie.service.PartitionDependencyManagerService.";
    public static final String CACHE_MANAGER_IMPL = "oozie.service.PartitionDependencyManagerService.cache.manager.impl";
    public static final String CACHE_PURGE_INTERVAL = "oozie.service.PartitionDependencyManagerService.cache.purge.interval";
    public static final String CACHE_PURGE_TTL = "oozie.service.PartitionDependencyManagerService.cache.purge.ttl";
    private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
    private HCatDependencyCache dependencyCache;
    private ConcurrentMap<String, Long> registeredCoordActionMap;
    private boolean purgeEnabled = false;

    @Override
    public void init(Services services) throws ServiceException {
        this.init(services.getConf());
    }

    private void init(Configuration conf) throws ServiceException {
        Class defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
        this.dependencyCache = defaultClass == null ? new SimpleHCatDependencyCache() : (HCatDependencyCache)ReflectionUtils.newInstance((Class)defaultClass, null);
        this.dependencyCache.init(conf);
        LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", this.dependencyCache.getClass().getName());
        this.purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode();
        if (this.purgeEnabled) {
            CachePurgeWorker purgeThread = new CachePurgeWorker(this.dependencyCache);
            Services.get().get(SchedulerService.class).schedule(purgeThread, 10L, (long)Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), SchedulerService.Unit.SEC);
            this.registeredCoordActionMap = new ConcurrentHashMap<String, Long>();
        }
    }

    @Override
    public void destroy() {
        this.dependencyCache.destroy();
    }

    @Override
    public Class<? extends Service> getInterface() {
        return PartitionDependencyManagerService.class;
    }

    public void addMissingDependency(HCatURI hcatURI, String actionID) {
        if (this.purgeEnabled) {
            this.registeredCoordActionMap.put(actionID, new Date().getTime());
        }
        this.dependencyCache.addMissingDependency(hcatURI, actionID);
    }

    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
        return this.dependencyCache.removeMissingDependency(hcatURI, actionID);
    }

    public Collection<String> getWaitingActions(HCatURI hcatURI) {
        return this.dependencyCache.getWaitingActions(hcatURI);
    }

    public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
        Collection<String> actionsWithAvailableDep = this.dependencyCache.markDependencyAvailable(server, db, table, partitions);
        if (actionsWithAvailableDep != null) {
            for (String actionID : actionsWithAvailableDep) {
                boolean ret = Services.get().get(CallableQueueService.class).queue(new CoordActionUpdatePushMissingDependency(actionID), 100L);
                if (ret) continue;
                XLog.getLog(this.getClass()).warn("Unable to queue the callable commands for PartitionDependencyManagerService for actionID " + actionID + ".Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
            }
        }
    }

    public Collection<String> getAvailableDependencyURIs(String actionID) {
        return this.dependencyCache.getAvailableDependencyURIs(actionID);
    }

    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
        return this.dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
    }

    public void removeCoordActionWithDependenciesAvailable(String actionID) {
        if (this.purgeEnabled) {
            this.registeredCoordActionMap.remove(actionID);
        }
        this.dependencyCache.removeCoordActionWithDependenciesAvailable(actionID);
    }

    @VisibleForTesting
    public void runCachePurgeWorker() {
        new CachePurgeWorker(this.dependencyCache).run();
    }

    private class CachePurgeWorker
    implements Runnable {
        HCatDependencyCache cache;

        public CachePurgeWorker(HCatDependencyCache cache) {
            this.cache = cache;
        }

        @Override
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            try {
                this.purgeMissingDependency(Services.get().getConf().getInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 1800));
            }
            catch (Throwable error) {
                XLog.getLog(PartitionDependencyManagerService.class).debug((Object)"Throwable in CachePurgeWorker thread run : ", error);
            }
        }

        private void purgeMissingDependency(int timeToLive) {
            long currentTime = new Date().getTime();
            HashSet<String> staleActions = new HashSet<String>();
            Iterator actionItr = PartitionDependencyManagerService.this.registeredCoordActionMap.keySet().iterator();
            while (actionItr.hasNext()) {
                String actionId = (String)actionItr.next();
                Long regTime = (Long)PartitionDependencyManagerService.this.registeredCoordActionMap.get(actionId);
                if (regTime >= currentTime - (long)(timeToLive * 1000)) continue;
                CoordinatorActionBean caBean = null;
                try {
                    caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
                }
                catch (JPAExecutorException e) {
                    if (e.getErrorCode() == ErrorCode.E0605) {
                        LOG.info(MessageFormat.format("Coord action {0} is not in database, deleting it from cache", actionId));
                        staleActions.add(actionId);
                        actionItr.remove();
                    }
                    LOG.warn((Object)("Error in checking coord action:" + actionId + "to purge, skipping"), e);
                }
                if (caBean == null || caBean.getStatus().equals((Object)CoordinatorAction.Status.WAITING)) continue;
                staleActions.add(actionId);
                actionItr.remove();
            }
            PartitionDependencyManagerService.this.dependencyCache.removeNonWaitingCoordActions(staleActions);
        }
    }
}

