Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
      6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
      7 
      8 #include <memory>
      9 #include <queue>
     10 #include <unordered_map>
     11 #include <unordered_set>
     12 #include <utility>
     13 #include <vector>
     14 
     15 #include "base/callback.h"
     16 #include "base/containers/hash_tables.h"
     17 #include "base/macros.h"
     18 #include "base/memory/ref_counted.h"
     19 #include "base/task_runner.h"
     20 #include "mojo/edk/embedder/platform_handle_vector.h"
     21 #include "mojo/edk/embedder/platform_shared_buffer.h"
     22 #include "mojo/edk/embedder/scoped_platform_handle.h"
     23 #include "mojo/edk/system/atomic_flag.h"
     24 #include "mojo/edk/system/node_channel.h"
     25 #include "mojo/edk/system/ports/name.h"
     26 #include "mojo/edk/system/ports/node.h"
     27 #include "mojo/edk/system/ports/node_delegate.h"
     28 
     29 namespace base {
     30 class PortProvider;
     31 }
     32 
     33 namespace mojo {
     34 namespace edk {
     35 
     36 class Broker;
     37 class Core;
     38 class MachPortRelay;
     39 class PortsMessage;
     40 
     41 // The owner of ports::Node which facilitates core EDK implementation. All
     42 // public interface methods are safe to call from any thread.
     43 class NodeController : public ports::NodeDelegate,
     44                        public NodeChannel::Delegate {
     45  public:
     46   class PortObserver : public ports::UserData {
     47    public:
     48     virtual void OnPortStatusChanged() = 0;
     49 
     50    protected:
     51     ~PortObserver() override {}
     52   };
     53 
     54   // |core| owns and out-lives us.
     55   explicit NodeController(Core* core);
     56   ~NodeController() override;
     57 
     58   const ports::NodeName& name() const { return name_; }
     59   Core* core() const { return core_; }
     60   ports::Node* node() const { return node_.get(); }
     61   scoped_refptr<base::TaskRunner> io_task_runner() const {
     62     return io_task_runner_;
     63   }
     64 
     65 #if defined(OS_MACOSX) && !defined(OS_IOS)
     66   // Create the relay used to transfer mach ports between processes.
     67   void CreateMachPortRelay(base::PortProvider* port_provider);
     68 #endif
     69 
     70   // Called exactly once, shortly after construction, and before any other
     71   // methods are called on this object.
     72   void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner);
     73 
     74   // Connects this node to a child node. This node will initiate a handshake.
     75   void ConnectToChild(base::ProcessHandle process_handle,
     76                       ScopedPlatformHandle platform_handle,
     77                       const std::string& child_token,
     78                       const ProcessErrorCallback& process_error_callback);
     79 
     80   // Closes all reserved ports which associated with the child process
     81   // |child_token|.
     82   void CloseChildPorts(const std::string& child_token);
     83 
     84   // Connects this node to a parent node. The parent node will initiate a
     85   // handshake.
     86   void ConnectToParent(ScopedPlatformHandle platform_handle);
     87 
     88   // Sets a port's observer. If |observer| is null the port's current observer
     89   // is removed.
     90   void SetPortObserver(const ports::PortRef& port,
     91                        const scoped_refptr<PortObserver>& observer);
     92 
     93   // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
     94   // it ensures the port's observer has also been removed.
     95   void ClosePort(const ports::PortRef& port);
     96 
     97   // Sends a message on a port to its peer.
     98   int SendMessage(const ports::PortRef& port_ref,
     99                   std::unique_ptr<PortsMessage> message);
    100 
    101   // Reserves a local port |port| associated with |token|. A peer holding a copy
    102   // of |token| can merge one of its own ports into this one.
    103   void ReservePort(const std::string& token, const ports::PortRef& port,
    104                    const std::string& child_token);
    105 
    106   // Merges a local port |port| into a port reserved by |token| in the parent.
    107   void MergePortIntoParent(const std::string& token,
    108                            const ports::PortRef& port);
    109 
    110   // Merges two local ports together.
    111   int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
    112 
    113   // Creates a new shared buffer for use in the current process.
    114   scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
    115 
    116   // Request that the Node be shut down cleanly. This may take an arbitrarily
    117   // long time to complete, at which point |callback| will be called.
    118   //
    119   // Note that while it is safe to continue using the NodeController's public
    120   // interface after requesting shutdown, you do so at your own risk and there
    121   // is NO guarantee that new messages will be sent or ports will complete
    122   // transfer.
    123   void RequestShutdown(const base::Closure& callback);
    124 
    125   // Notifies the NodeController that we received a bad message from the given
    126   // node.
    127   void NotifyBadMessageFrom(const ports::NodeName& source_node,
    128                             const std::string& error);
    129 
    130  private:
    131   friend Core;
    132 
    133   using NodeMap = std::unordered_map<ports::NodeName,
    134                                      scoped_refptr<NodeChannel>>;
    135   using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
    136 
    137   struct ReservedPort {
    138     ports::PortRef port;
    139     const std::string child_token;
    140   };
    141 
    142   void ConnectToChildOnIOThread(
    143       base::ProcessHandle process_handle,
    144       ScopedPlatformHandle platform_handle,
    145       ports::NodeName token,
    146       const ProcessErrorCallback& process_error_callback);
    147   void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle);
    148 
    149   scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
    150   scoped_refptr<NodeChannel> GetParentChannel();
    151   scoped_refptr<NodeChannel> GetBrokerChannel();
    152 
    153   void AddPeer(const ports::NodeName& name,
    154                scoped_refptr<NodeChannel> channel,
    155                bool start_channel);
    156   void DropPeer(const ports::NodeName& name, NodeChannel* channel);
    157   void SendPeerMessage(const ports::NodeName& name,
    158                        ports::ScopedMessage message);
    159   void AcceptIncomingMessages();
    160   void ProcessIncomingMessages();
    161   void DropAllPeers();
    162 
    163   // ports::NodeDelegate:
    164   void GenerateRandomPortName(ports::PortName* port_name) override;
    165   void AllocMessage(size_t num_header_bytes,
    166                     ports::ScopedMessage* message) override;
    167   void ForwardMessage(const ports::NodeName& node,
    168                       ports::ScopedMessage message) override;
    169   void BroadcastMessage(ports::ScopedMessage message) override;
    170   void PortStatusChanged(const ports::PortRef& port) override;
    171 
    172   // NodeChannel::Delegate:
    173   void OnAcceptChild(const ports::NodeName& from_node,
    174                      const ports::NodeName& parent_name,
    175                      const ports::NodeName& token) override;
    176   void OnAcceptParent(const ports::NodeName& from_node,
    177                       const ports::NodeName& token,
    178                       const ports::NodeName& child_name) override;
    179   void OnAddBrokerClient(const ports::NodeName& from_node,
    180                          const ports::NodeName& client_name,
    181                          base::ProcessHandle process_handle) override;
    182   void OnBrokerClientAdded(const ports::NodeName& from_node,
    183                            const ports::NodeName& client_name,
    184                            ScopedPlatformHandle broker_channel) override;
    185   void OnAcceptBrokerClient(const ports::NodeName& from_node,
    186                             const ports::NodeName& broker_name,
    187                             ScopedPlatformHandle broker_channel) override;
    188   void OnPortsMessage(const ports::NodeName& from_node,
    189                       Channel::MessagePtr message) override;
    190   void OnRequestPortMerge(const ports::NodeName& from_node,
    191                           const ports::PortName& connector_port_name,
    192                           const std::string& token) override;
    193   void OnRequestIntroduction(const ports::NodeName& from_node,
    194                              const ports::NodeName& name) override;
    195   void OnIntroduce(const ports::NodeName& from_node,
    196                    const ports::NodeName& name,
    197                    ScopedPlatformHandle channel_handle) override;
    198   void OnBroadcast(const ports::NodeName& from_node,
    199                    Channel::MessagePtr message) override;
    200 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    201   void OnRelayPortsMessage(const ports::NodeName& from_node,
    202                            base::ProcessHandle from_process,
    203                            const ports::NodeName& destination,
    204                            Channel::MessagePtr message) override;
    205   void OnPortsMessageFromRelay(const ports::NodeName& from_node,
    206                                const ports::NodeName& source_node,
    207                                Channel::MessagePtr message) override;
    208 #endif
    209   void OnChannelError(const ports::NodeName& from_node,
    210                       NodeChannel* channel) override;
    211 #if defined(OS_MACOSX) && !defined(OS_IOS)
    212   MachPortRelay* GetMachPortRelay() override;
    213 #endif
    214 
    215   // Marks this NodeController for destruction when the IO thread shuts down.
    216   // This is used in case Core is torn down before the IO thread. Must only be
    217   // called on the IO thread.
    218   void DestroyOnIOThreadShutdown();
    219 
    220   // If there is a registered shutdown callback (meaning shutdown has been
    221   // requested, this checks the Node's status to see if clean shutdown is
    222   // possible. If so, shutdown is performed and the shutdown callback is run.
    223   void AttemptShutdownIfRequested();
    224 
    225   // These are safe to access from any thread as long as the Node is alive.
    226   Core* const core_;
    227   const ports::NodeName name_;
    228   const std::unique_ptr<ports::Node> node_;
    229   scoped_refptr<base::TaskRunner> io_task_runner_;
    230 
    231   // Guards |peers_| and |pending_peer_messages_|.
    232   base::Lock peers_lock_;
    233 
    234   // Channels to known peers, including parent and children, if any.
    235   NodeMap peers_;
    236 
    237   // Outgoing message queues for peers we've heard of but can't yet talk to.
    238   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
    239       pending_peer_messages_;
    240 
    241   // Guards |reserved_ports_| and |pending_child_tokens_|.
    242   base::Lock reserved_ports_lock_;
    243 
    244   // Ports reserved by token. Key is the port token.
    245   base::hash_map<std::string, ReservedPort> reserved_ports_;
    246   // TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't
    247   // have one yet :(
    248   std::unordered_map<ports::NodeName, std::string> pending_child_tokens_;
    249 
    250   // Guards |pending_port_merges_| and |reject_pending_merges_|.
    251   base::Lock pending_port_merges_lock_;
    252 
    253   // A set of port merge requests awaiting parent connection.
    254   std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
    255 
    256   // Indicates that new merge requests should be rejected because the parent has
    257   // disconnected.
    258   bool reject_pending_merges_ = false;
    259 
    260   // Guards |parent_name_| and |bootstrap_parent_channel_|.
    261   base::Lock parent_lock_;
    262 
    263   // The name of our parent node, if any.
    264   ports::NodeName parent_name_;
    265 
    266   // A temporary reference to the parent channel before we know their name.
    267   scoped_refptr<NodeChannel> bootstrap_parent_channel_;
    268 
    269   // Guards |broker_name_|, |pending_broker_clients_|, and
    270   // |pending_relay_messages_|.
    271   base::Lock broker_lock_;
    272 
    273   // The name of our broker node, if any.
    274   ports::NodeName broker_name_;
    275 
    276   // A queue of pending child names waiting to be connected to a broker.
    277   std::queue<ports::NodeName> pending_broker_clients_;
    278 
    279   // Messages waiting to be relayed by the broker once it's known.
    280   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
    281       pending_relay_messages_;
    282 
    283   // Guards |incoming_messages_| and |incoming_messages_task_posted_|.
    284   base::Lock messages_lock_;
    285   std::queue<ports::ScopedMessage> incoming_messages_;
    286   // Ensures that there is only one incoming messages task posted to the IO
    287   // thread.
    288   bool incoming_messages_task_posted_ = false;
    289 
    290   // Guards |shutdown_callback_|.
    291   base::Lock shutdown_lock_;
    292 
    293   // Set by RequestShutdown(). If this is non-null, the controller will
    294   // begin polling the Node to see if clean shutdown is possible any time the
    295   // Node's state is modified by the controller.
    296   base::Closure shutdown_callback_;
    297   // Flag to fast-path checking |shutdown_callback_|.
    298   AtomicFlag shutdown_callback_flag_;
    299 
    300   // All other fields below must only be accessed on the I/O thread, i.e., the
    301   // thread on which core_->io_task_runner() runs tasks.
    302 
    303   // Channels to children during handshake.
    304   NodeMap pending_children_;
    305 
    306   // Indicates whether this object should delete itself on IO thread shutdown.
    307   // Must only be accessed from the IO thread.
    308   bool destroy_on_io_thread_shutdown_ = false;
    309 
    310 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
    311   // Broker for sync shared buffer creation (non-Mac posix-only) in children.
    312   std::unique_ptr<Broker> broker_;
    313 #endif
    314 
    315 #if defined(OS_MACOSX) && !defined(OS_IOS)
    316   base::Lock mach_port_relay_lock_;
    317   // Relay for transferring mach ports to/from children.
    318   std::unique_ptr<MachPortRelay> mach_port_relay_;
    319 #endif
    320 
    321   DISALLOW_COPY_AND_ASSIGN(NodeController);
    322 };
    323 
    324 }  // namespace edk
    325 }  // namespace mojo
    326 
    327 #endif  // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
    328