Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.gallery3d.util;
     18 
     19 import android.util.Log;
     20 
     21 import java.util.concurrent.Executor;
     22 import java.util.concurrent.LinkedBlockingQueue;
     23 import java.util.concurrent.ThreadPoolExecutor;
     24 import java.util.concurrent.TimeUnit;
     25 
     26 public class ThreadPool {
     27     @SuppressWarnings("unused")
     28     private static final String TAG = "ThreadPool";
     29     private static final int CORE_POOL_SIZE = 4;
     30     private static final int MAX_POOL_SIZE = 8;
     31     private static final int KEEP_ALIVE_TIME = 10; // 10 seconds
     32 
     33     // Resource type
     34     public static final int MODE_NONE = 0;
     35     public static final int MODE_CPU = 1;
     36     public static final int MODE_NETWORK = 2;
     37 
     38     public static final JobContext JOB_CONTEXT_STUB = new JobContextStub();
     39 
     40     ResourceCounter mCpuCounter = new ResourceCounter(2);
     41     ResourceCounter mNetworkCounter = new ResourceCounter(2);
     42 
     43     // A Job is like a Callable, but it has an addition JobContext parameter.
     44     public interface Job<T> {
     45         public T run(JobContext jc);
     46     }
     47 
     48     public interface JobContext {
     49         boolean isCancelled();
     50         void setCancelListener(CancelListener listener);
     51         boolean setMode(int mode);
     52     }
     53 
     54     private static class JobContextStub implements JobContext {
     55         @Override
     56         public boolean isCancelled() {
     57             return false;
     58         }
     59 
     60         @Override
     61         public void setCancelListener(CancelListener listener) {
     62         }
     63 
     64         @Override
     65         public boolean setMode(int mode) {
     66             return true;
     67         }
     68     }
     69 
     70     public interface CancelListener {
     71         public void onCancel();
     72     }
     73 
     74     private static class ResourceCounter {
     75         public int value;
     76         public ResourceCounter(int v) {
     77             value = v;
     78         }
     79     }
     80 
     81     private final Executor mExecutor;
     82 
     83     public ThreadPool() {
     84         this(CORE_POOL_SIZE, MAX_POOL_SIZE);
     85     }
     86 
     87     public ThreadPool(int initPoolSize, int maxPoolSize) {
     88         mExecutor = new ThreadPoolExecutor(
     89                 initPoolSize, maxPoolSize, KEEP_ALIVE_TIME,
     90                 TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
     91                 new PriorityThreadFactory("thread-pool",
     92                 android.os.Process.THREAD_PRIORITY_BACKGROUND));
     93     }
     94 
     95     // Submit a job to the thread pool. The listener will be called when the
     96     // job is finished (or cancelled).
     97     public <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
     98         Worker<T> w = new Worker<T>(job, listener);
     99         mExecutor.execute(w);
    100         return w;
    101     }
    102 
    103     public <T> Future<T> submit(Job<T> job) {
    104         return submit(job, null);
    105     }
    106 
    107     private class Worker<T> implements Runnable, Future<T>, JobContext {
    108         @SuppressWarnings("hiding")
    109         private static final String TAG = "Worker";
    110         private Job<T> mJob;
    111         private FutureListener<T> mListener;
    112         private CancelListener mCancelListener;
    113         private ResourceCounter mWaitOnResource;
    114         private volatile boolean mIsCancelled;
    115         private boolean mIsDone;
    116         private T mResult;
    117         private int mMode;
    118 
    119         public Worker(Job<T> job, FutureListener<T> listener) {
    120             mJob = job;
    121             mListener = listener;
    122         }
    123 
    124         // This is called by a thread in the thread pool.
    125         @Override
    126         public void run() {
    127             T result = null;
    128 
    129             // A job is in CPU mode by default. setMode returns false
    130             // if the job is cancelled.
    131             if (setMode(MODE_CPU)) {
    132                 try {
    133                     result = mJob.run(this);
    134                 } catch (Throwable ex) {
    135                     Log.w(TAG, "Exception in running a job", ex);
    136                 }
    137             }
    138 
    139             synchronized(this) {
    140                 setMode(MODE_NONE);
    141                 mResult = result;
    142                 mIsDone = true;
    143                 notifyAll();
    144             }
    145             if (mListener != null) mListener.onFutureDone(this);
    146         }
    147 
    148         // Below are the methods for Future.
    149         @Override
    150         public synchronized void cancel() {
    151             if (mIsCancelled) return;
    152             mIsCancelled = true;
    153             if (mWaitOnResource != null) {
    154                 synchronized (mWaitOnResource) {
    155                     mWaitOnResource.notifyAll();
    156                 }
    157             }
    158             if (mCancelListener != null) {
    159                 mCancelListener.onCancel();
    160             }
    161         }
    162 
    163         @Override
    164         public boolean isCancelled() {
    165             return mIsCancelled;
    166         }
    167 
    168         @Override
    169         public synchronized boolean isDone() {
    170             return mIsDone;
    171         }
    172 
    173         @Override
    174         public synchronized T get() {
    175             while (!mIsDone) {
    176                 try {
    177                     wait();
    178                 } catch (Exception ex) {
    179                     Log.w(TAG, "ingore exception", ex);
    180                     // ignore.
    181                 }
    182             }
    183             return mResult;
    184         }
    185 
    186         @Override
    187         public void waitDone() {
    188             get();
    189         }
    190 
    191         // Below are the methods for JobContext (only called from the
    192         // thread running the job)
    193         @Override
    194         public synchronized void setCancelListener(CancelListener listener) {
    195             mCancelListener = listener;
    196             if (mIsCancelled && mCancelListener != null) {
    197                 mCancelListener.onCancel();
    198             }
    199         }
    200 
    201         @Override
    202         public boolean setMode(int mode) {
    203             // Release old resource
    204             ResourceCounter rc = modeToCounter(mMode);
    205             if (rc != null) releaseResource(rc);
    206             mMode = MODE_NONE;
    207 
    208             // Acquire new resource
    209             rc = modeToCounter(mode);
    210             if (rc != null) {
    211                 if (!acquireResource(rc)) {
    212                     return false;
    213                 }
    214                 mMode = mode;
    215             }
    216 
    217             return true;
    218         }
    219 
    220         private ResourceCounter modeToCounter(int mode) {
    221             if (mode == MODE_CPU) {
    222                 return mCpuCounter;
    223             } else if (mode == MODE_NETWORK) {
    224                 return mNetworkCounter;
    225             } else {
    226                 return null;
    227             }
    228         }
    229 
    230         private boolean acquireResource(ResourceCounter counter) {
    231             while (true) {
    232                 synchronized (this) {
    233                     if (mIsCancelled) {
    234                         mWaitOnResource = null;
    235                         return false;
    236                     }
    237                     mWaitOnResource = counter;
    238                 }
    239 
    240                 synchronized (counter) {
    241                     if (counter.value > 0) {
    242                         counter.value--;
    243                         break;
    244                     } else {
    245                         try {
    246                             counter.wait();
    247                         } catch (InterruptedException ex) {
    248                             // ignore.
    249                         }
    250                     }
    251                 }
    252             }
    253 
    254             synchronized (this) {
    255                 mWaitOnResource = null;
    256             }
    257 
    258             return true;
    259         }
    260 
    261         private void releaseResource(ResourceCounter counter) {
    262             synchronized (counter) {
    263                 counter.value++;
    264                 counter.notifyAll();
    265             }
    266         }
    267     }
    268 }
    269