udp.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. // Copyright 2017 fatedier, [email protected]
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package udp
  15. import (
  16. "encoding/base64"
  17. "net"
  18. "sync"
  19. "time"
  20. "github.com/fatedier/frp/pkg/msg"
  21. "github.com/fatedier/golib/errors"
  22. "github.com/fatedier/golib/pool"
  23. )
  24. func NewUDPPacket(buf []byte, laddr, raddr *net.UDPAddr) *msg.UDPPacket {
  25. return &msg.UDPPacket{
  26. Content: base64.StdEncoding.EncodeToString(buf),
  27. LocalAddr: laddr,
  28. RemoteAddr: raddr,
  29. }
  30. }
  31. func GetContent(m *msg.UDPPacket) (buf []byte, err error) {
  32. buf, err = base64.StdEncoding.DecodeString(m.Content)
  33. return
  34. }
  35. func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh chan<- *msg.UDPPacket, bufSize int) {
  36. // read
  37. go func() {
  38. for udpMsg := range readCh {
  39. buf, err := GetContent(udpMsg)
  40. if err != nil {
  41. continue
  42. }
  43. udpConn.WriteToUDP(buf, udpMsg.RemoteAddr)
  44. }
  45. }()
  46. // write
  47. buf := pool.GetBuf(bufSize)
  48. defer pool.PutBuf(buf)
  49. for {
  50. n, remoteAddr, err := udpConn.ReadFromUDP(buf)
  51. if err != nil {
  52. return
  53. }
  54. // buf[:n] will be encoded to string, so the bytes can be reused
  55. udpMsg := NewUDPPacket(buf[:n], nil, remoteAddr)
  56. select {
  57. case sendCh <- udpMsg:
  58. default:
  59. }
  60. }
  61. }
  62. func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UDPPacket, sendCh chan<- msg.Message, bufSize int) {
  63. var (
  64. mu sync.RWMutex
  65. )
  66. udpConnMap := make(map[string]*net.UDPConn)
  67. // read from dstAddr and write to sendCh
  68. writerFn := func(raddr *net.UDPAddr, udpConn *net.UDPConn) {
  69. addr := raddr.String()
  70. defer func() {
  71. mu.Lock()
  72. delete(udpConnMap, addr)
  73. mu.Unlock()
  74. udpConn.Close()
  75. }()
  76. buf := pool.GetBuf(bufSize)
  77. for {
  78. udpConn.SetReadDeadline(time.Now().Add(30 * time.Second))
  79. n, _, err := udpConn.ReadFromUDP(buf)
  80. if err != nil {
  81. return
  82. }
  83. udpMsg := NewUDPPacket(buf[:n], nil, raddr)
  84. if err = errors.PanicToError(func() {
  85. select {
  86. case sendCh <- udpMsg:
  87. default:
  88. }
  89. }); err != nil {
  90. return
  91. }
  92. }
  93. }
  94. // read from readCh
  95. go func() {
  96. for udpMsg := range readCh {
  97. buf, err := GetContent(udpMsg)
  98. if err != nil {
  99. continue
  100. }
  101. mu.Lock()
  102. udpConn, ok := udpConnMap[udpMsg.RemoteAddr.String()]
  103. if !ok {
  104. udpConn, err = net.DialUDP("udp", nil, dstAddr)
  105. if err != nil {
  106. mu.Unlock()
  107. continue
  108. }
  109. udpConnMap[udpMsg.RemoteAddr.String()] = udpConn
  110. }
  111. mu.Unlock()
  112. _, err = udpConn.Write(buf)
  113. if err != nil {
  114. udpConn.Close()
  115. }
  116. if !ok {
  117. go writerFn(udpMsg.RemoteAddr, udpConn)
  118. }
  119. }
  120. }()
  121. }