/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.eventsourcing;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingObject;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.modeling.AggregateRoot;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.caching.NoOpCache;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.Aggregate;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.AggregateEventStream;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.DefaultEventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcedModel;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandler;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventStore;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventStoreSerializer;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.NoOpSnapshotRepository;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.NoSnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.PeriodicSnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.SnapshotRepository;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.SnapshotTrigger;
import java.beans.ConstructorProperties;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSourcingRepository
implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(EventSourcingRepository.class);
    private static final Function<String, String> keyFunction = aggregateId -> EventSourcingRepository.class.getSimpleName() + ":" + aggregateId;
    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    private final Cache cache;
    private final EventStoreSerializer serializer;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Map<Class<?>, BiFunction<String, Boolean, EventSourcedAggregate<?>>> aggregateFactory = new ConcurrentHashMap();
    private final ThreadLocal<Collection<EventSourcedAggregate<?>>> loadedModels = new ThreadLocal();

    public EventSourcingRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, EventStoreSerializer serializer) {
        this(eventStore, snapshotRepository, cache, serializer, new DefaultEventSourcingHandlerFactory(DeserializingMessage.defaultParameterResolvers));
    }

    @Override
    public boolean supports(Class<?> aggregateType) {
        return aggregateType.isAnnotationPresent(Aggregate.class);
    }

    @Override
    public boolean cachingAllowed(Class<?> aggregateType) {
        Aggregate aggregate = aggregateType.getAnnotation(Aggregate.class);
        if (aggregate == null) {
            throw new UnsupportedOperationException("Unsupported aggregate type: " + aggregateType);
        }
        return aggregate.cached();
    }

    @Override
    public <T> AggregateRoot<T> load(String aggregateId, Class<T> aggregateType, boolean readOnly, boolean onlyCached) {
        if (onlyCached) {
            return Optional.ofNullable((EventSourcedModel)this.cache.getIfPresent(keyFunction.apply(aggregateId))).orElse(null);
        }
        return ((Collection)Optional.ofNullable(this.loadedModels.get()).orElse(Collections.emptyList())).stream().filter(model -> ((EventSourcedAggregate)model).id.equals(aggregateId) && aggregateType.isAssignableFrom(model.getAggregateType())).map(m -> m).findAny().orElseGet(() -> this.createAggregate(aggregateType, aggregateId, readOnly));
    }

    protected <T> EventSourcedAggregate<T> createAggregate(Class<T> aggregateType, String aggregateId, boolean readOnly) {
        return (EventSourcedAggregate)this.aggregateFactory.computeIfAbsent(aggregateType, t -> {
            EventSourcingHandler eventSourcingHandler = this.handlerFactory.forType(aggregateType);
            Cache cache = this.isCached(aggregateType) ? this.cache : NoOpCache.INSTANCE;
            SnapshotRepository snapshotRepository = this.snapshotRepository(aggregateType);
            SnapshotTrigger snapshotTrigger = this.snapshotTrigger(aggregateType);
            String domain = this.domain(aggregateType);
            boolean eventSourced = this.eventSourced(aggregateType);
            return (id, ro) -> {
                EventSourcedAggregate eventSourcedAggregate = new EventSourcedAggregate(aggregateType, eventSourcingHandler, cache, this.serializer, this.eventStore, snapshotRepository, snapshotTrigger, domain, eventSourced, (boolean)ro, (String)id);
                eventSourcedAggregate.initialize();
                return eventSourcedAggregate;
            };
        }).apply(aggregateId, readOnly);
    }

    protected SnapshotRepository snapshotRepository(Class<?> aggregateType) {
        int frequency = this.snapshotPeriod(aggregateType);
        return frequency > 0 ? this.snapshotRepository : NoOpSnapshotRepository.INSTANCE;
    }

    protected SnapshotTrigger snapshotTrigger(Class<?> aggregateType) {
        int frequency = this.snapshotPeriod(aggregateType);
        return frequency > 0 ? new PeriodicSnapshotTrigger(frequency) : NoSnapshotTrigger.INSTANCE;
    }

    protected int snapshotPeriod(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(a -> a.eventSourced() ? a.snapshotPeriod() : 1).orElse((int)((Integer)Aggregate.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()));
    }

    protected boolean isCached(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::cached).orElse((boolean)((Boolean)Aggregate.class.getMethod("cached", new Class[0]).getDefaultValue()));
    }

    protected String domain(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::domain).filter(s -> !s.isEmpty()).orElse(aggregateType.getSimpleName());
    }

    protected boolean eventSourced(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::eventSourced).orElse((boolean)((Boolean)Aggregate.class.getMethod("eventSourced", new Class[0]).getDefaultValue()));
    }

    @ConstructorProperties(value={"eventStore", "snapshotRepository", "cache", "serializer", "handlerFactory"})
    public EventSourcingRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, EventStoreSerializer serializer, EventSourcingHandlerFactory handlerFactory) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
        this.cache = cache;
        this.serializer = serializer;
        this.handlerFactory = handlerFactory;
    }

    protected class EventSourcedAggregate<T>
    implements AggregateRoot<T> {
        private final Class<T> aggregateType;
        private final EventSourcingHandler<T> eventSourcingHandler;
        private final Cache cache;
        private final EventStoreSerializer serializer;
        private final EventStore eventStore;
        private final SnapshotRepository snapshotRepository;
        private final SnapshotTrigger snapshotTrigger;
        private final String domain;
        private final boolean eventSourced;
        private final List<DeserializingMessage> unpublishedEvents = new ArrayList<DeserializingMessage>();
        private final boolean readOnly;
        private final String id;
        private EventSourcedModel<T> model;

        protected void initialize() {
            this.model = Optional.ofNullable((EventSourcedModel)this.cache.getIfPresent((String)keyFunction.apply(this.id))).filter(a -> this.aggregateType.isAssignableFrom(a.get().getClass())).orElseGet(() -> {
                EventSourcedModel model = this.snapshotRepository.getSnapshot(this.id).filter(a -> this.aggregateType.isAssignableFrom(a.get().getClass())).orElse(EventSourcedModel.builder().id(this.id).type(this.aggregateType).build());
                if (!this.eventSourced) {
                    return model.toBuilder().sequenceNumber(model.sequenceNumber()).build();
                }
                AggregateEventStream<DeserializingMessage> eventStream = this.eventStore.getEvents(this.id, model.sequenceNumber());
                Iterator<DeserializingMessage> iterator = eventStream.iterator();
                while (iterator.hasNext()) {
                    DeserializingMessage event = iterator.next();
                    model = model.toBuilder().sequenceNumber(model.sequenceNumber() + 1L).type(this.aggregateType).id(this.id).lastEventId(event.getSerializedObject().getMessageId()).timestamp(Instant.ofEpochMilli(event.getSerializedObject().getTimestamp())).model(this.eventSourcingHandler.invoke(model.get(), event)).previous(model).build();
                }
                return model.toBuilder().sequenceNumber(eventStream.getLastSequenceNumber().orElse(model.sequenceNumber())).build();
            });
        }

        public Class<?> getAggregateType() {
            return Optional.ofNullable(this.model).map(EventSourcedModel::get).map(m -> m.getClass()).orElse(this.aggregateType);
        }

        @Override
        public AggregateRoot<T> apply(Message message) {
            if (this.readOnly) {
                throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The model is readonly.", message));
            }
            Metadata metadata = message.getMetadata().with(new Object[]{"$aggregateId", this.id, "$aggregateType", this.getAggregateType().getName()});
            Message eventMessage = message.withMetadata(metadata);
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject<byte[], SerializedMessage>(this.serializer.serialize(eventMessage), type -> this.serializer.convert(eventMessage.getPayload(), type)), MessageType.EVENT);
            this.model = this.model.toBuilder().sequenceNumber(this.model.sequenceNumber() + 1L).model(this.eventSourcingHandler.invoke(this.model.get(), deserializingMessage)).previous(this.model).lastEventId(eventMessage.getMessageId()).timestamp(eventMessage.getTimestamp()).build();
            this.unpublishedEvents.add(deserializingMessage);
            if (EventSourcingRepository.this.loadedModels.get() == null) {
                EventSourcingRepository.this.loadedModels.set(Collections.asLifoQueue(new ArrayDeque()));
                ((Collection)EventSourcingRepository.this.loadedModels.get()).add(this);
                Runnable commit = () -> {
                    Collection models = (Collection)EventSourcingRepository.this.loadedModels.get();
                    EventSourcingRepository.this.loadedModels.remove();
                    models.stream().map(EventSourcedAggregate::commit).reduce(Awaitable::join).ifPresent(a -> {
                        try {
                            a.await();
                        }
                        catch (Exception e) {
                            List<String> aggregateIds = models.stream().map(m -> m.id).collect(Collectors.toList());
                            log.error("Failed to commit events for aggregates {}. Clearing aggregates from the cache.", aggregateIds, (Object)e);
                            aggregateIds.forEach(id -> this.cache.invalidate((String)keyFunction.apply(id)));
                        }
                    });
                };
                if (this.aggregateType.getAnnotation(Aggregate.class).commitInBatch()) {
                    DeserializingMessage.whenBatchCompletes(commit);
                } else {
                    DeserializingMessage.whenMessageCompletes(commit);
                }
            } else if (((Collection)EventSourcingRepository.this.loadedModels.get()).stream().noneMatch(e -> e == this)) {
                ((Collection)EventSourcingRepository.this.loadedModels.get()).add(this);
            }
            return this;
        }

        @Override
        public T get() {
            return this.model.get();
        }

        @Override
        public AggregateRoot<T> previous() {
            return this.model.previous();
        }

        @Override
        public String lastEventId() {
            return this.model.lastEventId();
        }

        @Override
        public Instant timestamp() {
            return this.model.timestamp();
        }

        @Override
        public String id() {
            return this.id;
        }

        @Override
        public Class<T> type() {
            return this.aggregateType;
        }

        protected Awaitable commit() {
            Awaitable result = Awaitable.ready();
            if (!this.unpublishedEvents.isEmpty()) {
                try {
                    this.cache.put((String)keyFunction.apply(this.model.id()), this.model);
                    result = this.eventStore.storeEvents(this.model.id(), this.domain, this.model.sequenceNumber(), new ArrayList<DeserializingMessage>(this.unpublishedEvents));
                    if (this.snapshotTrigger.shouldCreateSnapshot(this.model, this.unpublishedEvents)) {
                        this.snapshotRepository.storeSnapshot(this.model);
                    }
                }
                catch (Exception e) {
                    log.error("Failed to commit new events of aggregate {}", (Object)this.model.id(), (Object)e);
                    this.cache.invalidate((String)keyFunction.apply(this.model.id()));
                }
                finally {
                    this.unpublishedEvents.clear();
                }
            }
            return result;
        }

        @ConstructorProperties(value={"aggregateType", "eventSourcingHandler", "cache", "serializer", "eventStore", "snapshotRepository", "snapshotTrigger", "domain", "eventSourced", "readOnly", "id"})
        public EventSourcedAggregate(Class<T> aggregateType, EventSourcingHandler<T> eventSourcingHandler, Cache cache, EventStoreSerializer serializer, EventStore eventStore, SnapshotRepository snapshotRepository, SnapshotTrigger snapshotTrigger, String domain, boolean eventSourced, boolean readOnly, String id) {
            this.aggregateType = aggregateType;
            this.eventSourcingHandler = eventSourcingHandler;
            this.cache = cache;
            this.serializer = serializer;
            this.eventStore = eventStore;
            this.snapshotRepository = snapshotRepository;
            this.snapshotTrigger = snapshotTrigger;
            this.domain = domain;
            this.eventSourced = eventSourced;
            this.readOnly = readOnly;
            this.id = id;
        }
    }
}

