1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // Package channelz defines APIs for enabling channelz service, entry 20 // registration/deletion, and accessing channelz data. It also defines channelz 21 // metric struct formats. 22 // 23 // All APIs in this package are experimental. 24 package channelz 25 26 import ( 27 "sort" 28 "sync" 29 "sync/atomic" 30 31 "google.golang.org/grpc/grpclog" 32 ) 33 34 var ( 35 db dbWrapper 36 idGen idGenerator 37 // EntryPerPage defines the number of channelz entries to be shown on a web page. 38 EntryPerPage = 50 39 curState int32 40 ) 41 42 // TurnOn turns on channelz data collection. 43 func TurnOn() { 44 if !IsOn() { 45 NewChannelzStorage() 46 atomic.StoreInt32(&curState, 1) 47 } 48 } 49 50 // IsOn returns whether channelz data collection is on. 51 func IsOn() bool { 52 return atomic.CompareAndSwapInt32(&curState, 1, 1) 53 } 54 55 // dbWarpper wraps around a reference to internal channelz data storage, and 56 // provide synchronized functionality to set and get the reference. 57 type dbWrapper struct { 58 mu sync.RWMutex 59 DB *channelMap 60 } 61 62 func (d *dbWrapper) set(db *channelMap) { 63 d.mu.Lock() 64 d.DB = db 65 d.mu.Unlock() 66 } 67 68 func (d *dbWrapper) get() *channelMap { 69 d.mu.RLock() 70 defer d.mu.RUnlock() 71 return d.DB 72 } 73 74 // NewChannelzStorage initializes channelz data storage and id generator. 75 // 76 // Note: This function is exported for testing purpose only. User should not call 77 // it in most cases. 78 func NewChannelzStorage() { 79 db.set(&channelMap{ 80 topLevelChannels: make(map[int64]struct{}), 81 channels: make(map[int64]*channel), 82 listenSockets: make(map[int64]*listenSocket), 83 normalSockets: make(map[int64]*normalSocket), 84 servers: make(map[int64]*server), 85 subChannels: make(map[int64]*subChannel), 86 }) 87 idGen.reset() 88 } 89 90 // GetTopChannels returns a slice of top channel's ChannelMetric, along with a 91 // boolean indicating whether there's more top channels to be queried for. 92 // 93 // The arg id specifies that only top channel with id at or above it will be included 94 // in the result. The returned slice is up to a length of EntryPerPage, and is 95 // sorted in ascending id order. 96 func GetTopChannels(id int64) ([]*ChannelMetric, bool) { 97 return db.get().GetTopChannels(id) 98 } 99 100 // GetServers returns a slice of server's ServerMetric, along with a 101 // boolean indicating whether there's more servers to be queried for. 102 // 103 // The arg id specifies that only server with id at or above it will be included 104 // in the result. The returned slice is up to a length of EntryPerPage, and is 105 // sorted in ascending id order. 106 func GetServers(id int64) ([]*ServerMetric, bool) { 107 return db.get().GetServers(id) 108 } 109 110 // GetServerSockets returns a slice of server's (identified by id) normal socket's 111 // SocketMetric, along with a boolean indicating whether there's more sockets to 112 // be queried for. 113 // 114 // The arg startID specifies that only sockets with id at or above it will be 115 // included in the result. The returned slice is up to a length of EntryPerPage, 116 // and is sorted in ascending id order. 117 func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) { 118 return db.get().GetServerSockets(id, startID) 119 } 120 121 // GetChannel returns the ChannelMetric for the channel (identified by id). 122 func GetChannel(id int64) *ChannelMetric { 123 return db.get().GetChannel(id) 124 } 125 126 // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). 127 func GetSubChannel(id int64) *SubChannelMetric { 128 return db.get().GetSubChannel(id) 129 } 130 131 // GetSocket returns the SocketInternalMetric for the socket (identified by id). 132 func GetSocket(id int64) *SocketMetric { 133 return db.get().GetSocket(id) 134 } 135 136 // RegisterChannel registers the given channel c in channelz database with ref 137 // as its reference name, and add it to the child list of its parent (identified 138 // by pid). pid = 0 means no parent. It returns the unique channelz tracking id 139 // assigned to this channel. 140 func RegisterChannel(c Channel, pid int64, ref string) int64 { 141 id := idGen.genID() 142 cn := &channel{ 143 refName: ref, 144 c: c, 145 subChans: make(map[int64]string), 146 nestedChans: make(map[int64]string), 147 id: id, 148 pid: pid, 149 } 150 if pid == 0 { 151 db.get().addChannel(id, cn, true, pid, ref) 152 } else { 153 db.get().addChannel(id, cn, false, pid, ref) 154 } 155 return id 156 } 157 158 // RegisterSubChannel registers the given channel c in channelz database with ref 159 // as its reference name, and add it to the child list of its parent (identified 160 // by pid). It returns the unique channelz tracking id assigned to this subchannel. 161 func RegisterSubChannel(c Channel, pid int64, ref string) int64 { 162 if pid == 0 { 163 grpclog.Error("a SubChannel's parent id cannot be 0") 164 return 0 165 } 166 id := idGen.genID() 167 sc := &subChannel{ 168 refName: ref, 169 c: c, 170 sockets: make(map[int64]string), 171 id: id, 172 pid: pid, 173 } 174 db.get().addSubChannel(id, sc, pid, ref) 175 return id 176 } 177 178 // RegisterServer registers the given server s in channelz database. It returns 179 // the unique channelz tracking id assigned to this server. 180 func RegisterServer(s Server, ref string) int64 { 181 id := idGen.genID() 182 svr := &server{ 183 refName: ref, 184 s: s, 185 sockets: make(map[int64]string), 186 listenSockets: make(map[int64]string), 187 id: id, 188 } 189 db.get().addServer(id, svr) 190 return id 191 } 192 193 // RegisterListenSocket registers the given listen socket s in channelz database 194 // with ref as its reference name, and add it to the child list of its parent 195 // (identified by pid). It returns the unique channelz tracking id assigned to 196 // this listen socket. 197 func RegisterListenSocket(s Socket, pid int64, ref string) int64 { 198 if pid == 0 { 199 grpclog.Error("a ListenSocket's parent id cannot be 0") 200 return 0 201 } 202 id := idGen.genID() 203 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid} 204 db.get().addListenSocket(id, ls, pid, ref) 205 return id 206 } 207 208 // RegisterNormalSocket registers the given normal socket s in channelz database 209 // with ref as its reference name, and add it to the child list of its parent 210 // (identified by pid). It returns the unique channelz tracking id assigned to 211 // this normal socket. 212 func RegisterNormalSocket(s Socket, pid int64, ref string) int64 { 213 if pid == 0 { 214 grpclog.Error("a NormalSocket's parent id cannot be 0") 215 return 0 216 } 217 id := idGen.genID() 218 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid} 219 db.get().addNormalSocket(id, ns, pid, ref) 220 return id 221 } 222 223 // RemoveEntry removes an entry with unique channelz trakcing id to be id from 224 // channelz database. 225 func RemoveEntry(id int64) { 226 db.get().removeEntry(id) 227 } 228 229 // channelMap is the storage data structure for channelz. 230 // Methods of channelMap can be divided in two two categories with respect to locking. 231 // 1. Methods acquire the global lock. 232 // 2. Methods that can only be called when global lock is held. 233 // A second type of method need always to be called inside a first type of method. 234 type channelMap struct { 235 mu sync.RWMutex 236 topLevelChannels map[int64]struct{} 237 servers map[int64]*server 238 channels map[int64]*channel 239 subChannels map[int64]*subChannel 240 listenSockets map[int64]*listenSocket 241 normalSockets map[int64]*normalSocket 242 } 243 244 func (c *channelMap) addServer(id int64, s *server) { 245 c.mu.Lock() 246 s.cm = c 247 c.servers[id] = s 248 c.mu.Unlock() 249 } 250 251 func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) { 252 c.mu.Lock() 253 cn.cm = c 254 c.channels[id] = cn 255 if isTopChannel { 256 c.topLevelChannels[id] = struct{}{} 257 } else { 258 c.findEntry(pid).addChild(id, cn) 259 } 260 c.mu.Unlock() 261 } 262 263 func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) { 264 c.mu.Lock() 265 sc.cm = c 266 c.subChannels[id] = sc 267 c.findEntry(pid).addChild(id, sc) 268 c.mu.Unlock() 269 } 270 271 func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) { 272 c.mu.Lock() 273 ls.cm = c 274 c.listenSockets[id] = ls 275 c.findEntry(pid).addChild(id, ls) 276 c.mu.Unlock() 277 } 278 279 func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) { 280 c.mu.Lock() 281 ns.cm = c 282 c.normalSockets[id] = ns 283 c.findEntry(pid).addChild(id, ns) 284 c.mu.Unlock() 285 } 286 287 // removeEntry triggers the removal of an entry, which may not indeed delete the 288 // entry, if it has to wait on the deletion of its children, or may lead to a chain 289 // of entry deletion. For example, deleting the last socket of a gracefully shutting 290 // down server will lead to the server being also deleted. 291 func (c *channelMap) removeEntry(id int64) { 292 c.mu.Lock() 293 c.findEntry(id).triggerDelete() 294 c.mu.Unlock() 295 } 296 297 // c.mu must be held by the caller. 298 func (c *channelMap) findEntry(id int64) entry { 299 var v entry 300 var ok bool 301 if v, ok = c.channels[id]; ok { 302 return v 303 } 304 if v, ok = c.subChannels[id]; ok { 305 return v 306 } 307 if v, ok = c.servers[id]; ok { 308 return v 309 } 310 if v, ok = c.listenSockets[id]; ok { 311 return v 312 } 313 if v, ok = c.normalSockets[id]; ok { 314 return v 315 } 316 return &dummyEntry{idNotFound: id} 317 } 318 319 // c.mu must be held by the caller 320 // deleteEntry simply deletes an entry from the channelMap. Before calling this 321 // method, caller must check this entry is ready to be deleted, i.e removeEntry() 322 // has been called on it, and no children still exist. 323 // Conditionals are ordered by the expected frequency of deletion of each entity 324 // type, in order to optimize performance. 325 func (c *channelMap) deleteEntry(id int64) { 326 var ok bool 327 if _, ok = c.normalSockets[id]; ok { 328 delete(c.normalSockets, id) 329 return 330 } 331 if _, ok = c.subChannels[id]; ok { 332 delete(c.subChannels, id) 333 return 334 } 335 if _, ok = c.channels[id]; ok { 336 delete(c.channels, id) 337 delete(c.topLevelChannels, id) 338 return 339 } 340 if _, ok = c.listenSockets[id]; ok { 341 delete(c.listenSockets, id) 342 return 343 } 344 if _, ok = c.servers[id]; ok { 345 delete(c.servers, id) 346 return 347 } 348 } 349 350 type int64Slice []int64 351 352 func (s int64Slice) Len() int { return len(s) } 353 func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 354 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } 355 356 func copyMap(m map[int64]string) map[int64]string { 357 n := make(map[int64]string) 358 for k, v := range m { 359 n[k] = v 360 } 361 return n 362 } 363 364 func min(a, b int) int { 365 if a < b { 366 return a 367 } 368 return b 369 } 370 371 func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) { 372 c.mu.RLock() 373 l := len(c.topLevelChannels) 374 ids := make([]int64, 0, l) 375 cns := make([]*channel, 0, min(l, EntryPerPage)) 376 377 for k := range c.topLevelChannels { 378 ids = append(ids, k) 379 } 380 sort.Sort(int64Slice(ids)) 381 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 382 count := 0 383 var end bool 384 var t []*ChannelMetric 385 for i, v := range ids[idx:] { 386 if count == EntryPerPage { 387 break 388 } 389 if cn, ok := c.channels[v]; ok { 390 cns = append(cns, cn) 391 t = append(t, &ChannelMetric{ 392 NestedChans: copyMap(cn.nestedChans), 393 SubChans: copyMap(cn.subChans), 394 }) 395 count++ 396 } 397 if i == len(ids[idx:])-1 { 398 end = true 399 break 400 } 401 } 402 c.mu.RUnlock() 403 if count == 0 { 404 end = true 405 } 406 407 for i, cn := range cns { 408 t[i].ChannelData = cn.c.ChannelzMetric() 409 t[i].ID = cn.id 410 t[i].RefName = cn.refName 411 } 412 return t, end 413 } 414 415 func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) { 416 c.mu.RLock() 417 l := len(c.servers) 418 ids := make([]int64, 0, l) 419 ss := make([]*server, 0, min(l, EntryPerPage)) 420 for k := range c.servers { 421 ids = append(ids, k) 422 } 423 sort.Sort(int64Slice(ids)) 424 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 425 count := 0 426 var end bool 427 var s []*ServerMetric 428 for i, v := range ids[idx:] { 429 if count == EntryPerPage { 430 break 431 } 432 if svr, ok := c.servers[v]; ok { 433 ss = append(ss, svr) 434 s = append(s, &ServerMetric{ 435 ListenSockets: copyMap(svr.listenSockets), 436 }) 437 count++ 438 } 439 if i == len(ids[idx:])-1 { 440 end = true 441 break 442 } 443 } 444 c.mu.RUnlock() 445 if count == 0 { 446 end = true 447 } 448 449 for i, svr := range ss { 450 s[i].ServerData = svr.s.ChannelzMetric() 451 s[i].ID = svr.id 452 s[i].RefName = svr.refName 453 } 454 return s, end 455 } 456 457 func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) { 458 var svr *server 459 var ok bool 460 c.mu.RLock() 461 if svr, ok = c.servers[id]; !ok { 462 // server with id doesn't exist. 463 c.mu.RUnlock() 464 return nil, true 465 } 466 svrskts := svr.sockets 467 l := len(svrskts) 468 ids := make([]int64, 0, l) 469 sks := make([]*normalSocket, 0, min(l, EntryPerPage)) 470 for k := range svrskts { 471 ids = append(ids, k) 472 } 473 sort.Sort((int64Slice(ids))) 474 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) 475 count := 0 476 var end bool 477 for i, v := range ids[idx:] { 478 if count == EntryPerPage { 479 break 480 } 481 if ns, ok := c.normalSockets[v]; ok { 482 sks = append(sks, ns) 483 count++ 484 } 485 if i == len(ids[idx:])-1 { 486 end = true 487 break 488 } 489 } 490 c.mu.RUnlock() 491 if count == 0 { 492 end = true 493 } 494 var s []*SocketMetric 495 for _, ns := range sks { 496 sm := &SocketMetric{} 497 sm.SocketData = ns.s.ChannelzMetric() 498 sm.ID = ns.id 499 sm.RefName = ns.refName 500 s = append(s, sm) 501 } 502 return s, end 503 } 504 505 func (c *channelMap) GetChannel(id int64) *ChannelMetric { 506 cm := &ChannelMetric{} 507 var cn *channel 508 var ok bool 509 c.mu.RLock() 510 if cn, ok = c.channels[id]; !ok { 511 // channel with id doesn't exist. 512 c.mu.RUnlock() 513 return nil 514 } 515 cm.NestedChans = copyMap(cn.nestedChans) 516 cm.SubChans = copyMap(cn.subChans) 517 c.mu.RUnlock() 518 cm.ChannelData = cn.c.ChannelzMetric() 519 cm.ID = cn.id 520 cm.RefName = cn.refName 521 return cm 522 } 523 524 func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { 525 cm := &SubChannelMetric{} 526 var sc *subChannel 527 var ok bool 528 c.mu.RLock() 529 if sc, ok = c.subChannels[id]; !ok { 530 // subchannel with id doesn't exist. 531 c.mu.RUnlock() 532 return nil 533 } 534 cm.Sockets = copyMap(sc.sockets) 535 c.mu.RUnlock() 536 cm.ChannelData = sc.c.ChannelzMetric() 537 cm.ID = sc.id 538 cm.RefName = sc.refName 539 return cm 540 } 541 542 func (c *channelMap) GetSocket(id int64) *SocketMetric { 543 sm := &SocketMetric{} 544 c.mu.RLock() 545 if ls, ok := c.listenSockets[id]; ok { 546 c.mu.RUnlock() 547 sm.SocketData = ls.s.ChannelzMetric() 548 sm.ID = ls.id 549 sm.RefName = ls.refName 550 return sm 551 } 552 if ns, ok := c.normalSockets[id]; ok { 553 c.mu.RUnlock() 554 sm.SocketData = ns.s.ChannelzMetric() 555 sm.ID = ns.id 556 sm.RefName = ns.refName 557 return sm 558 } 559 c.mu.RUnlock() 560 return nil 561 } 562 563 type idGenerator struct { 564 id int64 565 } 566 567 func (i *idGenerator) reset() { 568 atomic.StoreInt64(&i.id, 0) 569 } 570 571 func (i *idGenerator) genID() int64 { 572 return atomic.AddInt64(&i.id, 1) 573 } 574