Skip to content

Commit

Permalink
Continually update config and better config API (#57)
Browse files Browse the repository at this point in the history
* Add code to continually update the fronting config and to use better config API

* Remove public facing config methods
  • Loading branch information
myleshorton authored Jan 16, 2025
1 parent 1c3f5ce commit 4ed3a2a
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 230 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ jobs:
with:
name: ctrf-report
path: ctrf-report.json
- name: Publish Test Report
uses: ctrf-io/github-test-reporter@v1
with:
report-path: 'ctrf-report.json'
if: always()
- name: Publish Test Summary Results
run: npx github-actions-ctrf ctrf-report.json
- name: Install goveralls
Expand Down
4 changes: 2 additions & 2 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func TestCaching(t *testing.T) {
cloudsackID := "cloudsack"

providers := map[string]*Provider{
testProviderID: NewProvider(nil, "", nil, nil, nil, nil, nil, ""),
cloudsackID: NewProvider(nil, "", nil, nil, nil, nil, nil, ""),
testProviderID: NewProvider(nil, "", nil, nil, nil, nil, ""),
cloudsackID: NewProvider(nil, "", nil, nil, nil, nil, ""),
}

log.Debug("Creating fronted")
Expand Down
2 changes: 1 addition & 1 deletion front.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ type SNIConfig struct {
}

// Create a Provider with the given details
func NewProvider(hosts map[string]string, testURL string, masquerades []*Masquerade, validator ResponseValidator, passthrough []string, frontingSNIs map[string]*SNIConfig, verifyHostname *string, countryCode string) *Provider {
func NewProvider(hosts map[string]string, testURL string, masquerades []*Masquerade, passthrough []string, frontingSNIs map[string]*SNIConfig, verifyHostname *string, countryCode string) *Provider {
p := &Provider{
HostAliases: make(map[string]string),
TestURL: testURL,
Expand Down
5 changes: 1 addition & 4 deletions front_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fronted

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -14,7 +13,6 @@ func TestNewProvider(t *testing.T) {
givenHosts map[string]string
givenTestURL string
givenMasquerades []*Masquerade
givenValidator ResponseValidator
givenPassthrough []string
//givenSNIConfig *SNIConfig
givenFrontingSNIs map[string]*SNIConfig
Expand All @@ -39,7 +37,6 @@ func TestNewProvider(t *testing.T) {
givenHosts: map[string]string{"host1": "alias1", "host2": "alias2"},
givenTestURL: "http://test.com",
givenMasquerades: []*Masquerade{{Domain: "domain1", IpAddress: "127.0.0.1"}},
givenValidator: func(*http.Response) error { return nil },
givenPassthrough: []string{"passthrough1", "passthrough2"},
givenFrontingSNIs: map[string]*SNIConfig{
"test": &SNIConfig{
Expand All @@ -64,7 +61,7 @@ func TestNewProvider(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
actual := NewProvider(tt.givenHosts, tt.givenTestURL, tt.givenMasquerades, tt.givenValidator, tt.givenPassthrough, tt.givenFrontingSNIs, tt.givenVerifyHostname, "test")
actual := NewProvider(tt.givenHosts, tt.givenTestURL, tt.givenMasquerades, tt.givenPassthrough, tt.givenFrontingSNIs, tt.givenVerifyHostname, "test")
tt.assert(t, actual)
})
}
Expand Down
194 changes: 149 additions & 45 deletions fronted.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
Expand All @@ -24,6 +26,7 @@ import (
tls "github.com/refraction-networking/utls"

"github.com/getlantern/golog"
"github.com/getlantern/keepcurrent"
"github.com/getlantern/ops"

"github.com/alitto/pond/v2"
Expand All @@ -48,6 +51,7 @@ type fronted struct {
fronts sortedFronts
maxAllowedCachedAge time.Duration
maxCacheSize int
cacheFile string
cacheSaveInterval time.Duration
cacheDirty chan interface{}
cacheClosed chan interface{}
Expand All @@ -61,17 +65,23 @@ type fronted struct {
stopCh chan interface{}
crawlOnce sync.Once
stopped atomic.Bool
countryCode string
httpClient *http.Client
configURL string
}

// configURL is the URL from which to continually fetch updated domain fronting configurations.
const configURL = "https://media.githubusercontent.com/media/getlantern/fronted/refs/heads/main/fronted.yaml.gz"

// Interface for sending HTTP traffic over domain fronting.
type Fronted interface {
http.RoundTripper

// OnNewFrontsConfig updates the set of domain fronts to try from a YAML configuration.
OnNewFrontsConfig(yml []byte, countryCode string)
// onNewFrontsConfig updates the set of domain fronts to try from a YAML configuration.
onNewFrontsConfig(yml []byte)

// OnNewFronts updates the set of domain fronts to try.
OnNewFronts(pool *x509.CertPool, providers map[string]*Provider, countryCode string)
// onNewFronts updates the set of domain fronts to try.
onNewFronts(pool *x509.CertPool, providers map[string]*Provider)

// Close closes any resources, such as goroutines that are testing fronts.
Close()
Expand All @@ -80,10 +90,13 @@ type Fronted interface {
//go:embed fronted.yaml.gz
var embedFS embed.FS

// Option is a functional option type that allows us to configure the fronted client.
type Option func(*fronted)

// NewFronted creates a new Fronted instance with the given cache file.
// At this point it does not have the actual IPs, domains, etc of the fronts to try.
// defaultProviderID is used when a front without a provider is encountered (eg in a cache file)
func NewFronted(cacheFile string) Fronted {
func NewFronted(options ...Option) Fronted {
log.Debug("Creating new fronted")

f := &fronted{
Expand All @@ -100,77 +113,126 @@ func NewFronted(cacheFile string) Fronted {
connectingFronts: newConnectingFronts(4000),
stopCh: make(chan interface{}, 10),
defaultProviderID: defaultFrontedProviderID,
httpClient: http.DefaultClient,
cacheFile: defaultCacheFilePath(),
configURL: "",
}

if cacheFile != "" {
f.initCaching(cacheFile)
for _, opt := range options {
opt(f)
}

f.readFrontsFromEmbeddedConfig()
f.keepCurrent()

return f
}

func (f *fronted) readFrontsFromEmbeddedConfig() {
yml, err := embedFS.ReadFile("fronted.yaml.gz")
if err != nil {
slog.Error("Failed to read smart dialer config", "error", err)
// WithHTTPClient sets the HTTP client to use for fetching the fronted configuration. For example, the client
// could be censorship-resistant in some way.
func WithHTTPClient(httpClient *http.Client) Option {
return func(f *fronted) {
f.httpClient = httpClient
}
f.OnNewFrontsConfig(yml, "")
}

func (f *fronted) OnNewFrontsConfig(gzippedYaml []byte, countryCode string) {
r, gzipErr := gzip.NewReader(bytes.NewReader(gzippedYaml))
if gzipErr != nil {
slog.Error("Failed to create gzip reader", "error", gzipErr)
return
// WithCacheFile sets the file to use for caching domains that have successfully connected.
func WithCacheFile(file string) Option {
return func(f *fronted) {
f.initCaching(file)
}
yml, err := io.ReadAll(r)
if err != nil {
slog.Error("Failed to read gzipped file", "error", err)
return
}

// WithCountryCode sets the country code to use for fronting, which is particularly relevant for the
// SNI to use when connecting to the fronting domain.
func WithCountryCode(cc string) Option {
return func(f *fronted) {
f.countryCode = cc
}
path, err := yaml.PathString("$.providers")
if err != nil {
slog.Error("Failed to create providers dpath", "error", err)
return
}

// WithConfigURL sets the URL from which to continually fetch updated domain fronting configurations.
func WithConfigURL(configURL string) Option {
return func(f *fronted) {
f.configURL = configURL
}
providers := make(map[string]*Provider)
pathErr := path.Read(bytes.NewReader(yml), &providers)
if pathErr != nil {
slog.Error("Failed to read providers", "error", pathErr)
}

func defaultCacheFilePath() string {
if dir, err := os.UserConfigDir(); err != nil {
log.Errorf("Unable to get user config dir: %v", err)
// Use the temporary directory.
return filepath.Join(os.TempDir(), "fronted_cache.json")
} else {
return filepath.Join(dir, "domainfronting", "fronted_cache.json")
}
}

func (f *fronted) keepCurrent() {
if f.configURL == "" {
slog.Debug("No config URL provided -- not updating fronting configuration")
return
}

trustedCAsPath, err := yaml.PathString("$.trustedcas")
source := keepcurrent.FromTarGz(
keepcurrent.FromWebWithClient(configURL, f.httpClient), "fronted.yaml.gz")
chDB := make(chan []byte)
dest := keepcurrent.ToChannel(chDB)

runner := keepcurrent.NewWithValidator(
f.validator(),
source,
keepcurrent.ToFile("fronted.yaml.gz"),
dest,
)

go func() {
for data := range chDB {
f.onNewFrontsConfig(data)
}
}()

runner.Start(12 * time.Hour)
}

func (f *fronted) validator() func([]byte) error {
return func(data []byte) error {
_, _, err := processYaml(data)
if err != nil {
return err
}
return nil
}
}

func (f *fronted) readFrontsFromEmbeddedConfig() {
yml, err := embedFS.ReadFile("fronted.yaml.gz")
if err != nil {
slog.Error("Failed to create trusted CA path", "error", err)
return
slog.Error("Failed to read smart dialer config", "error", err)
}
var trustedCAs []*CA
trustedCAsErr := trustedCAsPath.Read(bytes.NewReader(yml), &trustedCAs)
if trustedCAsErr != nil {
slog.Error("Failed to read trusted CAs", "error", trustedCAsErr)
f.onNewFrontsConfig(yml)
}

func (f *fronted) onNewFrontsConfig(gzippedYaml []byte) {
pool, providers, err := processYaml(gzippedYaml)
if err != nil {
log.Errorf("Failed to process fronted config: %v", err)
return
}
pool := x509.NewCertPool()
for _, ca := range trustedCAs {
pool.AppendCertsFromPEM([]byte(ca.Cert))
}
f.OnNewFronts(pool, providers, countryCode)
f.onNewFronts(pool, providers)
}

// OnNewFronts sets the domain fronts to use, the trusted root CAs and the fronting providers
// onNewFronts sets the domain fronts to use, the trusted root CAs and the fronting providers
// (such as Akamai, Cloudfront, etc)
func (f *fronted) OnNewFronts(pool *x509.CertPool, providers map[string]*Provider, countryCode string) {
func (f *fronted) onNewFronts(pool *x509.CertPool, providers map[string]*Provider) {
// Make copies just to avoid any concurrency issues with access that may be happening on the
// caller side.
log.Debug("Updating fronted configuration")
if len(providers) == 0 {
log.Errorf("No providers configured")
return
}
providersCopy := copyProviders(providers, countryCode)
providersCopy := copyProviders(providers, f.countryCode)
f.addProviders(providersCopy)
f.addFronts(loadFronts(providersCopy))
f.certPool.Store(pool)
Expand Down Expand Up @@ -623,7 +685,7 @@ func (f *fronted) isStopped() bool {
func copyProviders(providers map[string]*Provider, countryCode string) map[string]*Provider {
providersCopy := make(map[string]*Provider, len(providers))
for key, p := range providers {
providersCopy[key] = NewProvider(p.HostAliases, p.TestURL, p.Masquerades, nil, p.PassthroughPatterns, p.FrontingSNIs, p.VerifyHostname, countryCode)
providersCopy[key] = NewProvider(p.HostAliases, p.TestURL, p.Masquerades, p.PassthroughPatterns, p.FrontingSNIs, p.VerifyHostname, countryCode)
}
return providersCopy
}
Expand Down Expand Up @@ -705,3 +767,45 @@ func Vet(m *Masquerade, pool *x509.CertPool, testURL string) bool {
defer conn.Close()
return masq.verifyWithPost(conn, testURL)
}

func processYaml(gzippedYaml []byte) (*x509.CertPool, map[string]*Provider, error) {
r, gzipErr := gzip.NewReader(bytes.NewReader(gzippedYaml))
if gzipErr != nil {
slog.Error("Failed to create gzip reader", "error", gzipErr)
// Wrap the error
return nil, nil, fmt.Errorf("failed to create gzip reader: %w", gzipErr)
}
yml, err := io.ReadAll(r)
if err != nil {
slog.Error("Failed to read gzipped file", "error", err)
return nil, nil, fmt.Errorf("failed to read gzipped file: %w", err)
}
path, err := yaml.PathString("$.providers")
if err != nil {
slog.Error("Failed to create providers path", "error", err)
return nil, nil, fmt.Errorf("failed to create providers path: %w", err)
}
providers := make(map[string]*Provider)
pathErr := path.Read(bytes.NewReader(yml), &providers)
if pathErr != nil {
slog.Error("Failed to read providers", "error", pathErr)
return nil, nil, fmt.Errorf("failed to read providers: %w", pathErr)
}

trustedCAsPath, err := yaml.PathString("$.trustedcas")
if err != nil {
slog.Error("Failed to create trusted CA path", "error", err)
return nil, nil, fmt.Errorf("failed to create trusted CA path: %w", err)
}
var trustedCAs []*CA
trustedCAsErr := trustedCAsPath.Read(bytes.NewReader(yml), &trustedCAs)
if trustedCAsErr != nil {
slog.Error("Failed to read trusted CAs", "error", trustedCAsErr)
return nil, nil, fmt.Errorf("failed to read trusted CAs: %w", trustedCAsErr)
}
pool := x509.NewCertPool()
for _, ca := range trustedCAs {
pool.AppendCertsFromPEM([]byte(ca.Cert))
}
return pool, providers, nil
}
Loading

0 comments on commit 4ed3a2a

Please sign in to comment.