1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.mojo.bindings; 6 7 import org.chromium.mojo.system.AsyncWaiter; 8 import org.chromium.mojo.system.AsyncWaiter.Callback; 9 import org.chromium.mojo.system.Core; 10 import org.chromium.mojo.system.MessagePipeHandle; 11 import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; 12 import org.chromium.mojo.system.MojoException; 13 import org.chromium.mojo.system.MojoResult; 14 import org.chromium.mojo.system.Pair; 15 16 import java.nio.ByteBuffer; 17 import java.util.ArrayList; 18 import java.util.List; 19 import java.util.concurrent.Executor; 20 21 /** 22 * A factory which provides per-thread executors, which enable execution on the thread from which 23 * they were obtained. 24 */ 25 class ExecutorFactory { 26 27 /** 28 * A null buffer which is used to send messages without any data on the PipedExecutor's 29 * signaling handles. 30 */ 31 private static final ByteBuffer NOTIFY_BUFFER = null; 32 33 /** 34 * Implementation of the executor which uses a pair of {@link MessagePipeHandle} for signaling. 35 * The executor will wait asynchronously on one end of a {@link MessagePipeHandle} on the thread 36 * on which it was created. Other threads can call execute with a {@link Runnable}, and the 37 * executor will queue the {@link Runnable} and write a message on the other end of the handle. 38 * This will wake up the executor which is waiting on the handle, which will then dequeue the 39 * {@link Runnable} and execute it on the original thread. 40 */ 41 private static class PipedExecutor implements Executor, Callback { 42 43 /** 44 * The handle which is written to. Access to this object must be protected with |mLock|. 45 */ 46 private final MessagePipeHandle mWriteHandle; 47 /** 48 * The handle which is read from. 49 */ 50 private final MessagePipeHandle mReadHandle; 51 /** 52 * The list of actions left to be run. Access to this object must be protected with |mLock|. 53 */ 54 private final List<Runnable> mPendingActions; 55 /** 56 * Lock protecting access to |mWriteHandle| and |mPendingActions|. 57 */ 58 private final Object mLock; 59 /** 60 * The {@link AsyncWaiter} to get notified of new message availability on |mReadHandle|. 61 */ 62 private final AsyncWaiter mWaiter; 63 64 /** 65 * Constructor. 66 */ 67 public PipedExecutor(Core core) { 68 mWaiter = core.getDefaultAsyncWaiter(); 69 assert mWaiter != null; 70 mLock = new Object(); 71 Pair<MessagePipeHandle, MessagePipeHandle> handles = core.createMessagePipe( 72 new MessagePipeHandle.CreateOptions()); 73 mReadHandle = handles.first; 74 mWriteHandle = handles.second; 75 mPendingActions = new ArrayList<Runnable>(); 76 asyncWait(); 77 } 78 79 /** 80 * Asynchronously wait for the next command to arrive. This should only be called on the 81 * executor thread. 82 */ 83 private void asyncWait() { 84 mWaiter.asyncWait(mReadHandle, Core.HandleSignals.READABLE, Core.DEADLINE_INFINITE, 85 this); 86 } 87 88 /** 89 * @see Callback#onResult(int) 90 */ 91 @Override 92 public void onResult(int result) { 93 if (result == MojoResult.OK && readNotifyBufferMessage()) { 94 runNextAction(); 95 } else { 96 close(); 97 } 98 } 99 100 /** 101 * @see Callback#onError(MojoException) 102 */ 103 @Override 104 public void onError(MojoException exception) { 105 close(); 106 } 107 108 /** 109 * Close the handles. Should only be called on the executor thread. 110 */ 111 private void close() { 112 synchronized (mLock) { 113 mWriteHandle.close(); 114 mPendingActions.clear(); 115 } 116 mReadHandle.close(); 117 } 118 119 /** 120 * Read the next message on |mReadHandle|, and return |true| if successful, |false| 121 * otherwise. 122 */ 123 private boolean readNotifyBufferMessage() { 124 try { 125 ReadMessageResult readMessageResult = mReadHandle.readMessage(NOTIFY_BUFFER, 0, 126 MessagePipeHandle.ReadFlags.NONE); 127 if (readMessageResult.getMojoResult() == MojoResult.OK) { 128 asyncWait(); 129 return true; 130 } 131 } catch (MojoException e) { 132 // Will be closed by the fall back at the end of this method. 133 } 134 return false; 135 } 136 137 /** 138 * Run the next action in the |mPendingActions| queue. 139 */ 140 private void runNextAction() { 141 Runnable toRun = null; 142 synchronized (mWriteHandle) { 143 toRun = mPendingActions.remove(0); 144 } 145 toRun.run(); 146 } 147 148 /** 149 * Execute the given |command| in the executor thread. This can be called on any thread. 150 * 151 * @see Executor#execute(Runnable) 152 */ 153 @Override 154 public void execute(Runnable command) { 155 // Accessing the write handle must be protected by the lock, because it can be closed 156 // from the executor's thread. 157 synchronized (mLock) { 158 if (!mWriteHandle.isValid()) { 159 throw new IllegalStateException( 160 "Trying to execute an action on a closed executor."); 161 } 162 mPendingActions.add(command); 163 mWriteHandle.writeMessage(NOTIFY_BUFFER, null, MessagePipeHandle.WriteFlags.NONE); 164 } 165 } 166 } 167 168 /** 169 * Keep one executor per executor thread. 170 */ 171 private static final ThreadLocal<Executor> EXECUTORS = new ThreadLocal<Executor>(); 172 173 /** 174 * Returns an {@link Executor} that will run all of its actions in the current thread. 175 */ 176 public static Executor getExecutorForCurrentThread(Core core) { 177 Executor executor = EXECUTORS.get(); 178 if (executor == null) { 179 executor = new PipedExecutor(core); 180 EXECUTORS.set(executor); 181 } 182 return executor; 183 } 184 } 185