From 211b8dd40fd1e032c4f3b13f1b448b19dd05c6b3 Mon Sep 17 00:00:00 2001 From: taoky Date: Fri, 4 Aug 2023 17:37:52 +0800 Subject: [PATCH] feat: add protocol proxy option (#16) * Add protocol proxy option * docs: update config example * chore: use Sprintf to construct proxy protocol msg --- README.md | 2 +- dist/config.example.toml | 7 ++++ pkg/server/config.go | 5 +-- pkg/server/server.go | 43 ++++++++++++++++++++----- test/e2e/e2e_test.go | 28 ++++++++++++++++ test/e2e/main_test.go | 6 ++++ test/fixtures/proxy/config4.toml | 8 +++++ test/fixtures/rsyncd/proxyprotocol.conf | 8 +++++ 8 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 test/fixtures/proxy/config4.toml create mode 100644 test/fixtures/rsyncd/proxyprotocol.conf diff --git a/README.md b/README.md index 9ef3e34..a11da35 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ rsync-proxy 可以根据 module name 反向代理不同 host 上的 rsync daemon ```shell mkdir /etc/rsync-proxy -cp cofig.example.toml /etc/rsync-proxy/config.toml +cp config.example.toml /etc/rsync-proxy/config.toml vim /etc/rsync-proxy/config.toml # 根据实际情况修改配置 ``` diff --git a/dist/config.example.toml b/dist/config.example.toml index be3734c..b0f875f 100644 --- a/dist/config.example.toml +++ b/dist/config.example.toml @@ -19,3 +19,10 @@ modules = ["bar"] [upstreams.u3] address = "rsync.example.internal:1235" modules = ["baz"] + +[upstreams.u4] +address = "rsync.example.internal:1236" +modules = ["pro"] +# This option requires rsync upstream to support and enable proxy protocol +# See: https://github.com/WayneD/rsync/blob/2f9b963abaa52e44891180fe6c0d1c2219f6686d/rsyncd.conf.5.md?plain=1#L268 +use_proxy_protocol = true diff --git a/pkg/server/config.go b/pkg/server/config.go index 8036090..39ccb87 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -9,8 +9,9 @@ import ( ) type Upstream struct { - Address string `toml:"address"` - Modules []string `toml:"modules"` + Address string `toml:"address"` + Modules []string `toml:"modules"` + UseProxyProtocol bool `toml:"use_proxy_protocol"` } type ProxySettings struct { diff --git a/pkg/server/server.go b/pkg/server/server.go index be83fc4..fb1069d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "sort" + "strings" "sync" "sync/atomic" "time" @@ -59,6 +60,8 @@ type Server struct { bufPool sync.Pool // name -> address modules map[string]string + // address -> enable proxy protocol or not + proxyProtocol map[string]bool activeConnCount atomic.Int64 connIndex atomic.Uint32 @@ -89,6 +92,7 @@ func (s *Server) loadConfig(c *Config) error { } modules := map[string]string{} + proxyProtocol := map[string]bool{} for upstreamName, v := range c.Upstreams { addr := v.Address _, err := net.ResolveTCPAddr("tcp", addr) @@ -101,6 +105,7 @@ func (s *Server) loadConfig(c *Config) error { } modules[moduleName] = addr } + proxyProtocol[addr] = v.UseProxyProtocol } s.reloadLock.Lock() @@ -119,6 +124,7 @@ func (s *Server) loadConfig(c *Config) error { } s.Motd = c.Proxy.Motd s.modules = modules + s.proxyProtocol = proxyProtocol return nil } @@ -159,42 +165,44 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn *net.TCPConn) defer s.bufPool.Put(bufPtr) buf := *bufPtr - ip := downConn.RemoteAddr().String() + addr := downConn.RemoteAddr().String() + ip := downConn.RemoteAddr().(*net.TCPAddr).IP.String() + port := downConn.RemoteAddr().(*net.TCPAddr).Port writeTimeout := s.WriteTimeout readTimeout := s.ReadTimeout n, err := readLine(downConn, buf, readTimeout) if err != nil { - return fmt.Errorf("read version from client %s: %w", ip, err) + return fmt.Errorf("read version from client %s: %w", addr, err) } rsyncdClientVersion := make([]byte, n) copy(rsyncdClientVersion, buf[:n]) if !bytes.HasPrefix(rsyncdClientVersion, RsyncdVersionPrefix) { - return fmt.Errorf("unknown version from client %s: %q", ip, rsyncdClientVersion) + return fmt.Errorf("unknown version from client %s: %q", addr, rsyncdClientVersion) } _, err = writeWithTimeout(downConn, RsyncdServerVersion, writeTimeout) if err != nil { - return fmt.Errorf("send version to client %s: %w", ip, err) + return fmt.Errorf("send version to client %s: %w", addr, err) } n, err = readLine(downConn, buf, readTimeout) if err != nil { - return fmt.Errorf("read module from client %s: %w", ip, err) + return fmt.Errorf("read module from client %s: %w", addr, err) } if n == 0 { - return fmt.Errorf("empty request from client %s", ip) + return fmt.Errorf("empty request from client %s", addr) } data := buf[:n] if s.Motd != "" { _, err = writeWithTimeout(downConn, []byte(s.Motd+"\n"), writeTimeout) if err != nil { - return fmt.Errorf("send motd to client %s: %w", ip, err) + return fmt.Errorf("send motd to client %s: %w", addr, err) } } if len(data) == 1 { // single '\n' - s.accessLog.F("client %s requests listing all modules", ip) + s.accessLog.F("client %s requests listing all modules", addr) return s.listAllModules(downConn) } @@ -204,6 +212,10 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn *net.TCPConn) s.reloadLock.RLock() upstreamAddr, ok := s.modules[moduleName] + var useProxyProtocol bool + if ok { + useProxyProtocol = s.proxyProtocol[upstreamAddr] + } s.reloadLock.RUnlock() if !ok { @@ -220,6 +232,21 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn *net.TCPConn) upConn := conn.(*net.TCPConn) defer upConn.Close() upIp := upConn.RemoteAddr().(*net.TCPAddr).IP.String() + upPort := upConn.RemoteAddr().(*net.TCPAddr).Port + + if useProxyProtocol { + var IPVersion string + if strings.Contains(ip, ":") { + IPVersion = "TCP6" + } else { + IPVersion = "TCP4" + } + proxyHeader := fmt.Sprintf("PROXY %s %s %s %d %d\r\n", IPVersion, ip, upIp, port, upPort) + _, err = writeWithTimeout(upConn, []byte(proxyHeader), writeTimeout) + if err != nil { + return fmt.Errorf("send proxy protocol header to upstream %s: %w", upIp, err) + } + } _, err = writeWithTimeout(upConn, rsyncdClientVersion, writeTimeout) if err != nil { diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9f9cd18..9d11244 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -136,3 +136,31 @@ func TestReloadConfigWithDuplicatedModules(t *testing.T) { r.Error(err) r.Contains(reloadOutput.String(), "Failed to reload config") } + +func TestProxyProtocol(t *testing.T) { + r := require.New(t) + dst, err := os.CreateTemp("", "rsync-proxy-e2e-*") + r.NoError(err) + r.NoError(dst.Close()) + + r.NoError(copyFile(getProxyConfigPath("config4.toml"), dst.Name())) + + proxy := startProxy(t, func(s *server.Server) { + s.ConfigPath = dst.Name() + }) + + tmpFile, err := os.CreateTemp("", "rsync-proxy-e2e-*") + r.NoError(err) + r.NoError(tmpFile.Close()) + defer os.Remove(tmpFile.Name()) + + outputBytes, err := newRsyncCommand(getRsyncPath(proxy, "/pro/v3.5/data"), tmpFile.Name()).CombinedOutput() + if err != nil { + t.Log(string(outputBytes)) + r.NoError(err) + } + + got, err := os.ReadFile(tmpFile.Name()) + r.NoError(err) + r.Equal("3.5", string(got)) +} diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index ba85104..d310761 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -54,6 +54,10 @@ func testMain(m *testing.M) (int, error) { port: 1235, name: "bar.conf", }, + { + port: 1236, + name: "proxyprotocol.conf", + }, } { prog, err := runRsyncd(ctx, cfg.port, filepath.Join(rsyncdConfDir, cfg.name)) if err != nil { @@ -222,6 +226,8 @@ func setupDataDirs() error { "/tmp/rsync-proxy-e2e/bar/v3.2/data": []byte("3.2"), "/tmp/rsync-proxy-e2e/bar/v3.3/data": []byte("3.3"), "/tmp/rsync-proxy-e2e/baz/v3.4/data": []byte("3.4"), + "/tmp/rsync-proxy-e2e/pro/v3.5/data": []byte("3.5"), + "/tmp/rsync-proxy-e2e/pro/v3.6/data": []byte("3.6"), } for fp, data := range files { err := writeFile(fp, data) diff --git a/test/fixtures/proxy/config4.toml b/test/fixtures/proxy/config4.toml new file mode 100644 index 0000000..f260cfe --- /dev/null +++ b/test/fixtures/proxy/config4.toml @@ -0,0 +1,8 @@ +[upstreams.u1] +address = "127.0.0.1:1236" +modules = ["pro"] +use_proxy_protocol = true + +[upstreams.u2] +address = "127.0.0.1:1235" +modules = ["bar", "baz"] \ No newline at end of file diff --git a/test/fixtures/rsyncd/proxyprotocol.conf b/test/fixtures/rsyncd/proxyprotocol.conf new file mode 100644 index 0000000..424b6aa --- /dev/null +++ b/test/fixtures/rsyncd/proxyprotocol.conf @@ -0,0 +1,8 @@ +use chroot = false +proxy protocol = true + +[pro] +path = /tmp/rsync-proxy-e2e/pro/ +comment = PRO FILES +read only = true +timeout = 300