From aa91964c2e46033a767b472f773381ef709b7b20 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Sat, 21 Oct 2023 16:18:02 +0200 Subject: [PATCH] refactor spending conditions and add comments --- cashu/core/p2pk.py | 5 +- cashu/core/secret.py | 3 +- cashu/mint/conditions.py | 426 +++++++++++++++++++++++---------------- 3 files changed, 257 insertions(+), 177 deletions(-) diff --git a/cashu/core/p2pk.py b/cashu/core/p2pk.py index 181c784e..a39ff139 100644 --- a/cashu/core/p2pk.py +++ b/cashu/core/p2pk.py @@ -24,7 +24,10 @@ def from_secret(cls, secret: Secret): return cls(**secret.dict(exclude={"tags"}), tags=secret.tags) def get_p2pk_pubkey_from_secret(self) -> List[str]: - """Gets the P2PK pubkey from a Secret depending on the locktime + """Gets the P2PK pubkey from a Secret depending on the locktime. + + If locktime is passed, only the refund pubkeys are returned. + Else, the pubkeys in the data field and in the 'pubkeys' tag are returned. Args: secret (Secret): P2PK Secret in ecash token diff --git a/cashu/core/secret.py b/cashu/core/secret.py index 070b09e0..663f2be5 100644 --- a/cashu/core/secret.py +++ b/cashu/core/secret.py @@ -1,4 +1,5 @@ import json +from enum import Enum from typing import Any, Dict, List, Optional, Union from loguru import logger @@ -7,7 +8,7 @@ from .crypto.secp import PrivateKey -class SecretKind: +class SecretKind(Enum): P2PK = "P2PK" HTLC = "HTLC" diff --git a/cashu/mint/conditions.py b/cashu/mint/conditions.py index 312b1e1f..bcdd7aef 100644 --- a/cashu/mint/conditions.py +++ b/cashu/mint/conditions.py @@ -19,137 +19,140 @@ class LedgerSpendingConditions: - def _verify_input_spending_conditions(self, proof: Proof) -> bool: + def _verify_p2pk_spending_conditions(self, proof: Proof, secret: Secret) -> bool: """ - Verify spending conditions: - Condition: P2PK - Witness: proof.p2pksigs - Condition: HTLC - Witness: proof.htlcpreimage, proof.htlcsignature + Verify P2PK spending condition for a single input. + + We return True: + - if the secret is not a P2PKSecret spending condition + - if the locktime has passed and no refund pubkey is present + + We raise an exception: + - if the pubkeys in the secret are not unique + - if no signatures are present + - if the signatures are not unique + - if n_sigs is not positive + - if n_sigs is larger than the number of provided signatures + - if no valid signatures are present + - if the signature threshold is not met """ + if secret.kind != SecretKind.P2PK: + # not a P2PK secret + return True - try: - secret = Secret.deserialize(proof.secret) - logger.trace(f"proof.secret: {proof.secret}") - logger.trace(f"secret: {secret}") - except Exception: - # secret is not a spending condition so we treat is a normal secret + p2pk_secret = P2PKSecret.from_secret(secret) + + # extract pubkeys that we require signatures from depending on whether the + # locktime has passed (refund) or not (pubkeys in secret.data and in tags) + # This is implemented in get_p2pk_pubkey_from_secret() + pubkeys = p2pk_secret.get_p2pk_pubkey_from_secret() + # we will get an empty list if the locktime has passed and no refund pubkey is present + if not pubkeys: return True - # P2PK - if secret.kind == SecretKind.P2PK: - p2pk_secret = P2PKSecret.from_secret(secret) - # check if locktime is in the past - pubkeys = p2pk_secret.get_p2pk_pubkey_from_secret() - assert len(set(pubkeys)) == len(pubkeys), "pubkeys must be unique." - logger.trace(f"pubkeys: {pubkeys}") - # we will get an empty list if the locktime has passed and no refund pubkey is present - if not pubkeys: - return True - - # now we check the signature - if not proof.p2pksigs: - # no signature present although secret indicates one - logger.error(f"no p2pk signatures in proof: {proof.p2pksigs}") - raise TransactionError("no p2pk signatures in proof.") - - # we make sure that there are no duplicate signatures - if len(set(proof.p2pksigs)) != len(proof.p2pksigs): - raise TransactionError("p2pk signatures must be unique.") - - # we parse the secret as a P2PK commitment - # assert len(proof.secret.split(":")) == 5, "p2pk secret format invalid." - - # INPUTS: check signatures proof.p2pksigs against pubkey - # we expect the signature to be on the pubkey (=message) itself - n_sigs_required = p2pk_secret.n_sigs or 1 - assert n_sigs_required > 0, "n_sigs must be positive." + assert len(set(pubkeys)) == len(pubkeys), "pubkeys must be unique." + logger.trace(f"pubkeys: {pubkeys}") - # check if enough signatures are present - assert len(proof.p2pksigs) >= n_sigs_required, ( - f"not enough signatures provided: {len(proof.p2pksigs)} <" - f" {n_sigs_required}." - ) + # verify that signatures are present + if not proof.p2pksigs: + # no signature present although secret indicates one + logger.error(f"no p2pk signatures in proof: {proof.p2pksigs}") + raise TransactionError("no p2pk signatures in proof.") - n_valid_sigs_per_output = 0 - # loop over all signatures in output - for input_sig in proof.p2pksigs: - for pubkey in pubkeys: - logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.") - logger.trace(f"Message: {p2pk_secret.serialize().encode('utf-8')}") - if verify_p2pk_signature( - message=p2pk_secret.serialize().encode("utf-8"), - pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), - signature=bytes.fromhex(input_sig), - ): - n_valid_sigs_per_output += 1 - logger.trace( - f"p2pk signature on input is valid: {input_sig} on" - f" {pubkey}." - ) - continue - else: - logger.trace( - f"p2pk signature on input is invalid: {input_sig} on" - f" {pubkey}." - ) - # check if we have enough valid signatures - assert n_valid_sigs_per_output, "no valid signature provided for input." - assert n_valid_sigs_per_output >= n_sigs_required, ( - f"signature threshold not met. {n_valid_sigs_per_output} <" - f" {n_sigs_required}." - ) - logger.trace( - f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures" - " found." - ) + # we make sure that there are no duplicate signatures + if len(set(proof.p2pksigs)) != len(proof.p2pksigs): + raise TransactionError("p2pk signatures must be unique.") - logger.trace(proof.p2pksigs) - logger.trace("p2pk signature on inputs is valid.") + # we parse the secret as a P2PK commitment + # assert len(proof.secret.split(":")) == 5, "p2pk secret format invalid." - return True + # INPUTS: check signatures proof.p2pksigs against pubkey + # we expect the signature to be on the pubkey (=message) itself + n_sigs_required = p2pk_secret.n_sigs or 1 + assert n_sigs_required > 0, "n_sigs must be positive." - # HTLC - if secret.kind == SecretKind.HTLC: - htlc_secret = HTLCSecret.from_secret(secret) - # time lock - # check if locktime is in the past - if htlc_secret.locktime and htlc_secret.locktime < time.time(): - refund_pubkeys = htlc_secret.tags.get_tag_all("refund") - if refund_pubkeys: - assert proof.witness, TransactionError("no HTLC refund signature.") - signature = HTLCWitness.from_witness(proof.witness).signature - assert signature, TransactionError( - "no HTLC refund signature provided" + # check if enough signatures are present + assert ( + len(proof.p2pksigs) >= n_sigs_required + ), f"not enough signatures provided: {len(proof.p2pksigs)} < {n_sigs_required}." + + n_valid_sigs_per_output = 0 + # loop over all signatures in output + for input_sig in proof.p2pksigs: + for pubkey in pubkeys: + logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.") + logger.trace(f"Message: {p2pk_secret.serialize().encode('utf-8')}") + if verify_p2pk_signature( + message=p2pk_secret.serialize().encode("utf-8"), + pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), + signature=bytes.fromhex(input_sig), + ): + n_valid_sigs_per_output += 1 + logger.trace( + f"p2pk signature on input is valid: {input_sig} on {pubkey}." ) - for pubkey in refund_pubkeys: - if verify_p2pk_signature( - message=htlc_secret.serialize().encode("utf-8"), - pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), - signature=bytes.fromhex(signature), - ): - # a signature matches - return True - raise TransactionError("HTLC refund signatures did not match.") - # no pubkeys given in secret, anyone can spend - return True - - # hash lock - assert proof.htlcpreimage, TransactionError("no HTLC preimage provided") - - # first we check whether a correct preimage was included - if not hashlib.sha256( - bytes.fromhex(proof.htlcpreimage) - ).digest() == bytes.fromhex(htlc_secret.data): - raise TransactionError("HTLC preimage does not match.") - - # then we check whether a signature is required - hashlock_pubkeys = htlc_secret.tags.get_tag_all("pubkeys") - if hashlock_pubkeys: - assert proof.witness, TransactionError("no HTLC hash lock signature.") + + # check if we have enough valid signatures + assert n_valid_sigs_per_output, "no valid signature provided for input." + assert n_valid_sigs_per_output >= n_sigs_required, ( + f"signature threshold not met. {n_valid_sigs_per_output} <" + f" {n_sigs_required}." + ) + + logger.trace( + f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures found." + ) + logger.trace(proof.p2pksigs) + logger.trace("p2pk signature on inputs is valid.") + + return True + + def _verify_htlc_spending_conditions(self, proof: Proof, secret: Secret) -> bool: + """ + Verify HTLC spending condition for a single input. + + We return True: + - if the secret is not a HTLCSecret spending condition + + We first verify the time lock. If the locktime has passed, we require + a valid signature if a 'refund' pubkey is present. If it isn't present, + anyone can spend. + + We return True: + - if 'refund' pubkeys are present and a valid signature is provided for one of them + We raise an exception: + - if 'refund' but no valid signature is present + + + We then verify the hash lock. We require a valid preimage. We require a valid + signature if 'pubkeys' are present. If they aren't present, anyone who provides + a valid preimage can spend. + + We raise an exception: + - if no preimage is provided + - if preimage does not match the hash lock in the secret + + We return True: + - if 'pubkeys' are present and a valid signature is provided for one of them + + We raise an exception: + - if 'pubkeys' are present but no valid signature is provided + """ + + if secret.kind != SecretKind.HTLC: + # not a P2PK secret + return True + htlc_secret = HTLCSecret.from_secret(secret) + + # time lock + # check if locktime is in the past + if htlc_secret.locktime and htlc_secret.locktime < time.time(): + refund_pubkeys = htlc_secret.tags.get_tag_all("refund") + if refund_pubkeys: + assert proof.witness, TransactionError("no HTLC refund signature.") signature = HTLCWitness.from_witness(proof.witness).signature - assert signature, TransactionError( - "HTLC no hash lock signatures provided." - ) - for pubkey in hashlock_pubkeys: + assert signature, TransactionError("no HTLC refund signature provided") + for pubkey in refund_pubkeys: if verify_p2pk_signature( message=htlc_secret.serialize().encode("utf-8"), pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), @@ -157,99 +160,172 @@ def _verify_input_spending_conditions(self, proof: Proof) -> bool: ): # a signature matches return True - # none of the pubkeys had a match - raise TransactionError("HTLC hash lock signatures did not match.") - # no pubkeys were included, anyone can spend + raise TransactionError("HTLC refund signatures did not match.") + # no pubkeys given in secret, anyone can spend + return True + + # hash lock + assert proof.htlcpreimage, TransactionError("no HTLC preimage provided") + + # first we check whether a correct preimage was included + if not hashlib.sha256( + bytes.fromhex(proof.htlcpreimage) + ).digest() == bytes.fromhex(htlc_secret.data): + raise TransactionError("HTLC preimage does not match.") + + # then we check whether a signature is required + hashlock_pubkeys = htlc_secret.tags.get_tag_all("pubkeys") + if hashlock_pubkeys: + assert proof.witness, TransactionError("no HTLC hash lock signature.") + signature = HTLCWitness.from_witness(proof.witness).signature + assert signature, TransactionError("HTLC no hash lock signatures provided.") + for pubkey in hashlock_pubkeys: + if verify_p2pk_signature( + message=htlc_secret.serialize().encode("utf-8"), + pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), + signature=bytes.fromhex(signature), + ): + # a signature matches + return True + # none of the pubkeys had a match + raise TransactionError("HTLC hash lock signatures did not match.") + # no pubkeys were included, anyone can spend + return True + + def _verify_input_spending_conditions(self, proof: Proof) -> bool: + """ + Verify spending conditions: + Condition: P2PK - Checks if signature in proof.witness is valid for pubkey in proof.secret + Condition: HTLC - Checks if preimage in proof.witness is valid for hash in proof.secret + """ + + try: + secret = Secret.deserialize(proof.secret) + logger.trace(f"proof.secret: {proof.secret}") + logger.trace(f"secret: {secret}") + except Exception: + # secret is not a spending condition so we treat is a normal secret return True + # P2PK + if secret.kind == SecretKind.P2PK: + return self._verify_p2pk_spending_conditions(proof, secret) + + # HTLC + if secret.kind == SecretKind.HTLC: + return self._verify_htlc_spending_conditions(proof, secret) + # no spending condition present return True - def _verify_output_spending_conditions( + # ------ output spending conditions ------ + + def _verify_output_p2pk_spending_conditions( self, proofs: List[Proof], outputs: List[BlindedMessage] ) -> bool: """ - Verify spending conditions: - Condition: P2PK - Witness: output.p2pksigs + If sigflag==SIG_ALL in proof.secret, check if outputs + contain valid signatures for pubkeys in proof.secret. + + We return True + - if not all proof.secret are Secret spending condition + - if not all secrets are P2PKSecret spending condition + - if not all signature.sigflag are SIG_ALL + + We raise an exception: + - if not all pubkeys in all secrets are the same + - if not all n_sigs in all secrets are the same + - if not all signatures in all outputs are unique + - if not all signatures in all outputs are valid + - if no valid signatures are present + - if the signature threshold is not met + + We return True if we successfully validated the spending condition. """ - # P2PK - pubkeys_per_proof = [] - n_sigs = [] - for proof in proofs: - try: - secret = P2PKSecret.deserialize(proof.secret) - # get all p2pk pubkeys from secrets - pubkeys_per_proof.append(secret.get_p2pk_pubkey_from_secret()) - # get signature threshold from secrets - n_sigs.append(secret.n_sigs) - except Exception: - # secret is not a spending condition so we treat is a normal secret - return True - # for all proofs all pubkeys must be the same + try: + secrets_generic = [Secret.deserialize(p.secret) for p in proofs] + p2pk_secrets = [ + P2PKSecret.from_secret(secret) for secret in secrets_generic + ] + except Exception: + # secret is not a spending condition so we treat is a normal secret + return True + + # check if all secrets are P2PK + # NOTE: This is redundant, because P2PKSecret.from_secret() already checks for the kind + # Leaving it in for explicitness + if not all([secret.kind == SecretKind.P2PK for secret in p2pk_secrets]): + # not all secrets are P2PK + return True + + # check if all secrets are sigflag==SIG_ALL + if not all([secret.sigflag == SigFlags.SIG_ALL for secret in p2pk_secrets]): + # not all secrets have sigflag==SIG_ALL + return True + + # extract all pubkeys and n_sigs from secrets + pubkeys_per_proof = [ + secret.get_p2pk_pubkey_from_secret() for secret in p2pk_secrets + ] + n_sigs_per_proof = [secret.n_sigs for secret in p2pk_secrets] + + # all pubkeys and n_sigs must be the same assert ( len(set([tuple(pubs_output) for pubs_output in pubkeys_per_proof])) == 1 ), "pubkeys in all proofs must match." - pubkeys = pubkeys_per_proof[0] - if not pubkeys: - # no pubkeys present - return True + assert len(set(n_sigs_per_proof)) == 1, "n_sigs in all proofs must match." - logger.trace(f"pubkeys: {pubkeys}") # TODO: add limit for maximum number of pubkeys - # for all proofs all n_sigs must be the same - assert len(set(n_sigs)) == 1, "n_sigs in all proofs must match." - n_sigs_required = n_sigs[0] or 1 + # validation successful - # first we check if all secrets are P2PK - if not all( - [Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs] - ): - # not all secrets are P2PK - return True + pubkeys: List[str] = pubkeys_per_proof[0] + # if n_sigs is None, we set it to 1 + n_sigs: int = n_sigs_per_proof[0] or 1 - # now we check if any of the secrets has sigflag==SIG_ALL - if not any( - [ - P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL - for p in proofs - ] - ): - # no secret has sigflag==SIG_ALL - return True + logger.trace(f"pubkeys: {pubkeys}") # loop over all outputs and check if the signatures are valid for pubkeys with a threshold of n_sig for output in outputs: # we expect the signature to be on the pubkey (=message) itself - assert output.p2pksigs, "no signatures in output." + p2pksigs = output.p2pksigs + assert p2pksigs, "no signatures in output." # TODO: add limit for maximum number of signatures # we check whether any signature is duplicate - assert len(set(output.p2pksigs)) == len( - output.p2pksigs + assert len(set(p2pksigs)) == len( + p2pksigs ), "duplicate signatures in output." n_valid_sigs_per_output = 0 # loop over all signatures in output - for output_sig in output.p2pksigs: + for sig in p2pksigs: for pubkey in pubkeys: if verify_p2pk_signature( message=output.B_.encode("utf-8"), pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), - signature=bytes.fromhex(output_sig), + signature=bytes.fromhex(sig), ): n_valid_sigs_per_output += 1 assert n_valid_sigs_per_output, "no valid signature provided for output." - assert n_valid_sigs_per_output >= n_sigs_required, ( - f"signature threshold not met. {n_valid_sigs_per_output} <" - f" {n_sigs_required}." - ) + assert ( + n_valid_sigs_per_output >= n_sigs + ), f"signature threshold not met. {n_valid_sigs_per_output} < {n_sigs}." + logger.trace( - f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures" - " found." + f"{n_valid_sigs_per_output} of {n_sigs} valid signatures found." ) - logger.trace(output.p2pksigs) + logger.trace(p2pksigs) logger.trace("p2pk signatures on output is valid.") - return True + + def _verify_output_spending_conditions( + self, proofs: List[Proof], outputs: List[BlindedMessage] + ) -> bool: + """ + Verify spending conditions: + Condition: P2PK - If sigflag==SIG_ALL in proof.secret, check if outputs contain valid signatures for pubkeys in proof.secret. + """ + + return self._verify_output_p2pk_spending_conditions(proofs, outputs)