Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2014 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.base.Preconditions.checkNotNull;
     20 
     21 import com.google.common.base.Preconditions;
     22 import com.google.common.collect.Queues;
     23 
     24 import java.util.Queue;
     25 import java.util.concurrent.Executor;
     26 import java.util.logging.Level;
     27 import java.util.logging.Logger;
     28 
     29 import javax.annotation.concurrent.GuardedBy;
     30 
     31 /**
     32  * A special purpose queue/executor that executes listener callbacks serially on a configured
     33  * executor.  Each callback task can be enqueued and executed as separate phases.
     34  *
     35  * <p>This class is very similar to {@link SerializingExecutor} with the exception that tasks can
     36  * be enqueued without necessarily executing immediately.
     37  */
     38 final class ListenerCallQueue<L> implements Runnable {
     39   // TODO(cpovirk): consider using the logger associated with listener.getClass().
     40   private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
     41 
     42   abstract static class Callback<L> {
     43     private final String methodCall;
     44 
     45     Callback(String methodCall) {
     46       this.methodCall = methodCall;
     47     }
     48 
     49     abstract void call(L listener);
     50 
     51     /** Helper method to add this callback to all the queues. */
     52     void enqueueOn(Iterable<ListenerCallQueue<L>> queues) {
     53       for (ListenerCallQueue<L> queue : queues) {
     54         queue.add(this);
     55       }
     56     }
     57   }
     58 
     59   private final L listener;
     60   private final Executor executor;
     61 
     62   @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque();
     63   @GuardedBy("this") private boolean isThreadScheduled;
     64 
     65   ListenerCallQueue(L listener, Executor executor) {
     66     this.listener = checkNotNull(listener);
     67     this.executor = checkNotNull(executor);
     68   }
     69 
     70   /** Enqueues a task to be run. */
     71   synchronized void add(Callback<L> callback) {
     72     waitQueue.add(callback);
     73   }
     74 
     75   /** Executes all listeners {@linkplain #add added} prior to this call, serially and in order.*/
     76   void execute() {
     77     boolean scheduleTaskRunner = false;
     78     synchronized (this) {
     79       if (!isThreadScheduled) {
     80         isThreadScheduled = true;
     81         scheduleTaskRunner = true;
     82       }
     83     }
     84     if (scheduleTaskRunner) {
     85       try {
     86         executor.execute(this);
     87       } catch (RuntimeException e) {
     88         // reset state in case of an error so that later calls to execute will actually do something
     89         synchronized (this) {
     90           isThreadScheduled = false;
     91         }
     92         // Log it and keep going.
     93         logger.log(Level.SEVERE,
     94             "Exception while running callbacks for " + listener + " on " + executor,
     95             e);
     96         throw e;
     97       }
     98     }
     99   }
    100 
    101   @Override public void run() {
    102     boolean stillRunning = true;
    103     try {
    104       while (true) {
    105         Callback<L> nextToRun;
    106         synchronized (ListenerCallQueue.this) {
    107           Preconditions.checkState(isThreadScheduled);
    108           nextToRun = waitQueue.poll();
    109           if (nextToRun == null) {
    110             isThreadScheduled = false;
    111             stillRunning = false;
    112             break;
    113           }
    114         }
    115 
    116         // Always run while _not_ holding the lock, to avoid deadlocks.
    117         try {
    118           nextToRun.call(listener);
    119         } catch (RuntimeException e) {
    120           // Log it and keep going.
    121           logger.log(Level.SEVERE,
    122               "Exception while executing callback: " + listener + "." + nextToRun.methodCall,
    123               e);
    124         }
    125       }
    126     } finally {
    127       if (stillRunning) {
    128         // An Error is bubbling up, we should mark ourselves as no longer
    129         // running, that way if anyone tries to keep using us we won't be
    130         // corrupted.
    131         synchronized (ListenerCallQueue.this) {
    132           isThreadScheduled = false;
    133         }
    134       }
    135     }
    136   }
    137 }
    138