1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.concurrent.ForkJoinPool; 30 31 /** 32 * Abstract base class for most fork-join tasks used to implement stream ops. 33 * Manages splitting logic, tracking of child tasks, and intermediate results. 34 * Each task is associated with a {@link Spliterator} that describes the portion 35 * of the input associated with the subtree rooted at this task. 36 * Tasks may be leaf nodes (which will traverse the elements of 37 * the {@code Spliterator}) or internal nodes (which split the 38 * {@code Spliterator} into multiple child tasks). 39 * 40 * @implNote 41 * <p>This class is based on {@link CountedCompleter}, a form of fork-join task 42 * where each task has a semaphore-like count of uncompleted children, and the 43 * task is implicitly completed and notified when its last child completes. 44 * Internal node tasks will likely override the {@code onCompletion} method from 45 * {@code CountedCompleter} to merge the results from child tasks into the 46 * current task's result. 47 * 48 * <p>Splitting and setting up the child task links is done by {@code compute()} 49 * for internal nodes. At {@code compute()} time for leaf nodes, it is 50 * guaranteed that the parent's child-related fields (including sibling links 51 * for the parent's children) will be set up for all children. 52 * 53 * <p>For example, a task that performs a reduce would override {@code doLeaf()} 54 * to perform a reduction on that leaf node's chunk using the 55 * {@code Spliterator}, and override {@code onCompletion()} to merge the results 56 * of the child tasks for internal nodes: 57 * 58 * <pre>{@code 59 * protected S doLeaf() { 60 * spliterator.forEach(...); 61 * return localReductionResult; 62 * } 63 * 64 * public void onCompletion(CountedCompleter caller) { 65 * if (!isLeaf()) { 66 * ReduceTask<P_IN, P_OUT, T, R> child = children; 67 * R result = child.getLocalResult(); 68 * child = child.nextSibling; 69 * for (; child != null; child = child.nextSibling) 70 * result = combine(result, child.getLocalResult()); 71 * setLocalResult(result); 72 * } 73 * } 74 * }</pre> 75 * 76 * <p>Serialization is not supported as there is no intention to serialize 77 * tasks managed by stream ops. 78 * 79 * @paramType of elements input to the pipeline 80 * @param Type of elements output from the pipeline 81 * @param <R> Type of intermediate result, which may be different from operation 82 * result type 83 * @param <K> Type of parent, child and sibling tasks 84 * @since 1.8 85 */ 86 @SuppressWarnings("serial") 87 abstract class AbstractTask<P_IN, P_OUT, R, 88 K extends AbstractTask<P_IN, P_OUT, R, K>> 89 extends CountedCompleter<R> { 90 91 /** 92 * Default target factor of leaf tasks for parallel decomposition. 93 * To allow load balancing, we over-partition, currently to approximately 94 * four tasks per processor, which enables others to help out 95 * if leaf tasks are uneven or some processors are otherwise busy. 96 */ 97 static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 98 99 /** The pipeline helper, common to all tasks in a computation */ 100 protected final PipelineHelper<P_OUT> helper; 101 102 /** 103 * The spliterator for the portion of the input associated with the subtree 104 * rooted at this task 105 */ 106 protected Spliterator<P_IN> spliterator; 107 108 /** Target leaf size, common to all tasks in a computation */ 109 protected long targetSize; // may be laziliy initialized 110 111 /** 112 * The left child. 113 * null if no children 114 * if non-null rightChild is non-null 115 */ 116 protected K leftChild; 117 118 /** 119 * The right child. 120 * null if no children 121 * if non-null leftChild is non-null 122 */ 123 protected K rightChild; 124 125 /** The result of this node, if completed */ 126 private R localResult; 127 128 /** 129 * Constructor for root nodes. 130 * 131 * @param helper The {@code PipelineHelper} describing the stream pipeline 132 * up to this operation 133 * @param spliterator The {@code Spliterator} describing the source for this 134 * pipeline 135 */ 136 protected AbstractTask(PipelineHelper<P_OUT> helper, 137 Spliterator<P_IN> spliterator) { 138 super(null); 139 this.helper = helper; 140 this.spliterator = spliterator; 141 this.targetSize = 0L; 142 } 143 144 /** 145 * Constructor for non-root nodes. 146 * 147 * @param parent this node's parent task 148 * @param spliterator {@code Spliterator} describing the subtree rooted at 149 * this node, obtained by splitting the parent {@code Spliterator} 150 */ 151 protected AbstractTask(K parent, 152 Spliterator<P_IN> spliterator) { 153 super(parent); 154 this.spliterator = spliterator; 155 this.helper = parent.helper; 156 this.targetSize = parent.targetSize; 157 } 158 159 /** 160 * Constructs a new node of type T whose parent is the receiver; must call 161 * the AbstractTask(T, Spliterator) constructor with the receiver and the 162 * provided Spliterator. 163 * 164 * @param spliterator {@code Spliterator} describing the subtree rooted at 165 * this node, obtained by splitting the parent {@code Spliterator} 166 * @return newly constructed child node 167 */ 168 protected abstract K makeChild(Spliterator<P_IN> spliterator); 169 170 /** 171 * Computes the result associated with a leaf node. Will be called by 172 * {@code compute()} and the result passed to @{code setLocalResult()} 173 * 174 * @return the computed result of a leaf node 175 */ 176 protected abstract R doLeaf(); 177 178 /** 179 * Returns a suggested target leaf size based on the initial size estimate. 180 * 181 * @return suggested target leaf size 182 */ 183 public static long suggestTargetSize(long sizeEstimate) { 184 long est = sizeEstimate / LEAF_TARGET; 185 return est > 0L ? est : 1L; 186 } 187 188 /** 189 * Returns the targetSize, initializing it via the supplied 190 * size estimate if not already initialized. 191 */ 192 protected final long getTargetSize(long sizeEstimate) { 193 long s; 194 return ((s = targetSize) != 0 ? s : 195 (targetSize = suggestTargetSize(sizeEstimate))); 196 } 197 198 /** 199 * Returns the local result, if any. Subclasses should use 200 * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage 201 * results. This returns the local result so that calls from within the 202 * fork-join framework will return the correct result. 203 * 204 * @return local result for this node previously stored with 205 * {@link #setLocalResult} 206 */ 207 @Override 208 public R getRawResult() { 209 return localResult; 210 } 211 212 /** 213 * Does nothing; instead, subclasses should use 214 * {@link #setLocalResult(Object)}} to manage results. 215 * 216 * @param result must be null, or an exception is thrown (this is a safety 217 * tripwire to detect when {@code setRawResult()} is being used 218 * instead of {@code setLocalResult()} 219 */ 220 @Override 221 protected void setRawResult(R result) { 222 if (result != null) 223 throw new IllegalStateException(); 224 } 225 226 /** 227 * Retrieves a result previously stored with {@link #setLocalResult} 228 * 229 * @return local result for this node previously stored with 230 * {@link #setLocalResult} 231 */ 232 protected R getLocalResult() { 233 return localResult; 234 } 235 236 /** 237 * Associates the result with the task, can be retrieved with 238 * {@link #getLocalResult} 239 * 240 * @param localResult local result for this node 241 */ 242 protected void setLocalResult(R localResult) { 243 this.localResult = localResult; 244 } 245 246 /** 247 * Indicates whether this task is a leaf node. (Only valid after 248 * {@link #compute} has been called on this node). If the node is not a 249 * leaf node, then children will be non-null and numChildren will be 250 * positive. 251 * 252 * @return {@code true} if this task is a leaf node 253 */ 254 protected boolean isLeaf() { 255 return leftChild == null; 256 } 257 258 /** 259 * Indicates whether this task is the root node 260 * 261 * @return {@code true} if this task is the root node. 262 */ 263 protected boolean isRoot() { 264 return getParent() == null; 265 } 266 267 /** 268 * Returns the parent of this task, or null if this task is the root 269 * 270 * @return the parent of this task, or null if this task is the root 271 */ 272 @SuppressWarnings("unchecked") 273 protected K getParent() { 274 return (K) getCompleter(); 275 } 276 277 /** 278 * Decides whether or not to split a task further or compute it 279 * directly. If computing directly, calls {@code doLeaf} and pass 280 * the result to {@code setRawResult}. Otherwise splits off 281 * subtasks, forking one and continuing as the other. 282 * 283 * <p> The method is structured to conserve resources across a 284 * range of uses. The loop continues with one of the child tasks 285 * when split, to avoid deep recursion. To cope with spliterators 286 * that may be systematically biased toward left-heavy or 287 * right-heavy splits, we alternate which child is forked versus 288 * continued in the loop. 289 */ 290 @Override 291 public void compute() { 292 Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators 293 long sizeEstimate = rs.estimateSize(); 294 long sizeThreshold = getTargetSize(sizeEstimate); 295 boolean forkRight = false; 296 @SuppressWarnings("unchecked") K task = (K) this; 297 while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { 298 K leftChild, rightChild, taskToFork; 299 task.leftChild = leftChild = task.makeChild(ls); 300 task.rightChild = rightChild = task.makeChild(rs); 301 task.setPendingCount(1); 302 if (forkRight) { 303 forkRight = false; 304 rs = ls; 305 task = leftChild; 306 taskToFork = rightChild; 307 } 308 else { 309 forkRight = true; 310 task = rightChild; 311 taskToFork = leftChild; 312 } 313 taskToFork.fork(); 314 sizeEstimate = rs.estimateSize(); 315 } 316 task.setLocalResult(task.doLeaf()); 317 task.tryComplete(); 318 } 319 320 /** 321 * {@inheritDoc} 322 * 323 * @implNote 324 * Clears spliterator and children fields. Overriders MUST call 325 * {@code super.onCompletion} as the last thing they do if they want these 326 * cleared. 327 */ 328 @Override 329 public void onCompletion(CountedCompleter<?> caller) { 330 spliterator = null; 331 leftChild = rightChild = null; 332 } 333 334 /** 335 * Returns whether this node is a "leftmost" node -- whether the path from 336 * the root to this node involves only traversing leftmost child links. For 337 * a leaf node, this means it is the first leaf node in the encounter order. 338 * 339 * @return {@code true} if this node is a "leftmost" node 340 */ 341 protected boolean isLeftmostNode() { 342 @SuppressWarnings("unchecked") 343 K node = (K) this; 344 while (node != null) { 345 K parent = node.getParent(); 346 if (parent != null && parent.leftChild != node) 347 return false; 348 node = parent; 349 } 350 return true; 351 } 352 } 353