Home | History | Annotate | Download | only in channelz
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 // Package channelz defines APIs for enabling channelz service, entry
     20 // registration/deletion, and accessing channelz data. It also defines channelz
     21 // metric struct formats.
     22 //
     23 // All APIs in this package are experimental.
     24 package channelz
     25 
     26 import (
     27 	"sort"
     28 	"sync"
     29 	"sync/atomic"
     30 
     31 	"google.golang.org/grpc/grpclog"
     32 )
     33 
     34 var (
     35 	db    dbWrapper
     36 	idGen idGenerator
     37 	// EntryPerPage defines the number of channelz entries to be shown on a web page.
     38 	EntryPerPage = 50
     39 	curState     int32
     40 )
     41 
     42 // TurnOn turns on channelz data collection.
     43 func TurnOn() {
     44 	if !IsOn() {
     45 		NewChannelzStorage()
     46 		atomic.StoreInt32(&curState, 1)
     47 	}
     48 }
     49 
     50 // IsOn returns whether channelz data collection is on.
     51 func IsOn() bool {
     52 	return atomic.CompareAndSwapInt32(&curState, 1, 1)
     53 }
     54 
     55 // dbWarpper wraps around a reference to internal channelz data storage, and
     56 // provide synchronized functionality to set and get the reference.
     57 type dbWrapper struct {
     58 	mu sync.RWMutex
     59 	DB *channelMap
     60 }
     61 
     62 func (d *dbWrapper) set(db *channelMap) {
     63 	d.mu.Lock()
     64 	d.DB = db
     65 	d.mu.Unlock()
     66 }
     67 
     68 func (d *dbWrapper) get() *channelMap {
     69 	d.mu.RLock()
     70 	defer d.mu.RUnlock()
     71 	return d.DB
     72 }
     73 
     74 // NewChannelzStorage initializes channelz data storage and id generator.
     75 //
     76 // Note: This function is exported for testing purpose only. User should not call
     77 // it in most cases.
     78 func NewChannelzStorage() {
     79 	db.set(&channelMap{
     80 		topLevelChannels: make(map[int64]struct{}),
     81 		channels:         make(map[int64]*channel),
     82 		listenSockets:    make(map[int64]*listenSocket),
     83 		normalSockets:    make(map[int64]*normalSocket),
     84 		servers:          make(map[int64]*server),
     85 		subChannels:      make(map[int64]*subChannel),
     86 	})
     87 	idGen.reset()
     88 }
     89 
     90 // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
     91 // boolean indicating whether there's more top channels to be queried for.
     92 //
     93 // The arg id specifies that only top channel with id at or above it will be included
     94 // in the result. The returned slice is up to a length of EntryPerPage, and is
     95 // sorted in ascending id order.
     96 func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
     97 	return db.get().GetTopChannels(id)
     98 }
     99 
    100 // GetServers returns a slice of server's ServerMetric, along with a
    101 // boolean indicating whether there's more servers to be queried for.
    102 //
    103 // The arg id specifies that only server with id at or above it will be included
    104 // in the result. The returned slice is up to a length of EntryPerPage, and is
    105 // sorted in ascending id order.
    106 func GetServers(id int64) ([]*ServerMetric, bool) {
    107 	return db.get().GetServers(id)
    108 }
    109 
    110 // GetServerSockets returns a slice of server's (identified by id) normal socket's
    111 // SocketMetric, along with a boolean indicating whether there's more sockets to
    112 // be queried for.
    113 //
    114 // The arg startID specifies that only sockets with id at or above it will be
    115 // included in the result. The returned slice is up to a length of EntryPerPage,
    116 // and is sorted in ascending id order.
    117 func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
    118 	return db.get().GetServerSockets(id, startID)
    119 }
    120 
    121 // GetChannel returns the ChannelMetric for the channel (identified by id).
    122 func GetChannel(id int64) *ChannelMetric {
    123 	return db.get().GetChannel(id)
    124 }
    125 
    126 // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
    127 func GetSubChannel(id int64) *SubChannelMetric {
    128 	return db.get().GetSubChannel(id)
    129 }
    130 
    131 // GetSocket returns the SocketInternalMetric for the socket (identified by id).
    132 func GetSocket(id int64) *SocketMetric {
    133 	return db.get().GetSocket(id)
    134 }
    135 
    136 // RegisterChannel registers the given channel c in channelz database with ref
    137 // as its reference name, and add it to the child list of its parent (identified
    138 // by pid). pid = 0 means no parent. It returns the unique channelz tracking id
    139 // assigned to this channel.
    140 func RegisterChannel(c Channel, pid int64, ref string) int64 {
    141 	id := idGen.genID()
    142 	cn := &channel{
    143 		refName:     ref,
    144 		c:           c,
    145 		subChans:    make(map[int64]string),
    146 		nestedChans: make(map[int64]string),
    147 		id:          id,
    148 		pid:         pid,
    149 	}
    150 	if pid == 0 {
    151 		db.get().addChannel(id, cn, true, pid, ref)
    152 	} else {
    153 		db.get().addChannel(id, cn, false, pid, ref)
    154 	}
    155 	return id
    156 }
    157 
    158 // RegisterSubChannel registers the given channel c in channelz database with ref
    159 // as its reference name, and add it to the child list of its parent (identified
    160 // by pid). It returns the unique channelz tracking id assigned to this subchannel.
    161 func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
    162 	if pid == 0 {
    163 		grpclog.Error("a SubChannel's parent id cannot be 0")
    164 		return 0
    165 	}
    166 	id := idGen.genID()
    167 	sc := &subChannel{
    168 		refName: ref,
    169 		c:       c,
    170 		sockets: make(map[int64]string),
    171 		id:      id,
    172 		pid:     pid,
    173 	}
    174 	db.get().addSubChannel(id, sc, pid, ref)
    175 	return id
    176 }
    177 
    178 // RegisterServer registers the given server s in channelz database. It returns
    179 // the unique channelz tracking id assigned to this server.
    180 func RegisterServer(s Server, ref string) int64 {
    181 	id := idGen.genID()
    182 	svr := &server{
    183 		refName:       ref,
    184 		s:             s,
    185 		sockets:       make(map[int64]string),
    186 		listenSockets: make(map[int64]string),
    187 		id:            id,
    188 	}
    189 	db.get().addServer(id, svr)
    190 	return id
    191 }
    192 
    193 // RegisterListenSocket registers the given listen socket s in channelz database
    194 // with ref as its reference name, and add it to the child list of its parent
    195 // (identified by pid). It returns the unique channelz tracking id assigned to
    196 // this listen socket.
    197 func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
    198 	if pid == 0 {
    199 		grpclog.Error("a ListenSocket's parent id cannot be 0")
    200 		return 0
    201 	}
    202 	id := idGen.genID()
    203 	ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
    204 	db.get().addListenSocket(id, ls, pid, ref)
    205 	return id
    206 }
    207 
    208 // RegisterNormalSocket registers the given normal socket s in channelz database
    209 // with ref as its reference name, and add it to the child list of its parent
    210 // (identified by pid). It returns the unique channelz tracking id assigned to
    211 // this normal socket.
    212 func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
    213 	if pid == 0 {
    214 		grpclog.Error("a NormalSocket's parent id cannot be 0")
    215 		return 0
    216 	}
    217 	id := idGen.genID()
    218 	ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
    219 	db.get().addNormalSocket(id, ns, pid, ref)
    220 	return id
    221 }
    222 
    223 // RemoveEntry removes an entry with unique channelz trakcing id to be id from
    224 // channelz database.
    225 func RemoveEntry(id int64) {
    226 	db.get().removeEntry(id)
    227 }
    228 
    229 // channelMap is the storage data structure for channelz.
    230 // Methods of channelMap can be divided in two two categories with respect to locking.
    231 // 1. Methods acquire the global lock.
    232 // 2. Methods that can only be called when global lock is held.
    233 // A second type of method need always to be called inside a first type of method.
    234 type channelMap struct {
    235 	mu               sync.RWMutex
    236 	topLevelChannels map[int64]struct{}
    237 	servers          map[int64]*server
    238 	channels         map[int64]*channel
    239 	subChannels      map[int64]*subChannel
    240 	listenSockets    map[int64]*listenSocket
    241 	normalSockets    map[int64]*normalSocket
    242 }
    243 
    244 func (c *channelMap) addServer(id int64, s *server) {
    245 	c.mu.Lock()
    246 	s.cm = c
    247 	c.servers[id] = s
    248 	c.mu.Unlock()
    249 }
    250 
    251 func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
    252 	c.mu.Lock()
    253 	cn.cm = c
    254 	c.channels[id] = cn
    255 	if isTopChannel {
    256 		c.topLevelChannels[id] = struct{}{}
    257 	} else {
    258 		c.findEntry(pid).addChild(id, cn)
    259 	}
    260 	c.mu.Unlock()
    261 }
    262 
    263 func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
    264 	c.mu.Lock()
    265 	sc.cm = c
    266 	c.subChannels[id] = sc
    267 	c.findEntry(pid).addChild(id, sc)
    268 	c.mu.Unlock()
    269 }
    270 
    271 func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
    272 	c.mu.Lock()
    273 	ls.cm = c
    274 	c.listenSockets[id] = ls
    275 	c.findEntry(pid).addChild(id, ls)
    276 	c.mu.Unlock()
    277 }
    278 
    279 func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
    280 	c.mu.Lock()
    281 	ns.cm = c
    282 	c.normalSockets[id] = ns
    283 	c.findEntry(pid).addChild(id, ns)
    284 	c.mu.Unlock()
    285 }
    286 
    287 // removeEntry triggers the removal of an entry, which may not indeed delete the
    288 // entry, if it has to wait on the deletion of its children, or may lead to a chain
    289 // of entry deletion. For example, deleting the last socket of a gracefully shutting
    290 // down server will lead to the server being also deleted.
    291 func (c *channelMap) removeEntry(id int64) {
    292 	c.mu.Lock()
    293 	c.findEntry(id).triggerDelete()
    294 	c.mu.Unlock()
    295 }
    296 
    297 // c.mu must be held by the caller.
    298 func (c *channelMap) findEntry(id int64) entry {
    299 	var v entry
    300 	var ok bool
    301 	if v, ok = c.channels[id]; ok {
    302 		return v
    303 	}
    304 	if v, ok = c.subChannels[id]; ok {
    305 		return v
    306 	}
    307 	if v, ok = c.servers[id]; ok {
    308 		return v
    309 	}
    310 	if v, ok = c.listenSockets[id]; ok {
    311 		return v
    312 	}
    313 	if v, ok = c.normalSockets[id]; ok {
    314 		return v
    315 	}
    316 	return &dummyEntry{idNotFound: id}
    317 }
    318 
    319 // c.mu must be held by the caller
    320 // deleteEntry simply deletes an entry from the channelMap. Before calling this
    321 // method, caller must check this entry is ready to be deleted, i.e removeEntry()
    322 // has been called on it, and no children still exist.
    323 // Conditionals are ordered by the expected frequency of deletion of each entity
    324 // type, in order to optimize performance.
    325 func (c *channelMap) deleteEntry(id int64) {
    326 	var ok bool
    327 	if _, ok = c.normalSockets[id]; ok {
    328 		delete(c.normalSockets, id)
    329 		return
    330 	}
    331 	if _, ok = c.subChannels[id]; ok {
    332 		delete(c.subChannels, id)
    333 		return
    334 	}
    335 	if _, ok = c.channels[id]; ok {
    336 		delete(c.channels, id)
    337 		delete(c.topLevelChannels, id)
    338 		return
    339 	}
    340 	if _, ok = c.listenSockets[id]; ok {
    341 		delete(c.listenSockets, id)
    342 		return
    343 	}
    344 	if _, ok = c.servers[id]; ok {
    345 		delete(c.servers, id)
    346 		return
    347 	}
    348 }
    349 
    350 type int64Slice []int64
    351 
    352 func (s int64Slice) Len() int           { return len(s) }
    353 func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
    354 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
    355 
    356 func copyMap(m map[int64]string) map[int64]string {
    357 	n := make(map[int64]string)
    358 	for k, v := range m {
    359 		n[k] = v
    360 	}
    361 	return n
    362 }
    363 
    364 func min(a, b int) int {
    365 	if a < b {
    366 		return a
    367 	}
    368 	return b
    369 }
    370 
    371 func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
    372 	c.mu.RLock()
    373 	l := len(c.topLevelChannels)
    374 	ids := make([]int64, 0, l)
    375 	cns := make([]*channel, 0, min(l, EntryPerPage))
    376 
    377 	for k := range c.topLevelChannels {
    378 		ids = append(ids, k)
    379 	}
    380 	sort.Sort(int64Slice(ids))
    381 	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
    382 	count := 0
    383 	var end bool
    384 	var t []*ChannelMetric
    385 	for i, v := range ids[idx:] {
    386 		if count == EntryPerPage {
    387 			break
    388 		}
    389 		if cn, ok := c.channels[v]; ok {
    390 			cns = append(cns, cn)
    391 			t = append(t, &ChannelMetric{
    392 				NestedChans: copyMap(cn.nestedChans),
    393 				SubChans:    copyMap(cn.subChans),
    394 			})
    395 			count++
    396 		}
    397 		if i == len(ids[idx:])-1 {
    398 			end = true
    399 			break
    400 		}
    401 	}
    402 	c.mu.RUnlock()
    403 	if count == 0 {
    404 		end = true
    405 	}
    406 
    407 	for i, cn := range cns {
    408 		t[i].ChannelData = cn.c.ChannelzMetric()
    409 		t[i].ID = cn.id
    410 		t[i].RefName = cn.refName
    411 	}
    412 	return t, end
    413 }
    414 
    415 func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
    416 	c.mu.RLock()
    417 	l := len(c.servers)
    418 	ids := make([]int64, 0, l)
    419 	ss := make([]*server, 0, min(l, EntryPerPage))
    420 	for k := range c.servers {
    421 		ids = append(ids, k)
    422 	}
    423 	sort.Sort(int64Slice(ids))
    424 	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
    425 	count := 0
    426 	var end bool
    427 	var s []*ServerMetric
    428 	for i, v := range ids[idx:] {
    429 		if count == EntryPerPage {
    430 			break
    431 		}
    432 		if svr, ok := c.servers[v]; ok {
    433 			ss = append(ss, svr)
    434 			s = append(s, &ServerMetric{
    435 				ListenSockets: copyMap(svr.listenSockets),
    436 			})
    437 			count++
    438 		}
    439 		if i == len(ids[idx:])-1 {
    440 			end = true
    441 			break
    442 		}
    443 	}
    444 	c.mu.RUnlock()
    445 	if count == 0 {
    446 		end = true
    447 	}
    448 
    449 	for i, svr := range ss {
    450 		s[i].ServerData = svr.s.ChannelzMetric()
    451 		s[i].ID = svr.id
    452 		s[i].RefName = svr.refName
    453 	}
    454 	return s, end
    455 }
    456 
    457 func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
    458 	var svr *server
    459 	var ok bool
    460 	c.mu.RLock()
    461 	if svr, ok = c.servers[id]; !ok {
    462 		// server with id doesn't exist.
    463 		c.mu.RUnlock()
    464 		return nil, true
    465 	}
    466 	svrskts := svr.sockets
    467 	l := len(svrskts)
    468 	ids := make([]int64, 0, l)
    469 	sks := make([]*normalSocket, 0, min(l, EntryPerPage))
    470 	for k := range svrskts {
    471 		ids = append(ids, k)
    472 	}
    473 	sort.Sort((int64Slice(ids)))
    474 	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
    475 	count := 0
    476 	var end bool
    477 	for i, v := range ids[idx:] {
    478 		if count == EntryPerPage {
    479 			break
    480 		}
    481 		if ns, ok := c.normalSockets[v]; ok {
    482 			sks = append(sks, ns)
    483 			count++
    484 		}
    485 		if i == len(ids[idx:])-1 {
    486 			end = true
    487 			break
    488 		}
    489 	}
    490 	c.mu.RUnlock()
    491 	if count == 0 {
    492 		end = true
    493 	}
    494 	var s []*SocketMetric
    495 	for _, ns := range sks {
    496 		sm := &SocketMetric{}
    497 		sm.SocketData = ns.s.ChannelzMetric()
    498 		sm.ID = ns.id
    499 		sm.RefName = ns.refName
    500 		s = append(s, sm)
    501 	}
    502 	return s, end
    503 }
    504 
    505 func (c *channelMap) GetChannel(id int64) *ChannelMetric {
    506 	cm := &ChannelMetric{}
    507 	var cn *channel
    508 	var ok bool
    509 	c.mu.RLock()
    510 	if cn, ok = c.channels[id]; !ok {
    511 		// channel with id doesn't exist.
    512 		c.mu.RUnlock()
    513 		return nil
    514 	}
    515 	cm.NestedChans = copyMap(cn.nestedChans)
    516 	cm.SubChans = copyMap(cn.subChans)
    517 	c.mu.RUnlock()
    518 	cm.ChannelData = cn.c.ChannelzMetric()
    519 	cm.ID = cn.id
    520 	cm.RefName = cn.refName
    521 	return cm
    522 }
    523 
    524 func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
    525 	cm := &SubChannelMetric{}
    526 	var sc *subChannel
    527 	var ok bool
    528 	c.mu.RLock()
    529 	if sc, ok = c.subChannels[id]; !ok {
    530 		// subchannel with id doesn't exist.
    531 		c.mu.RUnlock()
    532 		return nil
    533 	}
    534 	cm.Sockets = copyMap(sc.sockets)
    535 	c.mu.RUnlock()
    536 	cm.ChannelData = sc.c.ChannelzMetric()
    537 	cm.ID = sc.id
    538 	cm.RefName = sc.refName
    539 	return cm
    540 }
    541 
    542 func (c *channelMap) GetSocket(id int64) *SocketMetric {
    543 	sm := &SocketMetric{}
    544 	c.mu.RLock()
    545 	if ls, ok := c.listenSockets[id]; ok {
    546 		c.mu.RUnlock()
    547 		sm.SocketData = ls.s.ChannelzMetric()
    548 		sm.ID = ls.id
    549 		sm.RefName = ls.refName
    550 		return sm
    551 	}
    552 	if ns, ok := c.normalSockets[id]; ok {
    553 		c.mu.RUnlock()
    554 		sm.SocketData = ns.s.ChannelzMetric()
    555 		sm.ID = ns.id
    556 		sm.RefName = ns.refName
    557 		return sm
    558 	}
    559 	c.mu.RUnlock()
    560 	return nil
    561 }
    562 
    563 type idGenerator struct {
    564 	id int64
    565 }
    566 
    567 func (i *idGenerator) reset() {
    568 	atomic.StoreInt64(&i.id, 0)
    569 }
    570 
    571 func (i *idGenerator) genID() int64 {
    572 	return atomic.AddInt64(&i.id, 1)
    573 }
    574