funcProcessControlConn(l *conn.Listener) { for { // 等待frpc客户端程序连接 c, err := l.Accept() if err != nil { return } // frpc和frps建立 msgTunnel 消息传输隧道 go controlWorker(c) } }
funccontrolWorker(c *conn.Conn) { // if login message type is NewWorkConn, don't close this connection var closeFlag bool = true var s *server.ProxyServer deferfunc() { if closeFlag { c.Close() if s != nil { s.Close() } } }()
// get login message buf, err := c.ReadLine() if err != nil { return } // frpc客户端发送的消息 cliReq := &msg.ControlReq{} if err := json.Unmarshal([]byte(buf), &cliReq); err != nil { return } // 登录token验证 ret, info := doLogin(cliReq, c) s, ok := server.ProxyServers[cliReq.ProxyName] if !ok { return } if cliReq.Type != consts.NewWorkConn { cliRes := &msg.ControlRes{ Type: consts.NewCtlConnRes, Code: ret, Msg: info, } byteBuf, _ := json.Marshal(cliRes) err = c.Write(string(byteBuf) + "\n") if err != nil { time.Sleep(1 * time.Second) return } } else { closeFlag = false return }
// 创建channel保存要发送的控制消息 msgSendChan := make(chaninterface{}, 1024) // 通过msgTunnel消息传输隧道发送控制消息给frpc go msgSender(s, c, msgSendChan) // 用户访问frps,此时通知frpc开始和frps建立dataTunnel数据传输隧道 go noticeUserConn(s, msgSendChan)
funcdoLogin(req *msg.ControlReq, c *conn.Conn)(ret int64, info string) { ret = 1 // ...
// control conn if req.Type == consts.NewCtlConn { // frpc请求建立msgTunnel隧道 if s.Status == consts.Working { return }
// set infomations from frpc s.UseEncryption = req.UseEncryption
// start proxy and listen for user connections, no block err := s.Start() if err != nil { return } } elseif req.Type == consts.NewWorkConn { // work conn // frpc请求建立dataTunnel隧道 if s.Status != consts.Working { return } // the connection will close after join over // 一旦成功建立dataTunnel,保存起来,后续用于dataTunnel和userTunnel转发数据 s.RecvNewWorkConn(c) } else { return }
// start a goroutine for listener to accept user connection for _, listener := range p.listeners { gofunc(l Listener) { for { // block // if listener is closed, err returned c, err := l.Accept() if err != nil { return } // insert into list p.Lock() if p.Status != consts.Working { c.Close() p.Unlock() return } // 新用户到来时,加入到用户队列中 p.userConnList.PushBack(c) p.Unlock()
// put msg to control conn // 然后通知frpc开始建立dataTunnel隧道 p.ctlMsgChan <- 1
// set timeout time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() { p.Lock() element := p.userConnList.Front() p.Unlock() if element == nil { return }
userConn := element.Value.(*conn.Conn) if userConn == c { userConn.Close() } }) } }(listener) }
// start another goroutine for join two conns from frpc and user // 转发userTunnel和dataTunnel数据 gofunc() { for { // 即 dataTunnel workConn, ok := <-p.workConnChan if !ok { return }
p.Lock() // 从用户队列中取出一个userTunnel element := p.userConnList.Front() var userConn *conn.Conn if element != nil { userConn = element.Value.(*conn.Conn) p.userConnList.Remove(element) } else { workConn.Close() p.Unlock() continue } p.Unlock()
// 通过dataTunnel转发数据,这里分为加密传输和不加密传输 if p.UseEncryption { go conn.JoinMore(userConn, workConn, p.AuthToken) } else { go conn.Join(userConn, workConn) } } }()