/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.virtualnode.redundant.mixin;

import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.config.Config;
import io.datarouter.storage.node.op.raw.QueueStorage;
import io.datarouter.storage.queue.QueueMessage;
import io.datarouter.storage.queue.QueueMessageKey;
import io.datarouter.util.timer.PhaseTimer;
import io.datarouter.virtualnode.redundant.RedundantQueueNode;
import io.datarouter.virtualnode.redundant.mixin.SqsRedundantNodeTool;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

public interface RedundantQueueStorageMixin<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>, N extends QueueStorage.QueueStorageNode<PK, D, F>>
extends QueueStorage<PK, D>,
RedundantQueueNode<PK, D, F, N> {
    default public D poll(Config config) {
        for (QueueStorage.QueueStorageNode node : this.getReadNodes()) {
            Databean messages = node.poll(config);
            if (messages == null) continue;
            return (D)messages;
        }
        return null;
    }

    default public List<D> pollMulti(Config config) {
        return Scanner.of((Iterable)this.getReadNodes()).concatIter(node -> node.pollMulti(config)).list();
    }

    default public Scanner<D> pollUntilEmpty(Config config) {
        return Scanner.of((Iterable)this.getReadNodes()).concat(node -> node.pollUntilEmpty(config));
    }

    default public void ack(QueueMessageKey key, Config config) {
        PhaseTimer phaseTimer = new PhaseTimer();
        for (QueueStorage.QueueStorageNode node : this.getReadNodes()) {
            try {
                node.ack(key, config);
                phaseTimer.add("success " + node);
                return;
            }
            catch (RuntimeException e) {
                SqsRedundantNodeTool.swallowIfNotFound(e, node);
                phaseTimer.add("failed node " + node);
            }
        }
    }

    default public void ackMulti(Collection<QueueMessageKey> keys, Config config) {
        PhaseTimer phaseTimer = new PhaseTimer();
        for (QueueStorage.QueueStorageNode node : this.getReadNodes()) {
            try {
                node.ackMulti(keys, config);
                phaseTimer.add("success " + node);
                return;
            }
            catch (RuntimeException e) {
                SqsRedundantNodeTool.swallowIfNotFound(e, node);
                phaseTimer.add("failed node " + node);
            }
        }
    }

    default public void put(D databean, Config config) {
        ((QueueStorage.QueueStorageNode)this.getWriteNode()).put(databean, config);
    }

    default public void putMulti(Collection<D> databeans, Config config) {
        ((QueueStorage.QueueStorageNode)this.getWriteNode()).putMulti(databeans, config);
    }

    default public QueueMessage<PK, D> peek(Config config) {
        PhaseTimer phaseTimer = new PhaseTimer();
        return Scanner.of((Iterable)this.getReadNodes()).shuffle().map(node -> {
            QueueMessage databean = node.peek(config);
            phaseTimer.add("node " + node);
            return databean;
        }).include(Objects::nonNull).findFirst().orElse(null);
    }

    default public List<QueueMessage<PK, D>> peekMulti(Config config) {
        PhaseTimer phaseTimer = new PhaseTimer();
        for (QueueStorage.QueueStorageNode node : this.getReadNodes()) {
            List messages = node.peekMulti(config);
            phaseTimer.add("node " + node);
            if (messages.isEmpty()) continue;
            return messages;
        }
        return List.of();
    }

    default public Scanner<QueueMessage<PK, D>> peekUntilEmpty(Config config) {
        return Scanner.of((Iterable)this.getReadNodes()).concat(node -> node.peekUntilEmpty(config));
    }
}

