/*
 * Decompiled with CFR 0.152.
 */
package org.dcm4chee.arc.compress.impl;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Semaphore;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.dcm4che3.data.UID;
import org.dcm4che3.net.ApplicationEntity;
import org.dcm4chee.arc.Scheduler;
import org.dcm4chee.arc.compress.impl.CompressionEJB;
import org.dcm4chee.arc.conf.ArchiveAEExtension;
import org.dcm4chee.arc.conf.ArchiveCompressionRule;
import org.dcm4chee.arc.conf.ArchiveDeviceExtension;
import org.dcm4chee.arc.conf.Duration;
import org.dcm4chee.arc.conf.ScheduleExpression;
import org.dcm4chee.arc.entity.Location;
import org.dcm4chee.arc.entity.Series;
import org.dcm4chee.arc.retrieve.LocationInputStream;
import org.dcm4chee.arc.retrieve.RetrieveContext;
import org.dcm4chee.arc.retrieve.RetrieveService;
import org.dcm4chee.arc.store.InstanceLocations;
import org.dcm4chee.arc.store.StoreContext;
import org.dcm4chee.arc.store.StoreService;
import org.dcm4chee.arc.store.StoreSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class CompressionScheduler
extends Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(CompressionScheduler.class);
    @Inject
    private CompressionEJB ejb;
    @Inject
    private RetrieveService retrieveService;
    @Inject
    private StoreService storeService;

    protected CompressionScheduler() {
        super(Scheduler.Mode.scheduleWithFixedDelay);
    }

    protected Duration getPollingInterval() {
        ArchiveDeviceExtension arcDev = (ArchiveDeviceExtension)this.device.getDeviceExtension(ArchiveDeviceExtension.class);
        return arcDev != null && arcDev.getCompressionAETitle() != null ? arcDev.getCompressionPollingInterval() : null;
    }

    protected Logger log() {
        return LOG;
    }

    protected void execute() {
        List<Series.Compression> compressions;
        ArchiveDeviceExtension arcDev = (ArchiveDeviceExtension)this.device.getDeviceExtension(ArchiveDeviceExtension.class);
        if (!ScheduleExpression.emptyOrAnyContainsNow((ScheduleExpression[])arcDev.getCompressionSchedules())) {
            return;
        }
        String aet = arcDev.getCompressionAETitle();
        ApplicationEntity ae = this.device.getApplicationEntity(aet, true);
        if (ae == null || !ae.isInstalled()) {
            LOG.warn("No such Application Entity: " + aet);
            return;
        }
        int fetchSize = arcDev.getCompressionFetchSize();
        int permits = arcDev.getCompressionThreads();
        Semaphore semaphore = new Semaphore(permits);
        do {
            compressions = this.ejb.findSeriesForCompression(fetchSize);
            LOG.info("Found {} Series scheduled for compression", (Object)compressions.size());
            for (Series.Compression compression : compressions) {
                if (!this.ejb.claimForCompression(compression)) continue;
                CompressionScheduler.acquire(semaphore, 1);
                this.device.execute(() -> {
                    this.process(ae, compression);
                    semaphore.release();
                });
            }
            int acquire = permits - (permits = arcDev.getCompressionThreads());
            if (acquire > 0) {
                CompressionScheduler.acquire(semaphore, acquire);
                continue;
            }
            if (acquire >= 0) continue;
            semaphore.release(-acquire);
        } while (arcDev.getCompressionPollingInterval() != null && compressions.size() == fetchSize);
        CompressionScheduler.acquire(semaphore, permits);
    }

    private static void acquire(Semaphore semaphore, int permits) {
        if (!semaphore.tryAcquire(permits)) {
            try {
                int n = permits - semaphore.availablePermits();
                LOG.debug("Wait for completion of compression of {} Series", (Object)n);
                semaphore.acquire(permits);
                LOG.debug("Compression of {} Series completed", (Object)n);
            }
            catch (InterruptedException e) {
                LOG.warn("Failed to wait for completion of compression of Series", (Throwable)e);
            }
        }
    }

    private void process(ApplicationEntity ae, Series.Compression compr) {
        ArchiveAEExtension arcAE = (ArchiveAEExtension)ae.getAEExtensionNotNull(ArchiveAEExtension.class);
        if (compr.instancePurgeState == Series.InstancePurgeState.PURGED) {
            try (StoreSession session = this.storeService.newStoreSession(ae);){
                this.storeService.restoreInstances(session, compr.studyInstanceUID, compr.seriesInstanceUID, arcAE.getPurgeInstanceRecordsDelay());
            }
            catch (Exception e) {
                LOG.warn("Failed to restore Instance records for compression of Series[iuid={}] of Study[iuid={}]:\n", new Object[]{compr.seriesInstanceUID, compr.studyInstanceUID, e});
                return;
            }
        }
        try (RetrieveContext retrCtx = this.retrieveService.newRetrieveContext(ae.getAETitle(), compr.studyInstanceUID, compr.seriesInstanceUID, null);
             StoreSession session = this.storeService.newStoreSession(ae);){
            this.retrieveService.calculateMatches(retrCtx);
            LOG.info("Start compression of {} Instances of Series[iuid={}] of Study[iuid={}]", new Object[]{retrCtx.getNumberOfMatches(), compr.seriesInstanceUID, compr.studyInstanceUID});
            int failures = 0;
            int completed = 0;
            int skipped = 0;
            ArchiveCompressionRule compressionRule = new ArchiveCompressionRule();
            compressionRule.setTransferSyntax(compr.transferSyntaxUID);
            compressionRule.setImageWriteParams(compr.imageWriteParams());
            for (InstanceLocations inst : retrCtx.getMatches()) {
                if (this.alreadyCompressed(inst.getLocations(), compr.transferSyntaxUID)) {
                    LOG.info("{} of Series[iuid={}] of Study[iuid={}] already compressed with {} - skipped", new Object[]{inst, compr.seriesInstanceUID, compr.studyInstanceUID, UID.nameOf((String)compr.transferSyntaxUID)});
                    ++skipped;
                    continue;
                }
                try {
                    LocationInputStream lis = this.retrieveService.openLocationInputStream(retrCtx, inst);
                    try {
                        StoreContext ctx = this.storeService.newStoreContext(session);
                        ctx.setCompressionRule(compressionRule);
                        this.storeService.compress(ctx, inst, lis.stream);
                        ++completed;
                    }
                    finally {
                        if (lis == null) continue;
                        lis.close();
                    }
                }
                catch (Exception e) {
                    LOG.info("Failed to compress {} of Series[iuid={}] of Study[iuid={}]:\n", new Object[]{inst, compr.seriesInstanceUID, compr.studyInstanceUID, e});
                    ++failures;
                }
            }
            this.ejb.updateDB(compr, completed, failures);
            LOG.info("Finished compression of {} Instances of Series[iuid={}] of Study[iuid={}] - {} failures, {} skipped", new Object[]{completed, compr.seriesInstanceUID, compr.studyInstanceUID, failures, skipped});
            this.retrieveService.updateLocations(retrCtx);
        }
        catch (IOException e) {
            LOG.warn("Failed to calculate Instances for compression of Series[iuid={}] of Study[iuid={}]:\n", new Object[]{compr.seriesInstanceUID, compr.studyInstanceUID, e});
        }
    }

    private boolean alreadyCompressed(List<Location> locations, String tsuid) {
        return locations.stream().anyMatch(l -> Location.isDicomFile((Location)l) && l.getTransferSyntaxUID().equals(tsuid));
    }
}

