1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.volley; 18 19 import android.os.Process; 20 21 import java.util.ArrayList; 22 import java.util.HashMap; 23 import java.util.List; 24 import java.util.Map; 25 import java.util.concurrent.BlockingQueue; 26 27 /** 28 * Provides a thread for performing cache triage on a queue of requests. 29 * 30 * Requests added to the specified cache queue are resolved from cache. 31 * Any deliverable response is posted back to the caller via a 32 * {@link ResponseDelivery}. Cache misses and responses that require 33 * refresh are enqueued on the specified network queue for processing 34 * by a {@link NetworkDispatcher}. 35 */ 36 public class CacheDispatcher extends Thread { 37 38 private static final boolean DEBUG = VolleyLog.DEBUG; 39 40 /** The queue of requests coming in for triage. */ 41 private final BlockingQueue<Request<?>> mCacheQueue; 42 43 /** The queue of requests going out to the network. */ 44 private final BlockingQueue<Request<?>> mNetworkQueue; 45 46 /** The cache to read from. */ 47 private final Cache mCache; 48 49 /** For posting responses. */ 50 private final ResponseDelivery mDelivery; 51 52 /** Used for telling us to die. */ 53 private volatile boolean mQuit = false; 54 55 /** Manage list of waiting requests and de-duplicate requests with same cache key. */ 56 private final WaitingRequestManager mWaitingRequestManager; 57 58 /** 59 * Creates a new cache triage dispatcher thread. You must call {@link #start()} 60 * in order to begin processing. 61 * 62 * @param cacheQueue Queue of incoming requests for triage 63 * @param networkQueue Queue to post requests that require network to 64 * @param cache Cache interface to use for resolution 65 * @param delivery Delivery interface to use for posting responses 66 */ 67 public CacheDispatcher( 68 BlockingQueue<Request<?>> cacheQueue, BlockingQueue<Request<?>> networkQueue, 69 Cache cache, ResponseDelivery delivery) { 70 mCacheQueue = cacheQueue; 71 mNetworkQueue = networkQueue; 72 mCache = cache; 73 mDelivery = delivery; 74 mWaitingRequestManager = new WaitingRequestManager(this); 75 } 76 77 /** 78 * Forces this dispatcher to quit immediately. If any requests are still in 79 * the queue, they are not guaranteed to be processed. 80 */ 81 public void quit() { 82 mQuit = true; 83 interrupt(); 84 } 85 86 @Override 87 public void run() { 88 if (DEBUG) VolleyLog.v("start new dispatcher"); 89 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); 90 91 // Make a blocking call to initialize the cache. 92 mCache.initialize(); 93 94 while (true) { 95 try { 96 processRequest(); 97 } catch (InterruptedException e) { 98 // We may have been interrupted because it was time to quit. 99 if (mQuit) { 100 return; 101 } 102 } 103 } 104 } 105 106 // Extracted to its own method to ensure locals have a constrained liveness scope by the GC. 107 // This is needed to avoid keeping previous request references alive for an indeterminate amount 108 // of time. Update consumer-proguard-rules.pro when modifying this. See also 109 // https://github.com/google/volley/issues/114 110 private void processRequest() throws InterruptedException { 111 // Get a request from the cache triage queue, blocking until 112 // at least one is available. 113 final Request<?> request = mCacheQueue.take(); 114 request.addMarker("cache-queue-take"); 115 116 // If the request has been canceled, don't bother dispatching it. 117 if (request.isCanceled()) { 118 request.finish("cache-discard-canceled"); 119 return; 120 } 121 122 // Attempt to retrieve this item from cache. 123 Cache.Entry entry = mCache.get(request.getCacheKey()); 124 if (entry == null) { 125 request.addMarker("cache-miss"); 126 // Cache miss; send off to the network dispatcher. 127 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { 128 mNetworkQueue.put(request); 129 } 130 return; 131 } 132 133 // If it is completely expired, just send it to the network. 134 if (entry.isExpired()) { 135 request.addMarker("cache-hit-expired"); 136 request.setCacheEntry(entry); 137 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { 138 mNetworkQueue.put(request); 139 } 140 return; 141 } 142 143 // We have a cache hit; parse its data for delivery back to the request. 144 request.addMarker("cache-hit"); 145 Response<?> response = request.parseNetworkResponse( 146 new NetworkResponse(entry.data, entry.responseHeaders)); 147 request.addMarker("cache-hit-parsed"); 148 149 if (!entry.refreshNeeded()) { 150 // Completely unexpired cache hit. Just deliver the response. 151 mDelivery.postResponse(request, response); 152 } else { 153 // Soft-expired cache hit. We can deliver the cached response, 154 // but we need to also send the request to the network for 155 // refreshing. 156 request.addMarker("cache-hit-refresh-needed"); 157 request.setCacheEntry(entry); 158 // Mark the response as intermediate. 159 response.intermediate = true; 160 161 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { 162 // Post the intermediate response back to the user and have 163 // the delivery then forward the request along to the network. 164 mDelivery.postResponse(request, response, new Runnable() { 165 @Override 166 public void run() { 167 try { 168 mNetworkQueue.put(request); 169 } catch (InterruptedException e) { 170 // Restore the interrupted status 171 Thread.currentThread().interrupt(); 172 } 173 } 174 }); 175 } else { 176 // request has been added to list of waiting requests 177 // to receive the network response from the first request once it returns. 178 mDelivery.postResponse(request, response); 179 } 180 } 181 } 182 183 private static class WaitingRequestManager implements Request.NetworkRequestCompleteListener { 184 185 /** 186 * Staging area for requests that already have a duplicate request in flight. 187 * 188 * <ul> 189 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache 190 * key.</li> 191 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request 192 * is <em>not</em> contained in that list. Is null if no requests are staged.</li> 193 * </ul> 194 */ 195 private final Map<String, List<Request<?>>> mWaitingRequests = new HashMap<>(); 196 197 private final CacheDispatcher mCacheDispatcher; 198 199 WaitingRequestManager(CacheDispatcher cacheDispatcher) { 200 mCacheDispatcher = cacheDispatcher; 201 } 202 203 /** Request received a valid response that can be used by other waiting requests. */ 204 @Override 205 public void onResponseReceived(Request<?> request, Response<?> response) { 206 if (response.cacheEntry == null || response.cacheEntry.isExpired()) { 207 onNoUsableResponseReceived(request); 208 return; 209 } 210 String cacheKey = request.getCacheKey(); 211 List<Request<?>> waitingRequests; 212 synchronized (this) { 213 waitingRequests = mWaitingRequests.remove(cacheKey); 214 } 215 if (waitingRequests != null) { 216 if (VolleyLog.DEBUG) { 217 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.", 218 waitingRequests.size(), cacheKey); 219 } 220 // Process all queued up requests. 221 for (Request<?> waiting : waitingRequests) { 222 mCacheDispatcher.mDelivery.postResponse(waiting, response); 223 } 224 } 225 } 226 227 /** No valid response received from network, release waiting requests. */ 228 @Override 229 public synchronized void onNoUsableResponseReceived(Request<?> request) { 230 String cacheKey = request.getCacheKey(); 231 List<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey); 232 if (waitingRequests != null && !waitingRequests.isEmpty()) { 233 if (VolleyLog.DEBUG) { 234 VolleyLog.v("%d waiting requests for cacheKey=%s; resend to network", 235 waitingRequests.size(), cacheKey); 236 } 237 Request<?> nextInLine = waitingRequests.remove(0); 238 mWaitingRequests.put(cacheKey, waitingRequests); 239 nextInLine.setNetworkRequestCompleteListener(this); 240 try { 241 mCacheDispatcher.mNetworkQueue.put(nextInLine); 242 } catch (InterruptedException iex) { 243 VolleyLog.e("Couldn't add request to queue. %s", iex.toString()); 244 // Restore the interrupted status of the calling thread (i.e. NetworkDispatcher) 245 Thread.currentThread().interrupt(); 246 // Quit the current CacheDispatcher thread. 247 mCacheDispatcher.quit(); 248 } 249 } 250 } 251 252 /** 253 * For cacheable requests, if a request for the same cache key is already in flight, 254 * add it to a queue to wait for that in-flight request to finish. 255 * @return whether the request was queued. If false, we should continue issuing the request 256 * over the network. If true, we should put the request on hold to be processed when 257 * the in-flight request finishes. 258 */ 259 private synchronized boolean maybeAddToWaitingRequests(Request<?> request) { 260 String cacheKey = request.getCacheKey(); 261 // Insert request into stage if there's already a request with the same cache key 262 // in flight. 263 if (mWaitingRequests.containsKey(cacheKey)) { 264 // There is already a request in flight. Queue up. 265 List<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey); 266 if (stagedRequests == null) { 267 stagedRequests = new ArrayList<Request<?>>(); 268 } 269 request.addMarker("waiting-for-response"); 270 stagedRequests.add(request); 271 mWaitingRequests.put(cacheKey, stagedRequests); 272 if (VolleyLog.DEBUG) { 273 VolleyLog.d("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); 274 } 275 return true; 276 } else { 277 // Insert 'null' queue for this cacheKey, indicating there is now a request in 278 // flight. 279 mWaitingRequests.put(cacheKey, null); 280 request.setNetworkRequestCompleteListener(this); 281 if (VolleyLog.DEBUG) { 282 VolleyLog.d("new request, sending to network %s", cacheKey); 283 } 284 return false; 285 } 286 } 287 } 288 } 289