From dab51c32a27578548ed64fb737cd46b1c1049307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=B2=B3?= Date: Mon, 18 Feb 2019 01:05:05 +0800 Subject: [PATCH] bug repair when high concurrent --- README.md | 2 + client/client.go | 120 +---------------------------- client/control.go | 180 +++++++++++++++++++++++++++++++++++++++++++ client/status.go | 62 --------------- cmd/nps/nps.go | 1 - conf/npc.conf | 6 +- lib/common/util.go | 6 +- lib/conn/conn.go | 8 +- lib/conn/snappy.go | 36 ++------- lib/file/file.go | 2 +- server/proxy/base.go | 5 ++ server/proxy/http.go | 3 +- server/proxy/tcp.go | 4 + server/server.go | 36 ++++----- 14 files changed, 229 insertions(+), 242 deletions(-) create mode 100644 client/control.go delete mode 100644 client/status.go diff --git a/README.md b/README.md index 3d938c0a..7cca32be 100644 --- a/README.md +++ b/README.md @@ -272,6 +272,7 @@ server { listen 80; server_name *.proxy.com; location / { + proxy_set_header Host $http_host; proxy_pass http://127.0.0.1:8024; } } @@ -290,6 +291,7 @@ server { ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_prefer_server_ciphers on; location / { + proxy_set_header Host $http_host; proxy_pass http://127.0.0.1:8024; } } diff --git a/client/client.go b/client/client.go index 8027dfb9..df6bb5e2 100755 --- a/client/client.go +++ b/client/client.go @@ -1,18 +1,11 @@ package client import ( - "errors" "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/config" "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/lg" "github.com/cnlh/nps/lib/pool" - "github.com/cnlh/nps/vender/github.com/xtaci/kcp" - "github.com/cnlh/nps/vender/golang.org/x/net/proxy" - "io/ioutil" "net" - "net/url" - "path/filepath" "sync" "time" ) @@ -133,7 +126,7 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) { } pool.PutBufPoolCopy(buf) s.Lock() - delete(s.linkMap, link.Id) + //TODO 删除map s.Unlock() } @@ -173,7 +166,6 @@ func (s *TRPClient) dealChan() { go func() { for { if id, err := s.tunnel.GetLen(); err != nil { - lg.Println("get id error", err, id) break } else { s.Lock() @@ -193,113 +185,3 @@ func (s *TRPClient) dealChan() { }() <-s.stop } - -var errAdd = errors.New("The server returned an error, which port or host may have been occupied or not allowed to open.") - -func StartFromFile(path string) { - first := true - cnf, err := config.NewConfig(path) - if err != nil { - lg.Fatalln(err) - } - lg.Printf("Loading configuration file %s successfully", path) -re: - if first || cnf.CommonConfig.AutoReconnection { - if !first { - lg.Println("Reconnecting...") - time.Sleep(time.Second * 5) - } - } else { - return - } - first = false - c, err := NewConn(cnf.CommonConfig.Tp, cnf.CommonConfig.VKey, cnf.CommonConfig.Server, common.WORK_CONFIG, cnf.CommonConfig.ProxyUrl) - if err != nil { - lg.Println(err) - goto re - } - if _, err := c.SendConfigInfo(cnf.CommonConfig.Cnf); err != nil { - lg.Println(err) - goto re - } - var b []byte - if b, err = c.ReadLen(16); err != nil { - lg.Println(err) - goto re - } else { - ioutil.WriteFile(filepath.Join(common.GetTmpPath(), "npc_vkey.txt"), []byte(string(b)), 0600) - } - if !c.GetAddStatus() { - lg.Println(errAdd) - goto re - } - for _, v := range cnf.Hosts { - if _, err := c.SendHostInfo(v); err != nil { - lg.Println(err) - goto re - } - if !c.GetAddStatus() { - lg.Println(errAdd, v.Host) - goto re - } - } - for _, v := range cnf.Tasks { - if _, err := c.SendTaskInfo(v); err != nil { - lg.Println(err) - goto re - } - if !c.GetAddStatus() { - lg.Println(errAdd, v.Ports) - goto re - } - } - - c.Close() - - NewRPClient(cnf.CommonConfig.Server, string(b), cnf.CommonConfig.Tp, cnf.CommonConfig.ProxyUrl).Start() - goto re -} - -//Create a new connection with the server and verify it -func NewConn(tp string, vkey string, server string, connType string, proxyUrl string) (*conn.Conn, error) { - var err error - var connection net.Conn - var sess *kcp.UDPSession - if tp == "tcp" { - if proxyUrl != "" { - u, er := url.Parse(proxyUrl) - if er != nil { - return nil, er - } - n, er := proxy.FromURL(u, nil) - if er != nil { - return nil, er - } - connection, err = n.Dial("tcp", server) - } else { - connection, err = net.Dial("tcp", server) - } - } else { - sess, err = kcp.DialWithOptions(server, nil, 10, 3) - conn.SetUdpSession(sess) - connection = sess - } - if err != nil { - return nil, err - } - c := conn.NewConn(connection) - if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil { - lg.Println(err) - } - if s, err := c.ReadFlag(); err != nil { - lg.Println(err) - } else if s == common.VERIFY_EER { - lg.Fatalf("Validation key %s incorrect", vkey) - } - if _, err := c.Write([]byte(connType)); err != nil { - lg.Println(err) - } - c.SetAlive(tp) - - return c, nil -} diff --git a/client/control.go b/client/control.go new file mode 100644 index 00000000..9737aca4 --- /dev/null +++ b/client/control.go @@ -0,0 +1,180 @@ +package client + +import ( + "errors" + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/config" + "github.com/cnlh/nps/lib/conn" + "github.com/cnlh/nps/lib/lg" + "github.com/cnlh/nps/vender/github.com/xtaci/kcp" + "github.com/cnlh/nps/vender/golang.org/x/net/proxy" + "io/ioutil" + "log" + "net" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +func GetTaskStatus(path string) { + cnf, err := config.NewConfig(path) + if err != nil { + log.Fatalln(err) + } + c, err := NewConn(cnf.CommonConfig.Tp, cnf.CommonConfig.VKey, cnf.CommonConfig.Server, common.WORK_CONFIG, cnf.CommonConfig.ProxyUrl) + if err != nil { + log.Fatalln(err) + } + if _, err := c.Write([]byte(common.WORK_STATUS)); err != nil { + log.Fatalln(err) + } + if f, err := common.ReadAllFromFile(filepath.Join(common.GetTmpPath(), "npc_vkey.txt")); err != nil { + log.Fatalln(err) + } else if _, err := c.Write([]byte(string(f))); err != nil { + log.Fatalln(err) + } + if l, err := c.GetLen(); err != nil { + log.Fatalln(err) + } else if b, err := c.ReadLen(l); err != nil { + lg.Fatalln(err) + } else { + arr := strings.Split(string(b), common.CONN_DATA_SEQ) + for _, v := range cnf.Hosts { + if common.InStrArr(arr, v.Remark) { + log.Println(v.Remark, "ok") + } else { + log.Println(v.Remark, "not running") + } + } + for _, v := range cnf.Tasks { + ports := common.GetPorts(v.Ports) + for _, vv := range ports { + var remark string + if len(ports) > 1 { + remark = v.Remark + "_" + strconv.Itoa(vv) + } else { + remark = v.Remark + } + if common.InStrArr(arr, remark) { + log.Println(remark, "ok") + } else { + log.Println(remark, "not running") + } + } + } + } + os.Exit(0) +} + +var errAdd = errors.New("The server returned an error, which port or host may have been occupied or not allowed to open.") + +func StartFromFile(path string) { + first := true + cnf, err := config.NewConfig(path) + if err != nil { + lg.Fatalln(err) + } + lg.Printf("Loading configuration file %s successfully", path) +re: + if first || cnf.CommonConfig.AutoReconnection { + if !first { + lg.Println("Reconnecting...") + time.Sleep(time.Second * 5) + } + } else { + return + } + first = false + c, err := NewConn(cnf.CommonConfig.Tp, cnf.CommonConfig.VKey, cnf.CommonConfig.Server, common.WORK_CONFIG, cnf.CommonConfig.ProxyUrl) + if err != nil { + lg.Println(err) + goto re + } + if _, err := c.SendConfigInfo(cnf.CommonConfig.Cnf); err != nil { + lg.Println(err) + goto re + } + var b []byte + if b, err = c.ReadLen(16); err != nil { + lg.Println(err) + goto re + } else { + ioutil.WriteFile(filepath.Join(common.GetTmpPath(), "npc_vkey.txt"), []byte(string(b)), 0600) + } + if !c.GetAddStatus() { + lg.Println(errAdd) + goto re + } + for _, v := range cnf.Hosts { + if _, err := c.SendHostInfo(v); err != nil { + lg.Println(err) + goto re + } + if !c.GetAddStatus() { + lg.Println(errAdd, v.Host) + goto re + } + } + for _, v := range cnf.Tasks { + if _, err := c.SendTaskInfo(v); err != nil { + lg.Println(err) + goto re + } + if !c.GetAddStatus() { + lg.Println(errAdd, v.Ports) + goto re + } + } + + c.Close() + + NewRPClient(cnf.CommonConfig.Server, string(b), cnf.CommonConfig.Tp, cnf.CommonConfig.ProxyUrl).Start() + goto re +} + +//Create a new connection with the server and verify it +func NewConn(tp string, vkey string, server string, connType string, proxyUrl string) (*conn.Conn, error) { + var err error + var connection net.Conn + var sess *kcp.UDPSession + if tp == "tcp" { + if proxyUrl != "" { + u, er := url.Parse(proxyUrl) + if er != nil { + return nil, er + } + n, er := proxy.FromURL(u, nil) + if er != nil { + return nil, er + } + connection, err = n.Dial("tcp", server) + } else { + connection, err = net.Dial("tcp", server) + } + } else { + sess, err = kcp.DialWithOptions(server, nil, 10, 3) + conn.SetUdpSession(sess) + connection = sess + } + if err != nil { + return nil, err + } + c := conn.NewConn(connection) + if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil { + lg.Println(err) + } + if s, err := c.ReadFlag(); err != nil { + lg.Println(err) + } else if s == common.VERIFY_EER { + lg.Fatalf("Validation key %s incorrect", vkey) + } + if _, err := c.Write([]byte(connType)); err != nil { + lg.Println(err) + } + c.SetAlive(tp) + + return c, nil +} diff --git a/client/status.go b/client/status.go deleted file mode 100644 index a79015d4..00000000 --- a/client/status.go +++ /dev/null @@ -1,62 +0,0 @@ -package client - -import ( - "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/config" - "github.com/cnlh/nps/lib/lg" - "log" - "os" - "path/filepath" - "strconv" - "strings" -) - -func GetTaskStatus(path string) { - cnf, err := config.NewConfig(path) - if err != nil { - log.Fatalln(err) - } - c, err := NewConn(cnf.CommonConfig.Tp, cnf.CommonConfig.VKey, cnf.CommonConfig.Server, common.WORK_CONFIG, cnf.CommonConfig.ProxyUrl) - if err != nil { - log.Fatalln(err) - } - if _, err := c.Write([]byte(common.WORK_STATUS)); err != nil { - log.Fatalln(err) - } - if f, err := common.ReadAllFromFile(filepath.Join(common.GetTmpPath(), "npc_vkey.txt")); err != nil { - log.Fatalln(err) - } else if _, err := c.Write([]byte(string(f))); err != nil { - log.Fatalln(err) - } - if l, err := c.GetLen(); err != nil { - log.Fatalln(err) - } else if b, err := c.ReadLen(l); err != nil { - lg.Fatalln(err) - } else { - arr := strings.Split(string(b), common.CONN_DATA_SEQ) - for _, v := range cnf.Hosts { - if common.InArr(arr, v.Remark) { - log.Println(v.Remark, "ok") - } else { - log.Println(v.Remark, "not running") - } - } - for _, v := range cnf.Tasks { - ports := common.GetPorts(v.Ports) - for _, vv := range ports { - var remark string - if len(ports) > 1 { - remark = v.Remark + "_" + strconv.Itoa(vv) - } else { - remark = v.Remark - } - if common.InArr(arr, remark) { - log.Println(remark, "ok") - } else { - log.Println(remark, "not running") - } - } - } - } - os.Exit(0) -} diff --git a/cmd/nps/nps.go b/cmd/nps/nps.go index 18f1600a..cfe40738 100644 --- a/cmd/nps/nps.go +++ b/cmd/nps/nps.go @@ -23,7 +23,6 @@ var ( ) func main() { - log.SetFlags(log.Lshortfile) flag.Parse() if len(os.Args) > 1 { switch os.Args[1] { diff --git a/conf/npc.conf b/conf/npc.conf index 325d32f1..75e7cecd 100644 --- a/conf/npc.conf +++ b/conf/npc.conf @@ -3,11 +3,11 @@ server=127.0.0.1:8284 tp=tcp vkey=123 auto_reconnection=true - +crypt=true [web1] host=a.o.com host_change=www.proxy.com -target=127.0.0.1:8080 +target=127.0.0.1:8082 location=/ [web2] @@ -18,7 +18,7 @@ header_set_proxy=nps [tcp] mode=tcpServer -target=8001-8005,8080 +target=8001-8005,8082 port=9001-9005,9006 [socks5] diff --git a/lib/common/util.go b/lib/common/util.go index e1a106b5..2ee4e07d 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "encoding/binary" "github.com/cnlh/nps/lib/crypt" - "github.com/cnlh/nps/lib/lg" "io/ioutil" "net" "net/http" @@ -23,7 +22,7 @@ func GetCompressType(compress string) (int, int) { case "snappy": return COMPRESS_SNAPY_DECODE, COMPRESS_SNAPY_ENCODE default: - lg.Fatalln("数据压缩格式错误") + return COMPRESS_NONE_DECODE, COMPRESS_NONE_ENCODE } return COMPRESS_NONE_DECODE, COMPRESS_NONE_ENCODE } @@ -184,7 +183,7 @@ func BinaryWrite(raw *bytes.Buffer, v ...string) { binary.Write(raw, binary.LittleEndian, buffer.Bytes()) } -func InArr(arr []string, val string) bool { +func InStrArr(arr []string, val string) bool { for _, v := range arr { if v == val { return true @@ -224,6 +223,7 @@ func GetPorts(p string) []int { } return ps } + func IsPort(p string) bool { pi, err := strconv.Atoi(p) if err != nil { diff --git a/lib/conn/conn.go b/lib/conn/conn.go index f142af1b..769117b1 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -89,11 +89,9 @@ func (s *Conn) ReadLen(cLen int) ([]byte, error) { //read length or id (content length=4) func (s *Conn) GetLen() (int, error) { - val, err := s.ReadLen(4) - if err != nil { - return 0, err - } - return GetLenByBytes(val) + var l int32 + err := binary.Read(s, binary.LittleEndian, &l) + return int(l), err } //read flag diff --git a/lib/conn/snappy.go b/lib/conn/snappy.go index b6cf84e4..285f7370 100644 --- a/lib/conn/snappy.go +++ b/lib/conn/snappy.go @@ -1,41 +1,29 @@ package conn import ( - "github.com/cnlh/nps/lib/crypt" - "github.com/cnlh/nps/lib/lg" "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/lib/rate" "github.com/cnlh/nps/vender/github.com/golang/snappy" - "log" "net" ) type SnappyConn struct { - w *snappy.Writer - r *snappy.Reader - crypt bool - rate *rate.Rate + w *snappy.Writer + r *snappy.Reader + rate *rate.Rate } func NewSnappyConn(conn net.Conn, crypt bool, rate *rate.Rate) *SnappyConn { c := new(SnappyConn) c.w = snappy.NewBufferedWriter(conn) c.r = snappy.NewReader(conn) - c.crypt = crypt c.rate = rate return c } -//snappy压缩写 包含加密 +//snappy压缩写 func (s *SnappyConn) Write(b []byte) (n int, err error) { - n = len(b) - if s.crypt { - if b, err = crypt.AesEncrypt(b, []byte(cryptKey)); err != nil { - lg.Println("encode crypt error:", err) - return - } - } - if _, err = s.w.Write(b); err != nil { + if n, err = s.w.Write(b); err != nil { return } if err = s.w.Flush(); err != nil { @@ -47,24 +35,14 @@ func (s *SnappyConn) Write(b []byte) (n int, err error) { return } -//snappy压缩读 包含解密 +//snappy压缩读 func (s *SnappyConn) Read(b []byte) (n int, err error) { buf := pool.BufPool.Get().([]byte) defer pool.BufPool.Put(buf) if n, err = s.r.Read(buf); err != nil { return } - var bs []byte - if s.crypt { - if bs, err = crypt.AesDecrypt(buf[:n], []byte(cryptKey)); err != nil { - log.Println("decode crypt error:", err) - return - } - } else { - bs = buf[:n] - } - n = len(bs) - copy(b, bs) + copy(b, buf[:n]) if s.rate != nil { s.rate.Get(int64(n)) } diff --git a/lib/file/file.go b/lib/file/file.go index f13e8460..0160444b 100644 --- a/lib/file/file.go +++ b/lib/file/file.go @@ -157,7 +157,7 @@ func (s *Csv) UpdateTask(t *Tunnel) error { return nil } } - return errors.New("不存在") + return errors.New("the task is not exist") } func (s *Csv) DelTask(id int) error { diff --git a/server/proxy/base.go b/server/proxy/base.go index 4fcf3f2f..19dbae41 100644 --- a/server/proxy/base.go +++ b/server/proxy/base.go @@ -12,6 +12,11 @@ import ( "sync" ) +type Service interface { + Start() error + Close() error +} + //server base struct type server struct { id int diff --git a/server/proxy/http.go b/server/proxy/http.go index 6ec783d3..4816ad6e 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -89,8 +89,9 @@ func (s *httpServer) Start() error { return nil } -func (s *httpServer) Close() { +func (s *httpServer) Close() error { s.stop <- true + return nil } func (s *httpServer) handleTunneling(w http.ResponseWriter, r *http.Request) { diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 54553957..d38f8f81 100755 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -87,6 +87,10 @@ func (s *WebServer) Start() error { return errors.New("Web management startup failure") } +func (s *WebServer) Close() error { + return nil +} + //new func NewWebServer(bridge *bridge.Bridge) *WebServer { s := new(WebServer) diff --git a/server/server.go b/server/server.go index 08cde882..d33e43d0 100644 --- a/server/server.go +++ b/server/server.go @@ -9,7 +9,6 @@ import ( "github.com/cnlh/nps/server/proxy" "github.com/cnlh/nps/server/tool" "github.com/cnlh/nps/vender/github.com/astaxie/beego" - "reflect" ) var ( @@ -37,6 +36,7 @@ func InitFromCsv() { } } } + func DealBridgeTask() { for { select { @@ -59,27 +59,27 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) { } go DealBridgeTask() if svr := NewMode(Bridge, cnf); svr != nil { - RunList[cnf.Id] = svr - err := reflect.ValueOf(svr).MethodByName("Start").Call(nil)[0] - if err.Interface() != nil { + if err := svr.Start(); err != nil { lg.Fatalln(err) } + RunList[cnf.Id] = svr } else { lg.Fatalln("启动模式%s不正确", cnf.Mode) } } //new a server by mode name -func NewMode(Bridge *bridge.Bridge, c *file.Tunnel) interface{} { +func NewMode(Bridge *bridge.Bridge, c *file.Tunnel) proxy.Service { + var service proxy.Service switch c.Mode { case "tcpServer": - return proxy.NewTunnelModeServer(proxy.ProcessTunnel, Bridge, c) + service = proxy.NewTunnelModeServer(proxy.ProcessTunnel, Bridge, c) case "socks5Server": - return proxy.NewSock5ModeServer(Bridge, c) + service = proxy.NewSock5ModeServer(Bridge, c) case "httpProxyServer": - return proxy.NewTunnelModeServer(proxy.ProcessHttp, Bridge, c) + service = proxy.NewTunnelModeServer(proxy.ProcessHttp, Bridge, c) case "udpServer": - return proxy.NewUdpModeServer(Bridge, c) + service = proxy.NewUdpModeServer(Bridge, c) case "webServer": InitFromCsv() t := &file.Tunnel{ @@ -89,19 +89,20 @@ func NewMode(Bridge *bridge.Bridge, c *file.Tunnel) interface{} { Status: true, } AddTask(t) - return proxy.NewWebServer(Bridge) + service = proxy.NewWebServer(Bridge) case "httpHostServer": - return proxy.NewHttp(Bridge, c) + service = proxy.NewHttp(Bridge, c) } - return nil + return service } //stop server func StopServer(id int) error { if v, ok := RunList[id]; ok { - if reflect.ValueOf(v).IsValid() { - //TODO 错误处理 - reflect.ValueOf(v).MethodByName("Close").Call(nil) + if svr, ok := v.(proxy.Service); ok { + if err := svr.Close(); err != nil { + return err + } if t, err := file.GetCsvDb().GetTask(id); err != nil { return err } else { @@ -118,14 +119,13 @@ func StopServer(id int) error { //add task func AddTask(t *file.Tunnel) error { if b := tool.TestServerPort(t.Port, t.Mode); !b && t.Mode != "httpHostServer" { - lg.Printf("taskId %d start error Port %d Open Failed", t.Id, t.Port) + lg.Printf("taskId %d start error port %d Open Failed", t.Id, t.Port) return errors.New("the port open error") } if svr := NewMode(Bridge, t); svr != nil { RunList[t.Id] = svr go func() { - err := reflect.ValueOf(svr).MethodByName("Start").Call(nil)[0] - if err.Interface() != nil { + if err := svr.Start(); err != nil { lg.Println("clientId %d taskId %d start error %s", t.Client.Id, t.Id, err) delete(RunList, t.Id) return