1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.mojo.bindings; 6 7 import org.chromium.mojo.system.Core; 8 import org.chromium.mojo.system.MessagePipeHandle; 9 import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; 10 import org.chromium.mojo.system.MojoException; 11 import org.chromium.mojo.system.MojoResult; 12 import org.chromium.mojo.system.Pair; 13 import org.chromium.mojo.system.ResultAnd; 14 import org.chromium.mojo.system.Watcher; 15 import org.chromium.mojo.system.Watcher.Callback; 16 17 import java.nio.ByteBuffer; 18 import java.util.ArrayList; 19 import java.util.List; 20 import java.util.concurrent.Executor; 21 22 /** 23 * A factory which provides per-thread executors, which enable execution on the thread from which 24 * they were obtained. 25 */ 26 class ExecutorFactory { 27 28 /** 29 * A null buffer which is used to send messages without any data on the PipedExecutor's 30 * signaling handles. 31 */ 32 private static final ByteBuffer NOTIFY_BUFFER = null; 33 34 /** 35 * Implementation of the executor which uses a pair of {@link MessagePipeHandle} for signaling. 36 * The executor will wait asynchronously on one end of a {@link MessagePipeHandle} on the thread 37 * on which it was created. Other threads can call execute with a {@link Runnable}, and the 38 * executor will queue the {@link Runnable} and write a message on the other end of the handle. 39 * This will wake up the executor which is waiting on the handle, which will then dequeue the 40 * {@link Runnable} and execute it on the original thread. 41 */ 42 private static class PipedExecutor implements Executor, Callback { 43 44 /** 45 * The handle which is written to. Access to this object must be protected with |mLock|. 46 */ 47 private final MessagePipeHandle mWriteHandle; 48 /** 49 * The handle which is read from. 50 */ 51 private final MessagePipeHandle mReadHandle; 52 /** 53 * The list of actions left to be run. Access to this object must be protected with |mLock|. 54 */ 55 private final List<Runnable> mPendingActions; 56 /** 57 * Lock protecting access to |mWriteHandle| and |mPendingActions|. 58 */ 59 private final Object mLock; 60 /** 61 * The {@link Watcher} to get notified of new message availability on |mReadHandle|. 62 */ 63 private final Watcher mWatcher; 64 65 /** 66 * Constructor. 67 */ 68 public PipedExecutor(Core core) { 69 mWatcher = core.getWatcher(); 70 assert mWatcher != null; 71 mLock = new Object(); 72 Pair<MessagePipeHandle, MessagePipeHandle> handles = core.createMessagePipe( 73 new MessagePipeHandle.CreateOptions()); 74 mReadHandle = handles.first; 75 mWriteHandle = handles.second; 76 mPendingActions = new ArrayList<Runnable>(); 77 mWatcher.start(mReadHandle, Core.HandleSignals.READABLE, this); 78 } 79 80 /** 81 * @see Callback#onResult(int) 82 */ 83 @Override 84 public void onResult(int result) { 85 if (result == MojoResult.OK && readNotifyBufferMessage()) { 86 runNextAction(); 87 } else { 88 close(); 89 } 90 } 91 92 /** 93 * Close the handles. Should only be called on the executor thread. 94 */ 95 private void close() { 96 synchronized (mLock) { 97 mWriteHandle.close(); 98 mPendingActions.clear(); 99 } 100 mWatcher.cancel(); 101 mWatcher.destroy(); 102 mReadHandle.close(); 103 } 104 105 /** 106 * Read the next message on |mReadHandle|, and return |true| if successful, |false| 107 * otherwise. 108 */ 109 private boolean readNotifyBufferMessage() { 110 try { 111 ResultAnd<ReadMessageResult> readMessageResult = 112 mReadHandle.readMessage(NOTIFY_BUFFER, 0, MessagePipeHandle.ReadFlags.NONE); 113 if (readMessageResult.getMojoResult() == MojoResult.OK) { 114 return true; 115 } 116 } catch (MojoException e) { 117 // Will be closed by the fall back at the end of this method. 118 } 119 return false; 120 } 121 122 /** 123 * Run the next action in the |mPendingActions| queue. 124 */ 125 private void runNextAction() { 126 Runnable toRun = null; 127 synchronized (mLock) { 128 toRun = mPendingActions.remove(0); 129 } 130 toRun.run(); 131 } 132 133 /** 134 * Execute the given |command| in the executor thread. This can be called on any thread. 135 * 136 * @see Executor#execute(Runnable) 137 */ 138 @Override 139 public void execute(Runnable command) { 140 // Accessing the write handle must be protected by the lock, because it can be closed 141 // from the executor's thread. 142 synchronized (mLock) { 143 if (!mWriteHandle.isValid()) { 144 throw new IllegalStateException( 145 "Trying to execute an action on a closed executor."); 146 } 147 mPendingActions.add(command); 148 mWriteHandle.writeMessage(NOTIFY_BUFFER, null, MessagePipeHandle.WriteFlags.NONE); 149 } 150 } 151 } 152 153 /** 154 * Keep one executor per executor thread. 155 */ 156 private static final ThreadLocal<Executor> EXECUTORS = new ThreadLocal<Executor>(); 157 158 /** 159 * Returns an {@link Executor} that will run all of its actions in the current thread. 160 */ 161 public static Executor getExecutorForCurrentThread(Core core) { 162 Executor executor = EXECUTORS.get(); 163 if (executor == null) { 164 executor = new PipedExecutor(core); 165 EXECUTORS.set(executor); 166 } 167 return executor; 168 } 169 } 170