/**
 * Copyright (c) 2018-2024 Linagora
 * 
 * This program/library is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 2.1 of the License, or (at your
 * option) any later version.
 * 
 * This program/library is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
 * for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program/library; If not, see http://www.gnu.org/licenses/
 * for the GNU Lesser General Public License version 2.1.
 */
package org.ow2.petals.bc.filetransfer;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange.Role;
import javax.xml.namespace.QName;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.ow2.easywsdl.wsdl.api.abstractItf.AbsItfOperation.MEPPatternConstants;
import org.ow2.petals.component.framework.junit.Message;
import org.ow2.petals.component.framework.junit.RequestMessage;
import org.ow2.petals.component.framework.junit.helpers.ServiceProviderImplementation;
import org.ow2.petals.component.framework.junit.impl.ConsumesServiceConfiguration;
import org.ow2.petals.component.framework.junit.impl.ProvidesServiceConfiguration;
import org.ow2.petals.component.framework.junit.impl.message.StatusToConsumerMessage;

import com.jayway.awaitility.Awaitility;

/**
 * Abstract class for unit tests about the sliding window of the BC FileTransfer comsumer part.
 * 
 * @author Christophe DENEUX - Linagora
 * 
 */
public class SlidingWindowTest extends SimpleTestEnvironment {

    private static final Logger LOG = Logger.getLogger(SlidingWindowTest.class.getName());

    @Rule
    public final TemporaryFolder consumesFolder = new TemporaryFolder();

    /**
     * <p>
     * Check the processing of the sliding window with the default window size (unlimited)
     * </p>
     * <p>
     * Expected results:
     * </p>
     * <ul>
     * <li>no error occurs during all processing,</li>
     * <li>the number of files processed concurrently is down or equals to the window size.</li>
     * </ul>
     */
    @Test
    public void defaultSlidingWindow() throws Exception {
        this.validateSlidingWindow(0, 27);
    }

    /**
     * <p>
     * Check the processing of the sliding window with a window size defined to a custom value.
     * </p>
     * <p>
     * Expected results:
     * </p>
     * <ul>
     * <li>no error occurs during all processing,</li>
     * <li>the number of files processed concurrently is down or equals to the window size.</li>
     * </ul>
     */
    @Test
    public void slidingWindowSizedToCustomValue() throws Exception {
        this.validateSlidingWindow(7, 23);
    }

    /**
     * <p>
     * Check the processing of the sliding window with a window size defined to 1.
     * </p>
     * <p>
     * Expected results:
     * </p>
     * <ul>
     * <li>no error occurs during all processing,</li>
     * <li>the number of files processed concurrently is down or equals to the window size.</li>
     * </ul>
     */
    @Test
    public void slidingWindowSizedToOne() throws Exception {
        this.validateSlidingWindow(1, 5);
    }

    private void validateSlidingWindow(final int slidingWindowSize, final int nbFiles) throws Exception {

        final int pollingPeriod = 10;
        final String listeningFolderName = "sub-folder";
        final File listeningDirectory = new File(this.consumesFolder.getRoot(), listeningFolderName);
        this.consumesFolder.newFolder(listeningFolderName);

        final ConsumesServiceConfiguration serviceConsumer = createConsumesPut(listeningDirectory,
                FileTransferConstants.ENUM_TRANSFER_MODE_CONTENT);
        serviceConsumer.setParameter(
                new QName(FileTransferConstants.FILETRANSFER_SERVICE_NS, FileTransferConstants.PARAM_POLLING_PERIOD),
                String.valueOf(pollingPeriod * 1000));
        serviceConsumer.setParameter(new QName(FileTransferConstants.FILETRANSFER_SERVICE_NS,
                FileTransferConstants.PARAM_EXTERNAL_PROCESSOR_BLOCK_SIZE), String.valueOf(slidingWindowSize));
        COMPONENT_UNDER_TEST.deployService(SU_CONSUMER_NAME, serviceConsumer);
        assertTrue(COMPONENT_UNDER_TEST.isServiceDeployed(SU_CONSUMER_NAME));
        COMPONENT_UNDER_TEST.deployService(SU_PROVIDER_NAME,
                new ProvidesServiceConfiguration(TEST_ITF, TEST_SVC, TEST_EP));
        assertTrue(COMPONENT_UNDER_TEST.isServiceDeployed(SU_PROVIDER_NAME));

        // We create the service providers that will reply to external events
        final ReentrantReadWriteLock rwLockServiceProviderStartProcessing = new ReentrantReadWriteLock(false);
        final List<SlidingWindowThread> serviceProviders = new ArrayList<>();
        for (int i = 0; i < nbFiles; i++) {
            final SlidingWindowThread slidingWindowThread = new SlidingWindowThread(
                    rwLockServiceProviderStartProcessing);
            serviceProviders.add(slidingWindowThread);
            slidingWindowThread.start();
        }

        // Force all service providers to wait a signal to start incoming request processing
        rwLockServiceProviderStartProcessing.writeLock().tryLock(1, TimeUnit.MINUTES);

        // We create the external events that fire the SU consume
        for (int i = 0; i < nbFiles; i++) {
            createXmlFile(listeningDirectory, null);
        }

        // Wait to fill the sliding window
        Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(pollingPeriod, TimeUnit.SECONDS)
                .pollInterval(1, TimeUnit.SECONDS).until(new Callable<Boolean>() {
                    @Override
                    public Boolean call() throws Exception {
                        final int currentSlidingWindowSize = rwLockServiceProviderStartProcessing.getQueueLength();
                        LOG.info("Current sliding window size: " + currentSlidingWindowSize);
                        if (slidingWindowSize == 0) {
                            return currentSlidingWindowSize == nbFiles;
                        } else {
                            return currentSlidingWindowSize == slidingWindowSize;
                        }
                    }
                });

        // Signal that service providers can process incoming requests
        rwLockServiceProviderStartProcessing.writeLock().unlock();

        for (final SlidingWindowThread serviceProvider : serviceProviders) {
            serviceProvider.join(60000);
            if (serviceProvider.getError() != null) {
                throw serviceProvider.getError();
            }
        }

        // Check that all files were processed
        Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(pollingPeriod, TimeUnit.SECONDS)
                .pollInterval(1, TimeUnit.SECONDS).until(new Callable<Boolean>() {
                    @Override
                    public Boolean call() throws Exception {
                        return listeningDirectory.list().length == 0;
                    }
                });

    }

    private static class SlidingWindowThread extends Thread {

        private static AtomicInteger THREAD_NUM = new AtomicInteger(0);

        private final ReadWriteLock rwLockServiceProviderStartProcessing;

        private Exception error = null;

        public SlidingWindowThread(final ReadWriteLock rwLockServiceProviderProcessing) {
            super("Sliding Window Thread #" + THREAD_NUM.getAndIncrement());
            this.rwLockServiceProviderStartProcessing = rwLockServiceProviderProcessing;
        }

        @Override
        public void run() {

            try {
                // We receive the exchange sent by the SU consumer
                COMPONENT.receiveAsExternalProvider(new ServiceProviderImplementation() {
                    @Override
                    public Message provides(final RequestMessage request) throws Exception {

                        LOG.info("Start of service provider mock");
                        try {
                            rwLockServiceProviderStartProcessing.readLock().tryLock(1, TimeUnit.MINUTES);
                            LOG.info("Service provider mock processing unlocked");

                            assertEquals(ExchangeStatus.ACTIVE, request.getMessageExchange().getStatus());
                            assertEquals(FileTransferConstants.PUT_OPERATION,
                                    request.getMessageExchange().getOperation().getLocalPart());
                            assertEquals(MEPPatternConstants.IN_ONLY.value(),
                                    request.getMessageExchange().getPattern());
                            assertEquals(Role.PROVIDER, request.getMessageExchange().getRole());

                            return new StatusToConsumerMessage(request, ExchangeStatus.DONE);
                        } finally {
                            LOG.info("End of service provider mock");
                        }
                    }

                    @Override
                    public boolean statusExpected() {
                        // No status is expected by this facade service provider because of InOnly pattern, the status
                        // is directly returned to the consumer by this facade service provider
                        return false;
                    }
                },
                        // Waiting time must be at least: ((nb-files / sliding-window-size) + 1) * polling-period
                        60000);
            } catch (final Exception e) {
                LOG.log(Level.SEVERE, "Error in service provider", e);
                this.error = e;
            }
        }

        public Exception getError() {
            return this.error;
        }
    }
}
