package reactor.netty.resources;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.Metrics;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import reactor.netty.resources.ConnectionProvider;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.NonNull;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider.class */
public final class PooledConnectionProvider implements ConnectionProvider {
    final String name;
    final PoolFactory defaultPoolFactory;
    static final Logger log = Loggers.getLogger((Class<?>) PooledConnectionProvider.class);
    static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf("connectionOwner");
    final ConcurrentMap<PoolKey, InstrumentedPool<PooledConnection>> channelPools = PlatformDependent.newConcurrentHashMap();
    final Map<SocketAddress, PoolFactory> poolFactoryPerRemoteHost = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$DisposableAcquire.class */
    public static final class DisposableAcquire implements ConnectionObserver, Runnable, CoreSubscriber<PooledRef<PooledConnection>>, Disposable {
        final Disposable.Composite cancellations;
        final MonoSink<Connection> sink;
        final InstrumentedPool<PooledConnection> pool;
        final ConnectionObserver obs;
        final ChannelOperations.OnSetup opsFactory;
        final long pendingAcquireTimeout;
        final boolean retried;
        PooledRef<PooledConnection> pooledRef;
        Subscription subscription;

        DisposableAcquire(MonoSink<Connection> monoSink, InstrumentedPool<PooledConnection> instrumentedPool, ConnectionObserver connectionObserver, ChannelOperations.OnSetup onSetup, long j, boolean z) {
            this.cancellations = Disposables.composite();
            this.pool = instrumentedPool;
            this.sink = monoSink;
            this.obs = connectionObserver;
            this.opsFactory = onSetup;
            this.pendingAcquireTimeout = j;
            this.retried = z;
        }

        DisposableAcquire(DisposableAcquire disposableAcquire) {
            this.cancellations = disposableAcquire.cancellations;
            this.sink = disposableAcquire.sink;
            this.pool = disposableAcquire.pool;
            this.obs = disposableAcquire.obs;
            this.opsFactory = disposableAcquire.opsFactory;
            this.pendingAcquireTimeout = disposableAcquire.pendingAcquireTimeout;
            this.retried = true;
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(PooledRef<PooledConnection> pooledRef) {
            this.pooledRef = pooledRef;
            PooledConnection poolable = pooledRef.poolable();
            poolable.pooledRef = this.pooledRef;
            Channel channel = poolable.channel;
            if (channel.eventLoop().inEventLoop()) {
                run();
            } else {
                channel.eventLoop().execute(this);
            }
        }

        @Override // reactor.core.Disposable
        public void dispose() {
            this.subscription.cancel();
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.sink.error(th);
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.subscription, subscription)) {
                this.subscription = subscription;
                this.cancellations.add(this);
                if (!this.retried) {
                    this.sink.onCancel(this.cancellations);
                }
                subscription.request(Long.MAX_VALUE);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
        }

        @Override // reactor.netty.ConnectionObserver
        public Context currentContext() {
            return this.sink.currentContext();
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th) {
            this.sink.error(th);
            this.obs.onUncaughtException(connection, th);
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            if (state == ConnectionObserver.State.CONFIGURED) {
                this.sink.success(connection);
            }
            this.obs.onStateChange(connection, state);
        }

        @Override // java.lang.Runnable
        public void run() {
            PooledConnection poolable = this.pooledRef.poolable();
            Channel channel = poolable.channel;
            if (!channel.isActive()) {
                this.pooledRef.invalidate().subscribe(null, null, () -> {
                    if (PooledConnectionProvider.log.isDebugEnabled()) {
                        PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel closed, now {} active connections and {} inactive connections"), Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize()));
                    }
                });
                if (this.retried) {
                    this.sink.error(new IOException("Error while acquiring from " + this.pool));
                    return;
                }
                if (PooledConnectionProvider.log.isDebugEnabled()) {
                    PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Immediately aborted pooled channel, re-acquiring new channel"));
                }
                PooledConnectionProvider.disposableAcquire(new DisposableAcquire(this));
                return;
            }
            ConnectionObserver connectionObserver = (ConnectionObserver) channel.attr(PooledConnectionProvider.OWNER).getAndSet(this);
            if (connectionObserver instanceof PendingConnectionObserver) {
                PendingConnectionObserver pendingConnectionObserver = (PendingConnectionObserver) connectionObserver;
                connectionObserver = null;
                registerClose(this.pooledRef, this.pool);
                while (true) {
                    PendingConnectionObserver.Pending poll = pendingConnectionObserver.pendingQueue.poll();
                    if (poll == null) {
                        break;
                    }
                    if (poll.error != null) {
                        onUncaughtException(poll.connection, poll.error);
                    } else if (poll.state != null) {
                        onStateChange(poll.connection, poll.state);
                    }
                }
            } else if (connectionObserver == null) {
                registerClose(this.pooledRef, this.pool);
            }
            if (connectionObserver == null) {
                if (PooledConnectionProvider.log.isDebugEnabled()) {
                    PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel connected, now {} active connections and {} inactive connections"), Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize()));
                }
                if (this.opsFactory == ChannelOperations.OnSetup.empty()) {
                    this.sink.success(Connection.from(channel));
                    return;
                }
                return;
            }
            if (PooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel acquired, now {} active connections and {} inactive connections"), Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize()));
            }
            this.obs.onStateChange(poolable, ConnectionObserver.State.ACQUIRED);
            ChannelOperations<?, ?> create = this.opsFactory.create(poolable, poolable, null);
            if (create == null) {
                this.sink.success(poolable);
                return;
            }
            create.bind();
            this.sink.success(create);
            this.obs.onStateChange(create, ConnectionObserver.State.CONFIGURED);
        }

        void registerClose(PooledRef<PooledConnection> pooledRef, InstrumentedPool<PooledConnection> instrumentedPool) {
            Channel channel = pooledRef.poolable().channel;
            if (PooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Registering pool release on close event for channel"));
            }
            channel.closeFuture().addListener2(future -> {
                ConnectionObserver connectionObserver = (ConnectionObserver) channel.attr(PooledConnectionProvider.OWNER).get();
                if (connectionObserver instanceof DisposableAcquire) {
                    ((DisposableAcquire) connectionObserver).pooledRef.invalidate().subscribe(null, null, () -> {
                        if (PooledConnectionProvider.log.isDebugEnabled()) {
                            PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Channel closed, now {} active connections and {} inactive connections"), Integer.valueOf(instrumentedPool.metrics().acquiredSize()), Integer.valueOf(instrumentedPool.metrics().idleSize()));
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PendingConnectionObserver.class */
    public static final class PendingConnectionObserver implements ConnectionObserver {
        final Queue<Pending> pendingQueue = (Queue) Queues.unbounded(4).get();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PendingConnectionObserver$Pending.class */
        public static class Pending {
            final Connection connection;
            final Throwable error;
            final ConnectionObserver.State state;

            Pending(Connection connection, @Nullable Throwable th, @Nullable ConnectionObserver.State state) {
                this.connection = connection;
                this.error = th;
                this.state = state;
            }
        }

        PendingConnectionObserver() {
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th) {
            this.pendingQueue.add(new Pending(connection, th, null));
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            this.pendingQueue.add(new Pending(connection, null, state));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PoolFactory.class */
    static final class PoolFactory {
        final int maxConnections;
        final int pendingAcquireMaxCount;
        final long pendingAcquireTimeout;
        final long maxIdleTime;
        final long maxLifeTime;
        final boolean metricsEnabled;
        final Function<PoolBuilder<PooledConnection, ?>, InstrumentedPool<PooledConnection>> leasingStrategy;
        static final BiPredicate<PooledConnection, PooledRefMetadata> DEFAULT_EVICTION_PREDICATE = (pooledConnection, pooledRefMetadata) -> {
            return (pooledConnection.channel.isActive() && pooledConnection.isPersistent()) ? false : true;
        };
        static final Function<PooledConnection, Publisher<Void>> DEFAULT_DESTROY_HANDLER = pooledConnection -> {
            return !pooledConnection.channel.isActive() ? Mono.empty() : FutureMono.from(pooledConnection.channel.close());
        };

        PoolFactory(ConnectionProvider.ConnectionPoolSpec<?> connectionPoolSpec) {
            this.maxConnections = connectionPoolSpec.maxConnections;
            this.pendingAcquireMaxCount = connectionPoolSpec.pendingAcquireMaxCount == -2 ? 2 * connectionPoolSpec.maxConnections : connectionPoolSpec.pendingAcquireMaxCount;
            this.pendingAcquireTimeout = connectionPoolSpec.pendingAcquireTimeout.toMillis();
            this.maxIdleTime = connectionPoolSpec.maxIdleTime != null ? connectionPoolSpec.maxIdleTime.toMillis() : -1L;
            this.maxLifeTime = connectionPoolSpec.maxLifeTime != null ? connectionPoolSpec.maxLifeTime.toMillis() : -1L;
            this.metricsEnabled = connectionPoolSpec.metricsEnabled;
            this.leasingStrategy = connectionPoolSpec.leasingStrategy;
        }

        InstrumentedPool<PooledConnection> newPool(Publisher<PooledConnection> publisher) {
            return this.leasingStrategy.apply(PoolBuilder.from(publisher).destroyHandler(DEFAULT_DESTROY_HANDLER).evictionPredicate(DEFAULT_EVICTION_PREDICATE.or((pooledConnection, pooledRefMetadata) -> {
                return (this.maxIdleTime != -1 && pooledRefMetadata.idleTime() >= this.maxIdleTime) || (this.maxLifeTime != -1 && pooledRefMetadata.lifeTime() >= this.maxLifeTime);
            })).maxPendingAcquire(this.pendingAcquireMaxCount).sizeBetween(0, this.maxConnections));
        }

        public String toString() {
            return "PoolFactory {maxConnections=" + this.maxConnections + ", pendingAcquireMaxCount=" + this.pendingAcquireMaxCount + ", pendingAcquireTimeout=" + this.pendingAcquireTimeout + ", maxIdleTime=" + this.maxIdleTime + ", maxLifeTime=" + this.maxLifeTime + ", metricsEnabled=" + this.metricsEnabled + '}';
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PoolKey.class */
    static final class PoolKey {
        final SocketAddress holder;
        final int pipelineKey;
        final String fqdn;

        PoolKey(SocketAddress socketAddress, int i) {
            this.holder = socketAddress;
            this.fqdn = socketAddress instanceof InetSocketAddress ? socketAddress.toString() : "null";
            this.pipelineKey = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PoolKey poolKey = (PoolKey) obj;
            return this.pipelineKey == poolKey.pipelineKey && Objects.equals(this.holder, poolKey.holder) && Objects.equals(this.fqdn, poolKey.fqdn);
        }

        public int hashCode() {
            return Objects.hash(this.holder, Integer.valueOf(this.pipelineKey), this.fqdn);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PooledConnection.class */
    public static final class PooledConnection implements Connection, ConnectionObserver {
        final Channel channel;
        final InstrumentedPool<PooledConnection> pool;
        final MonoProcessor<Void> onTerminate = MonoProcessor.create();
        PooledRef<PooledConnection> pooledRef;

        PooledConnection(Channel channel, InstrumentedPool<PooledConnection> instrumentedPool) {
            this.channel = channel;
            this.pool = instrumentedPool;
        }

        ConnectionObserver owner() {
            PendingConnectionObserver pendingConnectionObserver;
            do {
                ConnectionObserver connectionObserver = (ConnectionObserver) this.channel.attr(PooledConnectionProvider.OWNER).get();
                if (connectionObserver != null) {
                    return connectionObserver;
                }
                pendingConnectionObserver = new PendingConnectionObserver();
            } while (!this.channel.attr(PooledConnectionProvider.OWNER).compareAndSet(null, pendingConnectionObserver));
            return pendingConnectionObserver;
        }

        @Override // reactor.netty.Connection
        public Mono<Void> onTerminate() {
            return this.onTerminate.or(onDispose());
        }

        @Override // reactor.netty.DisposableChannel
        public Channel channel() {
            return this.channel;
        }

        @Override // reactor.netty.ConnectionObserver
        public Context currentContext() {
            return owner().currentContext();
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th) {
            owner().onUncaughtException(connection, th);
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            if (PooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.log.debug(ReactorNetty.format(connection.channel(), "onStateChange({}, {})"), connection, state);
            }
            if (state != ConnectionObserver.State.DISCONNECTING) {
                owner().onStateChange(connection, state);
                return;
            }
            if (!isPersistent() && this.channel.isActive()) {
                this.channel.close();
                owner().onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
            } else {
                if (!this.channel.isActive()) {
                    owner().onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
                    return;
                }
                if (PooledConnectionProvider.log.isDebugEnabled()) {
                    PooledConnectionProvider.log.debug(ReactorNetty.format(connection.channel(), "Releasing channel"));
                }
                ConnectionObserver connectionObserver = (ConnectionObserver) this.channel.attr(PooledConnectionProvider.OWNER).getAndSet(ConnectionObserver.emptyListener());
                if (this.pooledRef == null) {
                    return;
                }
                this.pooledRef.release().subscribe(null, th -> {
                    if (PooledConnectionProvider.log.isDebugEnabled()) {
                        PooledConnectionProvider.log.debug("Failed cleaning the channel from pool, now {} active connections and {} inactive connections", Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize()), th);
                    }
                    this.onTerminate.onComplete();
                    connectionObserver.onStateChange(connection, ConnectionObserver.State.RELEASED);
                }, () -> {
                    if (PooledConnectionProvider.log.isDebugEnabled()) {
                        PooledConnectionProvider.log.debug(ReactorNetty.format(this.pooledRef.poolable().channel, "Channel cleaned, now {} active connections and {} inactive connections"), Integer.valueOf(this.pool.metrics().acquiredSize()), Integer.valueOf(this.pool.metrics().idleSize()));
                    }
                    this.onTerminate.onComplete();
                    connectionObserver.onStateChange(connection, ConnectionObserver.State.RELEASED);
                });
            }
        }

        public String toString() {
            return "PooledConnection{channel=" + this.channel + '}';
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PooledConnectionAllocator.class */
    static final class PooledConnectionAllocator {
        final InstrumentedPool<PooledConnection> pool;
        final Bootstrap bootstrap;
        final ChannelOperations.OnSetup opsFactory;

        /* loaded from: input_file:BOOT-INF/lib/reactor-netty-0.9.10.RELEASE.jar:reactor/netty/resources/PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.class */
        final class PooledConnectionInitializer implements ChannelHandler, ChannelFutureListener {
            final MonoSink<PooledConnection> sink;
            PooledConnection pooledConnection;

            PooledConnectionInitializer(MonoSink<PooledConnection> monoSink) {
                this.sink = monoSink;
            }

            @Override // io.netty.channel.ChannelHandler
            public void handlerAdded(ChannelHandlerContext channelHandlerContext) {
                Channel channel = channelHandlerContext.channel();
                if (PooledConnectionProvider.log.isDebugEnabled()) {
                    PooledConnectionProvider.log.debug(ReactorNetty.format(channel, "Created a new pooled channel, now {} active connections and {} inactive connections"), Integer.valueOf(PooledConnectionAllocator.this.pool.metrics().acquiredSize()), Integer.valueOf(PooledConnectionAllocator.this.pool.metrics().idleSize()));
                }
                PooledConnection pooledConnection = new PooledConnection(channel, PooledConnectionAllocator.this.pool);
                this.pooledConnection = pooledConnection;
                pooledConnection.bind();
                Bootstrap mo1456clone = PooledConnectionAllocator.this.bootstrap.mo1456clone();
                BootstrapHandlers.finalizeHandler(mo1456clone, PooledConnectionAllocator.this.opsFactory, pooledConnection);
                channel.pipeline().addFirst(mo1456clone.config2().handler());
                channelHandlerContext.pipeline().remove(this);
            }

            @Override // io.netty.channel.ChannelHandler
            public void handlerRemoved(ChannelHandlerContext channelHandlerContext) {
            }

            @Override // io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
                channelHandlerContext.pipeline().remove(this);
            }

            @Override // io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(ChannelFuture channelFuture) {
                if (channelFuture.isSuccess()) {
                    this.sink.success(this.pooledConnection);
                } else {
                    this.sink.error(channelFuture.cause());
                }
            }
        }

        PooledConnectionAllocator(Bootstrap bootstrap, PoolFactory poolFactory, ChannelOperations.OnSetup onSetup) {
            this.bootstrap = bootstrap.mo1456clone();
            this.opsFactory = onSetup;
            this.pool = poolFactory.newPool(connectChannel());
        }

        Publisher<PooledConnection> connectChannel() {
            return Mono.create(monoSink -> {
                Bootstrap mo1456clone = this.bootstrap.mo1456clone();
                PooledConnectionInitializer pooledConnectionInitializer = new PooledConnectionInitializer(monoSink);
                mo1456clone.handler(pooledConnectionInitializer);
                ChannelFuture connect = mo1456clone.connect();
                if (connect.isDone()) {
                    pooledConnectionInitializer.operationComplete(connect);
                } else {
                    connect.addListener2((GenericFutureListener<? extends Future<? super Void>>) pooledConnectionInitializer);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PooledConnectionProvider(ConnectionProvider.Builder builder) {
        this.name = builder.name;
        this.defaultPoolFactory = new PoolFactory(builder);
        for (Map.Entry<SocketAddress, ConnectionProvider.ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
            this.poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory(entry.getValue()));
        }
    }

    @Override // reactor.netty.resources.ConnectionProvider
    public void disposeWhen(@NonNull SocketAddress socketAddress) {
        ((List) this.channelPools.entrySet().stream().filter(entry -> {
            return compareAddresses(((PoolKey) entry.getKey()).holder, socketAddress);
        }).collect(Collectors.toList())).forEach(entry2 -> {
            if (this.channelPools.remove(entry2.getKey(), entry2.getValue())) {
                if (log.isDebugEnabled()) {
                    log.debug("Disposing pool for {}", ((PoolKey) entry2.getKey()).fqdn);
                }
                ((InstrumentedPool) entry2.getValue()).dispose();
            }
        });
    }

    private boolean compareAddresses(SocketAddress socketAddress, SocketAddress socketAddress2) {
        if (socketAddress.equals(socketAddress2)) {
            return true;
        }
        if (!(socketAddress instanceof InetSocketAddress) || !(socketAddress2 instanceof InetSocketAddress)) {
            return false;
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        InetSocketAddress inetSocketAddress2 = (InetSocketAddress) socketAddress2;
        if (inetSocketAddress.getPort() != inetSocketAddress2.getPort()) {
            return false;
        }
        InetAddress address = inetSocketAddress2.getAddress();
        return (address != null && address.isAnyLocalAddress()) || Objects.equals(inetSocketAddress.getHostString(), inetSocketAddress2.getHostString());
    }

    @Override // reactor.netty.resources.ConnectionProvider
    public Mono<Connection> acquire(Bootstrap bootstrap) {
        return Mono.create(monoSink -> {
            Bootstrap mo1456clone = bootstrap.mo1456clone();
            ChannelOperations.OnSetup channelOperationFactory = BootstrapHandlers.channelOperationFactory(mo1456clone);
            ConnectionObserver connectionObserver = BootstrapHandlers.connectionObserver(mo1456clone);
            NewConnectionProvider.convertLazyRemoteAddress(mo1456clone);
            ChannelHandler handler = mo1456clone.config2().handler();
            SocketAddress remoteAddress = mo1456clone.config2().remoteAddress();
            PoolKey poolKey = new PoolKey(remoteAddress, handler != null ? handler.hashCode() : -1);
            PoolFactory orDefault = this.poolFactoryPerRemoteHost.getOrDefault(remoteAddress, this.defaultPoolFactory);
            disposableAcquire(new DisposableAcquire(monoSink, this.channelPools.computeIfAbsent(poolKey, poolKey2 -> {
                if (log.isDebugEnabled()) {
                    log.debug("Creating a new client pool [{}] for [{}]", orDefault, remoteAddress);
                }
                InstrumentedPool<PooledConnection> instrumentedPool = new PooledConnectionAllocator(mo1456clone, orDefault, channelOperationFactory).pool;
                if (orDefault.metricsEnabled || BootstrapHandlers.findMetricsSupport(mo1456clone) != null) {
                    PooledConnectionProviderMetrics.registerMetrics(this.name, poolKey2.hashCode() + "", Metrics.formatSocketAddress(remoteAddress), instrumentedPool.metrics());
                }
                return instrumentedPool;
            }), connectionObserver, channelOperationFactory, orDefault.pendingAcquireTimeout, false));
        });
    }

    @Override // reactor.netty.resources.ConnectionProvider
    public Mono<Void> disposeLater() {
        return Mono.defer(() -> {
            ArrayList arrayList = new ArrayList();
            Iterator<PoolKey> it = this.channelPools.keySet().iterator();
            while (it.hasNext()) {
                arrayList.add(this.channelPools.remove(it.next()).disposeLater());
            }
            return arrayList.isEmpty() ? Mono.empty() : Mono.when(arrayList);
        });
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.channelPools.isEmpty() || this.channelPools.values().stream().allMatch((v0) -> {
            return v0.isDisposed();
        });
    }

    @Override // reactor.netty.resources.ConnectionProvider
    public int maxConnections() {
        return this.defaultPoolFactory.maxConnections;
    }

    static void disposableAcquire(DisposableAcquire disposableAcquire) {
        disposableAcquire.pool.acquire(Duration.ofMillis(disposableAcquire.pendingAcquireTimeout)).subscribe((CoreSubscriber<? super PooledRef<PooledConnection>>) disposableAcquire);
    }
}
