/*
 * Decompiled with CFR 0.152.
 */
package com.espertech.esper.dataflow.core;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatementState;
import com.espertech.esper.client.EventPropertyDescriptor;
import com.espertech.esper.client.EventType;
import com.espertech.esper.client.annotation.AuditEnum;
import com.espertech.esper.client.dataflow.EPDataFlowAlreadyExistsException;
import com.espertech.esper.client.dataflow.EPDataFlowDescriptor;
import com.espertech.esper.client.dataflow.EPDataFlowInstance;
import com.espertech.esper.client.dataflow.EPDataFlowInstantiationException;
import com.espertech.esper.client.dataflow.EPDataFlowInstantiationOptions;
import com.espertech.esper.client.dataflow.EPDataFlowNotFoundException;
import com.espertech.esper.client.dataflow.EPDataFlowOperatorParameterProvider;
import com.espertech.esper.client.dataflow.EPDataFlowOperatorProviderContext;
import com.espertech.esper.client.dataflow.EPDataFlowSavedConfiguration;
import com.espertech.esper.client.dataflow.EPDataFlowState;
import com.espertech.esper.core.context.util.AgentInstanceContext;
import com.espertech.esper.core.service.EPRuntimeEventSender;
import com.espertech.esper.core.service.EPServicesContext;
import com.espertech.esper.core.service.StatementContext;
import com.espertech.esper.dataflow.annotations.DataFlowOpPropertyHolder;
import com.espertech.esper.dataflow.annotations.DataFlowOpProvideSignal;
import com.espertech.esper.dataflow.annotations.DataFlowOperator;
import com.espertech.esper.dataflow.annotations.OutputType;
import com.espertech.esper.dataflow.annotations.OutputTypes;
import com.espertech.esper.dataflow.core.DataFlowConfigurationStateService;
import com.espertech.esper.dataflow.core.DataFlowService;
import com.espertech.esper.dataflow.core.DataFlowServiceEntry;
import com.espertech.esper.dataflow.core.DataFlowStmtDesc;
import com.espertech.esper.dataflow.core.DataflowStartDesc;
import com.espertech.esper.dataflow.core.EPDataFlowInstanceImpl;
import com.espertech.esper.dataflow.core.RealizationFactoryInterface;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializateContext;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInitializeResult;
import com.espertech.esper.dataflow.interfaces.DataFlowOpInputPort;
import com.espertech.esper.dataflow.interfaces.DataFlowOpLifecycle;
import com.espertech.esper.dataflow.interfaces.DataFlowOpOutputPort;
import com.espertech.esper.dataflow.interfaces.DataFlowOperatorFactory;
import com.espertech.esper.dataflow.interfaces.DataFlowSourceOperator;
import com.espertech.esper.dataflow.runnables.GraphSourceRunnable;
import com.espertech.esper.dataflow.util.DataFlowSignalManager;
import com.espertech.esper.dataflow.util.GraphTypeDesc;
import com.espertech.esper.dataflow.util.LogicalChannel;
import com.espertech.esper.dataflow.util.LogicalChannelBinding;
import com.espertech.esper.dataflow.util.LogicalChannelBindingMethodDesc;
import com.espertech.esper.dataflow.util.LogicalChannelBindingTypePassAlong;
import com.espertech.esper.dataflow.util.LogicalChannelBindingTypePassAlongWStream;
import com.espertech.esper.dataflow.util.LogicalChannelBindingTypeUnwind;
import com.espertech.esper.dataflow.util.LogicalChannelProducingPortCompiled;
import com.espertech.esper.dataflow.util.LogicalChannelProducingPortDeclared;
import com.espertech.esper.dataflow.util.LogicalChannelUtil;
import com.espertech.esper.dataflow.util.OperatorDependencyEntry;
import com.espertech.esper.dataflow.util.OperatorMetadataDescriptor;
import com.espertech.esper.epl.annotation.AnnotationUtil;
import com.espertech.esper.epl.core.EngineImportException;
import com.espertech.esper.epl.core.EngineImportService;
import com.espertech.esper.epl.expression.core.ExprValidationException;
import com.espertech.esper.epl.spec.CreateDataFlowDesc;
import com.espertech.esper.epl.spec.CreateSchemaDesc;
import com.espertech.esper.epl.spec.GraphOperatorInput;
import com.espertech.esper.epl.spec.GraphOperatorInputNamesAlias;
import com.espertech.esper.epl.spec.GraphOperatorOutputItem;
import com.espertech.esper.epl.spec.GraphOperatorOutputItemType;
import com.espertech.esper.epl.spec.GraphOperatorSpec;
import com.espertech.esper.event.EventAdapterService;
import com.espertech.esper.event.EventTypeUtility;
import com.espertech.esper.event.arr.ObjectArrayEventType;
import com.espertech.esper.util.CollectionUtil;
import com.espertech.esper.util.DependencyGraph;
import com.espertech.esper.util.JavaClassHelper;
import com.espertech.esper.util.PopulateUtil;
import java.io.StringWriter;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DataFlowServiceImpl
implements DataFlowService {
    private static final Log log = LogFactory.getLog(DataFlowServiceImpl.class);
    private static final String EVENT_WRAPPED_TYPE = "eventbean";
    private final Map<String, DataFlowServiceEntry> graphs = new HashMap<String, DataFlowServiceEntry>();
    private final Map<String, EPDataFlowInstance> instances = new HashMap<String, EPDataFlowInstance>();
    private final EPServiceProvider epService;
    private final DataFlowConfigurationStateService configurationState;

    public DataFlowServiceImpl(EPServiceProvider epService, DataFlowConfigurationStateService configurationState) {
        this.epService = epService;
        this.configurationState = configurationState;
    }

    @Override
    public synchronized EPDataFlowDescriptor getDataFlow(String dataFlowName) {
        DataFlowServiceEntry entry = this.graphs.get(dataFlowName);
        if (entry == null) {
            return null;
        }
        return new EPDataFlowDescriptor(dataFlowName, entry.getState(), entry.getDataFlowDesc().getStatementContext().getStatementName());
    }

    @Override
    public synchronized String[] getDataFlows() {
        Set<String> names = this.graphs.keySet();
        return names.toArray(new String[names.size()]);
    }

    @Override
    public synchronized void addStartGraph(CreateDataFlowDesc desc, StatementContext statementContext, EPServicesContext servicesContext, AgentInstanceContext agentInstanceContext, boolean newStatement) throws ExprValidationException {
        this.compileTimeValidate(desc);
        DataFlowServiceEntry existing = this.graphs.get(desc.getGraphName());
        if (existing != null && (existing.getState() == EPStatementState.STARTED || newStatement)) {
            throw new ExprValidationException("Data flow by name '" + desc.getGraphName() + "' has already been declared");
        }
        if (existing != null) {
            existing.setState(EPStatementState.STARTED);
            return;
        }
        HashMap<GraphOperatorSpec, Annotation[]> operatorAnnotations = new HashMap<GraphOperatorSpec, Annotation[]>();
        for (GraphOperatorSpec spec : desc.getOperators()) {
            Annotation[] operatorAnnotation = AnnotationUtil.compileAnnotations(spec.getAnnotations(), servicesContext.getEngineImportService(), null);
            operatorAnnotations.put(spec, operatorAnnotation);
        }
        DataFlowStmtDesc stmtDesc = new DataFlowStmtDesc(desc, statementContext, servicesContext, agentInstanceContext, operatorAnnotations);
        this.graphs.put(desc.getGraphName(), new DataFlowServiceEntry(stmtDesc, EPStatementState.STARTED));
    }

    @Override
    public synchronized void stopGraph(String graphName) {
        DataFlowServiceEntry existing = this.graphs.get(graphName);
        if (existing != null && existing.getState() == EPStatementState.STARTED) {
            existing.setState(EPStatementState.STOPPED);
        }
    }

    @Override
    public synchronized void removeGraph(String graphName) {
        this.graphs.remove(graphName);
    }

    @Override
    public EPDataFlowInstance instantiate(String dataFlowName) {
        return this.instantiate(dataFlowName, null);
    }

    @Override
    public synchronized EPDataFlowInstance instantiate(String dataFlowName, EPDataFlowInstantiationOptions options) {
        DataFlowServiceEntry serviceDesc = this.graphs.get(dataFlowName);
        if (serviceDesc == null) {
            throw new EPDataFlowInstantiationException("Data flow by name '" + dataFlowName + "' has not been defined");
        }
        if (serviceDesc.getState() != EPStatementState.STARTED) {
            throw new EPDataFlowInstantiationException("Data flow by name '" + dataFlowName + "' is currently in STOPPED statement state");
        }
        DataFlowStmtDesc stmtDesc = serviceDesc.getDataFlowDesc();
        try {
            return this.instantiateInternal(dataFlowName, options, stmtDesc.getGraphDesc(), stmtDesc.getStatementContext(), stmtDesc.getServicesContext(), stmtDesc.getAgentInstanceContext(), stmtDesc.getOperatorAnnotations());
        }
        catch (Exception ex) {
            String message = "Failed to instantiate data flow '" + dataFlowName + "': " + ex.getMessage();
            log.debug((Object)message, (Throwable)ex);
            throw new EPDataFlowInstantiationException(message, ex);
        }
    }

    @Override
    public synchronized void destroy() {
        this.graphs.clear();
    }

    @Override
    public synchronized void saveConfiguration(String dataflowConfigName, String dataFlowName, EPDataFlowInstantiationOptions options) {
        DataFlowServiceEntry dataFlow = this.graphs.get(dataFlowName);
        if (dataFlow == null) {
            String message = "Failed to locate data flow '" + dataFlowName + "'";
            throw new EPDataFlowNotFoundException(message);
        }
        if (this.configurationState.exists(dataflowConfigName)) {
            String message = "Data flow saved configuration by name '" + dataflowConfigName + "' already exists";
            throw new EPDataFlowAlreadyExistsException(message);
        }
        this.configurationState.add(new EPDataFlowSavedConfiguration(dataflowConfigName, dataFlowName, options));
    }

    @Override
    public synchronized String[] getSavedConfigurations() {
        return this.configurationState.getSavedConfigNames();
    }

    @Override
    public synchronized EPDataFlowSavedConfiguration getSavedConfiguration(String configurationName) {
        return this.configurationState.getSavedConfig(configurationName);
    }

    @Override
    public synchronized EPDataFlowInstance instantiateSavedConfiguration(String configurationName) throws EPDataFlowInstantiationException {
        EPDataFlowSavedConfiguration savedConfiguration = this.configurationState.getSavedConfig(configurationName);
        if (savedConfiguration == null) {
            throw new EPDataFlowInstantiationException("Dataflow saved configuration '" + configurationName + "' could not be found");
        }
        EPDataFlowInstantiationOptions options = savedConfiguration.getOptions();
        if (options == null) {
            options = new EPDataFlowInstantiationOptions();
            options.setDataFlowInstanceId(configurationName);
        }
        return this.instantiate(savedConfiguration.getDataflowName(), options);
    }

    @Override
    public synchronized boolean removeSavedConfiguration(String configurationName) {
        return this.configurationState.removePrototype(configurationName) != null;
    }

    @Override
    public synchronized void saveInstance(String instanceName, EPDataFlowInstance instance) throws EPDataFlowAlreadyExistsException {
        if (this.instances.containsKey(instanceName)) {
            throw new EPDataFlowAlreadyExistsException("Data flow instance name '" + instanceName + "' already saved");
        }
        this.instances.put(instanceName, instance);
    }

    @Override
    public synchronized String[] getSavedInstances() {
        Set<String> instanceids = this.instances.keySet();
        return instanceids.toArray(new String[instanceids.size()]);
    }

    @Override
    public synchronized EPDataFlowInstance getSavedInstance(String instanceName) {
        return this.instances.get(instanceName);
    }

    @Override
    public synchronized boolean removeSavedInstance(String instanceName) {
        return this.instances.remove(instanceName) != null;
    }

    private EPDataFlowInstance instantiateInternal(String dataFlowName, EPDataFlowInstantiationOptions options, CreateDataFlowDesc desc, StatementContext statementContext, EPServicesContext servicesContext, AgentInstanceContext agentInstanceContext, Map<GraphOperatorSpec, Annotation[]> operatorAnnotations) throws ExprValidationException {
        if (options == null) {
            options = new EPDataFlowInstantiationOptions();
        }
        Map<String, EventType> declaredTypes = this.resolveTypes(desc, statementContext, servicesContext);
        Map<Integer, OperatorMetadataDescriptor> operatorMetadata = this.resolveMetadata(desc, options, servicesContext.getEngineImportService(), operatorAnnotations);
        Map<Integer, OperatorDependencyEntry> operatorDependencies = this.analyzeDependencies(desc);
        Set<Integer> operatorBuildOrder = this.analyzeBuildOrder(operatorDependencies);
        Map<Integer, Object> operators = this.instantiateOperators(operatorMetadata, desc, options, servicesContext.getEngineImportService());
        EPRuntimeEventSender runtimeEventSender = (EPRuntimeEventSender)((Object)this.epService.getEPRuntime());
        List<LogicalChannel> operatorChannels = this.determineChannels(dataFlowName, operatorBuildOrder, operatorDependencies, operators, declaredTypes, operatorMetadata, options, servicesContext.getEventAdapterService(), servicesContext.getEngineImportService(), statementContext, servicesContext, agentInstanceContext, runtimeEventSender);
        if (log.isDebugEnabled()) {
            log.debug((Object)("For flow '" + dataFlowName + "' channels are: " + LogicalChannelUtil.printChannels(operatorChannels)));
        }
        ArrayList<LogicalChannelBinding> operatorChannelBindings = new ArrayList<LogicalChannelBinding>();
        for (LogicalChannel channel : operatorChannels) {
            Class<?> targetClass = operators.get(channel.getConsumingOpNum()).getClass();
            LogicalChannelBindingMethodDesc consumingMethod = this.findMatchingMethod(channel.getConsumingOpPrettyPrint(), targetClass, channel, false);
            LogicalChannelBindingMethodDesc onSignalMethod = null;
            if (channel.getOutputPort().isHasPunctuation()) {
                onSignalMethod = this.findMatchingMethod(channel.getConsumingOpPrettyPrint(), targetClass, channel, true);
            }
            operatorChannelBindings.add(new LogicalChannelBinding(channel, consumingMethod, onSignalMethod));
        }
        DataFlowSignalManager dataFlowSignalManager = new DataFlowSignalManager();
        DataflowStartDesc startDesc = RealizationFactoryInterface.realize(dataFlowName, operators, operatorMetadata, operatorBuildOrder, operatorChannelBindings, dataFlowSignalManager, options, servicesContext, statementContext);
        ArrayList<GraphSourceRunnable> sourceRunnables = new ArrayList<GraphSourceRunnable>();
        boolean audit = AuditEnum.DATAFLOW_SOURCE.getAudit(statementContext.getAnnotations()) != null;
        for (Map.Entry<Integer, Object> operatorEntry : operators.entrySet()) {
            if (!(operatorEntry.getValue() instanceof DataFlowSourceOperator)) continue;
            OperatorMetadataDescriptor meta = operatorMetadata.get(operatorEntry.getKey());
            DataFlowSourceOperator graphSource = (DataFlowSourceOperator)operatorEntry.getValue();
            GraphSourceRunnable runnable = new GraphSourceRunnable(statementContext.getEngineURI(), statementContext.getStatementName(), graphSource, dataFlowName, meta.getOperatorName(), operatorEntry.getKey(), meta.getOperatorPrettyPrint(), options.getExceptionHandler(), audit);
            sourceRunnables.add(runnable);
            dataFlowSignalManager.addSignalListener(operatorEntry.getKey(), runnable);
        }
        boolean auditStates = AuditEnum.DATAFLOW_TRANSITION.getAudit(statementContext.getAnnotations()) != null;
        return new EPDataFlowInstanceImpl(servicesContext.getEngineURI(), statementContext.getStatementName(), auditStates, dataFlowName, options.getDataFlowInstanceUserObject(), options.getDataFlowInstanceId(), EPDataFlowState.INSTANTIATED, sourceRunnables, operators, operatorBuildOrder, startDesc.getStatisticsProvider(), options.getParametersURIs());
    }

    private Map<String, EventType> resolveTypes(CreateDataFlowDesc desc, StatementContext statementContext, EPServicesContext servicesContext) throws ExprValidationException {
        HashMap<String, EventType> types = new HashMap<String, EventType>();
        for (CreateSchemaDesc spec : desc.getSchemas()) {
            EventType eventType = EventTypeUtility.createNonVariantType(true, spec, statementContext.getAnnotations(), statementContext.getConfigSnapshot(), statementContext.getEventAdapterService(), servicesContext.getEngineImportService());
            types.put(spec.getSchemaName(), eventType);
        }
        return types;
    }

    private Map<Integer, Object> instantiateOperators(Map<Integer, OperatorMetadataDescriptor> operatorClasses, CreateDataFlowDesc desc, EPDataFlowInstantiationOptions options, EngineImportService engineImportService) throws ExprValidationException {
        HashMap<Integer, Object> operators = new HashMap<Integer, Object>();
        for (Map.Entry<Integer, OperatorMetadataDescriptor> operatorEntry : operatorClasses.entrySet()) {
            Object operator = this.instantiateOperator(desc.getGraphName(), operatorEntry.getKey(), operatorEntry.getValue(), desc.getOperators().get(operatorEntry.getKey()), options, engineImportService);
            operators.put(operatorEntry.getKey(), operator);
        }
        return operators;
    }

    private Object instantiateOperator(String dataFlowName, int operatorNum, OperatorMetadataDescriptor desc, GraphOperatorSpec graphOperator, EPDataFlowInstantiationOptions options, EngineImportService engineImportService) throws ExprValidationException {
        Object operatorObject = desc.getOptionalOperatorObject();
        if (operatorObject == null) {
            Class clazz = desc.getOperatorFactoryClass() != null ? desc.getOperatorFactoryClass() : desc.getOperatorClass();
            try {
                operatorObject = clazz.newInstance();
            }
            catch (Exception e) {
                throw new ExprValidationException("Failed to instantiate: " + e.getMessage());
            }
        }
        Map<String, Object> configs = graphOperator.getDetail() == null ? Collections.emptyMap() : graphOperator.getDetail().getConfigs();
        this.injectObjectProperties(dataFlowName, graphOperator.getOperatorName(), operatorNum, configs, operatorObject, options.getParameterProvider(), options.getParametersURIs(), engineImportService);
        if (operatorObject instanceof DataFlowOperatorFactory) {
            try {
                operatorObject = ((DataFlowOperatorFactory)operatorObject).create();
            }
            catch (RuntimeException ex) {
                throw new ExprValidationException("Failed to obtain operator '" + desc.getOperatorName() + "', encountered an exception raised by factory class " + operatorObject.getClass().getSimpleName() + ": " + ex.getMessage(), ex);
            }
        }
        return operatorObject;
    }

    private void injectObjectProperties(String dataFlowName, String operatorName, int operatorNum, Map<String, Object> configs, Object instance, EPDataFlowOperatorParameterProvider optionalParameterProvider, Map<String, Object> optionalParameterURIs, EngineImportService engineImportService) throws ExprValidationException {
        Object propertyInstance;
        Set<Field> propertyHolderFields = JavaClassHelper.findAnnotatedFields(instance.getClass(), DataFlowOpPropertyHolder.class);
        if (propertyHolderFields.size() > 1) {
            throw new IllegalArgumentException("May apply " + DataFlowOpPropertyHolder.class.getSimpleName() + " annotation only to a single field");
        }
        if (propertyHolderFields.isEmpty()) {
            propertyInstance = instance;
        } else {
            Class<?> propertyHolderClass = propertyHolderFields.iterator().next().getType();
            try {
                propertyInstance = propertyHolderClass.newInstance();
            }
            catch (Exception e) {
                throw new ExprValidationException("Failed to instantiate '" + propertyHolderClass + "': " + e.getMessage(), e);
            }
        }
        PopulateUtil.populateObject(operatorName, operatorNum, dataFlowName, configs, propertyInstance, engineImportService, optionalParameterProvider, optionalParameterURIs);
        if (!propertyHolderFields.isEmpty()) {
            Field field = propertyHolderFields.iterator().next();
            try {
                field.setAccessible(true);
                field.set(instance, propertyInstance);
            }
            catch (Exception e) {
                throw new ExprValidationException("Failed to set field '" + field.getName() + "': " + e.getMessage(), e);
            }
        }
    }

    private List<LogicalChannel> determineChannels(String dataflowName, Set<Integer> operatorBuildOrder, Map<Integer, OperatorDependencyEntry> operatorDependencies, Map<Integer, Object> operators, Map<String, EventType> types, Map<Integer, OperatorMetadataDescriptor> operatorMetadata, EPDataFlowInstantiationOptions options, EventAdapterService eventAdapterService, EngineImportService engineImportService, StatementContext statementContext, EPServicesContext servicesContext, AgentInstanceContext agentInstanceContext, EPRuntimeEventSender runtimeEventSender) throws ExprValidationException {
        HashMap<Integer, List<LogicalChannelProducingPortDeclared>> declaredOutputPorts = new HashMap<Integer, List<LogicalChannelProducingPortDeclared>>();
        for (int operatorNum : operatorBuildOrder) {
            OperatorMetadataDescriptor metadata = operatorMetadata.get(operatorNum);
            Object operator = operators.get(operatorNum);
            List<LogicalChannelProducingPortDeclared> annotationPorts = this.determineAnnotatedOutputPorts(operatorNum, operator, metadata, engineImportService, eventAdapterService);
            List<LogicalChannelProducingPortDeclared> graphDeclaredPorts = this.determineGraphDeclaredOutputPorts(operator, operatorNum, metadata, types, servicesContext);
            ArrayList<LogicalChannelProducingPortDeclared> allDeclaredPorts = new ArrayList<LogicalChannelProducingPortDeclared>();
            allDeclaredPorts.addAll(annotationPorts);
            allDeclaredPorts.addAll(graphDeclaredPorts);
            declaredOutputPorts.put(operatorNum, allDeclaredPorts);
        }
        HashMap<Integer, List<LogicalChannelProducingPortCompiled>> compiledOutputPorts = new HashMap<Integer, List<LogicalChannelProducingPortCompiled>>();
        for (int myOpNum : operatorBuildOrder) {
            GraphOperatorSpec operatorSpec = operatorMetadata.get(myOpNum).getOperatorSpec();
            Object operator = operators.get(myOpNum);
            OperatorMetadataDescriptor metadata = operatorMetadata.get(myOpNum);
            Set<Integer> incomingDependentOpNums = operatorDependencies.get(myOpNum).getIncoming();
            GraphTypeDesc[] typesPerOutput = this.determineOutputForInput(dataflowName, myOpNum, operator, metadata, operatorSpec, declaredOutputPorts, compiledOutputPorts, types, incomingDependentOpNums, options, statementContext, servicesContext, agentInstanceContext, runtimeEventSender);
            List<LogicalChannelProducingPortCompiled> outgoingPorts = this.determineOutgoingPorts(myOpNum, operator, operatorSpec, metadata, compiledOutputPorts, declaredOutputPorts, typesPerOutput, incomingDependentOpNums);
            compiledOutputPorts.put(myOpNum, outgoingPorts);
        }
        ArrayList<LogicalChannel> channels = new ArrayList<LogicalChannel>();
        int channelId = 0;
        for (Integer myOpNum : operatorBuildOrder) {
            OperatorDependencyEntry dependencies = operatorDependencies.get(myOpNum);
            List<GraphOperatorInputNamesAlias> inputNames = operatorMetadata.get(myOpNum).getOperatorSpec().getInput().getStreamNamesAndAliases();
            OperatorMetadataDescriptor descriptor = operatorMetadata.get(myOpNum);
            int streamNum = -1;
            for (GraphOperatorInputNamesAlias inputName : inputNames) {
                ++streamNum;
                List<LogicalChannelProducingPortCompiled> producingPorts = LogicalChannelUtil.getOutputPortByStreamName(dependencies.getIncoming(), inputName.getInputStreamNames(), compiledOutputPorts);
                if (producingPorts.size() < inputName.getInputStreamNames().length) {
                    throw new IllegalStateException("Failed to find producing ports");
                }
                if (producingPorts.size() > 1) {
                    LogicalChannelProducingPortCompiled first = producingPorts.get(0);
                    for (int i = 1; i < producingPorts.size(); ++i) {
                        LogicalChannelProducingPortCompiled other = producingPorts.get(i);
                        this.compareTypeInfo(descriptor.getOperatorName(), first.getStreamName(), first.getGraphTypeDesc(), other.getStreamName(), other.getGraphTypeDesc());
                    }
                }
                String optionalAlias = inputName.getOptionalAsName();
                for (String streamName : inputName.getInputStreamNames()) {
                    for (LogicalChannelProducingPortCompiled port : producingPorts) {
                        if (!port.getStreamName().equals(streamName)) continue;
                        LogicalChannel channel = new LogicalChannel(channelId++, descriptor.getOperatorName(), myOpNum, streamNum, streamName, optionalAlias, descriptor.getOperatorPrettyPrint(), port);
                        channels.add(channel);
                    }
                }
            }
        }
        return channels;
    }

    private void compareTypeInfo(String operatorName, String firstName, GraphTypeDesc firstType, String otherName, GraphTypeDesc otherType) throws ExprValidationException {
        if (firstType.getEventType() != null && otherType.getEventType() != null && !firstType.getEventType().equals(otherType.getEventType())) {
            throw new ExprValidationException("For operator '" + operatorName + "' stream '" + firstName + "'" + " typed '" + firstType.getEventType().getName() + "'" + " is not the same type as stream '" + otherName + "'" + " typed '" + otherType.getEventType().getName() + "'");
        }
        if (firstType.isWildcard() != otherType.isWildcard()) {
            throw new ExprValidationException("For operator '" + operatorName + "' streams '" + firstName + "'" + " and '" + otherName + "' have differing wildcard type information");
        }
        if (firstType.isUnderlying() != otherType.isUnderlying()) {
            throw new ExprValidationException("For operator '" + operatorName + "' streams '" + firstName + "'" + " and '" + otherName + "' have differing underlying information");
        }
    }

    private List<LogicalChannelProducingPortCompiled> determineOutgoingPorts(int myOpNum, Object operator, GraphOperatorSpec operatorSpec, OperatorMetadataDescriptor metadata, Map<Integer, List<LogicalChannelProducingPortCompiled>> compiledOutputPorts, Map<Integer, List<LogicalChannelProducingPortDeclared>> declaredOutputPorts, GraphTypeDesc[] typesPerOutput, Set<Integer> incomingDependentOpNums) throws ExprValidationException {
        int numPorts = operatorSpec.getOutput().getItems().size();
        ArrayList<LogicalChannelProducingPortCompiled> result = new ArrayList<LogicalChannelProducingPortCompiled>();
        HashMap<String, GraphTypeDesc> types = new HashMap<String, GraphTypeDesc>();
        for (int port = 0; port < numPorts; ++port) {
            String portStreamName = operatorSpec.getOutput().getItems().get(port).getStreamName();
            LogicalChannelProducingPortDeclared foundDeclared = null;
            List<LogicalChannelProducingPortDeclared> declaredList = declaredOutputPorts.get(myOpNum);
            for (LogicalChannelProducingPortDeclared declared : declaredList) {
                if (declared.getStreamNumber() != port) continue;
                if (foundDeclared != null) {
                    throw new ExprValidationException("Found a declaration twice for port " + port);
                }
                foundDeclared = declared;
            }
            if (foundDeclared == null && (typesPerOutput == null || typesPerOutput.length <= port || typesPerOutput[port] == null)) {
                throw new ExprValidationException("Operator neither declares an output type nor provided by the operator itself in a 'prepare' method");
            }
            if (foundDeclared != null && typesPerOutput != null && typesPerOutput.length > port && typesPerOutput[port] != null) {
                throw new ExprValidationException("Operator both declares an output type and provided a type in the 'prepare' method");
            }
            boolean hasPunctuationSignal = foundDeclared != null && foundDeclared.isHasPunctuation() || this.determineReceivesPunctuation(incomingDependentOpNums, operatorSpec.getInput(), compiledOutputPorts);
            GraphTypeDesc compiledType = foundDeclared != null ? foundDeclared.getTypeDesc() : typesPerOutput[port];
            LogicalChannelProducingPortCompiled compiled = new LogicalChannelProducingPortCompiled(myOpNum, metadata.getOperatorPrettyPrint(), portStreamName, port, compiledType, hasPunctuationSignal);
            result.add(compiled);
            GraphTypeDesc existingType = (GraphTypeDesc)types.get(portStreamName);
            types.put(portStreamName, compiledType);
            if (existingType == null) continue;
            this.compareTypeInfo(operatorSpec.getOperatorName(), portStreamName, existingType, portStreamName, compiledType);
        }
        return result;
    }

    private boolean determineReceivesPunctuation(Set<Integer> incomingDependentOpNums, GraphOperatorInput input, Map<Integer, List<LogicalChannelProducingPortCompiled>> compiledOutputPorts) {
        for (GraphOperatorInputNamesAlias inputItem : input.getStreamNamesAndAliases()) {
            List<LogicalChannelProducingPortCompiled> list = LogicalChannelUtil.getOutputPortByStreamName(incomingDependentOpNums, inputItem.getInputStreamNames(), compiledOutputPorts);
            for (LogicalChannelProducingPortCompiled port : list) {
                if (!port.isHasPunctuation()) continue;
                return true;
            }
        }
        return false;
    }

    private GraphTypeDesc[] determineOutputForInput(String dataFlowName, int myOpNum, Object operator, OperatorMetadataDescriptor meta, GraphOperatorSpec operatorSpec, Map<Integer, List<LogicalChannelProducingPortDeclared>> declaredOutputPorts, Map<Integer, List<LogicalChannelProducingPortCompiled>> compiledOutputPorts, Map<String, EventType> types, Set<Integer> incomingDependentOpNums, EPDataFlowInstantiationOptions options, StatementContext statementContext, EPServicesContext servicesContext, AgentInstanceContext agentInstanceContext, EPRuntimeEventSender runtimeEventSender) throws ExprValidationException {
        DataFlowOpInitializeResult prepareResult;
        if (!(operator instanceof DataFlowOpLifecycle)) {
            return null;
        }
        int numDeclared = operatorSpec.getInput().getStreamNamesAndAliases().size();
        LinkedHashMap<Integer, DataFlowOpInputPort> inputPorts = new LinkedHashMap<Integer, DataFlowOpInputPort>();
        for (int inputPortNum = 0; inputPortNum < numDeclared; ++inputPortNum) {
            DataFlowOpInputPort port;
            GraphOperatorInputNamesAlias inputItem = operatorSpec.getInput().getStreamNamesAndAliases().get(inputPortNum);
            List<LogicalChannelProducingPortCompiled> producingPorts = LogicalChannelUtil.getOutputPortByStreamName(incomingDependentOpNums, inputItem.getInputStreamNames(), compiledOutputPorts);
            if (producingPorts.isEmpty()) {
                List<LogicalChannelProducingPortDeclared> declareds = declaredOutputPorts.get(myOpNum);
                if (declareds == null || declareds.isEmpty()) {
                    throw new ExprValidationException("Failed validation for operator '" + operatorSpec.getOperatorName() + "': No output ports declared");
                }
                LogicalChannelProducingPortDeclared foundDeclared = null;
                for (LogicalChannelProducingPortDeclared declared : declareds) {
                    if (!Arrays.asList(inputItem.getInputStreamNames()).contains(declared.getStreamName())) continue;
                    foundDeclared = declared;
                    break;
                }
                if (foundDeclared == null) {
                    throw new ExprValidationException("Failed validation for operator '" + operatorSpec.getOperatorName() + "': Failed to find output port declared");
                }
                port = new DataFlowOpInputPort(foundDeclared.getTypeDesc(), new HashSet<String>(Arrays.asList(inputItem.getInputStreamNames())), inputItem.getOptionalAsName(), false);
            } else {
                port = new DataFlowOpInputPort(new GraphTypeDesc(false, false, producingPorts.get(0).getGraphTypeDesc().getEventType()), new HashSet<String>(Arrays.asList(inputItem.getInputStreamNames())), inputItem.getOptionalAsName(), producingPorts.get(0).isHasPunctuation());
            }
            inputPorts.put(inputPortNum, port);
        }
        Map<Integer, DataFlowOpOutputPort> outputPorts = DataFlowServiceImpl.getDeclaredOutputPorts(operatorSpec, types, servicesContext);
        EPRuntimeEventSender dfRuntimeEventSender = runtimeEventSender;
        if (options.getSurrogateEventSender() != null) {
            dfRuntimeEventSender = options.getSurrogateEventSender();
        }
        DataFlowOpLifecycle preparable = (DataFlowOpLifecycle)operator;
        DataFlowOpInitializateContext context = new DataFlowOpInitializateContext(dataFlowName, options.getDataFlowInstanceId(), options.getDataFlowInstanceUserObject(), inputPorts, outputPorts, statementContext, servicesContext, agentInstanceContext, dfRuntimeEventSender, this.epService, meta.getOperatorAnnotations());
        try {
            prepareResult = preparable.initialize(context);
        }
        catch (ExprValidationException e) {
            throw new ExprValidationException("Failed validation for operator '" + operatorSpec.getOperatorName() + "': " + e.getMessage(), e);
        }
        catch (Exception e) {
            throw new ExprValidationException("Failed initialization for operator '" + operatorSpec.getOperatorName() + "': " + e.getMessage(), e);
        }
        if (prepareResult == null) {
            return null;
        }
        return prepareResult.getTypeDescriptors();
    }

    private List<LogicalChannelProducingPortDeclared> determineAnnotatedOutputPorts(int producingOpNum, Object operator, OperatorMetadataDescriptor descriptor, EngineImportService engineImportService, EventAdapterService eventAdapterService) throws ExprValidationException {
        ArrayList<LogicalChannelProducingPortDeclared> ports = new ArrayList<LogicalChannelProducingPortDeclared>();
        List<Annotation> annotations = JavaClassHelper.getAnnotations(OutputTypes.class, operator.getClass().getDeclaredAnnotations());
        for (Annotation annotation : annotations) {
            OutputType[] outputTypeArr;
            OutputTypes outputTypes = (OutputTypes)annotation;
            LinkedHashMap<String, Object> propertiesRaw = new LinkedHashMap<String, Object>();
            for (OutputType outputType : outputTypeArr = outputTypes.value()) {
                Class clazz;
                if (outputType.type() != null && outputType.type() != OutputType.class) {
                    clazz = outputType.type();
                } else {
                    String typeName = outputType.typeName();
                    clazz = JavaClassHelper.getClassForSimpleName(typeName);
                    if (clazz == null) {
                        try {
                            clazz = engineImportService.resolveClass(typeName, false);
                        }
                        catch (EngineImportException e) {
                            throw new RuntimeException("Failed to resolve type '" + typeName + "'");
                        }
                    }
                }
                propertiesRaw.put(outputType.name(), clazz);
            }
            Map<String, Object> propertiesCompiled = EventTypeUtility.compileMapTypeProperties(propertiesRaw, eventAdapterService);
            EventType eventType = eventAdapterService.createAnonymousObjectArrayType("TYPE_" + operator.getClass(), propertiesCompiled);
            List<GraphOperatorOutputItem> declaredOutput = descriptor.getOperatorSpec().getOutput().getItems();
            if (declaredOutput.isEmpty()) {
                throw new ExprValidationException("No output stream declared");
            }
            if (declaredOutput.size() < outputTypes.portNumber()) {
                throw new ExprValidationException("No output stream declared for this port");
            }
            String streamName = declaredOutput.get(outputTypes.portNumber()).getStreamName();
            boolean isDeclaredPunctuated = JavaClassHelper.isAnnotationListed(DataFlowOpProvideSignal.class, operator.getClass().getAnnotations());
            LogicalChannelProducingPortDeclared port = new LogicalChannelProducingPortDeclared(producingOpNum, descriptor.getOperatorPrettyPrint(), streamName, outputTypes.portNumber(), new GraphTypeDesc(false, false, eventType), isDeclaredPunctuated);
            ports.add(port);
        }
        return ports;
    }

    private List<LogicalChannelProducingPortDeclared> determineGraphDeclaredOutputPorts(Object operator, int producingOpNum, OperatorMetadataDescriptor metadata, Map<String, EventType> types, EPServicesContext servicesContext) throws ExprValidationException {
        ArrayList<LogicalChannelProducingPortDeclared> ports = new ArrayList<LogicalChannelProducingPortDeclared>();
        int portNumber = 0;
        for (GraphOperatorOutputItem outputItem : metadata.getOperatorSpec().getOutput().getItems()) {
            if (outputItem.getTypeInfo().size() > 1) {
                throw new ExprValidationException("Multiple parameter types are not supported");
            }
            if (!outputItem.getTypeInfo().isEmpty()) {
                GraphTypeDesc typeDesc = DataFlowServiceImpl.determineTypeOutputPort(outputItem.getTypeInfo().get(0), types, servicesContext);
                boolean isDeclaredPunctuated = JavaClassHelper.isAnnotationListed(DataFlowOpProvideSignal.class, operator.getClass().getAnnotations());
                ports.add(new LogicalChannelProducingPortDeclared(producingOpNum, metadata.getOperatorPrettyPrint(), outputItem.getStreamName(), portNumber, typeDesc, isDeclaredPunctuated));
            }
            ++portNumber;
        }
        return ports;
    }

    private Map<Integer, OperatorDependencyEntry> analyzeDependencies(CreateDataFlowDesc graphDesc) throws ExprValidationException {
        OperatorDependencyEntry entry;
        HashMap<Integer, OperatorDependencyEntry> logicalOpDependencies = new HashMap<Integer, OperatorDependencyEntry>();
        for (int i = 0; i < graphDesc.getOperators().size(); ++i) {
            entry = new OperatorDependencyEntry();
            logicalOpDependencies.put(i, entry);
        }
        for (int consumingOpNum = 0; consumingOpNum < graphDesc.getOperators().size(); ++consumingOpNum) {
            entry = (OperatorDependencyEntry)logicalOpDependencies.get(consumingOpNum);
            GraphOperatorSpec op = graphDesc.getOperators().get(consumingOpNum);
            for (GraphOperatorInputNamesAlias input : op.getInput().getStreamNamesAndAliases()) {
                for (String inputStreamName : input.getInputStreamNames()) {
                    boolean found = false;
                    for (int providerOpNum = 0; providerOpNum < graphDesc.getOperators().size(); ++providerOpNum) {
                        GraphOperatorSpec from = graphDesc.getOperators().get(providerOpNum);
                        for (GraphOperatorOutputItem outputItem : from.getOutput().getItems()) {
                            if (!outputItem.getStreamName().equals(inputStreamName)) continue;
                            found = true;
                            entry.addIncoming(providerOpNum);
                            ((OperatorDependencyEntry)logicalOpDependencies.get(providerOpNum)).addOutgoing(consumingOpNum);
                        }
                    }
                    if (found) continue;
                    throw new ExprValidationException("Input stream '" + inputStreamName + "' consumed by operator '" + op.getOperatorName() + "' could not be found");
                }
            }
        }
        return logicalOpDependencies;
    }

    private Map<Integer, OperatorMetadataDescriptor> resolveMetadata(CreateDataFlowDesc graphDesc, EPDataFlowInstantiationOptions options, EngineImportService engineImportService, Map<GraphOperatorSpec, Annotation[]> operatorAnnotations) throws ExprValidationException {
        HashMap<Integer, OperatorMetadataDescriptor> operatorClasses = new HashMap<Integer, OperatorMetadataDescriptor>();
        for (int i = 0; i < graphDesc.getOperators().size(); ++i) {
            Class clazz;
            OperatorMetadataDescriptor descriptor;
            Object operator;
            GraphOperatorSpec operatorSpec = graphDesc.getOperators().get(i);
            String operatorPrettyPrint = this.toPrettyPrint(i, operatorSpec);
            Annotation[] operatorAnnotation = operatorAnnotations.get(operatorSpec);
            if (options.getOperatorProvider() != null && (operator = options.getOperatorProvider().provide(new EPDataFlowOperatorProviderContext(graphDesc.getGraphName(), operatorSpec.getOperatorName(), operatorSpec))) != null) {
                descriptor = new OperatorMetadataDescriptor(operatorSpec, i, operator.getClass(), null, operator, operatorPrettyPrint, operatorAnnotation);
                operatorClasses.put(i, descriptor);
                continue;
            }
            Class factoryClass = null;
            try {
                factoryClass = engineImportService.resolveClass(operatorSpec.getOperatorName() + "Factory", false);
            }
            catch (EngineImportException e) {
                // empty catch block
            }
            if (factoryClass != null && JavaClassHelper.isImplementsInterface(factoryClass, DataFlowOperatorFactory.class)) {
                descriptor = new OperatorMetadataDescriptor(operatorSpec, i, null, factoryClass, null, operatorPrettyPrint, operatorAnnotation);
                operatorClasses.put(i, descriptor);
                continue;
            }
            try {
                clazz = engineImportService.resolveClass(operatorSpec.getOperatorName(), false);
            }
            catch (EngineImportException e) {
                throw new ExprValidationException("Failed to resolve operator '" + operatorSpec.getOperatorName() + "': " + e.getMessage(), e);
            }
            if (!JavaClassHelper.isImplementsInterface(clazz, DataFlowSourceOperator.class) && !JavaClassHelper.isAnnotationListed(DataFlowOperator.class, clazz.getDeclaredAnnotations())) {
                throw new ExprValidationException("Failed to resolve operator '" + operatorSpec.getOperatorName() + "', operator class " + clazz.getName() + " does not declare the " + DataFlowOperator.class.getSimpleName() + " annotation or implement the " + DataFlowSourceOperator.class.getSimpleName() + " interface");
            }
            OperatorMetadataDescriptor descriptor2 = new OperatorMetadataDescriptor(operatorSpec, i, clazz, null, null, operatorPrettyPrint, operatorAnnotation);
            operatorClasses.put(i, descriptor2);
        }
        return operatorClasses;
    }

    private String toPrettyPrint(int operatorNum, GraphOperatorSpec spec) {
        StringWriter writer = new StringWriter();
        writer.write(spec.getOperatorName());
        writer.write("#");
        writer.write(Integer.toString(operatorNum));
        writer.write("(");
        String delimiter = "";
        for (GraphOperatorInputNamesAlias inputItem : spec.getInput().getStreamNamesAndAliases()) {
            writer.write(delimiter);
            this.toPrettyPrintInput(inputItem, writer);
            if (inputItem.getOptionalAsName() != null) {
                writer.write(" as ");
                writer.write(inputItem.getOptionalAsName());
            }
            delimiter = ", ";
        }
        writer.write(")");
        if (spec.getOutput().getItems().isEmpty()) {
            return writer.toString();
        }
        writer.write(" -> ");
        delimiter = "";
        for (GraphOperatorOutputItem outputItem : spec.getOutput().getItems()) {
            writer.write(delimiter);
            writer.write(outputItem.getStreamName());
            this.writeTypes(outputItem.getTypeInfo(), writer);
            delimiter = ",";
        }
        return writer.toString();
    }

    private void toPrettyPrintInput(GraphOperatorInputNamesAlias inputItem, StringWriter writer) {
        if (inputItem.getInputStreamNames().length == 1) {
            writer.write(inputItem.getInputStreamNames()[0]);
        } else {
            writer.write("(");
            String delimiterNames = "";
            for (String name : inputItem.getInputStreamNames()) {
                writer.write(delimiterNames);
                writer.write(name);
                delimiterNames = ",";
            }
            writer.write(")");
        }
    }

    private void writeTypes(List<GraphOperatorOutputItemType> types, StringWriter writer) {
        if (types.isEmpty()) {
            return;
        }
        writer.write("<");
        String typeDelimiter = "";
        for (GraphOperatorOutputItemType type : types) {
            writer.write(typeDelimiter);
            this.writeType(type, writer);
            typeDelimiter = ",";
        }
        writer.write(">");
    }

    private void writeType(GraphOperatorOutputItemType type, StringWriter writer) {
        if (type.isWildcard()) {
            writer.append('?');
            return;
        }
        writer.append(type.getTypeOrClassname());
        this.writeTypes(type.getTypeParameters(), writer);
    }

    private Set<Integer> analyzeBuildOrder(Map<Integer, OperatorDependencyEntry> operators) throws ExprValidationException {
        DependencyGraph graph = new DependencyGraph(operators.size(), true);
        for (Map.Entry<Integer, OperatorDependencyEntry> entry : operators.entrySet()) {
            int myOpNum = entry.getKey();
            Set<Integer> incomings = entry.getValue().getIncoming();
            for (int incoming : incomings) {
                graph.addDependency(myOpNum, incoming);
            }
        }
        LinkedHashSet<Integer> topDownSet = new LinkedHashSet<Integer>();
        while (topDownSet.size() < operators.size()) {
            TreeSet<Integer> rootNodes = new TreeSet<Integer>(new Comparator<Integer>(){

                @Override
                public int compare(Integer o1, Integer o2) {
                    return -1 * o1.compareTo(o2);
                }
            });
            rootNodes.addAll(graph.getRootNodes(topDownSet));
            if (rootNodes.isEmpty()) {
                for (int i = 0; i < operators.size(); ++i) {
                    if (topDownSet.contains(i)) continue;
                    rootNodes.add(i);
                    break;
                }
            }
            topDownSet.addAll(rootNodes);
        }
        LinkedHashSet<Integer> inverted = new LinkedHashSet<Integer>();
        Integer[] arr = topDownSet.toArray(new Integer[topDownSet.size()]);
        for (int i = arr.length - 1; i >= 0; --i) {
            inverted.add(arr[i]);
        }
        return inverted;
    }

    private LogicalChannelBindingMethodDesc findMatchingMethod(String operatorName, Class target, LogicalChannel channelDesc, boolean isPunctuation) throws ExprValidationException {
        EventType expectedUnderlyingType;
        Class expectedUnderlying;
        Class[] expectedIndividual;
        if (isPunctuation) {
            for (Method method : target.getMethods()) {
                if (!method.getName().equals("onSignal")) continue;
                return new LogicalChannelBindingMethodDesc(method, LogicalChannelBindingTypePassAlong.INSTANCE);
            }
            return null;
        }
        LogicalChannelProducingPortCompiled outputPort = channelDesc.getOutputPort();
        GraphTypeDesc typeDesc = outputPort.getGraphTypeDesc();
        if (typeDesc.isWildcard()) {
            expectedIndividual = new Class[]{};
            expectedUnderlying = null;
            expectedUnderlyingType = null;
        } else {
            expectedIndividual = new Class[typeDesc.getEventType().getPropertyNames().length];
            int i = 0;
            for (EventPropertyDescriptor descriptor : typeDesc.getEventType().getPropertyDescriptors()) {
                expectedIndividual[i] = descriptor.getPropertyType();
                ++i;
            }
            expectedUnderlying = typeDesc.getEventType().getUnderlyingType();
            expectedUnderlyingType = typeDesc.getEventType();
        }
        String channelSpecificMethodName = null;
        if (channelDesc.getConsumingOptStreamAliasName() != null) {
            channelSpecificMethodName = "on" + channelDesc.getConsumingOptStreamAliasName();
        }
        for (Method method : target.getMethods()) {
            boolean eligible = method.getName().equals("onInput");
            if (!eligible && method.getName().equals(channelSpecificMethodName)) {
                eligible = true;
            }
            if (!eligible) continue;
            int numParams = method.getParameterTypes().length;
            Class<?>[] paramTypes = method.getParameterTypes();
            if (expectedUnderlying != null) {
                if (numParams == 1 && JavaClassHelper.isSubclassOrImplementsInterface(paramTypes[0], expectedUnderlying)) {
                    return new LogicalChannelBindingMethodDesc(method, LogicalChannelBindingTypePassAlong.INSTANCE);
                }
                if (numParams == 2 && JavaClassHelper.getBoxedType(paramTypes[0]) == Integer.class && JavaClassHelper.isSubclassOrImplementsInterface(paramTypes[1], expectedUnderlying)) {
                    return new LogicalChannelBindingMethodDesc(method, new LogicalChannelBindingTypePassAlongWStream(channelDesc.getConsumingOpStreamNum()));
                }
            }
            if (numParams == 1 && (paramTypes[0] == Object.class || paramTypes[0] == Object[].class && method.isVarArgs())) {
                return new LogicalChannelBindingMethodDesc(method, LogicalChannelBindingTypePassAlong.INSTANCE);
            }
            if (numParams == 2 && paramTypes[0] == Integer.TYPE && (paramTypes[1] == Object.class || paramTypes[1] == Object[].class && method.isVarArgs())) {
                return new LogicalChannelBindingMethodDesc(method, new LogicalChannelBindingTypePassAlongWStream(channelDesc.getConsumingOpStreamNum()));
            }
            if (!(expectedUnderlyingType instanceof ObjectArrayEventType) || !JavaClassHelper.isSignatureCompatible(expectedIndividual, method.getParameterTypes())) continue;
            return new LogicalChannelBindingMethodDesc(method, LogicalChannelBindingTypeUnwind.INSTANCE);
        }
        LinkedHashSet<String> choices = new LinkedHashSet<String>();
        choices.add(Object.class.getSimpleName());
        choices.add("Object[]");
        if (expectedUnderlying != null) {
            choices.add(expectedUnderlying.getSimpleName());
        }
        throw new ExprValidationException("Failed to find onInput method on for operator '" + operatorName + "' class " + target.getName() + ", expected an onInput method that takes any of {" + CollectionUtil.toString(choices) + "}");
    }

    private static Map<Integer, DataFlowOpOutputPort> getDeclaredOutputPorts(GraphOperatorSpec operatorSpec, Map<String, EventType> types, EPServicesContext servicesContext) throws ExprValidationException {
        LinkedHashMap<Integer, DataFlowOpOutputPort> outputPorts = new LinkedHashMap<Integer, DataFlowOpOutputPort>();
        for (int outputPortNum = 0; outputPortNum < operatorSpec.getOutput().getItems().size(); ++outputPortNum) {
            GraphOperatorOutputItem outputItem = operatorSpec.getOutput().getItems().get(outputPortNum);
            GraphTypeDesc typeDesc = null;
            if (!outputItem.getTypeInfo().isEmpty()) {
                typeDesc = DataFlowServiceImpl.determineTypeOutputPort(outputItem.getTypeInfo().get(0), types, servicesContext);
            }
            outputPorts.put(outputPortNum, new DataFlowOpOutputPort(outputItem.getStreamName(), typeDesc));
        }
        return outputPorts;
    }

    private static GraphTypeDesc determineTypeOutputPort(GraphOperatorOutputItemType outType, Map<String, EventType> types, EPServicesContext servicesContext) throws ExprValidationException {
        EventType eventType = null;
        boolean isWildcard = false;
        boolean isUnderlying = true;
        String typeOrClassname = outType.getTypeOrClassname();
        if (typeOrClassname != null && typeOrClassname.toLowerCase().equals(EVENT_WRAPPED_TYPE)) {
            isUnderlying = false;
            if (!outType.getTypeParameters().isEmpty() && !outType.getTypeParameters().get(0).isWildcard()) {
                String typeName = outType.getTypeParameters().get(0).getTypeOrClassname();
                eventType = DataFlowServiceImpl.resolveType(typeName, types, servicesContext);
            } else {
                isWildcard = true;
            }
        } else if (typeOrClassname != null) {
            eventType = DataFlowServiceImpl.resolveType(typeOrClassname, types, servicesContext);
        } else {
            isWildcard = true;
        }
        return new GraphTypeDesc(isWildcard, isUnderlying, eventType);
    }

    private static EventType resolveType(String typeOrClassname, Map<String, EventType> types, EPServicesContext servicesContext) throws ExprValidationException {
        EventType eventType = types.get(typeOrClassname);
        if (eventType == null) {
            eventType = servicesContext.getEventAdapterService().getExistsTypeByName(typeOrClassname);
        }
        if (eventType == null) {
            throw new ExprValidationException("Failed to find event type '" + typeOrClassname + "'");
        }
        return eventType;
    }

    private void compileTimeValidate(CreateDataFlowDesc desc) throws ExprValidationException {
        for (GraphOperatorSpec spec : desc.getOperators()) {
            for (GraphOperatorOutputItem out : spec.getOutput().getItems()) {
                if (out.getTypeInfo().size() <= 1) continue;
                throw new ExprValidationException("Failed to validate operator '" + spec.getOperatorName() + "': Multiple output types for a single stream '" + out.getStreamName() + "' are not supported");
            }
        }
        HashSet<String> schemaNames = new HashSet<String>();
        for (CreateSchemaDesc schema : desc.getSchemas()) {
            if (schemaNames.contains(schema.getSchemaName())) {
                throw new ExprValidationException("Schema name '" + schema.getSchemaName() + "' is declared more then once");
            }
            schemaNames.add(schema.getSchemaName());
        }
    }
}

