diff --git a/src/_zkapauthorizer/recover.py b/src/_zkapauthorizer/recover.py index 46d9df77a..a85da1958 100644 --- a/src/_zkapauthorizer/recover.py +++ b/src/_zkapauthorizer/recover.py @@ -119,7 +119,7 @@ async def recover( :param cursor: A database cursor which can be used to populate the database with recovered state. """ - if self._state.stage != RecoveryStages.inactive: + if self._state.stage not in {RecoveryStages.inactive, RecoveryStages.download_failed, RecoveryStages.import_failed}: return self._set_state(RecoveryState(stage=RecoveryStages.started)) diff --git a/src/_zkapauthorizer/resource.py b/src/_zkapauthorizer/resource.py index 4052b0618..9adbeb659 100644 --- a/src/_zkapauthorizer/resource.py +++ b/src/_zkapauthorizer/resource.py @@ -327,9 +327,14 @@ def err(f): ) ) disconnect_clients() + self.recovering_d = None def happy(_): + # note that the StatefulRecoverer eats download / + # import errors so we'll exit here sometimes even + # though an error message has been sent... disconnect_clients() + self.recovering_d = None self.recovering_d.addCallbacks(happy, err) diff --git a/src/_zkapauthorizer/tests/test_client_resource.py b/src/_zkapauthorizer/tests/test_client_resource.py index 3dd86d184..0bcfc4c28 100644 --- a/src/_zkapauthorizer/tests/test_client_resource.py +++ b/src/_zkapauthorizer/tests/test_client_resource.py @@ -987,6 +987,114 @@ async def recover(): ), ) + @given( + tahoe_configs(), + api_auth_tokens(), + ) + def test_recover_retry(self, get_config, api_auth_token): + """ + If at first our download fails, we can still retry using the API. + """ + downloads = [] + fails = [RuntimeError("downloader fails")] + + def get_sometimes_fail_downloader(cap): + async def do_download(set_state): + nonlocal downloads, fails + if fails: + raise fails.pop(0) + downloads.append(set_state) + return ( + # this data is CBOR for {"version": 1, "statements": []} + lambda: BytesIO(b"\xa2gversion\x01jstatements\x80"), + [], # no event-streams + ) + + return do_download + + clock = MemoryReactorClockResolver() + store = self.useFixture(TemporaryVoucherStore(aware_now, get_config)).store + recoverer = StatefulRecoverer() + factory = RecoverFactory(store, get_sometimes_fail_downloader, recoverer) + pumper = create_pumper() + self.addCleanup(pumper.stop) + + def create_proto(): + addr = IPv4Address("TCP", "127.0.0.1", "0") + proto = factory.buildProtocol(addr) + return proto + + agent = create_memory_agent(clock, pumper, create_proto) + pumper.start() + + async def recover(): + proto = await agent.open( + "ws://127.0.0.1:1/storage-plugins/privatestorageio-zkapauthz-v2/recover", + {"headers": {"Authorization": f"tahoe-lafs {api_auth_token}"}}, + ) + updates = [] + proto.on("message", lambda *args, **kw: updates.append((args, kw))) + await proto.is_open + proto.sendMessage( + json.dumps({"recovery-capability": self.GOOD_CAPABILITY}).encode("utf8") + ) + await proto.is_closed + return updates + + # first recovery will fails + d0 = Deferred.fromCoroutine(recover()) + pumper._flush() + + # try again + d1 = Deferred.fromCoroutine(recover()) + pumper._flush() + + self.assertThat( + d0, + succeeded( + AfterPreprocessing( + lambda messages: list( + loads(args[0]) for (args, kwargs) in messages + ), + Equals([ + { + "stage": "started", + "failure-reason": None, + }, + # "our" downloader (above) doesn't set any downloading etc + # state-updates + { + "stage": "download_failed", + "failure-reason": "downloader fails", + }, + ]), + ) + ), + ) + # second attempt should have succeeded + self.assertThat( + d1, + succeeded( + AfterPreprocessing( + lambda messages: list( + loads(args[0]) for (args, kwargs) in messages + ), + Equals([ + { + "stage": "started", + "failure-reason": None, + }, + # "our" downloader (above) doesn't set any downloading etc + # state-updates + { + "stage": "succeeded", + "failure-reason": None, + }, + ]), + ) + ), + ) + def maybe_extra_tokens(): """