Home | History | Annotate | Download | only in textproto
      1 // Copyright 2010 The Go Authors.  All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package textproto
      6 
      7 import (
      8 	"sync"
      9 )
     10 
     11 // A Pipeline manages a pipelined in-order request/response sequence.
     12 //
     13 // To use a Pipeline p to manage multiple clients on a connection,
     14 // each client should run:
     15 //
     16 //	id := p.Next()	// take a number
     17 //
     18 //	p.StartRequest(id)	// wait for turn to send request
     19 //	send request
     20 //	p.EndRequest(id)	// notify Pipeline that request is sent
     21 //
     22 //	p.StartResponse(id)	// wait for turn to read response
     23 //	read response
     24 //	p.EndResponse(id)	// notify Pipeline that response is read
     25 //
     26 // A pipelined server can use the same calls to ensure that
     27 // responses computed in parallel are written in the correct order.
     28 type Pipeline struct {
     29 	mu       sync.Mutex
     30 	id       uint
     31 	request  sequencer
     32 	response sequencer
     33 }
     34 
     35 // Next returns the next id for a request/response pair.
     36 func (p *Pipeline) Next() uint {
     37 	p.mu.Lock()
     38 	id := p.id
     39 	p.id++
     40 	p.mu.Unlock()
     41 	return id
     42 }
     43 
     44 // StartRequest blocks until it is time to send (or, if this is a server, receive)
     45 // the request with the given id.
     46 func (p *Pipeline) StartRequest(id uint) {
     47 	p.request.Start(id)
     48 }
     49 
     50 // EndRequest notifies p that the request with the given id has been sent
     51 // (or, if this is a server, received).
     52 func (p *Pipeline) EndRequest(id uint) {
     53 	p.request.End(id)
     54 }
     55 
     56 // StartResponse blocks until it is time to receive (or, if this is a server, send)
     57 // the request with the given id.
     58 func (p *Pipeline) StartResponse(id uint) {
     59 	p.response.Start(id)
     60 }
     61 
     62 // EndResponse notifies p that the response with the given id has been received
     63 // (or, if this is a server, sent).
     64 func (p *Pipeline) EndResponse(id uint) {
     65 	p.response.End(id)
     66 }
     67 
     68 // A sequencer schedules a sequence of numbered events that must
     69 // happen in order, one after the other.  The event numbering must start
     70 // at 0 and increment without skipping.  The event number wraps around
     71 // safely as long as there are not 2^32 simultaneous events pending.
     72 type sequencer struct {
     73 	mu   sync.Mutex
     74 	id   uint
     75 	wait map[uint]chan uint
     76 }
     77 
     78 // Start waits until it is time for the event numbered id to begin.
     79 // That is, except for the first event, it waits until End(id-1) has
     80 // been called.
     81 func (s *sequencer) Start(id uint) {
     82 	s.mu.Lock()
     83 	if s.id == id {
     84 		s.mu.Unlock()
     85 		return
     86 	}
     87 	c := make(chan uint)
     88 	if s.wait == nil {
     89 		s.wait = make(map[uint]chan uint)
     90 	}
     91 	s.wait[id] = c
     92 	s.mu.Unlock()
     93 	<-c
     94 }
     95 
     96 // End notifies the sequencer that the event numbered id has completed,
     97 // allowing it to schedule the event numbered id+1.  It is a run-time error
     98 // to call End with an id that is not the number of the active event.
     99 func (s *sequencer) End(id uint) {
    100 	s.mu.Lock()
    101 	if s.id != id {
    102 		panic("out of sync")
    103 	}
    104 	id++
    105 	s.id = id
    106 	if s.wait == nil {
    107 		s.wait = make(map[uint]chan uint)
    108 	}
    109 	c, ok := s.wait[id]
    110 	if ok {
    111 		delete(s.wait, id)
    112 	}
    113 	s.mu.Unlock()
    114 	if ok {
    115 		c <- 1
    116 	}
    117 }
    118