1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/channel.h" 6 7 #include "base/basictypes.h" 8 #include "base/bind.h" 9 #include "base/compiler_specific.h" 10 #include "base/logging.h" 11 #include "base/message_loop/message_loop.h" 12 #include "base/strings/stringprintf.h" 13 14 namespace mojo { 15 namespace system { 16 17 COMPILE_ASSERT(Channel::kBootstrapEndpointId != 18 MessageInTransit::kInvalidEndpointId, 19 kBootstrapEndpointId_is_invalid); 20 21 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId 22 Channel::kBootstrapEndpointId; 23 24 Channel::EndpointInfo::EndpointInfo() { 25 } 26 27 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe, 28 unsigned port) 29 : message_pipe(message_pipe), 30 port(port) { 31 } 32 33 Channel::EndpointInfo::~EndpointInfo() { 34 } 35 36 Channel::Channel() 37 : next_local_id_(kBootstrapEndpointId) { 38 } 39 40 bool Channel::Init(const PlatformChannelHandle& handle) { 41 DCHECK(creation_thread_checker_.CalledOnValidThread()); 42 43 // No need to take |lock_|, since this must be called before this object 44 // becomes thread-safe. 45 DCHECK(!raw_channel_.get()); 46 47 raw_channel_.reset( 48 RawChannel::Create(handle, this, base::MessageLoop::current())); 49 if (!raw_channel_->Init()) { 50 raw_channel_.reset(); 51 return false; 52 } 53 54 return true; 55 } 56 57 void Channel::Shutdown() { 58 DCHECK(creation_thread_checker_.CalledOnValidThread()); 59 60 base::AutoLock locker(lock_); 61 DCHECK(raw_channel_.get()); 62 raw_channel_->Shutdown(); 63 raw_channel_.reset(); 64 65 // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that 66 // it's empty? 67 } 68 69 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( 70 scoped_refptr<MessagePipe> message_pipe, unsigned port) { 71 MessageInTransit::EndpointId local_id; 72 { 73 base::AutoLock locker(lock_); 74 75 while (next_local_id_ == MessageInTransit::kInvalidEndpointId || 76 local_id_to_endpoint_info_map_.find(next_local_id_) != 77 local_id_to_endpoint_info_map_.end()) 78 next_local_id_++; 79 80 local_id = next_local_id_; 81 next_local_id_++; 82 83 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid 84 // some expensive reference count increment/decrements.) Once this is done, 85 // we should be able to delete |EndpointInfo|'s default constructor. 86 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); 87 } 88 89 message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id); 90 return local_id; 91 } 92 93 void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, 94 MessageInTransit::EndpointId remote_id) { 95 EndpointInfo endpoint_info; 96 { 97 base::AutoLock locker(lock_); 98 99 IdToEndpointInfoMap::const_iterator it = 100 local_id_to_endpoint_info_map_.find(local_id); 101 CHECK(it != local_id_to_endpoint_info_map_.end()); 102 endpoint_info = it->second; 103 } 104 105 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); 106 } 107 108 bool Channel::WriteMessage(MessageInTransit* message) { 109 base::AutoLock locker(lock_); 110 if (!raw_channel_.get()) { 111 // TODO(vtl): I think this is probably not an error condition, but I should 112 // think about it (and the shutdown sequence) more carefully. 113 LOG(INFO) << "WriteMessage() after shutdown"; 114 return false; 115 } 116 117 return raw_channel_->WriteMessage(message); 118 } 119 120 void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { 121 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 122 123 base::AutoLock locker_(lock_); 124 local_id_to_endpoint_info_map_.erase(local_id); 125 } 126 127 Channel::~Channel() { 128 // The channel should have been shut down first. 129 DCHECK(!raw_channel_.get()); 130 } 131 132 void Channel::OnReadMessage(const MessageInTransit& message) { 133 switch (message.type()) { 134 case MessageInTransit::kTypeMessagePipeEndpoint: 135 case MessageInTransit::kTypeMessagePipe: 136 OnReadMessageForDownstream(message); 137 break; 138 case MessageInTransit::TYPE_CHANNEL: 139 OnReadMessageForChannel(message); 140 break; 141 default: 142 HandleRemoteError(base::StringPrintf( 143 "Received message of invalid type %u", 144 static_cast<unsigned>(message.type()))); 145 break; 146 } 147 } 148 149 void Channel::OnFatalError(FatalError fatal_error) { 150 // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead. 151 NOTIMPLEMENTED(); 152 } 153 154 void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { 155 DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint || 156 message.type() == MessageInTransit::kTypeMessagePipe); 157 158 MessageInTransit::EndpointId local_id = message.destination_id(); 159 if (local_id == MessageInTransit::kInvalidEndpointId) { 160 HandleRemoteError("Received message with no destination ID"); 161 return; 162 } 163 164 EndpointInfo endpoint_info; 165 { 166 base::AutoLock locker(lock_); 167 168 // Since we own |raw_channel_|, and this method and |Shutdown()| should only 169 // be called from the creation thread, |raw_channel_| should never be null 170 // here. 171 DCHECK(raw_channel_.get()); 172 173 IdToEndpointInfoMap::const_iterator it = 174 local_id_to_endpoint_info_map_.find(local_id); 175 if (it == local_id_to_endpoint_info_map_.end()) { 176 HandleRemoteError(base::StringPrintf( 177 "Received a message for nonexistent local destination ID %u", 178 static_cast<unsigned>(local_id))); 179 return; 180 } 181 endpoint_info = it->second; 182 } 183 184 // We need to duplicate the message, because |EnqueueMessage()| will take 185 // ownership of it. 186 MessageInTransit* own_message = MessageInTransit::Create( 187 message.type(), message.subtype(), message.data(), message.data_size()); 188 if (endpoint_info.message_pipe->EnqueueMessage( 189 MessagePipe::GetPeerPort(endpoint_info.port), own_message, NULL) != 190 MOJO_RESULT_OK) { 191 HandleLocalError(base::StringPrintf( 192 "Failed to enqueue message to local destination ID %u", 193 static_cast<unsigned>(local_id))); 194 return; 195 } 196 } 197 198 void Channel::OnReadMessageForChannel(const MessageInTransit& message) { 199 // TODO(vtl): Currently no channel-only messages yet. 200 HandleRemoteError("Received invalid channel message"); 201 NOTREACHED(); 202 } 203 204 void Channel::HandleRemoteError(const base::StringPiece& error_message) { 205 // TODO(vtl): Is this how we really want to handle this? 206 LOG(INFO) << error_message; 207 } 208 209 void Channel::HandleLocalError(const base::StringPiece& error_message) { 210 // TODO(vtl): Is this how we really want to handle this? 211 LOG(FATAL) << error_message; 212 } 213 214 } // namespace system 215 } // namespace mojo 216