Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
      6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
      7 
      8 #include <memory>
      9 #include <queue>
     10 #include <unordered_map>
     11 #include <unordered_set>
     12 #include <utility>
     13 #include <vector>
     14 
     15 #include "base/callback.h"
     16 #include "base/containers/hash_tables.h"
     17 #include "base/macros.h"
     18 #include "base/memory/ref_counted.h"
     19 #include "base/task_runner.h"
     20 #include "mojo/edk/embedder/platform_handle_vector.h"
     21 #include "mojo/edk/embedder/platform_shared_buffer.h"
     22 #include "mojo/edk/embedder/scoped_platform_handle.h"
     23 #include "mojo/edk/system/atomic_flag.h"
     24 #include "mojo/edk/system/node_channel.h"
     25 #include "mojo/edk/system/ports/name.h"
     26 #include "mojo/edk/system/ports/node.h"
     27 #include "mojo/edk/system/ports/node_delegate.h"
     28 
     29 namespace base {
     30 class PortProvider;
     31 }
     32 
     33 namespace mojo {
     34 namespace edk {
     35 
     36 class Broker;
     37 class Core;
     38 class MachPortRelay;
     39 class PortsMessage;
     40 
     41 // The owner of ports::Node which facilitates core EDK implementation. All
     42 // public interface methods are safe to call from any thread.
     43 class NodeController : public ports::NodeDelegate,
     44                        public NodeChannel::Delegate {
     45  public:
     46   class PortObserver : public ports::UserData {
     47    public:
     48     virtual void OnPortStatusChanged() = 0;
     49 
     50    protected:
     51     ~PortObserver() override {}
     52   };
     53 
     54   // |core| owns and out-lives us.
     55   explicit NodeController(Core* core);
     56   ~NodeController() override;
     57 
     58   const ports::NodeName& name() const { return name_; }
     59   Core* core() const { return core_; }
     60   ports::Node* node() const { return node_.get(); }
     61   scoped_refptr<base::TaskRunner> io_task_runner() const {
     62     return io_task_runner_;
     63   }
     64 
     65 #if defined(OS_MACOSX) && !defined(OS_IOS)
     66   // Create the relay used to transfer mach ports between processes.
     67   void CreateMachPortRelay(base::PortProvider* port_provider);
     68 #endif
     69 
     70   // Called exactly once, shortly after construction, and before any other
     71   // methods are called on this object.
     72   void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner);
     73 
     74   // Connects this node to a child node. This node will initiate a handshake.
     75   void ConnectToChild(base::ProcessHandle process_handle,
     76                       ConnectionParams connection_params,
     77                       const std::string& child_token,
     78                       const ProcessErrorCallback& process_error_callback);
     79 
     80   // Closes all reserved ports which associated with the child process
     81   // |child_token|.
     82   void CloseChildPorts(const std::string& child_token);
     83 
     84   // Close a connection to a peer associated with |peer_token|.
     85   void ClosePeerConnection(const std::string& peer_token);
     86 
     87   // Connects this node to a parent node. The parent node will initiate a
     88   // handshake.
     89   void ConnectToParent(ConnectionParams connection_params);
     90 
     91   // Connects this node to a peer node. On success, |port| will be merged with
     92   // the corresponding port in the peer node.
     93   void ConnectToPeer(ConnectionParams connection_params,
     94                      const ports::PortRef& port,
     95                      const std::string& peer_token);
     96 
     97   // Sets a port's observer. If |observer| is null the port's current observer
     98   // is removed.
     99   void SetPortObserver(const ports::PortRef& port,
    100                        scoped_refptr<PortObserver> observer);
    101 
    102   // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
    103   // it ensures the port's observer has also been removed.
    104   void ClosePort(const ports::PortRef& port);
    105 
    106   // Sends a message on a port to its peer.
    107   int SendMessage(const ports::PortRef& port_ref,
    108                   std::unique_ptr<PortsMessage> message);
    109 
    110   // Reserves a local port |port| associated with |token|. A peer holding a copy
    111   // of |token| can merge one of its own ports into this one.
    112   void ReservePort(const std::string& token, const ports::PortRef& port,
    113                    const std::string& child_token);
    114 
    115   // Merges a local port |port| into a port reserved by |token| in the parent.
    116   void MergePortIntoParent(const std::string& token,
    117                            const ports::PortRef& port);
    118 
    119   // Merges two local ports together.
    120   int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
    121 
    122   // Creates a new shared buffer for use in the current process.
    123   scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
    124 
    125   // Request that the Node be shut down cleanly. This may take an arbitrarily
    126   // long time to complete, at which point |callback| will be called.
    127   //
    128   // Note that while it is safe to continue using the NodeController's public
    129   // interface after requesting shutdown, you do so at your own risk and there
    130   // is NO guarantee that new messages will be sent or ports will complete
    131   // transfer.
    132   void RequestShutdown(const base::Closure& callback);
    133 
    134   // Notifies the NodeController that we received a bad message from the given
    135   // node.
    136   void NotifyBadMessageFrom(const ports::NodeName& source_node,
    137                             const std::string& error);
    138 
    139  private:
    140   friend Core;
    141 
    142   using NodeMap = std::unordered_map<ports::NodeName,
    143                                      scoped_refptr<NodeChannel>>;
    144   using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
    145 
    146   struct ReservedPort {
    147     ports::PortRef port;
    148     const std::string child_token;
    149   };
    150 
    151   struct PeerConnection {
    152     PeerConnection();
    153     PeerConnection(const PeerConnection& other);
    154     PeerConnection(PeerConnection&& other);
    155     PeerConnection(scoped_refptr<NodeChannel> channel,
    156                    const ports::PortRef& local_port,
    157                    const std::string& peer_token);
    158     ~PeerConnection();
    159 
    160     PeerConnection& operator=(const PeerConnection& other);
    161     PeerConnection& operator=(PeerConnection&& other);
    162 
    163 
    164     scoped_refptr<NodeChannel> channel;
    165     ports::PortRef local_port;
    166     std::string peer_token;
    167   };
    168 
    169   void ConnectToChildOnIOThread(
    170       base::ProcessHandle process_handle,
    171       ConnectionParams connection_params,
    172       ports::NodeName token,
    173       const ProcessErrorCallback& process_error_callback);
    174   void ConnectToParentOnIOThread(ConnectionParams connection_params);
    175 
    176   void ConnectToPeerOnIOThread(ConnectionParams connection_params,
    177                                ports::NodeName token,
    178                                ports::PortRef port,
    179                                const std::string& peer_token);
    180   void ClosePeerConnectionOnIOThread(const std::string& node_name);
    181 
    182   scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
    183   scoped_refptr<NodeChannel> GetParentChannel();
    184   scoped_refptr<NodeChannel> GetBrokerChannel();
    185 
    186   void AddPeer(const ports::NodeName& name,
    187                scoped_refptr<NodeChannel> channel,
    188                bool start_channel);
    189   void DropPeer(const ports::NodeName& name, NodeChannel* channel);
    190   void SendPeerMessage(const ports::NodeName& name,
    191                        ports::ScopedMessage message);
    192   void AcceptIncomingMessages();
    193   void ProcessIncomingMessages();
    194   void DropAllPeers();
    195 
    196   // ports::NodeDelegate:
    197   void GenerateRandomPortName(ports::PortName* port_name) override;
    198   void AllocMessage(size_t num_header_bytes,
    199                     ports::ScopedMessage* message) override;
    200   void ForwardMessage(const ports::NodeName& node,
    201                       ports::ScopedMessage message) override;
    202   void BroadcastMessage(ports::ScopedMessage message) override;
    203   void PortStatusChanged(const ports::PortRef& port) override;
    204 
    205   // NodeChannel::Delegate:
    206   void OnAcceptChild(const ports::NodeName& from_node,
    207                      const ports::NodeName& parent_name,
    208                      const ports::NodeName& token) override;
    209   void OnAcceptParent(const ports::NodeName& from_node,
    210                       const ports::NodeName& token,
    211                       const ports::NodeName& child_name) override;
    212   void OnAddBrokerClient(const ports::NodeName& from_node,
    213                          const ports::NodeName& client_name,
    214                          base::ProcessHandle process_handle) override;
    215   void OnBrokerClientAdded(const ports::NodeName& from_node,
    216                            const ports::NodeName& client_name,
    217                            ScopedPlatformHandle broker_channel) override;
    218   void OnAcceptBrokerClient(const ports::NodeName& from_node,
    219                             const ports::NodeName& broker_name,
    220                             ScopedPlatformHandle broker_channel) override;
    221   void OnPortsMessage(const ports::NodeName& from_node,
    222                       Channel::MessagePtr message) override;
    223   void OnRequestPortMerge(const ports::NodeName& from_node,
    224                           const ports::PortName& connector_port_name,
    225                           const std::string& token) override;
    226   void OnRequestIntroduction(const ports::NodeName& from_node,
    227                              const ports::NodeName& name) override;
    228   void OnIntroduce(const ports::NodeName& from_node,
    229                    const ports::NodeName& name,
    230                    ScopedPlatformHandle channel_handle) override;
    231   void OnBroadcast(const ports::NodeName& from_node,
    232                    Channel::MessagePtr message) override;
    233 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    234   void OnRelayPortsMessage(const ports::NodeName& from_node,
    235                            base::ProcessHandle from_process,
    236                            const ports::NodeName& destination,
    237                            Channel::MessagePtr message) override;
    238   void OnPortsMessageFromRelay(const ports::NodeName& from_node,
    239                                const ports::NodeName& source_node,
    240                                Channel::MessagePtr message) override;
    241 #endif
    242   void OnAcceptPeer(const ports::NodeName& from_node,
    243                     const ports::NodeName& token,
    244                     const ports::NodeName& peer_name,
    245                     const ports::PortName& port_name) override;
    246   void OnChannelError(const ports::NodeName& from_node,
    247                       NodeChannel* channel) override;
    248 #if defined(OS_MACOSX) && !defined(OS_IOS)
    249   MachPortRelay* GetMachPortRelay() override;
    250 #endif
    251 
    252   // Cancels all pending port merges. These are merges which are supposed to
    253   // be requested from the parent ASAP, and they may be cancelled if the
    254   // connection to the parent is broken or never established.
    255   void CancelPendingPortMerges();
    256 
    257   // Marks this NodeController for destruction when the IO thread shuts down.
    258   // This is used in case Core is torn down before the IO thread. Must only be
    259   // called on the IO thread.
    260   void DestroyOnIOThreadShutdown();
    261 
    262   // If there is a registered shutdown callback (meaning shutdown has been
    263   // requested, this checks the Node's status to see if clean shutdown is
    264   // possible. If so, shutdown is performed and the shutdown callback is run.
    265   void AttemptShutdownIfRequested();
    266 
    267   // These are safe to access from any thread as long as the Node is alive.
    268   Core* const core_;
    269   const ports::NodeName name_;
    270   const std::unique_ptr<ports::Node> node_;
    271   scoped_refptr<base::TaskRunner> io_task_runner_;
    272 
    273   // Guards |peers_| and |pending_peer_messages_|.
    274   base::Lock peers_lock_;
    275 
    276   // Channels to known peers, including parent and children, if any.
    277   NodeMap peers_;
    278 
    279   // Outgoing message queues for peers we've heard of but can't yet talk to.
    280   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
    281       pending_peer_messages_;
    282 
    283   // Guards |reserved_ports_| and |pending_child_tokens_|.
    284   base::Lock reserved_ports_lock_;
    285 
    286   // Ports reserved by token. Key is the port token.
    287   base::hash_map<std::string, ReservedPort> reserved_ports_;
    288   // TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't
    289   // have one yet :(
    290   std::unordered_map<ports::NodeName, std::string> pending_child_tokens_;
    291 
    292   // Guards |pending_port_merges_| and |reject_pending_merges_|.
    293   base::Lock pending_port_merges_lock_;
    294 
    295   // A set of port merge requests awaiting parent connection.
    296   std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
    297 
    298   // Indicates that new merge requests should be rejected because the parent has
    299   // disconnected.
    300   bool reject_pending_merges_ = false;
    301 
    302   // Guards |parent_name_| and |bootstrap_parent_channel_|.
    303   base::Lock parent_lock_;
    304 
    305   // The name of our parent node, if any.
    306   ports::NodeName parent_name_;
    307 
    308   // A temporary reference to the parent channel before we know their name.
    309   scoped_refptr<NodeChannel> bootstrap_parent_channel_;
    310 
    311   // Guards |broker_name_|, |pending_broker_clients_|, and
    312   // |pending_relay_messages_|.
    313   base::Lock broker_lock_;
    314 
    315   // The name of our broker node, if any.
    316   ports::NodeName broker_name_;
    317 
    318   // A queue of pending child names waiting to be connected to a broker.
    319   std::queue<ports::NodeName> pending_broker_clients_;
    320 
    321   // Messages waiting to be relayed by the broker once it's known.
    322   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
    323       pending_relay_messages_;
    324 
    325   // Guards |incoming_messages_| and |incoming_messages_task_posted_|.
    326   base::Lock messages_lock_;
    327   std::queue<ports::ScopedMessage> incoming_messages_;
    328   // Ensures that there is only one incoming messages task posted to the IO
    329   // thread.
    330   bool incoming_messages_task_posted_ = false;
    331   // Flag to fast-path checking |incoming_messages_|.
    332   AtomicFlag incoming_messages_flag_;
    333 
    334   // Guards |shutdown_callback_|.
    335   base::Lock shutdown_lock_;
    336 
    337   // Set by RequestShutdown(). If this is non-null, the controller will
    338   // begin polling the Node to see if clean shutdown is possible any time the
    339   // Node's state is modified by the controller.
    340   base::Closure shutdown_callback_;
    341   // Flag to fast-path checking |shutdown_callback_|.
    342   AtomicFlag shutdown_callback_flag_;
    343 
    344   // All other fields below must only be accessed on the I/O thread, i.e., the
    345   // thread on which core_->io_task_runner() runs tasks.
    346 
    347   // Channels to children during handshake.
    348   NodeMap pending_children_;
    349 
    350   using PeerNodeMap =
    351       std::unordered_map<ports::NodeName, PeerConnection>;
    352   PeerNodeMap peer_connections_;
    353 
    354   // Maps from peer token to node name, pending or not.
    355   std::unordered_map<std::string, ports::NodeName> peers_by_token_;
    356 
    357   // Indicates whether this object should delete itself on IO thread shutdown.
    358   // Must only be accessed from the IO thread.
    359   bool destroy_on_io_thread_shutdown_ = false;
    360 
    361 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
    362   // Broker for sync shared buffer creation in children.
    363   std::unique_ptr<Broker> broker_;
    364 #endif
    365 
    366 #if defined(OS_MACOSX) && !defined(OS_IOS)
    367   base::Lock mach_port_relay_lock_;
    368   // Relay for transferring mach ports to/from children.
    369   std::unique_ptr<MachPortRelay> mach_port_relay_;
    370 #endif
    371 
    372   DISALLOW_COPY_AND_ASSIGN(NodeController);
    373 };
    374 
    375 }  // namespace edk
    376 }  // namespace mojo
    377 
    378 #endif  // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
    379