Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 
 
 

167 řádky
4.2 KiB

  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package main
  5. import (
  6. "encoding/binary"
  7. "fmt"
  8. "io"
  9. "net"
  10. "time"
  11. )
  12. // opcodePacket signals a packet, encoded with a 32-bit length prefix, followed
  13. // by the payload.
  14. const opcodePacket = byte('P')
  15. // opcodeTimeout signals a read timeout, encoded by a 64-bit number of
  16. // nanoseconds. On receipt, the peer should reply with
  17. // opcodeTimeoutAck. opcodeTimeout may only be sent by the Go side.
  18. const opcodeTimeout = byte('T')
  19. // opcodeTimeoutAck acknowledges a read timeout. This opcode has no payload and
  20. // may only be sent by the C side. Timeout ACKs act as a synchronization point
  21. // at the timeout, to bracket one flight of messages from C.
  22. const opcodeTimeoutAck = byte('t')
  23. type packetAdaptor struct {
  24. net.Conn
  25. }
  26. // newPacketAdaptor wraps a reliable streaming net.Conn into a reliable
  27. // packet-based net.Conn. The stream contains packets and control commands,
  28. // distinguished by a one byte opcode.
  29. func newPacketAdaptor(conn net.Conn) *packetAdaptor {
  30. return &packetAdaptor{conn}
  31. }
  32. func (p *packetAdaptor) readOpcode() (byte, error) {
  33. out := make([]byte, 1)
  34. if _, err := io.ReadFull(p.Conn, out); err != nil {
  35. return 0, err
  36. }
  37. return out[0], nil
  38. }
  39. func (p *packetAdaptor) readPacketBody() ([]byte, error) {
  40. var length uint32
  41. if err := binary.Read(p.Conn, binary.BigEndian, &length); err != nil {
  42. return nil, err
  43. }
  44. out := make([]byte, length)
  45. if _, err := io.ReadFull(p.Conn, out); err != nil {
  46. return nil, err
  47. }
  48. return out, nil
  49. }
  50. func (p *packetAdaptor) Read(b []byte) (int, error) {
  51. opcode, err := p.readOpcode()
  52. if err != nil {
  53. return 0, err
  54. }
  55. if opcode != opcodePacket {
  56. return 0, fmt.Errorf("unexpected opcode '%s'", opcode)
  57. }
  58. out, err := p.readPacketBody()
  59. if err != nil {
  60. return 0, err
  61. }
  62. return copy(b, out), nil
  63. }
  64. func (p *packetAdaptor) Write(b []byte) (int, error) {
  65. payload := make([]byte, 1+4+len(b))
  66. payload[0] = opcodePacket
  67. binary.BigEndian.PutUint32(payload[1:5], uint32(len(b)))
  68. copy(payload[5:], b)
  69. if _, err := p.Conn.Write(payload); err != nil {
  70. return 0, err
  71. }
  72. return len(b), nil
  73. }
  74. // SendReadTimeout instructs the peer to simulate a read timeout. It then waits
  75. // for acknowledgement of the timeout, buffering any packets received since
  76. // then. The packets are then returned.
  77. func (p *packetAdaptor) SendReadTimeout(d time.Duration) ([][]byte, error) {
  78. payload := make([]byte, 1+8)
  79. payload[0] = opcodeTimeout
  80. binary.BigEndian.PutUint64(payload[1:], uint64(d.Nanoseconds()))
  81. if _, err := p.Conn.Write(payload); err != nil {
  82. return nil, err
  83. }
  84. packets := make([][]byte, 0)
  85. for {
  86. opcode, err := p.readOpcode()
  87. if err != nil {
  88. return nil, err
  89. }
  90. switch opcode {
  91. case opcodeTimeoutAck:
  92. // Done! Return the packets buffered and continue.
  93. return packets, nil
  94. case opcodePacket:
  95. // Buffer the packet for the caller to process.
  96. packet, err := p.readPacketBody()
  97. if err != nil {
  98. return nil, err
  99. }
  100. packets = append(packets, packet)
  101. default:
  102. return nil, fmt.Errorf("unexpected opcode '%s'", opcode)
  103. }
  104. }
  105. }
  106. type replayAdaptor struct {
  107. net.Conn
  108. prevWrite []byte
  109. }
  110. // newReplayAdaptor wraps a packeted net.Conn. It transforms it into
  111. // one which, after writing a packet, always replays the previous
  112. // write.
  113. func newReplayAdaptor(conn net.Conn) net.Conn {
  114. return &replayAdaptor{Conn: conn}
  115. }
  116. func (r *replayAdaptor) Write(b []byte) (int, error) {
  117. n, err := r.Conn.Write(b)
  118. // Replay the previous packet and save the current one to
  119. // replay next.
  120. if r.prevWrite != nil {
  121. r.Conn.Write(r.prevWrite)
  122. }
  123. r.prevWrite = append(r.prevWrite[:0], b...)
  124. return n, err
  125. }
  126. type damageAdaptor struct {
  127. net.Conn
  128. damage bool
  129. }
  130. // newDamageAdaptor wraps a packeted net.Conn. It transforms it into one which
  131. // optionally damages the final byte of every Write() call.
  132. func newDamageAdaptor(conn net.Conn) *damageAdaptor {
  133. return &damageAdaptor{Conn: conn}
  134. }
  135. func (d *damageAdaptor) setDamage(damage bool) {
  136. d.damage = damage
  137. }
  138. func (d *damageAdaptor) Write(b []byte) (int, error) {
  139. if d.damage && len(b) > 0 {
  140. b = append([]byte{}, b...)
  141. b[len(b)-1]++
  142. }
  143. return d.Conn.Write(b)
  144. }