1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ 6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ 7 8 #include <memory> 9 #include <queue> 10 #include <unordered_map> 11 #include <unordered_set> 12 #include <utility> 13 #include <vector> 14 15 #include "base/callback.h" 16 #include "base/containers/hash_tables.h" 17 #include "base/macros.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/task_runner.h" 20 #include "mojo/edk/embedder/platform_handle_vector.h" 21 #include "mojo/edk/embedder/platform_shared_buffer.h" 22 #include "mojo/edk/embedder/scoped_platform_handle.h" 23 #include "mojo/edk/system/atomic_flag.h" 24 #include "mojo/edk/system/node_channel.h" 25 #include "mojo/edk/system/ports/name.h" 26 #include "mojo/edk/system/ports/node.h" 27 #include "mojo/edk/system/ports/node_delegate.h" 28 29 namespace base { 30 class PortProvider; 31 } 32 33 namespace mojo { 34 namespace edk { 35 36 class Broker; 37 class Core; 38 class MachPortRelay; 39 class PortsMessage; 40 41 // The owner of ports::Node which facilitates core EDK implementation. All 42 // public interface methods are safe to call from any thread. 43 class NodeController : public ports::NodeDelegate, 44 public NodeChannel::Delegate { 45 public: 46 class PortObserver : public ports::UserData { 47 public: 48 virtual void OnPortStatusChanged() = 0; 49 50 protected: 51 ~PortObserver() override {} 52 }; 53 54 // |core| owns and out-lives us. 55 explicit NodeController(Core* core); 56 ~NodeController() override; 57 58 const ports::NodeName& name() const { return name_; } 59 Core* core() const { return core_; } 60 ports::Node* node() const { return node_.get(); } 61 scoped_refptr<base::TaskRunner> io_task_runner() const { 62 return io_task_runner_; 63 } 64 65 #if defined(OS_MACOSX) && !defined(OS_IOS) 66 // Create the relay used to transfer mach ports between processes. 67 void CreateMachPortRelay(base::PortProvider* port_provider); 68 #endif 69 70 // Called exactly once, shortly after construction, and before any other 71 // methods are called on this object. 72 void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner); 73 74 // Connects this node to a child node. This node will initiate a handshake. 75 void ConnectToChild(base::ProcessHandle process_handle, 76 ConnectionParams connection_params, 77 const std::string& child_token, 78 const ProcessErrorCallback& process_error_callback); 79 80 // Closes all reserved ports which associated with the child process 81 // |child_token|. 82 void CloseChildPorts(const std::string& child_token); 83 84 // Close a connection to a peer associated with |peer_token|. 85 void ClosePeerConnection(const std::string& peer_token); 86 87 // Connects this node to a parent node. The parent node will initiate a 88 // handshake. 89 void ConnectToParent(ConnectionParams connection_params); 90 91 // Connects this node to a peer node. On success, |port| will be merged with 92 // the corresponding port in the peer node. 93 void ConnectToPeer(ConnectionParams connection_params, 94 const ports::PortRef& port, 95 const std::string& peer_token); 96 97 // Sets a port's observer. If |observer| is null the port's current observer 98 // is removed. 99 void SetPortObserver(const ports::PortRef& port, 100 scoped_refptr<PortObserver> observer); 101 102 // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as 103 // it ensures the port's observer has also been removed. 104 void ClosePort(const ports::PortRef& port); 105 106 // Sends a message on a port to its peer. 107 int SendMessage(const ports::PortRef& port_ref, 108 std::unique_ptr<PortsMessage> message); 109 110 // Reserves a local port |port| associated with |token|. A peer holding a copy 111 // of |token| can merge one of its own ports into this one. 112 void ReservePort(const std::string& token, const ports::PortRef& port, 113 const std::string& child_token); 114 115 // Merges a local port |port| into a port reserved by |token| in the parent. 116 void MergePortIntoParent(const std::string& token, 117 const ports::PortRef& port); 118 119 // Merges two local ports together. 120 int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1); 121 122 // Creates a new shared buffer for use in the current process. 123 scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes); 124 125 // Request that the Node be shut down cleanly. This may take an arbitrarily 126 // long time to complete, at which point |callback| will be called. 127 // 128 // Note that while it is safe to continue using the NodeController's public 129 // interface after requesting shutdown, you do so at your own risk and there 130 // is NO guarantee that new messages will be sent or ports will complete 131 // transfer. 132 void RequestShutdown(const base::Closure& callback); 133 134 // Notifies the NodeController that we received a bad message from the given 135 // node. 136 void NotifyBadMessageFrom(const ports::NodeName& source_node, 137 const std::string& error); 138 139 private: 140 friend Core; 141 142 using NodeMap = std::unordered_map<ports::NodeName, 143 scoped_refptr<NodeChannel>>; 144 using OutgoingMessageQueue = std::queue<Channel::MessagePtr>; 145 146 struct ReservedPort { 147 ports::PortRef port; 148 const std::string child_token; 149 }; 150 151 struct PeerConnection { 152 PeerConnection(); 153 PeerConnection(const PeerConnection& other); 154 PeerConnection(PeerConnection&& other); 155 PeerConnection(scoped_refptr<NodeChannel> channel, 156 const ports::PortRef& local_port, 157 const std::string& peer_token); 158 ~PeerConnection(); 159 160 PeerConnection& operator=(const PeerConnection& other); 161 PeerConnection& operator=(PeerConnection&& other); 162 163 164 scoped_refptr<NodeChannel> channel; 165 ports::PortRef local_port; 166 std::string peer_token; 167 }; 168 169 void ConnectToChildOnIOThread( 170 base::ProcessHandle process_handle, 171 ConnectionParams connection_params, 172 ports::NodeName token, 173 const ProcessErrorCallback& process_error_callback); 174 void ConnectToParentOnIOThread(ConnectionParams connection_params); 175 176 void ConnectToPeerOnIOThread(ConnectionParams connection_params, 177 ports::NodeName token, 178 ports::PortRef port, 179 const std::string& peer_token); 180 void ClosePeerConnectionOnIOThread(const std::string& node_name); 181 182 scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); 183 scoped_refptr<NodeChannel> GetParentChannel(); 184 scoped_refptr<NodeChannel> GetBrokerChannel(); 185 186 void AddPeer(const ports::NodeName& name, 187 scoped_refptr<NodeChannel> channel, 188 bool start_channel); 189 void DropPeer(const ports::NodeName& name, NodeChannel* channel); 190 void SendPeerMessage(const ports::NodeName& name, 191 ports::ScopedMessage message); 192 void AcceptIncomingMessages(); 193 void ProcessIncomingMessages(); 194 void DropAllPeers(); 195 196 // ports::NodeDelegate: 197 void GenerateRandomPortName(ports::PortName* port_name) override; 198 void AllocMessage(size_t num_header_bytes, 199 ports::ScopedMessage* message) override; 200 void ForwardMessage(const ports::NodeName& node, 201 ports::ScopedMessage message) override; 202 void BroadcastMessage(ports::ScopedMessage message) override; 203 void PortStatusChanged(const ports::PortRef& port) override; 204 205 // NodeChannel::Delegate: 206 void OnAcceptChild(const ports::NodeName& from_node, 207 const ports::NodeName& parent_name, 208 const ports::NodeName& token) override; 209 void OnAcceptParent(const ports::NodeName& from_node, 210 const ports::NodeName& token, 211 const ports::NodeName& child_name) override; 212 void OnAddBrokerClient(const ports::NodeName& from_node, 213 const ports::NodeName& client_name, 214 base::ProcessHandle process_handle) override; 215 void OnBrokerClientAdded(const ports::NodeName& from_node, 216 const ports::NodeName& client_name, 217 ScopedPlatformHandle broker_channel) override; 218 void OnAcceptBrokerClient(const ports::NodeName& from_node, 219 const ports::NodeName& broker_name, 220 ScopedPlatformHandle broker_channel) override; 221 void OnPortsMessage(const ports::NodeName& from_node, 222 Channel::MessagePtr message) override; 223 void OnRequestPortMerge(const ports::NodeName& from_node, 224 const ports::PortName& connector_port_name, 225 const std::string& token) override; 226 void OnRequestIntroduction(const ports::NodeName& from_node, 227 const ports::NodeName& name) override; 228 void OnIntroduce(const ports::NodeName& from_node, 229 const ports::NodeName& name, 230 ScopedPlatformHandle channel_handle) override; 231 void OnBroadcast(const ports::NodeName& from_node, 232 Channel::MessagePtr message) override; 233 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 234 void OnRelayPortsMessage(const ports::NodeName& from_node, 235 base::ProcessHandle from_process, 236 const ports::NodeName& destination, 237 Channel::MessagePtr message) override; 238 void OnPortsMessageFromRelay(const ports::NodeName& from_node, 239 const ports::NodeName& source_node, 240 Channel::MessagePtr message) override; 241 #endif 242 void OnAcceptPeer(const ports::NodeName& from_node, 243 const ports::NodeName& token, 244 const ports::NodeName& peer_name, 245 const ports::PortName& port_name) override; 246 void OnChannelError(const ports::NodeName& from_node, 247 NodeChannel* channel) override; 248 #if defined(OS_MACOSX) && !defined(OS_IOS) 249 MachPortRelay* GetMachPortRelay() override; 250 #endif 251 252 // Cancels all pending port merges. These are merges which are supposed to 253 // be requested from the parent ASAP, and they may be cancelled if the 254 // connection to the parent is broken or never established. 255 void CancelPendingPortMerges(); 256 257 // Marks this NodeController for destruction when the IO thread shuts down. 258 // This is used in case Core is torn down before the IO thread. Must only be 259 // called on the IO thread. 260 void DestroyOnIOThreadShutdown(); 261 262 // If there is a registered shutdown callback (meaning shutdown has been 263 // requested, this checks the Node's status to see if clean shutdown is 264 // possible. If so, shutdown is performed and the shutdown callback is run. 265 void AttemptShutdownIfRequested(); 266 267 // These are safe to access from any thread as long as the Node is alive. 268 Core* const core_; 269 const ports::NodeName name_; 270 const std::unique_ptr<ports::Node> node_; 271 scoped_refptr<base::TaskRunner> io_task_runner_; 272 273 // Guards |peers_| and |pending_peer_messages_|. 274 base::Lock peers_lock_; 275 276 // Channels to known peers, including parent and children, if any. 277 NodeMap peers_; 278 279 // Outgoing message queues for peers we've heard of but can't yet talk to. 280 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 281 pending_peer_messages_; 282 283 // Guards |reserved_ports_| and |pending_child_tokens_|. 284 base::Lock reserved_ports_lock_; 285 286 // Ports reserved by token. Key is the port token. 287 base::hash_map<std::string, ReservedPort> reserved_ports_; 288 // TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't 289 // have one yet :( 290 std::unordered_map<ports::NodeName, std::string> pending_child_tokens_; 291 292 // Guards |pending_port_merges_| and |reject_pending_merges_|. 293 base::Lock pending_port_merges_lock_; 294 295 // A set of port merge requests awaiting parent connection. 296 std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_; 297 298 // Indicates that new merge requests should be rejected because the parent has 299 // disconnected. 300 bool reject_pending_merges_ = false; 301 302 // Guards |parent_name_| and |bootstrap_parent_channel_|. 303 base::Lock parent_lock_; 304 305 // The name of our parent node, if any. 306 ports::NodeName parent_name_; 307 308 // A temporary reference to the parent channel before we know their name. 309 scoped_refptr<NodeChannel> bootstrap_parent_channel_; 310 311 // Guards |broker_name_|, |pending_broker_clients_|, and 312 // |pending_relay_messages_|. 313 base::Lock broker_lock_; 314 315 // The name of our broker node, if any. 316 ports::NodeName broker_name_; 317 318 // A queue of pending child names waiting to be connected to a broker. 319 std::queue<ports::NodeName> pending_broker_clients_; 320 321 // Messages waiting to be relayed by the broker once it's known. 322 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 323 pending_relay_messages_; 324 325 // Guards |incoming_messages_| and |incoming_messages_task_posted_|. 326 base::Lock messages_lock_; 327 std::queue<ports::ScopedMessage> incoming_messages_; 328 // Ensures that there is only one incoming messages task posted to the IO 329 // thread. 330 bool incoming_messages_task_posted_ = false; 331 // Flag to fast-path checking |incoming_messages_|. 332 AtomicFlag incoming_messages_flag_; 333 334 // Guards |shutdown_callback_|. 335 base::Lock shutdown_lock_; 336 337 // Set by RequestShutdown(). If this is non-null, the controller will 338 // begin polling the Node to see if clean shutdown is possible any time the 339 // Node's state is modified by the controller. 340 base::Closure shutdown_callback_; 341 // Flag to fast-path checking |shutdown_callback_|. 342 AtomicFlag shutdown_callback_flag_; 343 344 // All other fields below must only be accessed on the I/O thread, i.e., the 345 // thread on which core_->io_task_runner() runs tasks. 346 347 // Channels to children during handshake. 348 NodeMap pending_children_; 349 350 using PeerNodeMap = 351 std::unordered_map<ports::NodeName, PeerConnection>; 352 PeerNodeMap peer_connections_; 353 354 // Maps from peer token to node name, pending or not. 355 std::unordered_map<std::string, ports::NodeName> peers_by_token_; 356 357 // Indicates whether this object should delete itself on IO thread shutdown. 358 // Must only be accessed from the IO thread. 359 bool destroy_on_io_thread_shutdown_ = false; 360 361 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 362 // Broker for sync shared buffer creation in children. 363 std::unique_ptr<Broker> broker_; 364 #endif 365 366 #if defined(OS_MACOSX) && !defined(OS_IOS) 367 base::Lock mach_port_relay_lock_; 368 // Relay for transferring mach ports to/from children. 369 std::unique_ptr<MachPortRelay> mach_port_relay_; 370 #endif 371 372 DISALLOW_COPY_AND_ASSIGN(NodeController); 373 }; 374 375 } // namespace edk 376 } // namespace mojo 377 378 #endif // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ 379