package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.AddressHelper;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;

/* loaded from: classes2.dex */
public class ClusteredEventBus extends EventBusImpl {
    private final NetClient client;
    private final CloseFuture closeFuture;
    private final ClusterManager clusterManager;
    private final ConcurrentMap<String, ConnectionHolder> connections;
    private final EventLoopContext ebContext;
    private final AtomicLong handlerSequence;
    private String nodeId;
    private NodeInfo nodeInfo;
    private final NodeSelector nodeSelector;
    private final EventBusOptions options;
    private NetServer server;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClusteredEventBus.class);
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});

    /* renamed from: io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$1 */
    /* loaded from: classes2.dex */
    public class AnonymousClass1 implements Handler<Buffer> {
        int size = -1;
        final /* synthetic */ RecordParser val$parser;
        final /* synthetic */ NetSocket val$socket;

        public AnonymousClass1(RecordParser recordParser, NetSocket netSocket) {
            r2 = recordParser;
            r3 = netSocket;
        }

        @Override // io.vertx.core.Handler
        public void handle(Buffer buffer) {
            if (this.size == -1) {
                int i9 = buffer.getInt(0);
                this.size = i9;
                r2.fixedSizeMode(i9);
                return;
            }
            ClusteredMessage clusteredMessage = new ClusteredMessage(ClusteredEventBus.this);
            clusteredMessage.readFromWire(buffer, ((EventBusImpl) ClusteredEventBus.this).codecManager);
            if (((EventBusImpl) ClusteredEventBus.this).metrics != null) {
                ((EventBusImpl) ClusteredEventBus.this).metrics.messageRead(clusteredMessage.address(), buffer.length());
            }
            r2.fixedSizeMode(4);
            this.size = -1;
            if (clusteredMessage.hasFailure()) {
                clusteredMessage.internalError();
            } else if (clusteredMessage.codec() == CodecManager.PING_MESSAGE_CODEC) {
                r3.write((NetSocket) ClusteredEventBus.PONG);
            } else {
                ClusteredEventBus.this.deliverMessageLocally(clusteredMessage);
            }
        }
    }

    public ClusteredEventBus(VertxInternal vertxInternal, VertxOptions vertxOptions, ClusterManager clusterManager, NodeSelector nodeSelector) {
        super(vertxInternal);
        this.handlerSequence = new AtomicLong(0L);
        this.connections = new ConcurrentHashMap();
        EventBusOptions eventBusOptions = vertxOptions.getEventBusOptions();
        this.options = eventBusOptions;
        this.clusterManager = clusterManager;
        this.nodeSelector = nodeSelector;
        CloseFuture closeFuture = new CloseFuture(log);
        this.closeFuture = closeFuture;
        this.ebContext = vertxInternal.createEventLoopContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader());
        this.client = createNetClient(vertxInternal, new NetClientOptions(eventBusOptions.toJson()), closeFuture);
    }

    private <T> void clusteredSendReply(String str, OutboundDeliveryContext<T> outboundDeliveryContext) {
        MessageImpl<?, T> messageImpl = outboundDeliveryContext.message;
        if (str.equals(this.nodeId)) {
            super.sendOrPub(outboundDeliveryContext);
        } else {
            sendRemote(outboundDeliveryContext, str, messageImpl);
        }
    }

    private NetClient createNetClient(VertxInternal vertxInternal, NetClientOptions netClientOptions, CloseFuture closeFuture) {
        NetClientBuilder netClientBuilder = new NetClientBuilder(vertxInternal, netClientOptions);
        VertxMetrics metricsSPI = vertxInternal.metricsSPI();
        if (metricsSPI != null) {
            netClientBuilder.metrics(metricsSPI.createNetClientMetrics(netClientOptions));
        }
        netClientBuilder.closeFuture(closeFuture);
        return netClientBuilder.build();
    }

    private String getClusterHost() {
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        String clusterHost = this.clusterManager.clusterHost();
        return clusterHost != null ? clusterHost : AddressHelper.defaultAddress();
    }

    private int getClusterPort() {
        return this.options.getPort();
    }

    private String getClusterPublicHost(String str) {
        String clusterPublicHost = this.options.getClusterPublicHost();
        if (clusterPublicHost != null) {
            return clusterPublicHost;
        }
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        String clusterPublicHost2 = this.clusterManager.clusterPublicHost();
        return clusterPublicHost2 != null ? clusterPublicHost2 : str;
    }

    private int getClusterPublicPort(int i9) {
        int clusterPublicPort = this.options.getClusterPublicPort();
        return clusterPublicPort > 0 ? clusterPublicPort : i9;
    }

    private Handler<NetSocket> getServerHandler() {
        return new b(this, 1);
    }

    private NetServerOptions getServerOptions() {
        return new NetServerOptions(this.options.toJson());
    }

    public /* synthetic */ Future lambda$close$3(AsyncResult asyncResult) {
        return this.closeFuture.close();
    }

    public /* synthetic */ void lambda$close$4(AsyncResult asyncResult) {
        if (this.server != null) {
            Iterator<ConnectionHolder> it = this.connections.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }

    public /* synthetic */ void lambda$getServerHandler$7(NetSocket netSocket) {
        RecordParser newFixed = RecordParser.newFixed(4);
        newFixed.setOutput(new Handler<Buffer>() { // from class: io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.1
            int size = -1;
            final /* synthetic */ RecordParser val$parser;
            final /* synthetic */ NetSocket val$socket;

            public AnonymousClass1(RecordParser newFixed2, NetSocket netSocket2) {
                r2 = newFixed2;
                r3 = netSocket2;
            }

            @Override // io.vertx.core.Handler
            public void handle(Buffer buffer) {
                if (this.size == -1) {
                    int i9 = buffer.getInt(0);
                    this.size = i9;
                    r2.fixedSizeMode(i9);
                    return;
                }
                ClusteredMessage clusteredMessage = new ClusteredMessage(ClusteredEventBus.this);
                clusteredMessage.readFromWire(buffer, ((EventBusImpl) ClusteredEventBus.this).codecManager);
                if (((EventBusImpl) ClusteredEventBus.this).metrics != null) {
                    ((EventBusImpl) ClusteredEventBus.this).metrics.messageRead(clusteredMessage.address(), buffer.length());
                }
                r2.fixedSizeMode(4);
                this.size = -1;
                if (clusteredMessage.hasFailure()) {
                    clusteredMessage.internalError();
                } else if (clusteredMessage.codec() == CodecManager.PING_MESSAGE_CODEC) {
                    r3.write((NetSocket) ClusteredEventBus.PONG);
                } else {
                    ClusteredEventBus.this.deliverMessageLocally(clusteredMessage);
                }
            }
        });
        netSocket2.handler2((Handler<Buffer>) newFixed2);
    }

    public /* synthetic */ Future lambda$null$0(String str, NetServer netServer) {
        this.nodeInfo = new NodeInfo(getClusterPublicHost(str), getClusterPublicPort(this.server.actualPort()), this.options.getClusterNodeMetadata());
        this.nodeId = this.clusterManager.getNodeId();
        Promise<Void> promise = Promise.promise();
        this.clusterManager.setNodeInfo(this.nodeInfo, promise);
        return promise.future();
    }

    public /* synthetic */ void lambda$null$1(AsyncResult asyncResult) {
        if (asyncResult.succeeded()) {
            this.started = true;
            this.nodeSelector.eventBusStarted();
        }
    }

    public /* synthetic */ void lambda$sendOrPub$5(OutboundDeliveryContext outboundDeliveryContext, AsyncResult asyncResult) {
        if (asyncResult.succeeded()) {
            sendToNode(outboundDeliveryContext, (String) asyncResult.result());
        } else {
            sendOrPublishFailed(outboundDeliveryContext, asyncResult.cause());
        }
    }

    public /* synthetic */ void lambda$sendOrPub$6(OutboundDeliveryContext outboundDeliveryContext, AsyncResult asyncResult) {
        if (asyncResult.succeeded()) {
            sendToNodes(outboundDeliveryContext, (Iterable) asyncResult.result());
        } else {
            sendOrPublishFailed(outboundDeliveryContext, asyncResult.cause());
        }
    }

    public /* synthetic */ void lambda$start$2(int i9, final String str, Promise promise, Void r42) {
        this.server.listen(i9, str).flatMap(new Function() { // from class: io.vertx.core.eventbus.impl.clustered.e
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                Future lambda$null$0;
                lambda$null$0 = ClusteredEventBus.this.lambda$null$0(str, (NetServer) obj);
                return lambda$null$0;
            }
        }).andThen(new b(this, 2)).onComplete2(promise);
    }

    private void sendOrPublishFailed(OutboundDeliveryContext<?> outboundDeliveryContext, Throwable th) {
        Logger logger = log;
        if (logger.isDebugEnabled()) {
            logger.error("Failed to send message", th);
        }
        outboundDeliveryContext.written(th);
    }

    private void sendRemote(OutboundDeliveryContext<?> outboundDeliveryContext, String str, MessageImpl messageImpl) {
        ConnectionHolder connectionHolder = this.connections.get(str);
        if (connectionHolder == null) {
            connectionHolder = new ConnectionHolder(this, str);
            ConnectionHolder putIfAbsent = this.connections.putIfAbsent(str, connectionHolder);
            if (putIfAbsent != null) {
                connectionHolder = putIfAbsent;
            } else {
                connectionHolder.connect();
            }
        }
        connectionHolder.writeMessage(outboundDeliveryContext);
    }

    private <T> void sendToNode(OutboundDeliveryContext<T> outboundDeliveryContext, String str) {
        if (str == null || str.equals(this.nodeId)) {
            super.sendOrPub(outboundDeliveryContext);
        } else {
            sendRemote(outboundDeliveryContext, str, outboundDeliveryContext.message);
        }
    }

    private <T> void sendToNodes(OutboundDeliveryContext<T> outboundDeliveryContext, Iterable<String> iterable) {
        if (iterable != null) {
            boolean z8 = false;
            for (String str : iterable) {
                if (!z8) {
                    z8 = true;
                }
                sendToNode(outboundDeliveryContext, str);
            }
            if (z8) {
                return;
            }
        }
        super.sendOrPub(outboundDeliveryContext);
    }

    public NetClient client() {
        return this.client;
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl, io.vertx.core.eventbus.impl.EventBusInternal
    public void close(Promise<Void> promise) {
        Promise<Void> promise2 = Promise.promise();
        super.close(promise2);
        promise2.future().transform(new f(this, 1)).andThen(new b(this, 0)).onComplete2(promise);
    }

    public ConcurrentMap<String, ConnectionHolder> connections() {
        return this.connections;
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> handlerRegistration, boolean z8, boolean z9, ContextInternal contextInternal) {
        return new ClusteredHandlerHolder(handlerRegistration, z8, z9, contextInternal, this.handlerSequence.getAndIncrement());
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public MessageImpl createMessage(boolean z8, String str, MultiMap multiMap, Object obj, String str2) {
        Objects.requireNonNull(str, "no null address accepted");
        return new ClusteredMessage(this.nodeId, str, multiMap, obj, this.codecManager.lookupCodec(obj, str2, false), z8, this);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public String generateReplyAddress() {
        return "__vertx.reply." + UUID.randomUUID().toString();
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public boolean isMessageLocal(MessageImpl messageImpl) {
        return !((ClusteredMessage) messageImpl).isFromWire();
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence, boolean z8) {
        if (z8) {
            return concurrentCyclicSequence.next();
        }
        Iterator<HandlerHolder> it = concurrentCyclicSequence.iterator(false);
        while (it.hasNext()) {
            HandlerHolder next = it.next();
            if (next.isReplyHandler() || !next.isLocalOnly()) {
                return next;
            }
        }
        return null;
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        if (handlerHolder.isReplyHandler()) {
            if (promise != null) {
                promise.complete();
            }
        } else {
            RegistrationInfo registrationInfo = new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly());
            ClusterManager clusterManager = this.clusterManager;
            String str = handlerHolder.getHandler().address;
            Objects.requireNonNull(promise);
            clusterManager.addRegistration(str, registrationInfo, promise);
        }
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        if (handlerHolder.isReplyHandler()) {
            promise.complete();
            return;
        }
        RegistrationInfo registrationInfo = new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly());
        Promise<Void> promise2 = Promise.promise();
        this.clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise2);
        promise2.future().onComplete2(promise);
    }

    public EventBusOptions options() {
        return this.options;
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public <T> void sendOrPub(final OutboundDeliveryContext<T> outboundDeliveryContext) {
        if (((ClusteredMessage) outboundDeliveryContext.message).getRepliedTo() != null) {
            clusteredSendReply(((ClusteredMessage) outboundDeliveryContext.message).getRepliedTo(), outboundDeliveryContext);
            return;
        }
        if (outboundDeliveryContext.options.isLocalOnly()) {
            super.sendOrPub(outboundDeliveryContext);
            return;
        }
        Serializer serializer = Serializer.get(outboundDeliveryContext.ctx);
        if (outboundDeliveryContext.message.isSend()) {
            PromiseInternal<T> promise = outboundDeliveryContext.ctx.promise();
            MessageImpl<?, T> messageImpl = outboundDeliveryContext.message;
            final NodeSelector nodeSelector = this.nodeSelector;
            nodeSelector.getClass();
            final int i9 = 0;
            serializer.queue(messageImpl, new BiConsumer() { // from class: io.vertx.core.eventbus.impl.clustered.c
                @Override // java.util.function.BiConsumer
                public final void accept(Object obj, Object obj2) {
                    int i10 = i9;
                    NodeSelector nodeSelector2 = nodeSelector;
                    Message<?> message = (Message) obj;
                    Promise<String> promise2 = (Promise) obj2;
                    switch (i10) {
                        case 0:
                            nodeSelector2.selectForSend(message, promise2);
                            return;
                        default:
                            nodeSelector2.selectForPublish(message, promise2);
                            return;
                    }
                }
            }, promise);
            promise.future().onComplete2(new Handler(this) { // from class: io.vertx.core.eventbus.impl.clustered.d

                /* renamed from: e, reason: collision with root package name */
                public final /* synthetic */ ClusteredEventBus f30244e;

                {
                    this.f30244e = this;
                }

                @Override // io.vertx.core.Handler
                public final void handle(Object obj) {
                    int i10 = i9;
                    ClusteredEventBus clusteredEventBus = this.f30244e;
                    OutboundDeliveryContext outboundDeliveryContext2 = outboundDeliveryContext;
                    AsyncResult asyncResult = (AsyncResult) obj;
                    switch (i10) {
                        case 0:
                            clusteredEventBus.lambda$sendOrPub$5(outboundDeliveryContext2, asyncResult);
                            return;
                        default:
                            clusteredEventBus.lambda$sendOrPub$6(outboundDeliveryContext2, asyncResult);
                            return;
                    }
                }
            });
            return;
        }
        PromiseInternal<T> promise2 = outboundDeliveryContext.ctx.promise();
        MessageImpl<?, T> messageImpl2 = outboundDeliveryContext.message;
        final NodeSelector nodeSelector2 = this.nodeSelector;
        nodeSelector2.getClass();
        final int i10 = 1;
        serializer.queue(messageImpl2, new BiConsumer() { // from class: io.vertx.core.eventbus.impl.clustered.c
            @Override // java.util.function.BiConsumer
            public final void accept(Object obj, Object obj2) {
                int i102 = i10;
                NodeSelector nodeSelector22 = nodeSelector2;
                Message<?> message = (Message) obj;
                Promise<String> promise22 = (Promise) obj2;
                switch (i102) {
                    case 0:
                        nodeSelector22.selectForSend(message, promise22);
                        return;
                    default:
                        nodeSelector22.selectForPublish(message, promise22);
                        return;
                }
            }
        }, promise2);
        promise2.future().onComplete2(new Handler(this) { // from class: io.vertx.core.eventbus.impl.clustered.d

            /* renamed from: e, reason: collision with root package name */
            public final /* synthetic */ ClusteredEventBus f30244e;

            {
                this.f30244e = this;
            }

            @Override // io.vertx.core.Handler
            public final void handle(Object obj) {
                int i102 = i10;
                ClusteredEventBus clusteredEventBus = this.f30244e;
                OutboundDeliveryContext outboundDeliveryContext2 = outboundDeliveryContext;
                AsyncResult asyncResult = (AsyncResult) obj;
                switch (i102) {
                    case 0:
                        clusteredEventBus.lambda$sendOrPub$5(outboundDeliveryContext2, asyncResult);
                        return;
                    default:
                        clusteredEventBus.lambda$sendOrPub$6(outboundDeliveryContext2, asyncResult);
                        return;
                }
            }
        });
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl, io.vertx.core.eventbus.impl.EventBusInternal
    public void start(final Promise<Void> promise) {
        NetServer createNetServer = this.vertx.createNetServer(getServerOptions());
        this.server = createNetServer;
        createNetServer.connectHandler(getServerHandler());
        final int clusterPort = getClusterPort();
        final String clusterHost = getClusterHost();
        this.ebContext.runOnContext(new Handler() { // from class: io.vertx.core.eventbus.impl.clustered.a
            @Override // io.vertx.core.Handler
            public final void handle(Object obj) {
                ClusteredEventBus.this.lambda$start$2(clusterPort, clusterHost, promise, (Void) obj);
            }
        });
    }

    public VertxInternal vertx() {
        return this.vertx;
    }
}
