1 /* 2 * Copyright (C) 2014 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.base.Preconditions; 22 import com.google.common.collect.Queues; 23 24 import java.util.Queue; 25 import java.util.concurrent.Executor; 26 import java.util.logging.Level; 27 import java.util.logging.Logger; 28 29 import javax.annotation.concurrent.GuardedBy; 30 31 /** 32 * A special purpose queue/executor that executes listener callbacks serially on a configured 33 * executor. Each callback task can be enqueued and executed as separate phases. 34 * 35 * <p>This class is very similar to {@link SerializingExecutor} with the exception that tasks can 36 * be enqueued without necessarily executing immediately. 37 */ 38 final class ListenerCallQueue<L> implements Runnable { 39 // TODO(cpovirk): consider using the logger associated with listener.getClass(). 40 private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName()); 41 42 abstract static class Callback<L> { 43 private final String methodCall; 44 45 Callback(String methodCall) { 46 this.methodCall = methodCall; 47 } 48 49 abstract void call(L listener); 50 51 /** Helper method to add this callback to all the queues. */ 52 void enqueueOn(Iterable<ListenerCallQueue<L>> queues) { 53 for (ListenerCallQueue<L> queue : queues) { 54 queue.add(this); 55 } 56 } 57 } 58 59 private final L listener; 60 private final Executor executor; 61 62 @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque(); 63 @GuardedBy("this") private boolean isThreadScheduled; 64 65 ListenerCallQueue(L listener, Executor executor) { 66 this.listener = checkNotNull(listener); 67 this.executor = checkNotNull(executor); 68 } 69 70 /** Enqueues a task to be run. */ 71 synchronized void add(Callback<L> callback) { 72 waitQueue.add(callback); 73 } 74 75 /** Executes all listeners {@linkplain #add added} prior to this call, serially and in order.*/ 76 void execute() { 77 boolean scheduleTaskRunner = false; 78 synchronized (this) { 79 if (!isThreadScheduled) { 80 isThreadScheduled = true; 81 scheduleTaskRunner = true; 82 } 83 } 84 if (scheduleTaskRunner) { 85 try { 86 executor.execute(this); 87 } catch (RuntimeException e) { 88 // reset state in case of an error so that later calls to execute will actually do something 89 synchronized (this) { 90 isThreadScheduled = false; 91 } 92 // Log it and keep going. 93 logger.log(Level.SEVERE, 94 "Exception while running callbacks for " + listener + " on " + executor, 95 e); 96 throw e; 97 } 98 } 99 } 100 101 @Override public void run() { 102 boolean stillRunning = true; 103 try { 104 while (true) { 105 Callback<L> nextToRun; 106 synchronized (ListenerCallQueue.this) { 107 Preconditions.checkState(isThreadScheduled); 108 nextToRun = waitQueue.poll(); 109 if (nextToRun == null) { 110 isThreadScheduled = false; 111 stillRunning = false; 112 break; 113 } 114 } 115 116 // Always run while _not_ holding the lock, to avoid deadlocks. 117 try { 118 nextToRun.call(listener); 119 } catch (RuntimeException e) { 120 // Log it and keep going. 121 logger.log(Level.SEVERE, 122 "Exception while executing callback: " + listener + "." + nextToRun.methodCall, 123 e); 124 } 125 } 126 } finally { 127 if (stillRunning) { 128 // An Error is bubbling up, we should mark ourselves as no longer 129 // running, that way if anyone tries to keep using us we won't be 130 // corrupted. 131 synchronized (ListenerCallQueue.this) { 132 isThreadScheduled = false; 133 } 134 } 135 } 136 } 137 } 138