/*
 * Decompiled with CFR 0.152.
 */
package com.azure.ai.openai.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public final class OpenAIServerSentEvents<T> {
    private static final String STREAM_COMPLETION_EVENT = "data: [DONE]";
    private static final ClientLogger LOGGER = new ClientLogger(OpenAIServerSentEvents.class);
    private final Flux<ByteBuffer> source;
    private final Class<T> type;
    private ByteArrayOutputStream outStream;
    private static final ObjectMapper SERIALIZER = new ObjectMapper().enable(DeserializationFeature.FAIL_ON_TRAILING_TOKENS).disable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES).disable(DeserializationFeature.FAIL_ON_NULL_CREATOR_PROPERTIES).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).disable(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES);

    public OpenAIServerSentEvents(Flux<ByteBuffer> source, Class<T> type) {
        this.source = source;
        this.type = type;
        this.outStream = new ByteArrayOutputStream();
    }

    public Flux<T> getEvents() {
        return this.mapByteBuffersToEvents();
    }

    private Flux<T> mapByteBuffersToEvents() {
        return this.source.publishOn(Schedulers.boundedElastic()).concatMap(byteBuffer -> {
            byte[] byteArray;
            ArrayList values = new ArrayList();
            for (byte currentByte : byteArray = byteBuffer.array()) {
                if (currentByte == 10 || currentByte == 13) {
                    try {
                        String currentLine = this.outStream.toString(StandardCharsets.UTF_8.name());
                        this.handleCurrentLine(currentLine, values);
                    }
                    catch (JsonProcessingException | UnsupportedEncodingException e) {
                        return Flux.error((Throwable)e);
                    }
                    this.outStream = new ByteArrayOutputStream();
                    continue;
                }
                this.outStream.write(currentByte);
            }
            try {
                this.handleCurrentLine(this.outStream.toString(StandardCharsets.UTF_8.name()), values);
                this.outStream = new ByteArrayOutputStream();
            }
            catch (JsonProcessingException | IllegalStateException e) {
                return Flux.fromIterable(values);
            }
            catch (UnsupportedEncodingException e) {
                return Flux.error((Throwable)e);
            }
            return Flux.fromIterable(values);
        }).cache();
    }

    private void handleCurrentLine(String currentLine, List<T> values) throws JsonProcessingException {
        if (currentLine.isEmpty() || STREAM_COMPLETION_EVENT.equals(currentLine)) {
            return;
        }
        String[] split = currentLine.split(":", 2);
        if (split.length != 2) {
            throw new IllegalStateException("Invalid data format " + currentLine);
        }
        String dataValue = split[1];
        if (split[1].startsWith(" ")) {
            dataValue = split[1].substring(1);
        }
        Object value = SERIALIZER.readValue(dataValue, this.type);
        values.add(value);
    }
}

