/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.ael.adapters;

import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.engine.api.Engine;
import org.pentaho.di.engine.api.ExecutionContext;
import org.pentaho.di.engine.api.ExecutionResult;
import org.pentaho.di.engine.api.events.PDIEvent;
import org.pentaho.di.engine.api.model.LogicalModelElement;
import org.pentaho.di.engine.api.model.Operation;
import org.pentaho.di.engine.api.model.Transformation;
import org.pentaho.di.engine.api.reporting.LogEntry;
import org.pentaho.di.engine.api.reporting.Status;
import org.pentaho.di.engine.model.ActingPrincipal;
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.ael.adapters.StepDataInterfaceEngineAdapter;
import org.pentaho.di.trans.ael.adapters.StepInterfaceEngineAdapter;
import org.pentaho.di.trans.ael.adapters.TransMetaConverter;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaDataCombi;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class TransEngineAdapter
extends Trans {
    public static final String ANONYMOUS_PRINCIPAL = "anonymous";
    private final Transformation transformation;
    private final ExecutionContext executionContext;
    private CompletableFuture<ExecutionResult> executionResultFuture;
    public static final Map<LogLevel, org.pentaho.di.engine.api.reporting.LogLevel> LEVEL_MAP = new HashMap<LogLevel, org.pentaho.di.engine.api.reporting.LogLevel>();

    public TransEngineAdapter(Engine engine, TransMeta transMeta) {
        this.transformation = TransMetaConverter.convert(transMeta);
        this.executionContext = engine.prepare(this.transformation);
        this.executionContext.setActingPrincipal(this.getActingPrincipal(transMeta));
        this.transMeta = transMeta;
    }

    @Override
    public void setLogLevel(LogLevel logLogLevel) {
        this.executionContext.setLoggingLogLevel(LEVEL_MAP.getOrDefault(logLogLevel, org.pentaho.di.engine.api.reporting.LogLevel.MINIMAL));
    }

    @Override
    public void killAll() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void prepareExecution(String[] arguments) throws KettleException {
        this.activateParameters();
        this.transMeta.activateParameters();
        this.transMeta.setInternalKettleVariables();
        Map env = Arrays.stream(this.transMeta.listVariables()).collect(Collectors.toMap(Function.identity(), this.transMeta::getVariable));
        this.executionContext.setEnvironment(env);
        this.setSteps(new ArrayList<StepMetaDataCombi>(this.opsToSteps()));
        this.wireStatusToTransListeners();
        this.subscribeToOpLogging();
        this.executionContext.subscribe((LogicalModelElement)this.transformation, LogEntry.class, (Subscriber)new Subscriber<PDIEvent<Transformation, LogEntry>>(){

            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            public void onNext(PDIEvent<Transformation, LogEntry> event) {
                LogEntry data = (LogEntry)event.getData();
                TransEngineAdapter.this.logToChannel(TransEngineAdapter.this.getLogChannel(), data);
            }

            public void onError(Throwable throwable) {
            }

            public void onComplete() {
            }
        });
        this.setReadyToStart(true);
    }

    private void logToChannel(LogChannelInterface logChannel, LogEntry data) {
        org.pentaho.di.engine.api.reporting.LogLevel logLogLevel = data.getLogLogLevel();
        switch (logLogLevel) {
            case ERROR: {
                logChannel.logError(data.getMessage());
                break;
            }
            case MINIMAL: {
                logChannel.logMinimal(data.getMessage());
                break;
            }
            case BASIC: {
                logChannel.logBasic(data.getMessage());
                break;
            }
            case DETAILED: {
                logChannel.logDetailed(data.getMessage());
                break;
            }
            case DEBUG: {
                logChannel.logDebug(data.getMessage());
                break;
            }
            case TRACE: {
                logChannel.logRowlevel(data.getMessage());
            }
        }
    }

    private void subscribeToOpLogging() {
        this.transformation.getOperations().forEach(operation -> this.executionContext.subscribe((LogicalModelElement)operation, LogEntry.class, logEntry -> {
            StepInterface stepInterface = this.findStepInterface(operation.getId(), 0);
            if (stepInterface != null) {
                LogChannelInterface logChannel = stepInterface.getLogChannel();
                this.logToChannel(logChannel, (LogEntry)logEntry);
            } else {
                this.logToChannel(this.getLogChannel(), (LogEntry)logEntry);
            }
        }));
    }

    private void wireStatusToTransListeners() {
        this.executionContext.subscribe((LogicalModelElement)this.transformation, Status.class, (Subscriber)new Subscriber<PDIEvent<Transformation, Status>>(){

            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }

            public void onNext(PDIEvent<Transformation, Status> transStatusEvent) {
                TransEngineAdapter.this.addStepPerformanceSnapShot();
                TransEngineAdapter.this.getTransListeners().forEach(l -> {
                    try {
                        switch ((Status)transStatusEvent.getData()) {
                            case RUNNING: {
                                l.transStarted(TransEngineAdapter.this);
                                l.transActive(TransEngineAdapter.this);
                                break;
                            }
                            case PAUSED: {
                                break;
                            }
                            case STOPPED: {
                                break;
                            }
                            case FAILED: 
                            case FINISHED: {
                                l.transFinished(TransEngineAdapter.this);
                                TransEngineAdapter.this.setFinished(true);
                            }
                        }
                    }
                    catch (KettleException e) {
                        throw new RuntimeException(e);
                    }
                });
            }

            public void onError(Throwable t) {
                TransEngineAdapter.this.getLogChannel().logError("Error Executing Transformation", t);
                TransEngineAdapter.this.setFinished(true);
                TransEngineAdapter.this.getSteps().stream().map(stepMetaDataCombi -> stepMetaDataCombi.step).forEach(step -> {
                    step.setStopped(true);
                    step.setRunning(false);
                });
                TransEngineAdapter.this.getTransListeners().forEach(l -> {
                    try {
                        l.transFinished(TransEngineAdapter.this);
                    }
                    catch (KettleException e) {
                        TransEngineAdapter.this.getLogChannel().logError("Error notifying trans listener", (Throwable)e);
                    }
                });
            }

            public void onComplete() {
                TransEngineAdapter.this.setFinished(true);
                TransEngineAdapter.this.getTransListeners().forEach(l -> {
                    try {
                        l.transFinished(TransEngineAdapter.this);
                    }
                    catch (KettleException e) {
                        TransEngineAdapter.this.getLogChannel().logError("Error notifying trans listener", (Throwable)e);
                    }
                });
            }
        });
    }

    private Collection<StepMetaDataCombi> opsToSteps() {
        Map operationToCombi = this.transformation.getOperations().stream().collect(Collectors.toMap(Function.identity(), op -> {
            StepMetaDataCombi combi = new StepMetaDataCombi();
            combi.stepMeta = StepMeta.fromXml((String)op.getConfig().get("StepMeta"));
            combi.data = new StepDataInterfaceEngineAdapter((Operation)op, this.executionContext);
            combi.step = new StepInterfaceEngineAdapter((Operation)op, this.executionContext, combi.stepMeta, this.transMeta, combi.data, this);
            combi.meta = combi.stepMeta.getStepMetaInterface();
            combi.stepname = combi.stepMeta.getName();
            return combi;
        }));
        return operationToCombi.values();
    }

    @Override
    public void startThreads() throws KettleException {
        this.executionResultFuture = this.executionContext.execute();
    }

    @Override
    public void waitUntilFinished() {
        try {
            ExecutionResult executionResult = this.executionResultFuture.get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Waiting for transformation to be finished interrupted!", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Error executing Transformation or waiting for it to stop", e);
        }
    }

    @Override
    public RowProducer addRowProducer(String stepname, int copynr) throws KettleException {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    private Principal getActingPrincipal(TransMeta transMeta) {
        if (transMeta.getRepository() == null || transMeta.getRepository().getUserInfo() == null) {
            return new ActingPrincipal(ANONYMOUS_PRINCIPAL);
        }
        return new ActingPrincipal(transMeta.getRepository().getUserInfo().getName());
    }

    static {
        LEVEL_MAP.put(LogLevel.BASIC, org.pentaho.di.engine.api.reporting.LogLevel.BASIC);
        LEVEL_MAP.put(LogLevel.DEBUG, org.pentaho.di.engine.api.reporting.LogLevel.DEBUG);
        LEVEL_MAP.put(LogLevel.DETAILED, org.pentaho.di.engine.api.reporting.LogLevel.DETAILED);
        LEVEL_MAP.put(LogLevel.ERROR, org.pentaho.di.engine.api.reporting.LogLevel.ERROR);
        LEVEL_MAP.put(LogLevel.MINIMAL, org.pentaho.di.engine.api.reporting.LogLevel.MINIMAL);
        LEVEL_MAP.put(LogLevel.ROWLEVEL, org.pentaho.di.engine.api.reporting.LogLevel.TRACE);
    }
}

