/*
 * Copyright (C) 2008 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;

import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * Unit tests for {@link Futures#transform(ListenableFuture, AsyncFunction)}.
 *
 * @author Nishant Thakkar
 */
public class FuturesTransformAsyncFunctionTest
    extends AbstractChainedListenableFutureTest<String> {
  protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
  protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
  private static final String RESULT_DATA = "SUCCESS";

  private SettableFuture<String> outputFuture;
  // Signals that the function is waiting to complete
  private CountDownLatch funcIsWaitingLatch;
  // Signals the function so it will complete
  private CountDownLatch funcCompletionLatch;

  @Override protected ListenableFuture<String> buildChainingFuture(
      ListenableFuture<Integer> inputFuture) {
    outputFuture = SettableFuture.create();
    funcIsWaitingLatch = new CountDownLatch(1);
    funcCompletionLatch = new CountDownLatch(1);
    return Futures.transform(inputFuture, new ChainingFunction());
  }

  @Override protected String getSuccessfulResult() {
    return RESULT_DATA;
  }

  private class ChainingFunction implements AsyncFunction<Integer, String> {
    @Override
    public ListenableFuture<String> apply(Integer input) {
      switch (input) {
        case VALID_INPUT_DATA: outputFuture.set(RESULT_DATA); break;
        case SLOW_OUTPUT_VALID_INPUT_DATA: break;  // do nothing to the result
        case SLOW_FUNC_VALID_INPUT_DATA:
          funcIsWaitingLatch.countDown();
          awaitUninterruptibly(funcCompletionLatch);
          break;
        default: throw new UndeclaredThrowableException(EXCEPTION);
      }
      return outputFuture;
    }
  }

  public void testFutureGetThrowsFunctionException() throws Exception {
    inputFuture.set(EXCEPTION_DATA);
    listener.assertException(EXCEPTION);
  }

  public void testFutureGetThrowsCancellationIfInputCancelled()
      throws Exception {
    inputFuture.cancel(true); // argument is ignored
    try {
      resultFuture.get();
      fail("Result future must throw CancellationException"
          + " if input future is cancelled.");
    } catch (CancellationException expected) {}
  }

  public void testFutureGetThrowsCancellationIfOutputCancelled()
      throws Exception {
    inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
    outputFuture.cancel(true); // argument is ignored
    try {
      resultFuture.get();
      fail("Result future must throw CancellationException"
          + " if function output future is cancelled.");
    } catch (CancellationException expected) {}
  }

  public void testFutureCancelBeforeInputCompletion() throws Exception {
    assertTrue(resultFuture.cancel(true));
    assertTrue(resultFuture.isCancelled());
    assertTrue(inputFuture.isCancelled());
    assertFalse(outputFuture.isCancelled());
    try {
      resultFuture.get();
      fail("Result future is cancelled and should have thrown a"
          + " CancellationException");
    } catch (CancellationException expected) {}
  }

  public void testFutureCancellableBeforeOutputCompletion() throws Exception {
    inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
    assertTrue(resultFuture.cancel(true));
    assertTrue(resultFuture.isCancelled());
    assertFalse(inputFuture.isCancelled());
    assertTrue(outputFuture.isCancelled());
    try {
      resultFuture.get();
      fail("Result future is cancelled and should have thrown a"
          + " CancellationException");
    } catch (CancellationException expected) {}
  }

  public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
    // Set the result in a separate thread since this test runs the function
    // (which will block) in the same thread.
    new Thread() {
      @Override
      public void run() {
        inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
      }
    }.start();
    funcIsWaitingLatch.await();

    assertTrue(resultFuture.cancel(true));
    assertTrue(resultFuture.isCancelled());
    assertFalse(inputFuture.isCancelled());
    assertFalse(outputFuture.isCancelled());
    try {
      resultFuture.get();
      fail("Result future is cancelled and should have thrown a"
          + " CancellationException");
    } catch (CancellationException expected) {}

    funcCompletionLatch.countDown();  // allow the function to complete
    try {
      outputFuture.get();
      fail("The function output future is cancelled and should have thrown a"
          + " CancellationException");
    } catch (CancellationException expected) {}
  }

  public void testFutureCancelAfterCompletion() throws Exception {
    inputFuture.set(VALID_INPUT_DATA);
    assertFalse(resultFuture.cancel(true));
    assertFalse(resultFuture.isCancelled());
    assertFalse(inputFuture.isCancelled());
    assertFalse(outputFuture.isCancelled());
    assertEquals(RESULT_DATA, resultFuture.get());
  }

  public void testFutureGetThrowsRuntimeException() throws Exception {
    BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
    ListenableFuture<String> chain = buildChainingFuture(badInput);
    try {
      chain.get();
      fail("Future.get must throw an exception when the input future fails.");
    } catch (ExecutionException e) {
      assertSame(RuntimeException.class, e.getCause().getClass());
    }
  }

  /**
   * Proxy to throw a {@link RuntimeException} out of the {@link #get()} method.
   */
  public static class BadFuture
      extends SimpleForwardingListenableFuture<Integer> {
    protected BadFuture(ListenableFuture<Integer> delegate) {
      super(delegate);
    }

    @Override
    public Integer get() {
      throw new RuntimeException("Oops");
    }
  }
}
