Home | History | Annotate | Download | only in runner
      1 // Copyright 2014 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package runner
      6 
      7 import (
      8 	"encoding/binary"
      9 	"fmt"
     10 	"io"
     11 	"net"
     12 	"time"
     13 )
     14 
     15 // opcodePacket signals a packet, encoded with a 32-bit length prefix, followed
     16 // by the payload.
     17 const opcodePacket = byte('P')
     18 
     19 // opcodeTimeout signals a read timeout, encoded by a 64-bit number of
     20 // nanoseconds. On receipt, the peer should reply with
     21 // opcodeTimeoutAck. opcodeTimeout may only be sent by the Go side.
     22 const opcodeTimeout = byte('T')
     23 
     24 // opcodeTimeoutAck acknowledges a read timeout. This opcode has no payload and
     25 // may only be sent by the C side. Timeout ACKs act as a synchronization point
     26 // at the timeout, to bracket one flight of messages from C.
     27 const opcodeTimeoutAck = byte('t')
     28 
     29 type packetAdaptor struct {
     30 	net.Conn
     31 	debug *recordingConn
     32 }
     33 
     34 // newPacketAdaptor wraps a reliable streaming net.Conn into a reliable
     35 // packet-based net.Conn. The stream contains packets and control commands,
     36 // distinguished by a one byte opcode.
     37 func newPacketAdaptor(conn net.Conn) *packetAdaptor {
     38 	return &packetAdaptor{conn, nil}
     39 }
     40 
     41 func (p *packetAdaptor) log(message string, data []byte) {
     42 	if p.debug == nil {
     43 		return
     44 	}
     45 
     46 	p.debug.LogSpecial(message, data)
     47 }
     48 
     49 func (p *packetAdaptor) readOpcode() (byte, error) {
     50 	out := make([]byte, 1)
     51 	if _, err := io.ReadFull(p.Conn, out); err != nil {
     52 		return 0, err
     53 	}
     54 	return out[0], nil
     55 }
     56 
     57 func (p *packetAdaptor) readPacketBody() ([]byte, error) {
     58 	var length uint32
     59 	if err := binary.Read(p.Conn, binary.BigEndian, &length); err != nil {
     60 		return nil, err
     61 	}
     62 	out := make([]byte, length)
     63 	if _, err := io.ReadFull(p.Conn, out); err != nil {
     64 		return nil, err
     65 	}
     66 	return out, nil
     67 }
     68 
     69 func (p *packetAdaptor) Read(b []byte) (int, error) {
     70 	opcode, err := p.readOpcode()
     71 	if err != nil {
     72 		return 0, err
     73 	}
     74 	if opcode != opcodePacket {
     75 		return 0, fmt.Errorf("unexpected opcode '%d'", opcode)
     76 	}
     77 	out, err := p.readPacketBody()
     78 	if err != nil {
     79 		return 0, err
     80 	}
     81 	return copy(b, out), nil
     82 }
     83 
     84 func (p *packetAdaptor) Write(b []byte) (int, error) {
     85 	payload := make([]byte, 1+4+len(b))
     86 	payload[0] = opcodePacket
     87 	binary.BigEndian.PutUint32(payload[1:5], uint32(len(b)))
     88 	copy(payload[5:], b)
     89 	if _, err := p.Conn.Write(payload); err != nil {
     90 		return 0, err
     91 	}
     92 	return len(b), nil
     93 }
     94 
     95 // SendReadTimeout instructs the peer to simulate a read timeout. It then waits
     96 // for acknowledgement of the timeout, buffering any packets received since
     97 // then. The packets are then returned.
     98 func (p *packetAdaptor) SendReadTimeout(d time.Duration) ([][]byte, error) {
     99 	p.log("Simulating read timeout: "+d.String(), nil)
    100 
    101 	payload := make([]byte, 1+8)
    102 	payload[0] = opcodeTimeout
    103 	binary.BigEndian.PutUint64(payload[1:], uint64(d.Nanoseconds()))
    104 	if _, err := p.Conn.Write(payload); err != nil {
    105 		return nil, err
    106 	}
    107 
    108 	var packets [][]byte
    109 	for {
    110 		opcode, err := p.readOpcode()
    111 		if err != nil {
    112 			return nil, err
    113 		}
    114 		switch opcode {
    115 		case opcodeTimeoutAck:
    116 			p.log("Received timeout ACK", nil)
    117 			// Done! Return the packets buffered and continue.
    118 			return packets, nil
    119 		case opcodePacket:
    120 			// Buffer the packet for the caller to process.
    121 			packet, err := p.readPacketBody()
    122 			if err != nil {
    123 				return nil, err
    124 			}
    125 			p.log("Simulating dropped packet", packet)
    126 			packets = append(packets, packet)
    127 		default:
    128 			return nil, fmt.Errorf("unexpected opcode '%d'", opcode)
    129 		}
    130 	}
    131 }
    132 
    133 type replayAdaptor struct {
    134 	net.Conn
    135 	prevWrite []byte
    136 }
    137 
    138 // newReplayAdaptor wraps a packeted net.Conn. It transforms it into
    139 // one which, after writing a packet, always replays the previous
    140 // write.
    141 func newReplayAdaptor(conn net.Conn) net.Conn {
    142 	return &replayAdaptor{Conn: conn}
    143 }
    144 
    145 func (r *replayAdaptor) Write(b []byte) (int, error) {
    146 	n, err := r.Conn.Write(b)
    147 
    148 	// Replay the previous packet and save the current one to
    149 	// replay next.
    150 	if r.prevWrite != nil {
    151 		r.Conn.Write(r.prevWrite)
    152 	}
    153 	r.prevWrite = append(r.prevWrite[:0], b...)
    154 
    155 	return n, err
    156 }
    157 
    158 type damageAdaptor struct {
    159 	net.Conn
    160 	damage bool
    161 }
    162 
    163 // newDamageAdaptor wraps a packeted net.Conn. It transforms it into one which
    164 // optionally damages the final byte of every Write() call.
    165 func newDamageAdaptor(conn net.Conn) *damageAdaptor {
    166 	return &damageAdaptor{Conn: conn}
    167 }
    168 
    169 func (d *damageAdaptor) setDamage(damage bool) {
    170 	d.damage = damage
    171 }
    172 
    173 func (d *damageAdaptor) Write(b []byte) (int, error) {
    174 	if d.damage && len(b) > 0 {
    175 		b = append([]byte{}, b...)
    176 		b[len(b)-1]++
    177 	}
    178 	return d.Conn.Write(b)
    179 }
    180