frp v0.5.0 源码分析

frp是什么?引用官网的一句话就是:

frp 是一个专注于内网穿透的高性能的反向代理应用,支持 TCP、UDP、HTTP、HTTPS 等多种协议。可以将内网服务以安全、便捷的方式通过具有公网 IP 节点的中转暴露到公网。

概述

截止到目前为止,frp版本更新已经迭代到了 v0.45.0 ,加入了很多新功能。如果直接从最新版本阅读源码将会十分困难,因此为了学习frp最基本的实现过程,这里通过frp 0.5.0 版本来进行学习。因此下面的所有代码分析都是基于该版本的。

获取frp 0.5.0版本需要指定一个tag

1
git clone -b v0.5.0 https://github.com/fatedier/frp.git

下面是一个经典的内网穿透NAT模型,要实现内网穿透,肯定需要一台具有公网IP地址的服务器。

image-20220616142944246

该NAT模型主要分为以下几个部分:

  • 内网用户:用户所处的局域网环境,且能够访问互联网
  • 公网服务器:具备公网IP地址的服务器,frps程序运行在该环境
  • 内网客户端:位于另一端的局域网环境,也能够访问互联网,frpc程序运行在该环境

对于上面这三部分,在frp中就是:

  • user客户端和frps服务端建立连接
    • userTunnel 用于frps和user数据传输
  • frps服务端和frpc客户端建立连接
    • msgTunnel 控制消息传输隧道,用于处理消息控制传输而非实际数据转发
      • 向服务器发送心跳包,检测服务器是否关闭
      • 当用户访问服务器时,检查是否启动dataTunnel
      • token验证
    • dataTunnel 数据传输隧道,用于实际转发数据,这里又分为加密和不加密两种情况
      • 采用类似 io.Copy() 方式实时传输数据
  • user<->frps<->frpc,在frps实现关键点就是 userTunneldataTunnel 之间的数据传输

当然对于服务器Server来说需要通过msgTunnel 响应心跳包,除此之外当外部用户User访问服务器Server时,会通过msgTunnel通道传递消息以便通知Client启动dataTunnel进行数据传输 io.Copy(),接着由服务器将中转数据返回给User,最后关闭dataTunnel,但是需要注意msgTunnel会保留,还是会和Server建立连接。

总之就是frpc启动时需要和frps建立起msgTunnel,之后每当user访问frps时,建立userTunnel,并由frps通过msgTunnel通知frpc建立dataTunnel,然后由frps实现userTunnel和dataTunnel的数据转发。

最初版本的frp实现过程就是这么简单,下面对其进行源码解析,当前也会省略一些不必要的内容

源码分析

frps

frps 入口源码位于 https://github.com/fatedier/frp/blob/v0.5.0/src/frp/cmd/frps/main.gohttps://github.com/fatedier/frp/blob/v0.5.0/src/frp/cmd/frps/control.go

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
// ...
// 监听端口
l, err := conn.Listen(server.BindAddr, server.BindPort)
// ...

// create vhost if VhostHttpPort != 0
// 对于HTTP转发,需要特别处理,拆分TCP数据包
if server.VhostHttpPort != 0 {
vhostListener, err := conn.Listen(server.BindAddr, server.VhostHttpPort)
if err != nil {
os.Exit(1)
}
server.VhostMuxer, err = vhost.NewHttpMuxer(vhostListener, 30*time.Second)
if err != nil {
}
}

ProcessControlConn(l)
}

conn.Listen 返回一个 Listener 对象,实际上封装了 ListenAcceptClose 等函数。

1
2
3
4
5
6
type Listener struct {
addr net.Addr
l *net.TCPListener
accept chan *Conn
closeFlag bool
}

处理控制消息传输过程是 ProcessControlConn,等待frpc客户端连接frps服务器,然后为每一个msgTunnel隧道创建一个goroutine处理控制消息传输。ProcessControlConn 主要完成了以下的功能:

  • 接收frpc发送的心跳包并响应
  • 处理frpc的token验证
  • 当有新用户连接到frps时,通过frpc创建dataTunnel转发数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
func ProcessControlConn(l *conn.Listener) {
for {
// 等待frpc客户端程序连接
c, err := l.Accept()
if err != nil {
return
}
// frpc和frps建立 msgTunnel 消息传输隧道
go controlWorker(c)
}
}

func controlWorker(c *conn.Conn) {
// if login message type is NewWorkConn, don't close this connection
var closeFlag bool = true
var s *server.ProxyServer
defer func() {
if closeFlag {
c.Close()
if s != nil {
s.Close()
}
}
}()

// get login message
buf, err := c.ReadLine()
if err != nil {
return
}
// frpc客户端发送的消息
cliReq := &msg.ControlReq{}
if err := json.Unmarshal([]byte(buf), &cliReq); err != nil {
return
}
// 登录token验证
ret, info := doLogin(cliReq, c)
s, ok := server.ProxyServers[cliReq.ProxyName]
if !ok {
return
}
if cliReq.Type != consts.NewWorkConn {
cliRes := &msg.ControlRes{
Type: consts.NewCtlConnRes,
Code: ret,
Msg: info,
}
byteBuf, _ := json.Marshal(cliRes)
err = c.Write(string(byteBuf) + "\n")
if err != nil {
time.Sleep(1 * time.Second)
return
}
} else {
closeFlag = false
return
}

// 创建channel保存要发送的控制消息
msgSendChan := make(chan interface{}, 1024)
// 通过msgTunnel消息传输隧道发送控制消息给frpc
go msgSender(s, c, msgSendChan)
// 用户访问frps,此时通知frpc开始和frps建立dataTunnel数据传输隧道
go noticeUserConn(s, msgSendChan)

// 接受fprc发送的消息并处理,如接受frpc发送的心跳包数据并对其进行响应
msgReader(s, c, msgSendChan)

close(msgSendChan)
return
}

func msgSender(s *server.ProxyServer, c *conn.Conn, msgSendChan chan interface{}) {
for {
msg, ok := <-msgSendChan
if !ok {
break
}

buf, _ := json.Marshal(msg)
err := c.Write(string(buf) + "\n")
if err != nil {
s.Close()
break
}
}
}

func noticeUserConn(s *server.ProxyServer, msgSendChan chan interface{}) {
for {
// 当有新用户到来时,通知frpc正式和frps建立dataTunnel隧道
// 并用于之后的数据转发
closeFlag := s.WaitUserConn()
if closeFlag {
break
}
notice := &msg.ControlRes{
Type: consts.NoticeUserConn,
}
msgSendChan <- notice
}
}

func msgReader(s *server.ProxyServer, c *conn.Conn, msgSendChan chan interface{}) error {
var heartbeatTimeout bool = false
// 接收frpc心跳包超时,说明frpc已经断开连接
timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, func() {
heartbeatTimeout = true
s.Close()
c.Close()
})
defer timer.Stop()

for {
// 通过msgTunnel接收frpc发送的控制消息
buf, err := c.ReadLine()
if err != nil {
if err == io.EOF {
return err
} else if c == nil || c.IsClosed() {
return err
}
continue
}

cliReq := &msg.ControlReq{}
if err := json.Unmarshal([]byte(buf), &cliReq); err != nil {
continue
}

switch cliReq.Type {
case consts.HeartbeatReq:
// 响应心跳包
timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
heartbeatRes := &msg.ControlRes{
Type: consts.HeartbeatRes,
}
msgSendChan <- heartbeatRes
default:
}
}
return nil
}

func doLogin(req *msg.ControlReq, c *conn.Conn) (ret int64, info string) {
ret = 1
// ...

// control conn
if req.Type == consts.NewCtlConn {
// frpc请求建立msgTunnel隧道
if s.Status == consts.Working {
return
}

// set infomations from frpc
s.UseEncryption = req.UseEncryption

// start proxy and listen for user connections, no block
err := s.Start()
if err != nil {
return
}
} else if req.Type == consts.NewWorkConn {
// work conn
// frpc请求建立dataTunnel隧道
if s.Status != consts.Working {
return
}
// the connection will close after join over
// 一旦成功建立dataTunnel,保存起来,后续用于dataTunnel和userTunnel转发数据
s.RecvNewWorkConn(c)
} else {
return
}

ret = 0
return
}

上面就是frps和frpc接收和发送 控制消息 的大致过程,下面继续分析frps和用户user的接收和发送数据。

具体源码位置:https://github.com/fatedier/frp/blob/v0.5.0/src/frp/models/server/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type ProxyServer struct {
Name string
AuthToken string
Type string
BindAddr string
ListenPort int64
UseEncryption bool
CustomDomains []string

Status int64
listeners []Listener
ctlMsgChan chan int64 // 用于通知msgTunnel发送控制消息给frpc,建立dataTunnel
workConnChan chan *conn.Conn
userConnList *list.List // 用户连接队列
mutex sync.Mutex
}

ProxyServer 表示frps需要反向代理的服务,比如ssh、http等,每一个服务监听一个公开端口,用于外部用户user访问,对于HTTP web请求来说,复用一个80端口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
func (p *ProxyServer) Start() (err error) {
p.Init()
if p.Type == "tcp" {
l, err := conn.Listen(p.BindAddr, p.ListenPort)
if err != nil {
return err
}
p.listeners = append(p.listeners, l)
} else if p.Type == "http" {
for _, domain := range p.CustomDomains {
l, err := VhostMuxer.Listen(domain)
if err != nil {
return err
}
p.listeners = append(p.listeners, l)
}
}

p.Status = consts.Working

// start a goroutine for listener to accept user connection
for _, listener := range p.listeners {
go func(l Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
return
}
// insert into list
p.Lock()
if p.Status != consts.Working {
c.Close()
p.Unlock()
return
}
// 新用户到来时,加入到用户队列中
p.userConnList.PushBack(c)
p.Unlock()

// put msg to control conn
// 然后通知frpc开始建立dataTunnel隧道
p.ctlMsgChan <- 1

// set timeout
time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() {
p.Lock()
element := p.userConnList.Front()
p.Unlock()
if element == nil {
return
}

userConn := element.Value.(*conn.Conn)
if userConn == c {
userConn.Close()
}
})
}
}(listener)
}

// start another goroutine for join two conns from frpc and user
// 转发userTunnel和dataTunnel数据
go func() {
for {
// 即 dataTunnel
workConn, ok := <-p.workConnChan
if !ok {
return
}

p.Lock()
// 从用户队列中取出一个userTunnel
element := p.userConnList.Front()
var userConn *conn.Conn
if element != nil {
userConn = element.Value.(*conn.Conn)
p.userConnList.Remove(element)
} else {
workConn.Close()
p.Unlock()
continue
}
p.Unlock()

// 通过dataTunnel转发数据,这里分为加密传输和不加密传输
if p.UseEncryption {
go conn.JoinMore(userConn, workConn, p.AuthToken)
} else {
go conn.Join(userConn, workConn)
}
}
}()

return nil
}

上面每启动一个服务就监听响应的端口等待用户user连接,一旦用户user连接成功,即 userTunnel 隧道建立成功,则可以开始和dataTunnel转发数据了

转发数据过程比较简单,可以使用 io.Copy,对于加密的数据传输,这里不再讲述了,可以自行查看源码:https://github.com/fatedier/frp/blob/v0.5.0/src/frp/utils/conn/conn.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Join(c1 *Conn, c2 *Conn) {
var wait sync.WaitGroup
pipe := func(to *Conn, from *Conn) {
defer to.Close()
defer from.Close()
defer wait.Done()

var err error
_, err = io.Copy(to.TcpConn, from.TcpConn)
if err != nil {
log.Warn("join connections error, %v", err)
}
}

wait.Add(2)
go pipe(c1, c2)
go pipe(c2, c1)
wait.Wait()
return
}

frpc

对于frpc客户端程序来说,就比较简单了,具体源码位置:https://github.com/fatedier/frp/blob/v0.5.0/src/frp/cmd/frpc/main.gohttps://github.com/fatedier/frp/blob/v0.5.0/src/frp/cmd/frpc/control.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func main() {
// ...
var wait sync.WaitGroup
wait.Add(len(client.ProxyClients))
for _, client := range client.ProxyClients {
go ControlProcess(client, &wait)
}
wait.Wait()
}

func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
defer wait.Done()

msgSendChan := make(chan interface{}, 1024)
// frpc连接到frps服务器
c, err := loginToServer(cli)
if err != nil {
return
}
defer c.Close()
// 发送心跳包
go heartbeatSender(c, msgSendChan)
go msgSender(cli, c, msgSendChan)
// 接收心跳包
msgReader(cli, c, msgSendChan)

close(msgSendChan)
}

func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {
c, err = conn.ConnectServer(client.ServerAddr, client.ServerPort)
if err != nil {
return
}

nowTime := time.Now().Unix()
authKey := pcrypto.GetAuthKey(cli.Name + cli.AuthToken + fmt.Sprintf("%d", nowTime))
req := &msg.ControlReq{
Type: consts.NewCtlConn, // 请求建立msgTunnel隧道
ProxyName: cli.Name,
AuthKey: authKey,
UseEncryption: cli.UseEncryption, // 转发数据是加密
Timestamp: nowTime,
}
buf, _ := json.Marshal(req)
err = c.Write(string(buf) + "\n")
if err != nil {
return
}

res, err := c.ReadLine()
if err != nil {
return
}

ctlRes := &msg.ControlRes{}
if err = json.Unmarshal([]byte(res), &ctlRes); err != nil {
return
}

if ctlRes.Code != 0 {
return c, fmt.Errorf("%s", ctlRes.Msg)
}

return
}

func msgReader(cli *client.ProxyClient, c *conn.Conn, msgSendChan chan interface{}) error {
// for heartbeat
var heartbeatTimeout bool = false
// 接收心跳包超时,frps服务端断开
timer := time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, func() {
heartbeatTimeout = true
c.Close()
})
defer timer.Stop()

for {
buf, err := c.ReadLine()
if err == io.EOF || c == nil || c.IsClosed() {
c.Close()
var delayTime time.Duration = 1

// loop until reconnect to frps
// 重连frps
for {
c, err = loginToServer(cli)
if err == nil {
close(msgSendChan)
msgSendChan = make(chan interface{}, 1024)
go heartbeatSender(c, msgSendChan)
go msgSender(cli, c, msgSendChan)
break
}

if delayTime < 60 {
delayTime = delayTime * 2
}
time.Sleep(delayTime * time.Second)
}
continue
} else if err != nil {
continue
}

ctlRes := &msg.ControlRes{}
if err := json.Unmarshal([]byte(buf), &ctlRes); err != nil {
continue
}

switch ctlRes.Type {
case consts.HeartbeatRes: // 接收心跳包
timer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second)
case consts.NoticeUserConn: // 新用户连接到frps,frpc可以建立dataTunnel隧道
cli.StartTunnel(client.ServerAddr, client.ServerPort)
default:
}
}
return nil
}

以上就是frp大致实现过程,可以看到早期实现的frp相对来说较为简单的,如果你自己去阅读的话,相信也很快就能理解如何设计一个简单的反向代理工具。

如果还不理解的话,可以查看下面的这两幅图,我将一些关键点标了出来。

image-20220616204233727

image-20220617105802908

vhost

由于TCP是面向字节流的协议,在发送多条数据时TCP看作一条数据流而不会区分数据边界,因此需要由上层协议划分数据边界,比如HTTP协议通过包头Header和包体Body、Content-Length、固定分隔符 \r\n 区分客户端发送的每个HTTP请求数据包。

frp支持自定义域名绑定,即可通过域名方式访问内网HTTP服务,因此在处理HTTP请求时需要获取 Host 主机参数,并将请求转发到相应的主机,这个时候就需要解析TCP数据,好在 Golang 标准库提供了 http.ReadRequest() 函数来解析HTTP请求。

具体源码:https://github.com/fatedier/frp/blob/v0.5.0/src/frp/utils/vhost/vhost.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func GetHttpHostname(c *conn.Conn) (_ net.Conn, routerName string, err error) {
sc, rd := newShareConn(c.TcpConn)

// 解析HTTP请求
request, err := http.ReadRequest(bufio.NewReader(rd))
if err != nil {
return sc, "", err
}
// 获取Host主机
routerName = request.Host
request.Body.Close()

return sc, routerName, nil
}

注意到上面的代码中 sc, rd := newShareConn(c.TcpConn),主要作用是在解析HTTP请求时(Reader)将已读的数据保存到Buffer中(Writer),也就是通过 io.TeeReader() 将Reader和Writer绑定,这样才可以保证转发的HTTP请求是完整的。

而如果直接调用 http.ReadRequest(bufio.NewReader(c.TcpConn)) 那么会丢失一部分已读数据,因为 http.ReadRequest() 不会保存解析的TCP数据,那么转发的数据也是不完整的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type sharedConn struct {
net.Conn
sync.Mutex
buff *bytes.Buffer
}

func newShareConn(conn net.Conn) (*sharedConn, io.Reader) {
sc := &sharedConn{
Conn: conn,
buff: bytes.NewBuffer(make([]byte, 0, 1024)),
}
return sc, io.TeeReader(conn, sc.buff)
}

func (sc *sharedConn) Read(p []byte) (n int, err error) {
sc.Lock()
// buff 已经读取完,从 net.Conn 继续读取
if sc.buff == nil {
sc.Unlock()
return sc.Conn.Read(p)
}
// buff 还有数据
n, err = sc.buff.Read(p)

if err == io.EOF {
sc.buff = nil
var n2 int
n2, err = sc.Conn.Read(p[n:])

n += n2
}
sc.Unlock()
return
}

总的来说frp 0.5.0 版本的源码还是非常简单易读的,对于想要理解内网穿透实现原理也有很大的帮助,而且frp也是Golang实现的,不得不说Golang非常适用于网络工具的开发,也许是Golang语法简单和goroutine并发吧 😃


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!