/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.scheduling.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.IndexUtils;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.scheduling.ScheduledMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemorySchedulingClient
extends InMemoryMessageStore
implements SchedulingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemorySchedulingClient.class);
    private final ConcurrentSkipListMap<Long, String> times = new ConcurrentSkipListMap();
    private volatile Clock clock = Clock.systemUTC();

    @Override
    public MessageBatch readAndWait(String consumer, String trackerId, Long previousLastIndex, ConsumerConfiguration configuration) {
        Long lastIndex;
        MessageBatch messageBatch = super.readAndWait(consumer, trackerId, previousLastIndex, configuration);
        List messages = messageBatch.getMessages().stream().filter(m -> this.times.containsKey(m.getIndex()) && this.clock.millis() >= IndexUtils.millisFromIndex((long)m.getIndex())).collect(Collectors.toList());
        Long l = lastIndex = messages.isEmpty() ? null : ((SerializedMessage)messages.get(messages.size() - 1)).getIndex();
        if (configuration.getTypeFilter() != null) {
            messages = messages.stream().filter(m -> m.getData().getType().matches(configuration.getTypeFilter())).collect(Collectors.toList());
        }
        return new MessageBatch(messageBatch.getSegment(), messages, lastIndex);
    }

    @Override
    protected boolean shouldWait(Map<Long, SerializedMessage> tailMap) {
        long deadline = IndexUtils.indexFromMillis((long)this.clock.millis());
        return tailMap.isEmpty() || tailMap.keySet().stream().noneMatch(index -> index <= deadline);
    }

    @Override
    public Awaitable storePosition(String consumer, int[] segment, long lastIndex) {
        this.times.headMap((Object)lastIndex).clear();
        return super.storePosition(consumer, segment, lastIndex);
    }

    @Override
    public Awaitable schedule(ScheduledMessage ... schedules) {
        for (ScheduledMessage schedule : schedules) {
            this.cancelSchedule(schedule.getScheduleId());
            long index = IndexUtils.indexFromMillis((long)schedule.getTimestamp());
            while (this.times.putIfAbsent(index, schedule.getScheduleId()) != null) {
                ++index;
            }
            schedule.getMessage().setIndex(Long.valueOf(index));
        }
        super.send((SerializedMessage[])Arrays.stream(schedules).map(ScheduledMessage::getMessage).toArray(SerializedMessage[]::new));
        return Awaitable.ready();
    }

    @Override
    public Awaitable cancelSchedule(String scheduleId) {
        this.times.values().removeIf(s -> s.equals(scheduleId));
        return Awaitable.ready();
    }

    @Override
    public Awaitable send(SerializedMessage ... messages) {
        throw new UnsupportedOperationException("Use method #schedule instead");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setClock(@NonNull Clock clock) {
        if (clock == null) {
            throw new NullPointerException("clock is marked non-null but is null");
        }
        InMemorySchedulingClient inMemorySchedulingClient = this;
        synchronized (inMemorySchedulingClient) {
            this.clock = clock;
            this.notifyAll();
        }
    }

    public List<Schedule> removeExpiredSchedules(Serializer serializer) {
        NavigableMap expiredEntries = this.times.headMap((Object)IndexUtils.indexFromMillis((long)this.clock.millis()), true);
        List<Schedule> result = expiredEntries.entrySet().stream().map(e -> {
            SerializedMessage m = this.getMessage((Long)e.getKey());
            return new Schedule(serializer.deserializeMessages(Stream.of(m), true, MessageType.SCHEDULE).findFirst().get().getPayload(), m.getMetadata(), (String)e.getValue(), IndexUtils.timestampFromIndex((long)((Long)e.getKey())));
        }).collect(Collectors.toList());
        expiredEntries.clear();
        return result;
    }
}

