/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.apm.toolkit.logging.common.log;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.dependencies.io.grpc.CallOptions;
import org.apache.skywalking.apm.dependencies.io.grpc.Channel;
import org.apache.skywalking.apm.dependencies.io.grpc.ClientCall;
import org.apache.skywalking.apm.dependencies.io.grpc.ClientInterceptor;
import org.apache.skywalking.apm.dependencies.io.grpc.ClientInterceptors;
import org.apache.skywalking.apm.dependencies.io.grpc.ForwardingClientCall;
import org.apache.skywalking.apm.dependencies.io.grpc.ManagedChannel;
import org.apache.skywalking.apm.dependencies.io.grpc.ManagedChannelBuilder;
import org.apache.skywalking.apm.dependencies.io.grpc.Metadata;
import org.apache.skywalking.apm.dependencies.io.grpc.MethodDescriptor;
import org.apache.skywalking.apm.dependencies.io.grpc.StatusRuntimeException;
import org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;
import org.apache.skywalking.apm.toolkit.logging.common.log.ToolkitConfig;
import org.apache.skywalking.apm.util.StringUtil;

@OverrideImplementor(value=LogReportServiceClient.class)
public class GRPCLogReportServiceClient
extends LogReportServiceClient {
    private static final ILog LOGGER = LogManager.getLogger(GRPCLogReportServiceClient.class);
    private volatile DataCarrier<LogData> carrier;
    private LogReportServiceGrpc.LogReportServiceStub asyncStub;
    private ManagedChannel channel;
    private AtomicBoolean disconnected = new AtomicBoolean(false);
    private static final Metadata.Key<String> AUTH_HEAD_HEADER_NAME = Metadata.Key.of((String)"Authentication", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER);

    public void boot() throws Throwable {
        this.carrier = new DataCarrier("gRPC-log", "gRPC-log", Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
        this.carrier.consume((IConsumer)this, 1);
        this.channel = ManagedChannelBuilder.forAddress((String)ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.SERVER_HOST, (int)ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.SERVER_PORT).usePlaintext().build();
        Channel decoratedChannel = this.decorateLogChannelWithAuthentication((Channel)this.channel);
        this.asyncStub = (LogReportServiceGrpc.LogReportServiceStub)LogReportServiceGrpc.newStub((Channel)decoratedChannel).withMaxOutboundMessageSize(ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.MAX_MESSAGE_SIZE);
    }

    public void shutdown() {
        try {
            this.carrier.shutdownConsumers();
            if (this.channel != null) {
                this.channel.shutdownNow();
            }
        }
        catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
        }
    }

    public void produce(LogData logData) {
        if (Objects.nonNull(logData) && !this.carrier.produce((Object)logData) && LOGGER.isDebugEnable()) {
            LOGGER.debug("One log has been abandoned, cause by buffer is full.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consume(final List<LogData> dataList) {
        if (CollectionUtil.isEmpty(dataList)) {
            return;
        }
        StreamObserver reportStreamObserver = null;
        final GRPCStreamServiceStatus waitStatus = new GRPCStreamServiceStatus(false);
        try {
            reportStreamObserver = ((LogReportServiceGrpc.LogReportServiceStub)this.asyncStub.withDeadlineAfter((long)ToolkitConfig.Plugin.Toolkit.Log.GRPC.Reporter.UPSTREAM_TIMEOUT, TimeUnit.SECONDS)).collect((StreamObserver)new StreamObserver<Commands>(){

                public void onNext(Commands commands) {
                }

                public void onError(Throwable t) {
                    waitStatus.finished();
                    if (GRPCLogReportServiceClient.this.disconnected.compareAndSet(false, true)) {
                        LOGGER.error("Send log to gRPC server fail with an internal exception.", t);
                    }
                    LOGGER.error(t, "Try to send {} log data to collector, with unexpected exception.", new Object[]{dataList.size()});
                }

                public void onCompleted() {
                    GRPCLogReportServiceClient.this.disconnected.compareAndSet(true, false);
                    waitStatus.finished();
                }
            });
            for (LogData logData : dataList) {
                reportStreamObserver.onNext((Object)logData);
            }
        }
        catch (Throwable e) {
            if (!(e instanceof StatusRuntimeException)) {
                LOGGER.error(e, "Report log failure with the gRPC client.", new Object[0]);
            }
        }
        finally {
            if (reportStreamObserver != null) {
                reportStreamObserver.onCompleted();
            }
            waitStatus.wait4Finish();
        }
    }

    private Channel decorateLogChannelWithAuthentication(Channel channel) {
        if (StringUtil.isEmpty((String)Config.Agent.AUTHENTICATION)) {
            return channel;
        }
        return ClientInterceptors.intercept((Channel)channel, (ClientInterceptor[])new ClientInterceptor[]{new ClientInterceptor(){

            public <REQ, RESP> ClientCall<REQ, RESP> interceptCall(MethodDescriptor<REQ, RESP> method, CallOptions options, Channel channel) {
                return new ForwardingClientCall.SimpleForwardingClientCall<REQ, RESP>(channel.newCall(method, options)){

                    public void start(ClientCall.Listener<RESP> responseListener, Metadata headers) {
                        headers.put(AUTH_HEAD_HEADER_NAME, (Object)Config.Agent.AUTHENTICATION);
                        super.start(responseListener, headers);
                    }
                };
            }
        }});
    }
}

