123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- // Copyright 2017 fatedier, [email protected]
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package udp
- import (
- "encoding/base64"
- "net"
- "sync"
- "time"
- "github.com/fatedier/frp/pkg/msg"
- "github.com/fatedier/golib/errors"
- "github.com/fatedier/golib/pool"
- )
- func NewUDPPacket(buf []byte, laddr, raddr *net.UDPAddr) *msg.UDPPacket {
- return &msg.UDPPacket{
- Content: base64.StdEncoding.EncodeToString(buf),
- LocalAddr: laddr,
- RemoteAddr: raddr,
- }
- }
- func GetContent(m *msg.UDPPacket) (buf []byte, err error) {
- buf, err = base64.StdEncoding.DecodeString(m.Content)
- return
- }
- func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh chan<- *msg.UDPPacket, bufSize int) {
- // read
- go func() {
- for udpMsg := range readCh {
- buf, err := GetContent(udpMsg)
- if err != nil {
- continue
- }
- udpConn.WriteToUDP(buf, udpMsg.RemoteAddr)
- }
- }()
- // write
- buf := pool.GetBuf(bufSize)
- defer pool.PutBuf(buf)
- for {
- n, remoteAddr, err := udpConn.ReadFromUDP(buf)
- if err != nil {
- return
- }
- // buf[:n] will be encoded to string, so the bytes can be reused
- udpMsg := NewUDPPacket(buf[:n], nil, remoteAddr)
- select {
- case sendCh <- udpMsg:
- default:
- }
- }
- }
- func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UDPPacket, sendCh chan<- msg.Message, bufSize int) {
- var (
- mu sync.RWMutex
- )
- udpConnMap := make(map[string]*net.UDPConn)
- // read from dstAddr and write to sendCh
- writerFn := func(raddr *net.UDPAddr, udpConn *net.UDPConn) {
- addr := raddr.String()
- defer func() {
- mu.Lock()
- delete(udpConnMap, addr)
- mu.Unlock()
- udpConn.Close()
- }()
- buf := pool.GetBuf(bufSize)
- for {
- udpConn.SetReadDeadline(time.Now().Add(30 * time.Second))
- n, _, err := udpConn.ReadFromUDP(buf)
- if err != nil {
- return
- }
- udpMsg := NewUDPPacket(buf[:n], nil, raddr)
- if err = errors.PanicToError(func() {
- select {
- case sendCh <- udpMsg:
- default:
- }
- }); err != nil {
- return
- }
- }
- }
- // read from readCh
- go func() {
- for udpMsg := range readCh {
- buf, err := GetContent(udpMsg)
- if err != nil {
- continue
- }
- mu.Lock()
- udpConn, ok := udpConnMap[udpMsg.RemoteAddr.String()]
- if !ok {
- udpConn, err = net.DialUDP("udp", nil, dstAddr)
- if err != nil {
- mu.Unlock()
- continue
- }
- udpConnMap[udpMsg.RemoteAddr.String()] = udpConn
- }
- mu.Unlock()
- _, err = udpConn.Write(buf)
- if err != nil {
- udpConn.Close()
- }
- if !ok {
- go writerFn(udpMsg.RemoteAddr, udpConn)
- }
- }
- }()
- }
|