From c4e714296e23d2430d09ac2dacab9c5713ea120c Mon Sep 17 00:00:00 2001 From: Cheuk <90270663+cheukt@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:54:34 -0400 Subject: [PATCH] RSDK-8598 - Replace cache after sync (#4343) --- config/config.go | 30 +++++++++++++++++++++++ config/reader.go | 22 ++--------------- config/reader_test.go | 51 ++++++++++++++++++++++++++++++--------- config/watcher_test.go | 13 ++++++++++ robot/impl/local_robot.go | 16 ++++++++++-- 5 files changed, 98 insertions(+), 34 deletions(-) diff --git a/config/config.go b/config/config.go index 476ba07de64..ca708883684 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( + "bytes" "crypto/tls" "encoding/json" "fmt" @@ -15,6 +16,7 @@ import ( "time" "github.com/pkg/errors" + "go.viam.com/utils/artifact" "go.viam.com/utils/jwks" "go.viam.com/utils/pexec" "go.viam.com/utils/rpc" @@ -68,6 +70,11 @@ type Config struct { // Revision contains the current revision of the config. Revision string + + // toCache stores the JSON marshalled version of the config to be cached. It should be a copy of + // the config pulled from cloud with minor changes. + // This version is kept because the config is changed as it moves through the system. + toCache []byte } // NOTE: This data must be maintained with what is in Config. @@ -238,6 +245,29 @@ func (c Config) FindComponent(name string) *resource.Config { return nil } +// SetToCache sets toCache with a marshalled copy of the config passed in. +func (c *Config) SetToCache(cfg *Config) error { + md, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return err + } + c.toCache = md + return nil +} + +// StoreToCache caches the toCache. +func (c *Config) StoreToCache() error { + if c.toCache == nil { + return errors.New("no unprocessed config to cache") + } + if err := os.MkdirAll(ViamDotDir, 0o700); err != nil { + return err + } + reader := bytes.NewReader(c.toCache) + path := getCloudCacheFilePath(c.Cloud.ID) + return artifact.AtomicStore(path, reader, c.Cloud.ID) +} + // UnmarshalJSON unmarshals JSON into the config and adjusts some // names if they are not fully filled in. func (c *Config) UnmarshalJSON(data []byte) error { diff --git a/config/reader.go b/config/reader.go index d0cedc7eeae..b0452a40b7a 100644 --- a/config/reader.go +++ b/config/reader.go @@ -17,7 +17,6 @@ import ( "github.com/pkg/errors" apppb "go.viam.com/api/app/v1" "go.viam.com/utils" - "go.viam.com/utils/artifact" "go.viam.com/utils/rpc" "golang.org/x/sys/cpu" @@ -114,22 +113,6 @@ func readFromCache(id string) (*Config, error) { return unprocessedConfig, nil } -func storeToCache(id string, cfg *Config) error { - if err := os.MkdirAll(ViamDotDir, 0o700); err != nil { - return err - } - - md, err := json.MarshalIndent(cfg, "", " ") - if err != nil { - return err - } - reader := bytes.NewReader(md) - - path := getCloudCacheFilePath(id) - - return artifact.AtomicStore(path, reader, id) -} - func clearCache(id string) { utils.UncheckedErrorFunc(func() error { return os.Remove(getCloudCacheFilePath(id)) @@ -318,10 +301,9 @@ func readFromCloud( unprocessedConfig.Cloud.TLSCertificate = tls.certificate unprocessedConfig.Cloud.TLSPrivateKey = tls.privateKey - if err := storeToCache(cloudCfg.ID, unprocessedConfig); err != nil { - logger.Errorw("failed to cache config", "error", err) + if err := cfg.SetToCache(unprocessedConfig); err != nil { + logger.Errorw("failed to set toCache on config", "error", err) } - return cfg, nil } diff --git a/config/reader_test.go b/config/reader_test.go index 495bda36e17..570d7fcb5d8 100644 --- a/config/reader_test.go +++ b/config/reader_test.go @@ -2,7 +2,6 @@ package config import ( "context" - "errors" "fmt" "io/fs" "os" @@ -11,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/pkg/errors" pb "go.viam.com/api/app/v1" "go.viam.com/test" @@ -69,7 +69,6 @@ func TestFromReader(t *testing.T) { appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String()) cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret) gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger) - defer clearCache(robotPartID) test.That(t, err, test.ShouldBeNil) expectedCloud := *cloudResponse @@ -79,6 +78,8 @@ func TestFromReader(t *testing.T) { expectedCloud.RefreshInterval = time.Duration(10000000000) test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud) + test.That(t, gotCfg.StoreToCache(), test.ShouldBeNil) + defer clearCache(robotPartID) cachedCfg, err := readFromCache(robotPartID) test.That(t, err, test.ShouldBeNil) expectedCloud.AppAddress = "" @@ -102,7 +103,10 @@ func TestFromReader(t *testing.T) { MachineID: "the-machine", } cachedConf := &Config{Cloud: cachedCloud} - err := storeToCache(robotPartID, cachedConf) + + cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}} + cfgToCache.SetToCache(cachedConf) + err := cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) defer clearCache(robotPartID) @@ -153,7 +157,6 @@ func TestFromReader(t *testing.T) { appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String()) cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret) gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger) - defer clearCache(robotPartID) test.That(t, err, test.ShouldBeNil) expectedCloud := *cloudResponse @@ -161,6 +164,9 @@ func TestFromReader(t *testing.T) { expectedCloud.RefreshInterval = time.Duration(10000000000) test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud) + err = gotCfg.StoreToCache() + defer clearCache(robotPartID) + test.That(t, err, test.ShouldBeNil) cachedCfg, err := readFromCache(robotPartID) test.That(t, err, test.ShouldBeNil) expectedCloud.AppAddress = "" @@ -191,13 +197,20 @@ func TestStoreToCache(t *testing.T) { } cfg.Cloud = cloud - // store our config to the cloud - err = storeToCache(cfg.Cloud.ID, cfg) + // errors if no unprocessed config to cache + cfgToCache := &Config{Cloud: &Cloud{ID: "forCachingTest"}} + err = cfgToCache.StoreToCache() + test.That(t, err.Error(), test.ShouldContainSubstring, "no unprocessed config to cache") + + // store our config to the cache + cfgToCache.SetToCache(cfg) + err = cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) // read config from cloud, confirm consistency cloudCfg, err := readFromCloud(ctx, cfg, nil, true, false, logger) test.That(t, err, test.ShouldBeNil) + cloudCfg.toCache = nil test.That(t, cloudCfg, test.ShouldResemble, cfg) // Modify our config @@ -207,10 +220,12 @@ func TestStoreToCache(t *testing.T) { // read config from cloud again, confirm that the cached config differs from cfg cloudCfg2, err := readFromCloud(ctx, cfg, nil, true, false, logger) test.That(t, err, test.ShouldBeNil) - test.That(t, cloudCfg2, test.ShouldNotResemble, cfg) + cloudCfg2.toCache = nil + test.That(t, cloudCfg2, test.ShouldNotResemble, cfgToCache) // store the updated config to the cloud - err = storeToCache(cfg.Cloud.ID, cfg) + cfgToCache.SetToCache(cfg) + err = cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) test.That(t, cfg.Ensure(true, logger), test.ShouldBeNil) @@ -218,6 +233,7 @@ func TestStoreToCache(t *testing.T) { // read updated cloud config, confirm that it now matches our updated cfg cloudCfg3, err := readFromCloud(ctx, cfg, nil, true, false, logger) test.That(t, err, test.ShouldBeNil) + cloudCfg3.toCache = nil test.That(t, cloudCfg3, test.ShouldResemble, cfg) } @@ -304,7 +320,9 @@ func TestReadTLSFromCache(t *testing.T) { defer clearCache(robotPartID) cfg.Cloud = nil - err = storeToCache(robotPartID, cfg) + cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}} + cfgToCache.SetToCache(cfg) + err = cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) tls := tlsConfig{} @@ -315,11 +333,14 @@ func TestReadTLSFromCache(t *testing.T) { t.Run("invalid cached TLS", func(t *testing.T) { defer clearCache(robotPartID) cloud := &Cloud{ + ID: robotPartID, TLSPrivateKey: "key", } cfg.Cloud = cloud - err = storeToCache(robotPartID, cfg) + cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}} + cfgToCache.SetToCache(cfg) + err = cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) tls := tlsConfig{} @@ -333,12 +354,15 @@ func TestReadTLSFromCache(t *testing.T) { t.Run("invalid cached TLS but insecure signaling", func(t *testing.T) { defer clearCache(robotPartID) cloud := &Cloud{ + ID: robotPartID, TLSPrivateKey: "key", SignalingInsecure: true, } cfg.Cloud = cloud - err = storeToCache(robotPartID, cfg) + cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}} + cfgToCache.SetToCache(cfg) + err = cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) tls := tlsConfig{} @@ -352,12 +376,15 @@ func TestReadTLSFromCache(t *testing.T) { t.Run("valid cached TLS", func(t *testing.T) { defer clearCache(robotPartID) cloud := &Cloud{ + ID: robotPartID, TLSCertificate: "cert", TLSPrivateKey: "key", } cfg.Cloud = cloud - err = storeToCache(robotPartID, cfg) + cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}} + cfgToCache.SetToCache(cfg) + err = cfgToCache.StoreToCache() test.That(t, err, test.ShouldBeNil) // the config is missing several fields required to start the robot, but this diff --git a/config/watcher_test.go b/config/watcher_test.go index 3cb7df459a0..d1fcdf9c36c 100644 --- a/config/watcher_test.go +++ b/config/watcher_test.go @@ -259,6 +259,16 @@ func TestNewWatcherCloud(t *testing.T) { }}, } + unprocessedFromCfg := func(cfg config.Config) *config.Config { + // the unprocessed config uses the original config read from the cloud, + // and the cloud config is missing a few fields in the proto, meaning a few fields need to be cleared out. + unprocessed, err := cfg.CopyOnlyPublicFields() + test.That(t, err, test.ShouldBeNil) + unprocessed.Cloud.AppAddress = "" + unprocessed.Cloud.RefreshInterval = 10 * time.Second + return unprocessed + } + storeConfigInServer(confToReturn) watcher, err := config.NewWatcher(context.Background(), &config.Config{Cloud: newCloudConf()}, logger) @@ -268,6 +278,7 @@ func TestNewWatcherCloud(t *testing.T) { confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil) + confToExpect.SetToCache(unprocessedFromCfg(confToExpect)) newConf := <-watcher.Config() test.That(t, newConf, test.ShouldResemble, &confToExpect) @@ -305,6 +316,7 @@ func TestNewWatcherCloud(t *testing.T) { confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil) + confToExpect.SetToCache(unprocessedFromCfg(confToExpect)) newConf = <-watcher.Config() test.That(t, newConf, test.ShouldResemble, &confToExpect) @@ -356,6 +368,7 @@ func TestNewWatcherCloud(t *testing.T) { confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil) + confToExpect.SetToCache(unprocessedFromCfg(confToExpect)) newConf = <-watcher.Config() test.That(t, newConf, test.ShouldResemble, &confToExpect) diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index cbc1408006c..431aab4b6c2 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -1190,14 +1190,26 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config, // if anything has changed. err := r.packageManager.Sync(ctx, newConfig.Packages, newConfig.Modules) if err != nil { - allErrs = multierr.Combine(allErrs, err) + r.Logger().CErrorw(ctx, "reconfiguration aborted because cloud modules or packages download failed", "error", err) + return } // For local tarball modules, we create synthetic versions for package management. The `localRobot` keeps track of these because // config reader would overwrite if we just stored it in config. Here, we copy the synthetic version from the `localRobot` into the // appropriate `config.Module` object inside the `cfg.Modules` slice. Thus, when a local tarball module is reloaded, the viam-server // can unpack it into a fresh directory rather than reusing the previous one. r.applyLocalModuleVersions(newConfig) - allErrs = multierr.Combine(allErrs, r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules)) + err = r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules) + if err != nil { + r.Logger().CErrorw(ctx, "reconfiguration aborted because local modules or packages sync failed", "error", err) + return + } + + if newConfig.Cloud != nil { + r.Logger().CDebug(ctx, "updating cached config") + if err := newConfig.StoreToCache(); err != nil { + r.logger.CErrorw(ctx, "error storing the config", "error", err) + } + } // Add default services and process their dependencies. Dependencies may // already come from config validation so we check that here.