/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.event;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.event.EventQueue;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.util.XLog;

public class MemoryEventQueue
implements EventQueue {
    private static ConcurrentLinkedQueue<EventQueue.EventQueueElement> eventQueue;
    private static AtomicInteger currentSize;
    private static int maxSize;
    private static XLog LOG;
    private static int batchSize;

    @Override
    public void init(Configuration conf) {
        eventQueue = new ConcurrentLinkedQueue();
        maxSize = ConfigurationService.getInt(conf, "oozie.service.EventHandlerService.queue.size");
        currentSize = new AtomicInteger();
        batchSize = ConfigurationService.getInt(conf, "oozie.service.EventHandlerService.batch.size");
        LOG = XLog.getLog(this.getClass());
        LOG.info("Memory Event Queue initialized with Max size = [{0}], Batch drain size = [{1}]", maxSize, batchSize);
    }

    @Override
    public int getBatchSize() {
        return batchSize;
    }

    @Override
    public void add(Event e) {
        EventQueue.EventQueueElement eqe = new EventQueue.EventQueueElement(e);
        try {
            if (this.size() <= maxSize) {
                if (eventQueue.add(eqe)) {
                    currentSize.incrementAndGet();
                }
            } else {
                LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", this.size(), e);
            }
        }
        catch (IllegalStateException ise) {
            LOG.warn("Unable to add event due to " + ise);
        }
    }

    @Override
    public List<Event> pollBatch() {
        ArrayList<Event> eventBatch = new ArrayList<Event>();
        for (int i = 0; i < batchSize; ++i) {
            EventQueue.EventQueueElement polled = eventQueue.poll();
            if (polled == null) {
                LOG.trace("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize);
                break;
            }
            currentSize.decrementAndGet();
            eventBatch.add(polled.event);
        }
        return eventBatch;
    }

    @Override
    public Event poll() {
        EventQueue.EventQueueElement polled = eventQueue.poll();
        if (polled != null) {
            currentSize.decrementAndGet();
            return polled.event;
        }
        return null;
    }

    @Override
    public boolean isEmpty() {
        return this.size() == 0;
    }

    @Override
    public int size() {
        return currentSize.intValue();
    }

    @Override
    public Event peek() {
        EventQueue.EventQueueElement peeked = eventQueue.peek();
        if (peeked != null) {
            return peeked.event;
        }
        return null;
    }

    @Override
    public void clear() {
        eventQueue.clear();
        currentSize.set(0);
    }
}

