Skip to content

Commit

Permalink
Add transparent proxy; closes #29
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Aug 10, 2023
1 parent 2474242 commit 2f73eee
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 6 deletions.
20 changes: 19 additions & 1 deletion cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func newCmd_rpc() *cli.Command {
var includePatterns cli.StringSlice
var excludePatterns cli.StringSlice
var watch bool
var pathForProxyForUnknownRpcMethods string
return &cli.Command{
Name: "rpc",
Description: "Provide multiple epoch config files, and start a Solana JSON RPC that exposes getTransaction, getBlock, and (optionally) getSignaturesForAddress",
Expand Down Expand Up @@ -72,6 +73,12 @@ func newCmd_rpc() *cli.Command {
Value: false,
Destination: &watch,
},
&cli.StringFlag{
Name: "proxy",
Usage: "Path to a config file that will be used to proxy unknown RPC methods",
Value: "",
Destination: &pathForProxyForUnknownRpcMethods,
},
),
Action: func(c *cli.Context) error {
src := c.Args().Slice()
Expand Down Expand Up @@ -209,7 +216,18 @@ func newCmd_rpc() *cli.Command {
}
}

return multi.ListenAndServe(listenOn)
var listenerConfig *ListenerConfig
if pathForProxyForUnknownRpcMethods != "" {
proxyConfig, err := LoadProxyConfig(pathForProxyForUnknownRpcMethods)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load proxy config file %q: %s", pathForProxyForUnknownRpcMethods, err.Error()), 1)
}
listenerConfig = &ListenerConfig{
ProxyConfig: proxyConfig,
}
}

return multi.ListenAndServe(listenOn, listenerConfig)
},
}
}
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (

require (
github.com/fsnotify/fsnotify v1.5.4
github.com/goware/urlx v0.3.2
github.com/ipld/go-car v0.5.0
github.com/mr-tron/base58 v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand All @@ -74,6 +75,8 @@ require (
require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.14 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
Expand Down Expand Up @@ -325,6 +329,8 @@ github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36j
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/goware/urlx v0.3.2 h1:gdoo4kBHlkqZNaf6XlQ12LGtQOmpKJrR04Rc3RnpJEo=
github.com/goware/urlx v0.3.2/go.mod h1:h8uwbJy68o+tQXCGZNa9D73WN8n0r9OBae5bUnLcgjw=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
Expand Down
87 changes: 82 additions & 5 deletions multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/goware/urlx"
"github.com/mr-tron/base58"
sigtoepoch "github.com/rpcpool/yellowstone-faithful/sig-to-epoch"
"github.com/sourcegraph/jsonrpc2"
Expand Down Expand Up @@ -143,10 +144,35 @@ func (m *MultiEpoch) GetFirstAvailableEpoch() (*Epoch, error) {
return nil, fmt.Errorf("no epochs available")
}

type ListenerConfig struct {
ProxyConfig *ProxyConfig
}

type ProxyConfig struct {
Target string `json:"target" yaml:"target"`
Headers map[string]string `json:"headers" yaml:"headers"`
}

func LoadProxyConfig(configFilepath string) (*ProxyConfig, error) {
var proxyConfig ProxyConfig
if isJSONFile(configFilepath) {
if err := loadFromJSON(configFilepath, &proxyConfig); err != nil {
return nil, err
}
} else if isYAMLFile(configFilepath) {
if err := loadFromYAML(configFilepath, &proxyConfig); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("config file %q must be JSON or YAML", configFilepath)
}
return &proxyConfig, nil
}

// ListeAndServe starts listening on the configured address and serves the RPC API.
func (m *MultiEpoch) ListenAndServe(listenOn string) error {
h := newMultiEpochHandler(m)
h = fasthttp.CompressHandler(h)
func (m *MultiEpoch) ListenAndServe(listenOn string, lsConf *ListenerConfig) error {
handler := newMultiEpochHandler(m, lsConf)
handler = fasthttp.CompressHandler(handler)

if sigToEpochIndexDir := m.options.PathToSigToEpoch; sigToEpochIndexDir != "" {
klog.Infof("Opening sig-to-epoch index from %s", sigToEpochIndexDir)
Expand Down Expand Up @@ -175,7 +201,7 @@ func (m *MultiEpoch) ListenAndServe(listenOn string) error {
}

klog.Infof("RPC server listening on %s", listenOn)
return fasthttp.ListenAndServe(listenOn, h)
return fasthttp.ListenAndServe(listenOn, handler)
}

func randomRequestID() string {
Expand All @@ -186,7 +212,25 @@ func randomRequestID() string {
return strings.ToUpper(base58.Encode(b))
}

func newMultiEpochHandler(handler *MultiEpoch) func(ctx *fasthttp.RequestCtx) {
func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx *fasthttp.RequestCtx) {
// create a transparent reverse proxy
var proxy *fasthttp.HostClient
if lsConf != nil && lsConf.ProxyConfig != nil && lsConf.ProxyConfig.Target != "" {
target := lsConf.ProxyConfig.Target
parsedTargetURL, err := urlx.Parse(target)
if err != nil {
panic(fmt.Errorf("invalid proxy target URL %q: %w", target, err))
}
addr := parsedTargetURL.Hostname()
if parsedTargetURL.Port() != "" {
addr += ":" + parsedTargetURL.Port()
}
proxy = &fasthttp.HostClient{
Addr: addr,
IsTLS: parsedTargetURL.Scheme == "https",
}
klog.Infof("Will proxy unhandled RPC methods to %q", target)
}
return func(c *fasthttp.RequestCtx) {
startedAt := time.Now()
reqID := randomRequestID()
Expand Down Expand Up @@ -234,6 +278,39 @@ func newMultiEpochHandler(handler *MultiEpoch) func(ctx *fasthttp.RequestCtx) {

klog.Infof("[%s] received request: %q", reqID, strings.TrimSpace(string(body)))

if proxy != nil && !isValidMethod(rpcRequest.Method) {
klog.Infof("[%s] Unhandled method %q, proxying to %q", reqID, rpcRequest.Method, lsConf.ProxyConfig.Target)
// proxy the request to the target
proxyReq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(proxyReq)
{
for k, v := range lsConf.ProxyConfig.Headers {
proxyReq.Header.Set(k, v)
}
}
proxyReq.Header.SetMethod("POST")
proxyReq.Header.SetContentType("application/json")
proxyReq.SetRequestURI(lsConf.ProxyConfig.Target)
proxyReq.SetBody(body)
proxyResp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(proxyResp)
if err := proxy.Do(proxyReq, proxyResp); err != nil {
klog.Errorf("[%s] failed to proxy request: %v", reqID, err)
replyJSON(c, http.StatusInternalServerError, jsonrpc2.Response{
Error: &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
},
})
return
}
c.Response.Header.Set("Content-Type", "application/json")
c.Response.SetStatusCode(proxyResp.StatusCode())
c.Response.SetBody(proxyResp.Body())
// TODO: handle compression.
return
}

rqCtx := &requestContext{ctx: c}
method := rpcRequest.Method

Expand Down

0 comments on commit 2f73eee

Please sign in to comment.