Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.gallery3d.util;
     18 
     19 import com.android.gallery3d.common.Utils;
     20 import com.android.gallery3d.util.ThreadPool.Job;
     21 import com.android.gallery3d.util.ThreadPool.JobContext;
     22 
     23 import java.util.LinkedList;
     24 
     25 // Limit the number of concurrent jobs that has been submitted into a ThreadPool
     26 @SuppressWarnings("rawtypes")
     27 public class JobLimiter implements FutureListener {
     28     private static final String TAG = "JobLimiter";
     29 
     30     // State Transition:
     31     //      INIT -> DONE, CANCELLED
     32     //      DONE -> CANCELLED
     33     private static final int STATE_INIT = 0;
     34     private static final int STATE_DONE = 1;
     35     private static final int STATE_CANCELLED = 2;
     36 
     37     private final LinkedList<JobWrapper<?>> mJobs = new LinkedList<JobWrapper<?>>();
     38     private final ThreadPool mPool;
     39     private int mLimit;
     40 
     41     private static class JobWrapper<T> implements Future<T>, Job<T> {
     42         private int mState = STATE_INIT;
     43         private Job<T> mJob;
     44         private Future<T> mDelegate;
     45         private FutureListener<T> mListener;
     46         private T mResult;
     47 
     48         public JobWrapper(Job<T> job, FutureListener<T> listener) {
     49             mJob = job;
     50             mListener = listener;
     51         }
     52 
     53         public synchronized void setFuture(Future<T> future) {
     54             if (mState != STATE_INIT) return;
     55             mDelegate = future;
     56         }
     57 
     58         @Override
     59         public void cancel() {
     60             FutureListener<T> listener = null;
     61             synchronized (this) {
     62                 if (mState != STATE_DONE) {
     63                     listener = mListener;
     64                     mJob = null;
     65                     mListener = null;
     66                     if (mDelegate != null) {
     67                         mDelegate.cancel();
     68                         mDelegate = null;
     69                     }
     70                 }
     71                 mState = STATE_CANCELLED;
     72                 mResult = null;
     73                 notifyAll();
     74             }
     75             if (listener != null) listener.onFutureDone(this);
     76         }
     77 
     78         @Override
     79         public synchronized boolean isCancelled() {
     80             return mState == STATE_CANCELLED;
     81         }
     82 
     83         @Override
     84         public boolean isDone() {
     85             // Both CANCELLED AND DONE is considered as done
     86             return mState !=  STATE_INIT;
     87         }
     88 
     89         @Override
     90         public synchronized T get() {
     91             while (mState == STATE_INIT) {
     92                 // handle the interrupted exception of wait()
     93                 Utils.waitWithoutInterrupt(this);
     94             }
     95             return mResult;
     96         }
     97 
     98         @Override
     99         public void waitDone() {
    100             get();
    101         }
    102 
    103         @Override
    104         public T run(JobContext jc) {
    105             Job<T> job = null;
    106             synchronized (this) {
    107                 if (mState == STATE_CANCELLED) return null;
    108                 job = mJob;
    109             }
    110             T result  = null;
    111             try {
    112                 result = job.run(jc);
    113             } catch (Throwable t) {
    114                 Log.w(TAG, "error executing job: " + job, t);
    115             }
    116             FutureListener<T> listener = null;
    117             synchronized (this) {
    118                 if (mState == STATE_CANCELLED) return null;
    119                 mState = STATE_DONE;
    120                 listener = mListener;
    121                 mListener = null;
    122                 mJob = null;
    123                 mResult = result;
    124                 notifyAll();
    125             }
    126             if (listener != null) listener.onFutureDone(this);
    127             return result;
    128         }
    129     }
    130 
    131     public JobLimiter(ThreadPool pool, int limit) {
    132         mPool = Utils.checkNotNull(pool);
    133         mLimit = limit;
    134     }
    135 
    136     public synchronized <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
    137         JobWrapper<T> future = new JobWrapper<T>(Utils.checkNotNull(job), listener);
    138         mJobs.addLast(future);
    139         submitTasksIfAllowed();
    140         return future;
    141     }
    142 
    143     @SuppressWarnings({"rawtypes", "unchecked"})
    144     private void submitTasksIfAllowed() {
    145         while (mLimit > 0 && !mJobs.isEmpty()) {
    146             JobWrapper wrapper = mJobs.removeFirst();
    147             if (!wrapper.isCancelled()) {
    148                 --mLimit;
    149                 wrapper.setFuture(mPool.submit(wrapper, this));
    150             }
    151         }
    152     }
    153 
    154     @Override
    155     public synchronized void onFutureDone(Future future) {
    156         ++mLimit;
    157         submitTasksIfAllowed();
    158     }
    159 }
    160