1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.collect; 18 19 import com.google.common.util.concurrent.Uninterruptibles; 20 21 import junit.framework.TestCase; 22 23 import java.util.Collection; 24 import java.util.List; 25 import java.util.concurrent.ArrayBlockingQueue; 26 import java.util.concurrent.BlockingQueue; 27 import java.util.concurrent.ExecutorService; 28 import java.util.concurrent.Executors; 29 import java.util.concurrent.Future; 30 import java.util.concurrent.LinkedBlockingQueue; 31 import java.util.concurrent.PriorityBlockingQueue; 32 import java.util.concurrent.SynchronousQueue; 33 import java.util.concurrent.TimeUnit; 34 35 /** 36 * Tests for {@link Queues}. 37 * 38 * @author Dimitris Andreou 39 */ 40 41 public class QueuesTest extends TestCase { 42 /* 43 * All the following tests relate to BlockingQueue methods in Queues. 44 */ 45 46 public static List<BlockingQueue<Object>> blockingQueues() { 47 return ImmutableList.<BlockingQueue<Object>>of( 48 new LinkedBlockingQueue<Object>(), 49 new LinkedBlockingQueue<Object>(10), 50 new SynchronousQueue<Object>(), 51 new ArrayBlockingQueue<Object>(10), 52 new PriorityBlockingQueue<Object>(10, Ordering.arbitrary())); 53 } 54 55 private ExecutorService threadPool; 56 57 @Override 58 public void setUp() { 59 threadPool = Executors.newCachedThreadPool(); 60 } 61 62 @Override 63 public void tearDown() throws InterruptedException { 64 // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite 65 // loop, which will be noticed here 66 threadPool.shutdown(); 67 assertTrue("Some worker didn't finish in time", 68 threadPool.awaitTermination(1, TimeUnit.SECONDS)); 69 } 70 71 private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, 72 long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException { 73 return interruptibly 74 ? Queues.drain(q, buffer, maxElements, timeout, unit) 75 : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit); 76 } 77 78 public void testMultipleProducers() throws Exception { 79 for (BlockingQueue<Object> q : blockingQueues()) { 80 testMultipleProducers(q); 81 } 82 } 83 84 private void testMultipleProducers(BlockingQueue<Object> q) 85 throws InterruptedException { 86 for (boolean interruptibly : new boolean[] { true, false }) { 87 threadPool.submit(new Producer(q, 20)); 88 threadPool.submit(new Producer(q, 20)); 89 threadPool.submit(new Producer(q, 20)); 90 threadPool.submit(new Producer(q, 20)); 91 threadPool.submit(new Producer(q, 20)); 92 93 List<Object> buf = Lists.newArrayList(); 94 int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly); 95 assertEquals(100, elements); 96 assertEquals(100, buf.size()); 97 assertDrained(q); 98 } 99 } 100 101 public void testDrainTimesOut() throws Exception { 102 for (BlockingQueue<Object> q : blockingQueues()) { 103 testDrainTimesOut(q); 104 } 105 } 106 107 private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception { 108 for (boolean interruptibly : new boolean[] { true, false }) { 109 assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS)); 110 111 // producing one, will ask for two 112 Future<?> submitter = threadPool.submit(new Producer(q, 1)); 113 114 // make sure we time out 115 long startTime = System.nanoTime(); 116 117 int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly); 118 assertTrue(drained <= 1); 119 120 assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10)); 121 122 // If even the first one wasn't there, clean up so that the next test doesn't see an element. 123 submitter.get(); 124 if (drained == 0) { 125 assertNotNull(q.poll()); 126 } 127 } 128 } 129 130 public void testZeroElements() throws Exception { 131 for (BlockingQueue<Object> q : blockingQueues()) { 132 testZeroElements(q); 133 } 134 } 135 136 private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException { 137 for (boolean interruptibly : new boolean[] { true, false }) { 138 // asking to drain zero elements 139 assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly)); 140 } 141 } 142 143 public void testEmpty() throws Exception { 144 for (BlockingQueue<Object> q : blockingQueues()) { 145 testEmpty(q); 146 } 147 } 148 149 private void testEmpty(BlockingQueue<Object> q) { 150 assertDrained(q); 151 } 152 153 public void testNegativeMaxElements() throws Exception { 154 for (BlockingQueue<Object> q : blockingQueues()) { 155 testNegativeMaxElements(q); 156 } 157 } 158 159 private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException { 160 threadPool.submit(new Producer(q, 1)); 161 162 List<Object> buf = Lists.newArrayList(); 163 int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 164 assertEquals(elements, 0); 165 assertTrue(buf.isEmpty()); 166 167 // Clean up produced element to free the producer thread, otherwise it will complain 168 // when we shutdown the threadpool. 169 Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 170 } 171 172 public void testDrain_throws() throws Exception { 173 for (BlockingQueue<Object> q : blockingQueues()) { 174 testDrain_throws(q); 175 } 176 } 177 178 private void testDrain_throws(BlockingQueue<Object> q) { 179 threadPool.submit(new Interrupter(Thread.currentThread())); 180 try { 181 Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 182 fail(); 183 } catch (InterruptedException expected) { 184 } 185 } 186 187 public void testDrainUninterruptibly_doesNotThrow() throws Exception { 188 for (BlockingQueue<Object> q : blockingQueues()) { 189 testDrainUninterruptibly_doesNotThrow(q); 190 } 191 } 192 193 private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) { 194 final Thread mainThread = Thread.currentThread(); 195 threadPool.submit(new Runnable() { 196 public void run() { 197 new Producer(q, 50).run(); 198 new Interrupter(mainThread).run(); 199 new Producer(q, 50).run(); 200 } 201 }); 202 List<Object> buf = Lists.newArrayList(); 203 int elements = 204 Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 205 // so when this drains all elements, we know the thread has also been interrupted in between 206 assertTrue(Thread.interrupted()); 207 assertEquals(100, elements); 208 assertEquals(100, buf.size()); 209 } 210 211 public void testNewLinkedBlockingQueueCapacity() { 212 try { 213 Queues.newLinkedBlockingQueue(0); 214 fail("Should have thrown IllegalArgumentException"); 215 } catch (IllegalArgumentException expected) { 216 // any capacity less than 1 should throw IllegalArgumentException 217 } 218 assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity()); 219 assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity()); 220 } 221 222 /** 223 * Checks that #drain() invocations behave correctly for a drained (empty) queue. 224 */ 225 private void assertDrained(BlockingQueue<Object> q) { 226 assertNull(q.peek()); 227 assertInterruptibleDrained(q); 228 assertUninterruptibleDrained(q); 229 } 230 231 private void assertInterruptibleDrained(BlockingQueue<Object> q) { 232 // nothing to drain, thus this should wait doing nothing 233 try { 234 assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS)); 235 } catch (InterruptedException e) { 236 throw new AssertionError(); 237 } 238 239 // but does the wait actually occurs? 240 threadPool.submit(new Interrupter(Thread.currentThread())); 241 try { 242 // if waiting works, this should get stuck 243 Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 244 fail(); 245 } catch (InterruptedException expected) { 246 // we indeed waited; a slow thread had enough time to interrupt us 247 } 248 } 249 250 // same as above; uninterruptible version 251 private void assertUninterruptibleDrained(BlockingQueue<Object> q) { 252 assertEquals(0, 253 Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS)); 254 255 // but does the wait actually occurs? 256 threadPool.submit(new Interrupter(Thread.currentThread())); 257 258 long startTime = System.nanoTime(); 259 Queues.drainUninterruptibly( 260 q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS); 261 assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10)); 262 // wait for interrupted status and clear it 263 while (!Thread.interrupted()) { Thread.yield(); } 264 } 265 266 private static class Producer implements Runnable { 267 final BlockingQueue<Object> q; 268 final int elements; 269 270 Producer(BlockingQueue<Object> q, int elements) { 271 this.q = q; 272 this.elements = elements; 273 } 274 275 @Override public void run() { 276 try { 277 for (int i = 0; i < elements; i++) { 278 q.put(new Object()); 279 } 280 } catch (InterruptedException e) { 281 // TODO(user): replace this when there is a better way to spawn threads in tests and 282 // have threads propagate their errors back to the test thread. 283 e.printStackTrace(); 284 // never returns, so that #tearDown() notices that one worker isn't done 285 Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 286 } 287 } 288 } 289 290 private static class Interrupter implements Runnable { 291 final Thread threadToInterrupt; 292 293 Interrupter(Thread threadToInterrupt) { 294 this.threadToInterrupt = threadToInterrupt; 295 } 296 297 @Override public void run() { 298 try { 299 Thread.sleep(100); 300 } catch (InterruptedException e) { 301 throw new AssertionError(); 302 } finally { 303 threadToInterrupt.interrupt(); 304 } 305 } 306 } 307 } 308