Skip to content

Commit

Permalink
Add ability to re-try a recovery (if it has previously failed)
Browse files Browse the repository at this point in the history
  • Loading branch information
meejah committed Jun 4, 2022
1 parent 943ef87 commit f219e31
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/_zkapauthorizer/recover.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def recover(
:param cursor: A database cursor which can be used to populate the
database with recovered state.
"""
if self._state.stage != RecoveryStages.inactive:
if self._state.stage not in {RecoveryStages.inactive, RecoveryStages.download_failed, RecoveryStages.import_failed}:
return

self._set_state(RecoveryState(stage=RecoveryStages.started))
Expand Down
5 changes: 5 additions & 0 deletions src/_zkapauthorizer/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,14 @@ def err(f):
)
)
disconnect_clients()
self.recovering_d = None

def happy(_):
# note that the StatefulRecoverer eats download /
# import errors so we'll exit here sometimes even
# though an error message has been sent...
disconnect_clients()
self.recovering_d = None

self.recovering_d.addCallbacks(happy, err)

Expand Down
108 changes: 108 additions & 0 deletions src/_zkapauthorizer/tests/test_client_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,114 @@ async def recover():
),
)

@given(
tahoe_configs(),
api_auth_tokens(),
)
def test_recover_retry(self, get_config, api_auth_token):
"""
If at first our download fails, we can still retry using the API.
"""
downloads = []
fails = [RuntimeError("downloader fails")]

def get_sometimes_fail_downloader(cap):
async def do_download(set_state):
nonlocal downloads, fails
if fails:
raise fails.pop(0)
downloads.append(set_state)
return (
# this data is CBOR for {"version": 1, "statements": []}
lambda: BytesIO(b"\xa2gversion\x01jstatements\x80"),
[], # no event-streams
)

return do_download

clock = MemoryReactorClockResolver()
store = self.useFixture(TemporaryVoucherStore(aware_now, get_config)).store
recoverer = StatefulRecoverer()
factory = RecoverFactory(store, get_sometimes_fail_downloader, recoverer)
pumper = create_pumper()
self.addCleanup(pumper.stop)

def create_proto():
addr = IPv4Address("TCP", "127.0.0.1", "0")
proto = factory.buildProtocol(addr)
return proto

agent = create_memory_agent(clock, pumper, create_proto)
pumper.start()

async def recover():
proto = await agent.open(
"ws://127.0.0.1:1/storage-plugins/privatestorageio-zkapauthz-v2/recover",
{"headers": {"Authorization": f"tahoe-lafs {api_auth_token}"}},
)
updates = []
proto.on("message", lambda *args, **kw: updates.append((args, kw)))
await proto.is_open
proto.sendMessage(
json.dumps({"recovery-capability": self.GOOD_CAPABILITY}).encode("utf8")
)
await proto.is_closed
return updates

# first recovery will fails
d0 = Deferred.fromCoroutine(recover())
pumper._flush()

# try again
d1 = Deferred.fromCoroutine(recover())
pumper._flush()

self.assertThat(
d0,
succeeded(
AfterPreprocessing(
lambda messages: list(
loads(args[0]) for (args, kwargs) in messages
),
Equals([
{
"stage": "started",
"failure-reason": None,
},
# "our" downloader (above) doesn't set any downloading etc
# state-updates
{
"stage": "download_failed",
"failure-reason": "downloader fails",
},
]),
)
),
)
# second attempt should have succeeded
self.assertThat(
d1,
succeeded(
AfterPreprocessing(
lambda messages: list(
loads(args[0]) for (args, kwargs) in messages
),
Equals([
{
"stage": "started",
"failure-reason": None,
},
# "our" downloader (above) doesn't set any downloading etc
# state-updates
{
"stage": "succeeded",
"failure-reason": None,
},
]),
)
),
)


def maybe_extra_tokens():
"""
Expand Down

0 comments on commit f219e31

Please sign in to comment.