/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.events;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.events.EventListener;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.reactivestreams.extensions.Px;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConnectionEventInterceptor
implements DuplexConnection {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionEventInterceptor.class);
    private final DuplexConnection delegate;
    private final EventPublisher<? extends EventListener> publisher;

    public ConnectionEventInterceptor(DuplexConnection delegate, EventPublisher<? extends EventListener> publisher) {
        this.delegate = delegate;
        this.publisher = publisher;
    }

    @Override
    public Publisher<Void> send(Publisher<Frame> frame) {
        return this.delegate.send((Publisher<Frame>)Px.from(frame).map(f -> {
            try {
                this.publishEventsForFrameWrite((Frame)f);
            }
            catch (Exception e) {
                logger.info("Error while emitting events for frame " + f + " written. Ignoring error.", (Throwable)e);
            }
            return f;
        }));
    }

    @Override
    public Publisher<Void> sendOne(Frame frame) {
        return this.delegate.sendOne(frame);
    }

    @Override
    public Publisher<Frame> receive() {
        return Px.from(this.delegate.receive()).map(f -> {
            try {
                this.publishEventsForFrameRead((Frame)f);
            }
            catch (Exception e) {
                logger.info("Error while emitting events for frame " + f + " read. Ignoring error.", (Throwable)e);
            }
            return f;
        });
    }

    @Override
    public double availability() {
        return this.delegate.availability();
    }

    @Override
    public Publisher<Void> close() {
        return this.delegate.close();
    }

    @Override
    public Publisher<Void> onClose() {
        return this.delegate.onClose();
    }

    private void publishEventsForFrameRead(Frame frameRead) {
        if (!this.publisher.isEventPublishingEnabled()) {
            return;
        }
        EventListener listener = this.publisher.getEventListener();
        listener.frameRead(frameRead.getStreamId(), frameRead.getType());
        switch (frameRead.getType()) {
            case LEASE: {
                listener.leaseReceived(Frame.Lease.numberOfRequests(frameRead), Frame.Lease.ttl(frameRead));
                break;
            }
            case ERROR: {
                listener.errorReceived(frameRead.getStreamId(), Frame.Error.errorCode(frameRead));
            }
        }
    }

    private void publishEventsForFrameWrite(Frame frameWritten) {
        if (!this.publisher.isEventPublishingEnabled()) {
            return;
        }
        EventListener listener = this.publisher.getEventListener();
        listener.frameWritten(frameWritten.getStreamId(), frameWritten.getType());
        switch (frameWritten.getType()) {
            case LEASE: {
                listener.leaseSent(Frame.Lease.numberOfRequests(frameWritten), Frame.Lease.ttl(frameWritten));
                break;
            }
            case ERROR: {
                listener.errorSent(frameWritten.getStreamId(), Frame.Error.errorCode(frameWritten));
            }
        }
    }
}

