Home | History | Annotate | Download | only in ports

Lines Matching refs:port

30 bool CanAcceptMoreMessages(const Port* port) {
33 uint64_t next_sequence_num = port->message_queue.next_sequence_num();
34 if (port->state == Port::kClosed)
36 if (port->peer_closed || port->remove_proxy_on_last_message) {
37 if (port->last_sequence_num_to_receive == next_sequence_num - 1)
47 explicit LockedPort(Port* port) : port_(port) {
51 Port* get() const { return port_; }
52 Port* operator->() const { return port_; }
55 Port* const port_;
74 DVLOG(2) << "Port " << entry.first << " referencing node "
91 entry.second->state != Port::kReceiving) {
94 DVLOG(2) << "Port " << entry.first << " referencing node "
108 scoped_refptr<Port> port = GetPort(port_name);
109 if (!port)
112 *port_ref = PortRef(port_name, std::move(port));
120 scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
121 int rv = AddPortWithName(port_name, port);
125 *port_ref = PortRef(port_name, std::move(port));
132 Port* port = port_ref.port();
135 base::AutoLock lock(port->lock);
136 if (port->state != Port::kUninitialized)
139 port->state = Port::kReceiving;
140 port->peer_node_name = peer_node_name;
141 port->peer_port_name = peer_port_name;
173 Port* port = port_ref.port();
175 base::AutoLock lock(port->lock);
176 if (port->state == Port::kClosed)
179 port->user_data = std::move(user_data);
186 Port* port = port_ref.port();
188 base::AutoLock lock(port->lock);
189 if (port->state == Port::kClosed)
192 *user_data = port->user_data;
204 Port* port = port_ref.port();
206 // We may need to erase the port, which requires ports_lock_ to be held,
207 // but ports_lock_ must be acquired before any individual port locks.
210 base::AutoLock lock(port->lock);
211 if (port->state == Port::kUninitialized) {
212 // If the port was not yet initialized, there's nothing interesting to do.
217 if (port->state != Port::kReceiving)
220 port->state = Port::kClosed;
223 // port to allow the peer to have the opportunity to consume all inbound
224 // messages before notifying the embedder that this port is closed.
225 data.last_sequence_num = port->next_sequence_num_to_send - 1;
227 peer_node_name = port->peer_node_name;
228 peer_port_name = port->peer_port_name;
230 // If the port being closed still has unread messages, then we need to take
232 port->message_queue.GetReferencedPorts(&referenced_port_names);
253 Port* port = port_ref.port();
255 base::AutoLock lock(port->lock);
257 if (port->state != Port::kReceiving)
260 port_status->has_messages = port->message_queue.HasNextMessage();
261 port_status->receiving_messages = CanAcceptMoreMessages(port);
262 port_status->peer_closed = port->peer_closed;
273 Port* port = port_ref.port();
275 base::AutoLock lock(port->lock);
277 // This could also be treated like the port being unknown since the
278 // embedder should no longer be referring to a port that has been sent.
279 if (port->state != Port::kReceiving)
284 if (!CanAcceptMoreMessages(port))
287 port->message_queue.GetNextMessage(message, filter);
294 scoped_refptr<Port> new_port = GetPort(new_port_name);
296 DCHECK(new_port) << "Port " << new_port_name << "@" << name_
301 DCHECK(new_port->state == Port::kReceiving);
313 // close the sending port itself if it happened to be one of the encoded
319 PortRef port;
320 if (GetPort(message->ports()[i], &port) == OK)
321 ClosePort(port);
356 Port* port = port_ref.port();
359 base::AutoLock lock(port->lock);
364 // Send the port-to-merge over to the destination node so it can be merged
365 // into the port cycle atomically there.
367 WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
378 Port* port0 = port0_ref.port();
379 Port* port1 = port1_ref.port();
382 // |ports_lock_| must be held when acquiring overlapping port locks.
390 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
432 scoped_refptr<Port> port = GetPort(port_name);
434 // Even if this port does not exist, cannot receive anymore messages or is
449 if (port) {
450 // We may want to forward messages once the port lock is held, so we must
453 base::AutoLock lock(port->lock);
457 if (CanAcceptMoreMessages(port.get())) {
459 port->message_queue.AcceptMessage(std::move(message), &has_next_message);
461 if (port->state == Port::kBuffering) {
463 } else if (port->state == Port::kProxying) {
469 // port has seen all of the messages it is expected to see.
470 int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
474 MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
487 DLOG(WARNING) << "Cannot close non-existent port!\n";
491 PortRef port_ref(port_name, port);
499 scoped_refptr<Port> port = GetPort(port_name);
500 if (!port)
505 << port->peer_port_name << "@" << port->peer_node_name;
507 return BeginProxying(PortRef(port_name, std::move(port)));
513 // An ObserveProxy with an invalid target port name is a broadcast used to
526 // The port may have already been closed locally, in which case the
529 scoped_refptr<Port> port
530 if (!port) {
542 base::AutoLock lock(port->lock);
544 if (port->peer_node_name == event.proxy_node_name &&
545 port->peer_port_name == event.proxy_port_name) {
546 if (port->state == Port::kReceiving) {
547 port->peer_node_name = event.proxy_to_node_name;
548 port->peer_port_name = event.proxy_to_port_name;
551 ack.last_sequence_num = port->next_sequence_num_to_send - 1;
561 // Afterall, another port could be sending messages to our peer now
575 port->send_on_proxy_removal.reset(
584 // port referring to the proxy.
586 port->peer_node_name,
587 NewInternalMessage(port->peer_port_name,
600 scoped_refptr<Port> port = GetPort(port_name);
601 if (!port)
602 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so
606 base::AutoLock lock(port->lock);
608 if (port->state != Port::kProxying)
613 InitiateProxyRemoval(LockedPort(port.get()), port_name);
617 // We can now remove this port once we have received and forwarded the last
618 // message addressed to this port.
619 port->remove_proxy_on_last_message = true;
620 port->last_sequence_num_to_receive = last_sequence_num;
622 TryRemoveProxy(PortRef(port_name, std::move(port)));
628 // OK if the port doesn't exist, as it may have been closed already.
629 scoped_refptr<Port> port = GetPort(port_name);
630 if (!port)
633 // This message tells the port that it should no longer expect more messages
644 base::AutoLock lock(port->lock);
646 port->peer_closed = true;
647 port->last_sequence_num_to_receive = last_sequence_num;
650 << " (state=" << port->state << ") pointing to "
651 << port->peer_port_name << "@" << port->peer_node_name
654 // We always forward ObserveClosure, even beyond the receiving port which
655 // cares about it. This ensures that any dead-end proxies beyond that port
658 if (port->state == Port::kReceiving) {
661 // When forwarding along the other half of the port cycle, this will only
666 // may be semantically confusing since the forwarding port is not actually
668 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
670 // We haven't yet reached the receiving peer of the closed port, so
674 // See about removing the port if it is a proxy as our peer won't be able
676 port->remove_proxy_on_last_message = true;
677 if (port->state == Port::kProxying)
683 << port->peer_port_name << "@" << port->peer_node_name
687 peer_node_name = port->peer_node_name;
688 peer_port_name = port->peer_port_name;
691 TryRemoveProxy(PortRef(port_name, port));
699 PortRef port_ref(port_name, std::move(port));
707 scoped_refptr<Port> port = GetPort(port_name);
710 << (port ? port->state : -1) << ") merging with proxy "
721 // Accept the new port. This is now the receiving end of the other port cycle
726 } else if (port) {
728 // needs to hold |ports_lock_|. We also acquire multiple port locks within.
730 base::AutoLock lock(port->lock);
732 if (port->state != Port::kReceiving) {
735 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
737 DCHECK(new_port->state == Port::kReceiving);
742 PortRef port0_ref(port_name, port);
774 int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
777 if (!ports_.insert(std::make_pair(port_name, std::move(port))).second)
780 DVLOG(2) << "Created port " << port_name << "@" << name_;
792 DVLOG(2) << "Deleted port " << port_name << "@" << name_;
795 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
800 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
821 Port* port = port_ref.port();
824 // We must acquire |ports_lock_| before grabbing any port locks, because
827 base::AutoLock lock(port->lock);
829 if (port->state != Port::kReceiving)
832 if (port->peer_closed)
835 int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
844 peer_node_name = port->peer_node_name;
863 Port* port0 = port0_ref.port();
864 Port* port1 = port1_ref.port();
870 CHECK(port0->state == Port::kReceiving);
871 CHECK(port1->state == Port::kReceiving);
891 port0->state = Port::kBuffering;
895 port1->state = Port::kBuffering;
903 // If either merged port had a closed peer, its new peer needs to be
933 port0->state = Port::kReceiving;
934 port1->state = Port::kReceiving;
940 void Node::WillSendPort(const LockedPort& port,
944 port->lock.AssertAcquired();
953 DCHECK(port->state == Port::kReceiving);
954 port->state = Port::kBuffering;
958 if (port->peer_closed)
959 port->remove_proxy_on_last_message = true;
963 port_descriptor->peer_node_name = port->peer_node_name;
964 port_descriptor->peer_port_name = port->peer_port_name;
967 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
969 port->message_queue.next_sequence_num();
971 port->last_sequence_num_to_receive;
972 port_descriptor->peer_closed = port->peer_closed;
975 // Configure the local port to point to the new port.
976 port->peer_node_name = to_node_name;
977 port->peer_port_name = new_port_name;
982 scoped_refptr<Port> port = make_scoped_refptr(
983 new Port(port_descriptor.next_sequence_num_to_send,
985 port->state = Port::kReceiving;
986 port->peer_node_name = port_descriptor.peer_node_name;
987 port->peer_port_name = port_descriptor.peer_port_name;
988 port->last_sequence_num_to_receive =
990 port->peer_closed = port_descriptor.peer_closed;
992 DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
993 << port->peer_closed << "; last_sequence_num_to_receive="
994 << port->last_sequence_num_to_receive << "]";
996 // A newly accepted port is not signalable until the message referencing the
997 // new port finds its way to the consumer (see GetMessage).
998 port->message_queue.set_signalable(false);
1000 int rv = AddPortWithName(port_name, std::move(port));
1004 // Allow referring port to forward messages.
1012 int Node::WillSendMessage_Locked(const LockedPort& port,
1016 port->lock.AssertAcquired();
1025 *sequence_num = port->next_sequence_num_to_send++;
1040 std::vector<scoped_refptr<Port>> ports;
1050 if (ports[i]->state != Port::kReceiving)
1052 else if (message->ports()[i] == port->peer_port_name)
1056 // Oops, we cannot send this port.
1060 port->next_sequence_num_to_send--;
1071 port->peer_node_name,
1085 << " to " << port->peer_port_name << "@" << port->peer_node_name;
1088 GetMutableEventHeader(message)->port_name = port->peer_port_name;
1092 int Node::BeginProxying_Locked(const LockedPort& port,
1095 port->lock.AssertAcquired();
1097 if (port->state != Port::kBuffering)
1100 port->state = Port::kProxying;
1102 int rv = ForwardMessages_Locked(LockedPort(port), port_name);
1110 if (port->remove_proxy_on_last_message) {
1111 MaybeRemoveProxy_Locked(LockedPort(port), port_name);
1115 data.last_sequence_num = port->last_sequence_num_to_receive;
1117 port->peer_node_name,
1118 NewInternalMessage(port->peer_port_name,
1121 InitiateProxyRemoval(LockedPort(port), port_name);
1128 Port* port = port_ref.port();
1131 base::AutoLock lock(port->lock);
1133 if (port->state != Port::kBuffering)
1136 port->state = Port::kProxying;
1138 int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
1147 base::AutoLock lock(port->lock);
1148 if (port->state != Port::kProxying)
1151 should_remove = port->remove_proxy_on_last_message;
1155 data.last_sequence_num = port->last_sequence_num_to_receive;
1156 peer_node_name = port->peer_node_name;
1157 closure_message = NewInternalMessage(port->peer_port_name,
1160 InitiateProxyRemoval(LockedPort(port), port_ref.name());
1172 int Node::ForwardMessages_Locked(const LockedPort& port,
1175 port->lock.AssertAcquired();
1179 port->message_queue.GetNextMessage(&message, nullptr);
1183 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
1187 delegate_->ForwardMessage(port->peer_node_name, std::move(message));
1192 void Node::InitiateProxyRemoval(const LockedPort& port,
1194 port->lock.AssertAcquired();
1197 // a proxy. This allows whatever port is referencing this node to skip it.
1204 data.proxy_to_node_name = port->peer_node_name;
1205 data.proxy_to_port_name = port->peer_port_name;
1208 port->peer_node_name,
1209 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
1212 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
1216 port->lock.AssertAcquired();
1218 DCHECK(port->state == Port::kProxying);
1220 // Make sure we have seen ObserveProxyAck before removing the port.
1221 if (!port->remove_proxy_on_last_message)
1224 if (!CanAcceptMoreMessages(port.get())) {
1225 // This proxy port is done. We can now remove it!
1228 if (port->send_on_proxy_removal) {
1229 NodeName to_node = port->send_on_proxy_removal->first;
1230 ScopedMessage& message = port->send_on_proxy_removal->second;
1233 port->send_on_proxy_removal.reset();
1236 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1242 Port* port = port_ref.port();
1247 base::AutoLock lock(port->lock);
1249 // Port already removed. Nothing to do.
1250 if (port->state == Port::kClosed)
1253 DCHECK(port->state == Port::kProxying);
1255 // Make sure we have seen ObserveProxyAck before removing the port.
1256 if (!port->remove_proxy_on_last_message)
1259 if (!CanAcceptMoreMessages(port)) {
1260 // This proxy port is done. We can now remove it!
1263 if (port->send_on_proxy_removal) {
1264 to_node = port->send_on_proxy_removal->first;
1265 msg = std::move(port->send_on_proxy_removal->second);
1266 port->send_on_proxy_removal.reset();
1269 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1283 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1295 Port* port = iter->second.get();
1297 base::AutoLock port_lock(port->lock);
1299 if (port->peer_node_name == node_name &&
1301 port->peer_port_name == port_name)) {
1302 if (!port->peer_closed) {
1307 port->peer_closed = true;
1308 port->last_sequence_num_to_receive =
1309 port->message_queue.next_sequence_num() - 1;
1311 if (port->state == Port::kReceiving)
1312 ports_to_notify.push_back(PortRef(iter->first, port));
1316 // expect to receive a Port{Accepted,Rejected} event. Because we're
1321 if (port->state != Port::kReceiving) {
1332 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1337 for (const auto& port : ports_to_notify)
1338 delegate_->PortStatusChanged(port);
1350 // Also process death locally since the port that points this closed one
1352 // Note: Although this is recursive, only a single port is involved which