diff --git a/cashu/core/base.py b/cashu/core/base.py index 525b4b65..7978d962 100644 --- a/cashu/core/base.py +++ b/cashu/core/base.py @@ -1,8 +1,7 @@ import base64 import json -import time from sqlite3 import Row -from typing import Any, Dict, List, Optional, Union +from typing import Dict, List, Optional, Union from loguru import logger from pydantic import BaseModel @@ -10,150 +9,11 @@ from .crypto.keys import derive_keys, derive_keyset_id, derive_pubkeys from .crypto.secp import PrivateKey, PublicKey from .legacy import derive_keys_backwards_compatible_insecure_pre_0_12 - -# from .p2pk import sign_p2pk_sign +from .p2pk import P2SHScript # ------- PROOFS ------- -class SecretKind: - P2SH = "P2SH" - P2PK = "P2PK" - - -class SigFlags: - SIG_INPUTS = ( # require signatures only on the inputs (default signature flag) - "SIG_INPUTS" - ) - SIG_ALL = "SIG_ALL" # require signatures on inputs and outputs - - -class Tags(BaseModel): - """ - Tags are used to encode additional information in the Secret of a Proof. - """ - - __root__: List[List[str]] = [] - - def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs): - super().__init__(**kwargs) - self.__root__ = tags or [] - - def __setitem__(self, key: str, value: str) -> None: - self.__root__.append([key, value]) - - def __getitem__(self, key: str) -> Union[str, None]: - return self.get_tag(key) - - def get_tag(self, tag_name: str) -> Union[str, None]: - for tag in self.__root__: - if tag[0] == tag_name: - return tag[1] - return None - - def get_tag_all(self, tag_name: str) -> List[str]: - all_tags = [] - for tag in self.__root__: - if tag[0] == tag_name: - all_tags.append(tag[1]) - return all_tags - - -class Secret(BaseModel): - """Describes spending condition encoded in the secret field of a Proof.""" - - kind: str - data: str - nonce: Union[None, str] = None - tags: Union[None, Tags] = None - - def serialize(self) -> str: - data_dict: Dict[str, Any] = { - "data": self.data, - "nonce": self.nonce or PrivateKey().serialize()[:32], - } - if self.tags and self.tags.__root__: - logger.debug(f"Serializing tags: {self.tags.__root__}") - data_dict["tags"] = self.tags.__root__ - return json.dumps( - [self.kind, data_dict], - ) - - @classmethod - def deserialize(cls, from_proof: str): - kind, kwargs = json.loads(from_proof) - data = kwargs.pop("data") - nonce = kwargs.pop("nonce") - tags_list = kwargs.pop("tags", None) - if tags_list: - tags = Tags(tags=tags_list) - else: - tags = None - logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}") - return cls(kind=kind, data=data, nonce=nonce, tags=tags) - - @property - def locktime(self) -> Union[None, int]: - if self.tags: - locktime = self.tags.get_tag("locktime") - if locktime: - return int(locktime) - return None - - @property - def sigflag(self) -> Union[None, str]: - if self.tags: - sigflag = self.tags.get_tag("sigflag") - if sigflag: - return sigflag - return None - - @property - def n_sigs(self) -> Union[None, int]: - if self.tags: - n_sigs = self.tags.get_tag("n_sigs") - if n_sigs: - return int(n_sigs) - return None - - def get_p2pk_pubkey_from_secret(self) -> List[str]: - """Gets the P2PK pubkey from a Secret depending on the locktime - - Args: - secret (Secret): P2PK Secret in ecash token - - Returns: - str: pubkey to use for P2PK, empty string if anyone can spend (locktime passed) - """ - pubkeys: List[str] = [self.data] # for now we only support one pubkey - # get all additional pubkeys from tags for multisig - if self.tags and self.tags.get_tag("pubkey"): - pubkeys += self.tags.get_tag_all("pubkey") - - now = time.time() - if self.locktime and self.locktime < now: - logger.trace(f"p2pk locktime ran out ({self.locktime}<{now}).") - # check tags if a refund pubkey is present. - # If yes, we demand the signature to be from the refund pubkey - if self.tags: - refund_pubkey = self.tags.get_tag("refund") - if refund_pubkey: - pubkeys = [refund_pubkey] - return pubkeys - return [] - return pubkeys - - -class P2SHScript(BaseModel): - """ - Unlocks P2SH spending condition of a Proof - """ - - script: str - signature: str - address: Union[str, None] = None - - class Proof(BaseModel): """ Value token diff --git a/cashu/core/p2pk.py b/cashu/core/p2pk.py index e48fe030..0d97b345 100644 --- a/cashu/core/p2pk.py +++ b/cashu/core/p2pk.py @@ -1,8 +1,147 @@ import hashlib +import json +import time +from typing import Any, Dict, List, Optional, Union + +from loguru import logger +from pydantic import BaseModel from .crypto.secp import PrivateKey, PublicKey +class SecretKind: + P2SH = "P2SH" + P2PK = "P2PK" + + +class SigFlags: + SIG_INPUTS = ( # require signatures only on the inputs (default signature flag) + "SIG_INPUTS" + ) + SIG_ALL = "SIG_ALL" # require signatures on inputs and outputs + + +class Tags(BaseModel): + """ + Tags are used to encode additional information in the Secret of a Proof. + """ + + __root__: List[List[str]] = [] + + def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs): + super().__init__(**kwargs) + self.__root__ = tags or [] + + def __setitem__(self, key: str, value: str) -> None: + self.__root__.append([key, value]) + + def __getitem__(self, key: str) -> Union[str, None]: + return self.get_tag(key) + + def get_tag(self, tag_name: str) -> Union[str, None]: + for tag in self.__root__: + if tag[0] == tag_name: + return tag[1] + return None + + def get_tag_all(self, tag_name: str) -> List[str]: + all_tags = [] + for tag in self.__root__: + if tag[0] == tag_name: + for t in tag[1:]: + all_tags.append(t) + return all_tags + + +class Secret(BaseModel): + """Describes spending condition encoded in the secret field of a Proof.""" + + kind: str + data: str + tags: Tags + nonce: Union[None, str] = None + + def serialize(self) -> str: + data_dict: Dict[str, Any] = { + "data": self.data, + "nonce": self.nonce or PrivateKey().serialize()[:32], + } + if self.tags.__root__: + logger.debug(f"Serializing tags: {self.tags.__root__}") + data_dict["tags"] = self.tags.__root__ + return json.dumps( + [self.kind, data_dict], + ) + + @classmethod + def deserialize(cls, from_proof: str): + kind, kwargs = json.loads(from_proof) + data = kwargs.pop("data") + nonce = kwargs.pop("nonce") + tags_list: List = kwargs.pop("tags", None) + tags = Tags(tags=tags_list) + logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}") + return cls(kind=kind, data=data, nonce=nonce, tags=tags) + + +class P2PKSecret(Secret): + @classmethod + def from_secret(cls, secret: Secret): + assert secret.kind == SecretKind.P2PK, "Secret is not a P2PK secret" + # NOTE: exclude tags in .dict() because it doesn't deserialize it properly + # need to add it back in manually with tags=secret.tags + return cls(**secret.dict(exclude={"tags"}), tags=secret.tags) + + def get_p2pk_pubkey_from_secret(self) -> List[str]: + """Gets the P2PK pubkey from a Secret depending on the locktime + + Args: + secret (Secret): P2PK Secret in ecash token + + Returns: + str: pubkey to use for P2PK, empty string if anyone can spend (locktime passed) + """ + # the pubkey in the data field is the pubkey to use for P2PK + pubkeys: List[str] = [self.data] + + # get all additional pubkeys from tags for multisig + pubkeys += self.tags.get_tag_all("pubkeys") + + # check if locktime is passed and if so, only return refund pubkeys + now = time.time() + if self.locktime and self.locktime < now: + logger.trace(f"p2pk locktime ran out ({self.locktime}<{now}).") + # check tags if a refund pubkey is present. + # If yes, we demand the signature to be from the refund pubkey + return self.tags.get_tag_all("refund") + + return pubkeys + + @property + def locktime(self) -> Union[None, int]: + locktime = self.tags.get_tag("locktime") + return int(locktime) if locktime else None + + @property + def sigflag(self) -> Union[None, str]: + return self.tags.get_tag("sigflag") + + @property + def n_sigs(self) -> Union[None, int]: + n_sigs = self.tags.get_tag("n_sigs") + return int(n_sigs) if n_sigs else None + + +class P2SHScript(BaseModel): + """ + Unlocks P2SH spending condition of a Proof + """ + + script: str + signature: str + address: Union[str, None] = None + + def sign_p2pk_sign(message: bytes, private_key: PrivateKey): # ecdsa version # signature = private_key.ecdsa_serialize(private_key.ecdsa_sign(message)) diff --git a/cashu/mint/ledger.py b/cashu/mint/ledger.py index 8f972d58..d508029b 100644 --- a/cashu/mint/ledger.py +++ b/cashu/mint/ledger.py @@ -13,9 +13,6 @@ MintKeyset, MintKeysets, Proof, - Secret, - SecretKind, - SigFlags, ) from ..core.crypto import b_dhke from ..core.crypto.keys import derive_pubkey, random_hash @@ -33,7 +30,13 @@ TransactionError, ) from ..core.helpers import fee_reserve, sum_proofs -from ..core.p2pk import verify_p2pk_signature +from ..core.p2pk import ( + P2PKSecret, + Secret, + SecretKind, + SigFlags, + verify_p2pk_signature, +) from ..core.script import verify_bitcoin_script from ..core.settings import settings from ..core.split import amount_split @@ -254,12 +257,13 @@ def _verify_input_spending_conditions(self, proof: Proof) -> bool: # secret is not a spending condition so we treat is a normal secret return True if secret.kind == SecretKind.P2SH: + p2pk_secret = P2PKSecret.from_secret(secret) # check if locktime is in the past now = time.time() - if secret.locktime and secret.locktime < now: - logger.trace(f"p2sh locktime ran out ({secret.locktime}<{now}).") + if p2pk_secret.locktime and p2pk_secret.locktime < now: + logger.trace(f"p2sh locktime ran out ({p2pk_secret.locktime}<{now}).") return True - logger.trace(f"p2sh locktime still active ({secret.locktime}>{now}).") + logger.trace(f"p2sh locktime still active ({p2pk_secret.locktime}>{now}).") if ( proof.p2shscript is None @@ -284,8 +288,9 @@ def _verify_input_spending_conditions(self, proof: Proof) -> bool: # P2PK if secret.kind == SecretKind.P2PK: + p2pk_secret = P2PKSecret.from_secret(secret) # check if locktime is in the past - pubkeys = secret.get_p2pk_pubkey_from_secret() + pubkeys = p2pk_secret.get_p2pk_pubkey_from_secret() assert len(set(pubkeys)) == len(pubkeys), "pubkeys must be unique." logger.trace(f"pubkeys: {pubkeys}") # we will get an empty list if the locktime has passed and no refund pubkey is present @@ -307,7 +312,7 @@ def _verify_input_spending_conditions(self, proof: Proof) -> bool: # INPUTS: check signatures proof.p2pksigs against pubkey # we expect the signature to be on the pubkey (=message) itself - n_sigs_required = secret.n_sigs or 1 + n_sigs_required = p2pk_secret.n_sigs or 1 assert n_sigs_required > 0, "n_sigs must be positive." # check if enough signatures are present @@ -321,9 +326,9 @@ def _verify_input_spending_conditions(self, proof: Proof) -> bool: for input_sig in proof.p2pksigs: for pubkey in pubkeys: logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.") - logger.trace(f"Message: {secret.serialize().encode('utf-8')}") + logger.trace(f"Message: {p2pk_secret.serialize().encode('utf-8')}") if verify_p2pk_signature( - message=secret.serialize().encode("utf-8"), + message=p2pk_secret.serialize().encode("utf-8"), pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), signature=bytes.fromhex(input_sig), ): @@ -370,7 +375,7 @@ def _verify_output_spending_conditions( n_sigs = [] for proof in proofs: try: - secret = Secret.deserialize(proof.secret) + secret = P2PKSecret.deserialize(proof.secret) # get all p2pk pubkeys from secrets pubkeys_per_proof.append(secret.get_p2pk_pubkey_from_secret()) # get signature threshold from secrets @@ -403,7 +408,10 @@ def _verify_output_spending_conditions( # now we check if any of the secrets has sigflag==SIG_ALL if not any( - [Secret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL for p in proofs] + [ + P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL + for p in proofs + ] ): # no secret has sigflag==SIG_ALL return True @@ -798,7 +806,7 @@ async def _generate_change_promises( return_amounts_sorted = sorted(return_amounts, reverse=True) # we need to imprint these amounts into the blanket outputs for i in range(len(outputs)): - outputs[i].amount = return_amounts_sorted[i] + outputs[i].amount = return_amounts_sorted[i] # type: ignore if not self._verify_no_duplicate_outputs(outputs): raise TransactionError("duplicate promises.") return_promises = await self._generate_promises(outputs, keyset) diff --git a/cashu/mint/main.py b/cashu/mint/main.py index a279d93b..999863d4 100644 --- a/cashu/mint/main.py +++ b/cashu/mint/main.py @@ -47,7 +47,7 @@ def main( host=host, ssl_keyfile=ssl_keyfile, ssl_certfile=ssl_certfile, - **d, + **d, # type: ignore ) server = uvicorn.Server(config) server.run() diff --git a/cashu/wallet/p2pk.py b/cashu/wallet/p2pk.py new file mode 100644 index 00000000..ae95eb31 --- /dev/null +++ b/cashu/wallet/p2pk.py @@ -0,0 +1,262 @@ +import base64 +from datetime import datetime, timedelta +from typing import List, Optional + +from loguru import logger + +from ..core import bolt11 as bolt11 +from ..core.base import ( + BlindedMessage, + Proof, +) +from ..core.crypto.secp import PrivateKey +from ..core.db import Database +from ..core.p2pk import ( + P2PKSecret, + P2SHScript, + Secret, + SecretKind, + SigFlags, + Tags, + sign_p2pk_sign, +) +from ..core.script import ( + step0_carol_checksig_redeemscrip, + step0_carol_privkey, + step1_carol_create_p2sh_address, + step2_carol_sign_tx, +) +from ..wallet.crud import ( + get_unused_locks, + store_p2sh, +) +from .protocols import SupportsDb, SupportsPrivateKey + + +class WalletP2PK(SupportsPrivateKey, SupportsDb): + db: Database + private_key: Optional[PrivateKey] = None + # ---------- P2SH and P2PK ---------- + + async def create_p2sh_address_and_store(self) -> str: + """Creates a P2SH lock script and stores the script and signature in the database.""" + alice_privkey = step0_carol_privkey() + txin_redeemScript = step0_carol_checksig_redeemscrip(alice_privkey.pub) + txin_p2sh_address = step1_carol_create_p2sh_address(txin_redeemScript) + txin_signature = step2_carol_sign_tx(txin_redeemScript, alice_privkey).scriptSig + txin_redeemScript_b64 = base64.urlsafe_b64encode(txin_redeemScript).decode() + txin_signature_b64 = base64.urlsafe_b64encode(txin_signature).decode() + p2shScript = P2SHScript( + script=txin_redeemScript_b64, + signature=txin_signature_b64, + address=str(txin_p2sh_address), + ) + await store_p2sh(p2shScript, db=self.db) + assert p2shScript.address + return p2shScript.address + + async def create_p2pk_pubkey(self): + assert ( + self.private_key + ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" + public_key = self.private_key.pubkey + # logger.debug(f"Private key: {self.private_key.bech32()}") + assert public_key + return public_key.serialize().hex() + + async def create_p2pk_lock( + self, + pubkey: str, + locktime_seconds: Optional[int] = None, + tags: Optional[Tags] = None, + sig_all: bool = False, + n_sigs: int = 1, + ) -> P2PKSecret: + logger.debug(f"Provided tags: {tags}") + if not tags: + tags = Tags() + logger.debug(f"Before tags: {tags}") + if locktime_seconds: + tags["locktime"] = str( + int((datetime.now() + timedelta(seconds=locktime_seconds)).timestamp()) + ) + tags["sigflag"] = SigFlags.SIG_ALL if sig_all else SigFlags.SIG_INPUTS + if n_sigs > 1: + tags["n_sigs"] = str(n_sigs) + logger.debug(f"After tags: {tags}") + return P2PKSecret( + kind=SecretKind.P2PK, + data=pubkey, + tags=tags, + ) + + async def create_p2sh_lock( + self, + address: str, + locktime: Optional[int] = None, + tags: Tags = Tags(), + ) -> Secret: + if locktime: + tags["locktime"] = str( + (datetime.now() + timedelta(seconds=locktime)).timestamp() + ) + + return Secret( + kind=SecretKind.P2SH, + data=address, + tags=tags, + ) + + async def sign_p2pk_proofs(self, proofs: List[Proof]) -> List[str]: + assert ( + self.private_key + ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" + private_key = self.private_key + assert private_key.pubkey + logger.trace( + f"Signing with private key: {private_key.serialize()} public key:" + f" {private_key.pubkey.serialize().hex()}" + ) + for proof in proofs: + logger.trace(f"Signing proof: {proof}") + logger.trace(f"Signing message: {proof.secret}") + + signatures = [ + sign_p2pk_sign( + message=proof.secret.encode("utf-8"), + private_key=private_key, + ) + for proof in proofs + ] + logger.debug(f"Signatures: {signatures}") + return signatures + + async def sign_p2pk_outputs(self, outputs: List[BlindedMessage]) -> List[str]: + assert ( + self.private_key + ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" + private_key = self.private_key + assert private_key.pubkey + return [ + sign_p2pk_sign( + message=output.B_.encode("utf-8"), + private_key=private_key, + ) + for output in outputs + ] + + async def add_p2pk_witnesses_to_outputs( + self, outputs: List[BlindedMessage] + ) -> List[BlindedMessage]: + """Takes a list of outputs and adds a P2PK signatures to each. + Args: + outputs (List[BlindedMessage]): Outputs to add P2PK signatures to + Returns: + List[BlindedMessage]: Outputs with P2PK signatures added + """ + p2pk_signatures = await self.sign_p2pk_outputs(outputs) + for o, s in zip(outputs, p2pk_signatures): + o.p2pksigs = [s] + return outputs + + async def add_witnesses_to_outputs( + self, proofs: List[Proof], outputs: List[BlindedMessage] + ) -> List[BlindedMessage]: + """Adds witnesses to outputs if the inputs (proofs) indicate an appropriate signature flag + + Args: + proofs (List[Proof]): Inputs to the transaction + outputs (List[BlindedMessage]): Outputs to add witnesses to + Returns: + List[BlindedMessage]: Outputs with signatures added + """ + # first we check whether all tokens have serialized secrets as their secret + try: + for p in proofs: + Secret.deserialize(p.secret) + except Exception: + # if not, we do not add witnesses (treat as regular token secret) + return outputs + + # if any of the proofs provided require SIG_ALL, we must provide it + if any( + [ + P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL + for p in proofs + ] + ): + outputs = await self.add_p2pk_witnesses_to_outputs(outputs) + return outputs + + async def add_p2sh_witnesses_to_proofs( + self: SupportsDb, proofs: List[Proof] + ) -> List[Proof]: + # Quirk: we use a single P2SH script and signature pair for all tokens in proofs + address = Secret.deserialize(proofs[0].secret).data + p2shscripts = await get_unused_locks(address, db=self.db) + assert len(p2shscripts) == 1, Exception("lock not found.") + p2sh_script, p2sh_signature = ( + p2shscripts[0].script, + p2shscripts[0].signature, + ) + logger.debug(f"Unlock script: {p2sh_script} signature: {p2sh_signature}") + + # attach unlock scripts to proofs + for p in proofs: + p.p2shscript = P2SHScript(script=p2sh_script, signature=p2sh_signature) + return proofs + + async def add_p2pk_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: + p2pk_signatures = await self.sign_p2pk_proofs(proofs) + logger.debug(f"Unlock signatures for {len(proofs)} proofs: {p2pk_signatures}") + logger.debug(f"Proofs: {proofs}") + # attach unlock signatures to proofs + assert len(proofs) == len(p2pk_signatures), "wrong number of signatures" + for p, s in zip(proofs, p2pk_signatures): + if p.p2pksigs: + p.p2pksigs.append(s) + else: + p.p2pksigs = [s] + return proofs + + async def add_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: + """Adds witnesses to proofs for P2SH or P2PK redemption. + + This method parses the secret of each proof and determines the correct + witness type and adds it to the proof if we have it available. + + Note: In order for this method to work, all proofs must have the same secret type. + This is because we use a single P2SH script and signature pair for all tokens in proofs. + + For P2PK, we use an individual signature for each token in proofs. + + Args: + proofs (List[Proof]): List of proofs to add witnesses to + + Returns: + List[Proof]: List of proofs with witnesses added + """ + + # iterate through proofs and produce witnesses for each + + # first we check whether all tokens have serialized secrets as their secret + try: + for p in proofs: + Secret.deserialize(p.secret) + except Exception: + # if not, we do not add witnesses (treat as regular token secret) + return proofs + logger.debug("Spending conditions detected.") + # P2SH scripts + if all([Secret.deserialize(p.secret).kind == SecretKind.P2SH for p in proofs]): + logger.debug("P2SH redemption detected.") + proofs = await self.add_p2sh_witnesses_to_proofs(proofs) + + # P2PK signatures + elif all( + [Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs] + ): + logger.debug("P2PK redemption detected.") + proofs = await self.add_p2pk_witnesses_to_proofs(proofs) + + return proofs diff --git a/cashu/wallet/protocols.py b/cashu/wallet/protocols.py new file mode 100644 index 00000000..effb430b --- /dev/null +++ b/cashu/wallet/protocols.py @@ -0,0 +1,16 @@ +from typing import Protocol + +from ..core.crypto.secp import PrivateKey +from ..core.db import Database + + +class SupportsPrivateKey(Protocol): + private_key: PrivateKey + + +class SupportsDb(Protocol): + db: Database + + +class SupportsKeysets(Protocol): + keyset_id: str diff --git a/cashu/wallet/secrets.py b/cashu/wallet/secrets.py new file mode 100644 index 00000000..7c5342a2 --- /dev/null +++ b/cashu/wallet/secrets.py @@ -0,0 +1,198 @@ +import base64 +import hashlib +from typing import List, Optional, Tuple + +from bip32 import BIP32 +from loguru import logger +from mnemonic import Mnemonic + +from ..core import bolt11 as bolt11 +from ..core.crypto.secp import PrivateKey +from ..core.db import Database +from ..core.settings import settings +from ..wallet.crud import ( + bump_secret_derivation, + get_seed_and_mnemonic, + store_seed_and_mnemonic, +) +from .protocols import SupportsDb, SupportsKeysets + + +class WalletSecrets(SupportsDb, SupportsKeysets): + keyset_id: str + db: Database + + async def _init_private_key(self, from_mnemonic: Optional[str] = None) -> None: + """Initializes the private key of the wallet from the mnemonic. + There are three ways to initialize the private key: + 1. If the database does not contain a seed, and no mnemonic is given, a new seed is generated. + 2. If the database does not contain a seed, and a mnemonic is given, the seed is generated from the mnemonic. + 3. If the database contains a seed, the seed is loaded from the database. + + If the mnemonic was not loaded from the database, the seed and mnemonic are stored in the database. + + Args: + from_mnemonic (Optional[str], optional): Mnemonic to use. Defaults to None. + + Raises: + ValueError: If the mnemonic is not BIP39 compliant. + """ + ret_db = await get_seed_and_mnemonic(self.db) + + mnemo = Mnemonic("english") + + if ret_db is None and from_mnemonic is None: + # if there is no seed in the database, generate a new one + mnemonic_str = mnemo.generate() + wallet_command_prefix_str = ( + f" --wallet {settings.wallet_name}" + if settings.wallet_name != "wallet" + else "" + ) + wallet_name = ( + f' for wallet "{settings.wallet_name}"' + if settings.wallet_name != "wallet" + else "" + ) + print( + f"Generated a new mnemonic{wallet_name}. To view it, run" + f' "cashu{wallet_command_prefix_str} info --mnemonic".' + ) + elif from_mnemonic: + # or use the one provided + mnemonic_str = from_mnemonic.lower().strip() + elif ret_db is not None: + # if there is a seed in the database, use it + _, mnemonic_str = ret_db[0], ret_db[1] + else: + logger.debug("No mnemonic provided") + return + + if not mnemo.check(mnemonic_str): + raise ValueError("Invalid mnemonic") + + self.seed = mnemo.to_seed(mnemonic_str) + self.mnemonic = mnemonic_str + + logger.debug(f"Using seed: {self.seed.hex()}") + logger.debug(f"Using mnemonic: {mnemonic_str}") + + # if no mnemonic was in the database, store the new one + if ret_db is None: + await store_seed_and_mnemonic( + self.db, seed=self.seed.hex(), mnemonic=mnemonic_str + ) + + try: + self.bip32 = BIP32.from_seed(self.seed) + self.private_key = PrivateKey( + self.bip32.get_privkey_from_path("m/129372'/0'/0'/0'") + ) + except ValueError: + raise ValueError("Invalid seed") + except Exception as e: + logger.error(e) + + async def _generate_secret(self, randombits=128) -> str: + """Returns base64 encoded deterministic random string. + + NOTE: This method should probably retire after `deterministic_secrets`. We are + deriving secrets from a counter but don't store the respective blinding factor. + We won't be able to restore any ecash generated with these secrets. + """ + secret_counter = await bump_secret_derivation( + db=self.db, keyset_id=self.keyset_id + ) + logger.trace(f"secret_counter: {secret_counter}") + s, _, _ = await self.generate_determinstic_secret(secret_counter) + # return s.decode("utf-8") + return hashlib.sha256(s).hexdigest() + + async def generate_determinstic_secret( + self, counter: int + ) -> Tuple[bytes, bytes, str]: + """ + Determinstically generates two secrets (one as the secret message, + one as the blinding factor). + """ + assert self.bip32, "BIP32 not initialized yet." + # integer keyset id modulo max number of bip32 child keys + keyest_id = int.from_bytes(base64.b64decode(self.keyset_id), "big") % ( + 2**31 - 1 + ) + logger.trace(f"keyset id: {self.keyset_id} becomes {keyest_id}") + token_derivation_path = f"m/129372'/0'/{keyest_id}'/{counter}'" + # for secret + secret_derivation_path = f"{token_derivation_path}/0" + logger.trace(f"secret derivation path: {secret_derivation_path}") + secret = self.bip32.get_privkey_from_path(secret_derivation_path) + # blinding factor + r_derivation_path = f"{token_derivation_path}/1" + logger.trace(f"r derivation path: {r_derivation_path}") + r = self.bip32.get_privkey_from_path(r_derivation_path) + return secret, r, token_derivation_path + + async def generate_n_secrets( + self, n: int = 1, skip_bump: bool = False + ) -> Tuple[List[str], List[PrivateKey], List[str]]: + """Generates n secrets and blinding factors and returns a tuple of secrets, + blinding factors, and derivation paths. + + Args: + n (int, optional): Number of secrets to generate. Defaults to 1. + skip_bump (bool, optional): Skip increment of secret counter in the database. + You want to set this to false if you don't know whether the following operation + will succeed or not (like a POST /mint request). Defaults to False. + + Returns: + Tuple[List[str], List[PrivateKey], List[str]]: Secrets, blinding factors, derivation paths + + """ + secret_counters_start = await bump_secret_derivation( + db=self.db, keyset_id=self.keyset_id, by=n, skip=skip_bump + ) + logger.trace(f"secret_counters_start: {secret_counters_start}") + secret_counters = list(range(secret_counters_start, secret_counters_start + n)) + logger.trace( + f"Generating secret nr {secret_counters[0]} to {secret_counters[-1]}." + ) + secrets_rs_derivationpaths = [ + await self.generate_determinstic_secret(s) for s in secret_counters + ] + # secrets are supplied as str + secrets = [hashlib.sha256(s[0]).hexdigest() for s in secrets_rs_derivationpaths] + # rs are supplied as PrivateKey + rs = [PrivateKey(privkey=s[1], raw=True) for s in secrets_rs_derivationpaths] + + derivation_paths = [s[2] for s in secrets_rs_derivationpaths] + + return secrets, rs, derivation_paths + + async def generate_secrets_from_to( + self, from_counter: int, to_counter: int + ) -> Tuple[List[str], List[PrivateKey], List[str]]: + """Generates secrets and blinding factors from `from_counter` to `to_counter` + + Args: + from_counter (int): Start counter + to_counter (int): End counter + + Returns: + Tuple[List[str], List[PrivateKey], List[str]]: Secrets, blinding factors, derivation paths + + Raises: + ValueError: If `from_counter` is larger than `to_counter` + """ + assert ( + from_counter <= to_counter + ), "from_counter must be smaller than to_counter" + secret_counters = [c for c in range(from_counter, to_counter + 1)] + secrets_rs_derivationpaths = [ + await self.generate_determinstic_secret(s) for s in secret_counters + ] + # secrets are supplied as str + secrets = [hashlib.sha256(s[0]).hexdigest() for s in secrets_rs_derivationpaths] + # rs are supplied as PrivateKey + rs = [PrivateKey(privkey=s[1], raw=True) for s in secrets_rs_derivationpaths] + derivation_paths = [s[2] for s in secrets_rs_derivationpaths] + return secrets, rs, derivation_paths diff --git a/cashu/wallet/wallet.py b/cashu/wallet/wallet.py index 69a40b50..72650d93 100644 --- a/cashu/wallet/wallet.py +++ b/cashu/wallet/wallet.py @@ -1,18 +1,15 @@ import base64 -import hashlib import json import math import secrets as scrts import time import uuid -from datetime import datetime, timedelta from itertools import groupby from typing import Dict, List, Optional, Tuple, Union import requests from bip32 import BIP32 from loguru import logger -from mnemonic import Mnemonic from requests import Response from ..core import bolt11 as bolt11 @@ -27,17 +24,12 @@ GetMintResponse, Invoice, KeysetsResponse, - P2SHScript, PostMeltRequest, PostMintRequest, PostMintResponse, PostRestoreResponse, PostSplitRequest, Proof, - Secret, - SecretKind, - SigFlags, - Tags, TokenV2, TokenV2Mint, TokenV3, @@ -50,13 +42,7 @@ from ..core.db import Database from ..core.helpers import calculate_number_of_blank_outputs, sum_proofs from ..core.migrations import migrate_databases -from ..core.p2pk import sign_p2pk_sign -from ..core.script import ( - step0_carol_checksig_redeemscrip, - step0_carol_privkey, - step1_carol_create_p2sh_address, - step2_carol_sign_tx, -) +from ..core.p2pk import Secret from ..core.settings import settings from ..core.split import amount_split from ..tor.tor import TorProxy @@ -64,20 +50,18 @@ bump_secret_derivation, get_keyset, get_proofs, - get_seed_and_mnemonic, - get_unused_locks, invalidate_proof, secret_used, set_secret_derivation, store_keyset, store_lightning_invoice, - store_p2sh, store_proof, - store_seed_and_mnemonic, update_lightning_invoice, update_proof_reserved, ) from . import migrations +from .p2pk import WalletP2PK +from .secrets import WalletSecrets def async_set_requests(func): @@ -126,13 +110,13 @@ def __init__(self, url: str, db: Database): self.s = requests.Session() self.db = db - async def generate_n_secrets( - self, n: int = 1, skip_bump: bool = False - ) -> Tuple[List[str], List[PrivateKey], List[str]]: - return await self.generate_n_secrets(n, skip_bump) + # async def generate_n_secrets( + # self, n: int = 1, skip_bump: bool = False + # ) -> Tuple[List[str], List[PrivateKey], List[str]]: + # return await self.generate_n_secrets(n, skip_bump) - async def _generate_secret(self, skip_bump: bool = False) -> str: - return await self._generate_secret(skip_bump) + # async def _generate_secret(self, skip_bump: bool = False) -> str: + # return await self._generate_secret(skip_bump) @async_set_requests async def _init_s(self): @@ -455,11 +439,13 @@ async def request_mint(self, amount) -> Invoice: return Invoice(amount=amount, pr=mint_response.pr, hash=mint_response.hash) @async_set_requests - async def mint(self, amounts: List[int], hash: Optional[str] = None) -> List[Proof]: + async def mint( + self, outputs: List[BlindedMessage], hash: Optional[str] = None + ) -> List[BlindedSignature]: """Mints new coins and returns a proof of promise. Args: - amounts (List[int]): Amounts of tokens to mint + outputs (List[BlindedMessage]): Outputs to mint new tokens with hash (str, optional): Hash of the paid invoice. Defaults to None. Returns: @@ -468,14 +454,6 @@ async def mint(self, amounts: List[int], hash: Optional[str] = None) -> List[Pro Raises: Exception: If the minting fails """ - # quirk: we skip bumping the secret counter in the database since we are - # not sure if the minting will succeed. If it succeeds, we will bump it - # in the next step. - secrets, rs, derivation_paths = await self.generate_n_secrets( - len(amounts), skip_bump=True - ) - await self._check_used_secrets(secrets) - outputs, rs = self._construct_outputs(amounts, secrets, rs) outputs_payload = PostMintRequest(outputs=outputs) logger.trace("Checking Lightning invoice. POST /mint") resp = self.s.post( @@ -490,12 +468,7 @@ async def mint(self, amounts: List[int], hash: Optional[str] = None) -> List[Pro reponse_dict = resp.json() logger.trace("Lightning invoice checked. POST /mint") promises = PostMintResponse.parse_obj(reponse_dict).promises - - # bump secret counter in database - await bump_secret_derivation( - db=self.db, keyset_id=self.keyset_id, by=len(amounts) - ) - return self._construct_proofs(promises, secrets, rs, derivation_paths) + return promises @async_set_requests async def split( @@ -609,14 +582,14 @@ async def restore_promises( return returnObj.outputs, returnObj.promises -class Wallet(LedgerAPI): +class Wallet(LedgerAPI, WalletP2PK, WalletSecrets): """Minimal wallet wrapper.""" mnemonic: str # holds mnemonic of the wallet seed: bytes # holds private key of the wallet generated from the mnemonic - db: Database + # db: Database bip32: BIP32 - private_key: Optional[PrivateKey] = None + # private_key: Optional[PrivateKey] = None def __init__( self, @@ -669,184 +642,6 @@ async def _migrate_database(self): except Exception as e: logger.error(f"Could not run migrations: {e}") - async def _init_private_key(self, from_mnemonic: Optional[str] = None) -> None: - """Initializes the private key of the wallet from the mnemonic. - There are three ways to initialize the private key: - 1. If the database does not contain a seed, and no mnemonic is given, a new seed is generated. - 2. If the database does not contain a seed, and a mnemonic is given, the seed is generated from the mnemonic. - 3. If the database contains a seed, the seed is loaded from the database. - - If the mnemonic was not loaded from the database, the seed and mnemonic are stored in the database. - - Args: - from_mnemonic (Optional[str], optional): Mnemonic to use. Defaults to None. - - Raises: - ValueError: If the mnemonic is not BIP39 compliant. - """ - ret_db = await get_seed_and_mnemonic(self.db) - - mnemo = Mnemonic("english") - - if ret_db is None and from_mnemonic is None: - # if there is no seed in the database, generate a new one - mnemonic_str = mnemo.generate() - wallet_command_prefix_str = ( - f" --wallet {settings.wallet_name}" - if settings.wallet_name != "wallet" - else "" - ) - wallet_name = ( - f' for wallet "{settings.wallet_name}"' - if settings.wallet_name != "wallet" - else "" - ) - print( - f"Generated a new mnemonic{wallet_name}. To view it, run" - f' "cashu{wallet_command_prefix_str} info --mnemonic".' - ) - elif from_mnemonic: - # or use the one provided - mnemonic_str = from_mnemonic.lower().strip() - elif ret_db is not None: - # if there is a seed in the database, use it - _, mnemonic_str = ret_db[0], ret_db[1] - else: - logger.debug("No mnemonic provided") - return - - if not mnemo.check(mnemonic_str): - raise ValueError("Invalid mnemonic") - - self.seed = mnemo.to_seed(mnemonic_str) - self.mnemonic = mnemonic_str - - logger.debug(f"Using seed: {self.seed.hex()}") - logger.debug(f"Using mnemonic: {mnemonic_str}") - - # if no mnemonic was in the database, store the new one - if ret_db is None: - await store_seed_and_mnemonic( - self.db, seed=self.seed.hex(), mnemonic=mnemonic_str - ) - - try: - self.bip32 = BIP32.from_seed(self.seed) - self.private_key = PrivateKey( - self.bip32.get_privkey_from_path("m/129372'/0'/0'/0'") - ) - except ValueError: - raise ValueError("Invalid seed") - except Exception as e: - logger.error(e) - - async def _generate_secret(self, randombits=128) -> str: - """Returns base64 encoded deterministic random string. - - NOTE: This method should probably retire after `deterministic_secrets`. We are - deriving secrets from a counter but don't store the respective blinding factor. - We won't be able to restore any ecash generated with these secrets. - """ - secret_counter = await bump_secret_derivation( - db=self.db, keyset_id=self.keyset_id - ) - logger.trace(f"secret_counter: {secret_counter}") - s, _, _ = await self.generate_determinstic_secret(secret_counter) - # return s.decode("utf-8") - return hashlib.sha256(s).hexdigest() - - async def generate_determinstic_secret( - self, counter: int - ) -> Tuple[bytes, bytes, str]: - """ - Determinstically generates two secrets (one as the secret message, - one as the blinding factor). - """ - assert self.bip32, "BIP32 not initialized yet." - # integer keyset id modulo max number of bip32 child keys - keyest_id = int.from_bytes(base64.b64decode(self.keyset_id), "big") % ( - 2**31 - 1 - ) - logger.trace(f"keyset id: {self.keyset_id} becomes {keyest_id}") - token_derivation_path = f"m/129372'/0'/{keyest_id}'/{counter}'" - # for secret - secret_derivation_path = f"{token_derivation_path}/0" - logger.trace(f"secret derivation path: {secret_derivation_path}") - secret = self.bip32.get_privkey_from_path(secret_derivation_path) - # blinding factor - r_derivation_path = f"{token_derivation_path}/1" - logger.trace(f"r derivation path: {r_derivation_path}") - r = self.bip32.get_privkey_from_path(r_derivation_path) - return secret, r, token_derivation_path - - async def generate_n_secrets( - self, n: int = 1, skip_bump: bool = False - ) -> Tuple[List[str], List[PrivateKey], List[str]]: - """Generates n secrets and blinding factors and returns a tuple of secrets, - blinding factors, and derivation paths. - - Args: - n (int, optional): Number of secrets to generate. Defaults to 1. - skip_bump (bool, optional): Skip increment of secret counter in the database. - You want to set this to false if you don't know whether the following operation - will succeed or not (like a POST /mint request). Defaults to False. - - Returns: - Tuple[List[str], List[PrivateKey], List[str]]: Secrets, blinding factors, derivation paths - - """ - secret_counters_start = await bump_secret_derivation( - db=self.db, keyset_id=self.keyset_id, by=n, skip=skip_bump - ) - logger.trace(f"secret_counters_start: {secret_counters_start}") - secret_counters = list(range(secret_counters_start, secret_counters_start + n)) - logger.trace( - f"Generating secret nr {secret_counters[0]} to {secret_counters[-1]}." - ) - secrets_rs_derivationpaths = [ - await self.generate_determinstic_secret(s) for s in secret_counters - ] - # secrets are supplied as str - secrets = [hashlib.sha256(s[0]).hexdigest() for s in secrets_rs_derivationpaths] - # rs are supplied as PrivateKey - rs = [PrivateKey(privkey=s[1], raw=True) for s in secrets_rs_derivationpaths] - - derivation_paths = [s[2] for s in secrets_rs_derivationpaths] - # sanity check to make sure we're not reusing secrets - # NOTE: this step is probably wasting more resources than it helps - await self._check_used_secrets(secrets) - - return secrets, rs, derivation_paths - - async def generate_secrets_from_to( - self, from_counter: int, to_counter: int - ) -> Tuple[List[str], List[PrivateKey], List[str]]: - """Generates secrets and blinding factors from `from_counter` to `to_counter` - - Args: - from_counter (int): Start counter - to_counter (int): End counter - - Returns: - Tuple[List[str], List[PrivateKey], List[str]]: Secrets, blinding factors, derivation paths - - Raises: - ValueError: If `from_counter` is larger than `to_counter` - """ - assert ( - from_counter <= to_counter - ), "from_counter must be smaller than to_counter" - secret_counters = [c for c in range(from_counter, to_counter + 1)] - secrets_rs_derivationpaths = [ - await self.generate_determinstic_secret(s) for s in secret_counters - ] - # secrets are supplied as str - secrets = [hashlib.sha256(s[0]).hexdigest() for s in secrets_rs_derivationpaths] - # rs are supplied as PrivateKey - rs = [PrivateKey(privkey=s[1], raw=True) for s in secrets_rs_derivationpaths] - derivation_paths = [s[2] for s in secrets_rs_derivationpaths] - return secrets, rs, derivation_paths - # ---------- API ---------- async def load_mint(self, keyset_id: str = ""): @@ -914,7 +709,25 @@ async def mint( # if no split was specified, we use the canonical split amounts = split or amount_split(amount) - proofs = await super().mint(amounts, hash) + + # quirk: we skip bumping the secret counter in the database since we are + # not sure if the minting will succeed. If it succeeds, we will bump it + # in the next step. + secrets, rs, derivation_paths = await self.generate_n_secrets( + len(amounts), skip_bump=True + ) + await self._check_used_secrets(secrets) + outputs, rs = self._construct_outputs(amounts, secrets, rs) + + # will raise exception if mint is unsuccessful + promises = await super().mint(outputs, hash) + + # success, bump secret counter in database + await bump_secret_derivation( + db=self.db, keyset_id=self.keyset_id, by=len(amounts) + ) + proofs = self._construct_proofs(promises, secrets, rs, derivation_paths) + if proofs == []: raise Exception("received no proofs.") await self._store_proofs(proofs) @@ -925,112 +738,6 @@ async def mint( self.proofs += proofs return proofs - async def add_p2pk_witnesses_to_outputs( - self, outputs: List[BlindedMessage] - ) -> List[BlindedMessage]: - p2pk_signatures = await self.sign_p2pk_outputs(outputs) - for o, s in zip(outputs, p2pk_signatures): - o.p2pksigs = [s] - return outputs - - async def add_witnesses_to_outputs( - self, proofs: List[Proof], outputs: List[BlindedMessage] - ) -> List[BlindedMessage]: - """Adds witnesses to outputs if the inputs (proofs) indicate an appropriate signature flag - - Args: - proofs (List[Proof]): _description_ - outputs (List[BlindedMessage]): _description_ - """ - # first we check whether all tokens have serialized secrets as their secret - try: - for p in proofs: - Secret.deserialize(p.secret) - except Exception: - # if not, we do not add witnesses (treat as regular token secret) - return outputs - - # if any of the proofs provided require SIG_ALL, we must provide it - if any( - [Secret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL for p in proofs] - ): - # p2pk_signatures = await self.sign_p2pk_outputs(outputs) - # for o, s in zip(outputs, p2pk_signatures): - # o.p2pksigs = [s] - outputs = await self.add_p2pk_witnesses_to_outputs(outputs) - return outputs - - async def add_p2sh_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: - # Quirk: we use a single P2SH script and signature pair for all tokens in proofs - address = Secret.deserialize(proofs[0].secret).data - p2shscripts = await get_unused_locks(address, db=self.db) - assert len(p2shscripts) == 1, Exception("lock not found.") - p2sh_script, p2sh_signature = ( - p2shscripts[0].script, - p2shscripts[0].signature, - ) - logger.debug(f"Unlock script: {p2sh_script} signature: {p2sh_signature}") - - # attach unlock scripts to proofs - for p in proofs: - p.p2shscript = P2SHScript(script=p2sh_script, signature=p2sh_signature) - return proofs - - async def add_p2pk_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: - p2pk_signatures = await self.sign_p2pk_proofs(proofs) - logger.debug(f"Unlock signatures for {len(proofs)} proofs: {p2pk_signatures}") - logger.debug(f"Proofs: {proofs}") - # attach unlock signatures to proofs - assert len(proofs) == len(p2pk_signatures), "wrong number of signatures" - for p, s in zip(proofs, p2pk_signatures): - if p.p2pksigs: - p.p2pksigs.append(s) - else: - p.p2pksigs = [s] - return proofs - - async def add_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: - """Adds witnesses to proofs for P2SH or P2PK redemption. - - This method parses the secret of each proof and determines the correct - witness type and adds it to the proof if we have it available. - - Note: In order for this method to work, all proofs must have the same secret type. - This is because we use a single P2SH script and signature pair for all tokens in proofs. - - For P2PK, we use an individual signature for each token in proofs. - - Args: - proofs (List[Proof]): List of proofs to add witnesses to - - Returns: - List[Proof]: List of proofs with witnesses added - """ - - # iterate through proofs and produce witnesses for each - - # first we check whether all tokens have serialized secrets as their secret - try: - for p in proofs: - Secret.deserialize(p.secret) - except Exception: - # if not, we do not add witnesses (treat as regular token secret) - return proofs - logger.debug("Spending conditions detected.") - # P2SH scripts - if all([Secret.deserialize(p.secret).kind == SecretKind.P2SH for p in proofs]): - logger.debug("P2SH redemption detected.") - proofs = await self.add_p2sh_witnesses_to_proofs(proofs) - - # P2PK signatures - elif all( - [Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs] - ): - logger.debug("P2PK redemption detected.") - proofs = await self.add_p2pk_witnesses_to_proofs(proofs) - - return proofs - async def redeem( self, proofs: List[Proof], @@ -1186,7 +893,7 @@ async def pay_lightning( async def check_proof_state(self, proofs): return await super().check_proof_state(proofs) - # ---------- TOKEN MECHANIS ---------- + # ---------- TOKEN MECHANICS ---------- async def _store_proofs(self, proofs): async with self.db.connect() as conn: @@ -1483,115 +1190,6 @@ async def split_to_send( await self.set_reserved(send_proofs, reserved=True) return keep_proofs, send_proofs - # ---------- P2SH and P2PK ---------- - - async def create_p2sh_address_and_store(self) -> str: - """Creates a P2SH lock script and stores the script and signature in the database.""" - alice_privkey = step0_carol_privkey() - txin_redeemScript = step0_carol_checksig_redeemscrip(alice_privkey.pub) - txin_p2sh_address = step1_carol_create_p2sh_address(txin_redeemScript) - txin_signature = step2_carol_sign_tx(txin_redeemScript, alice_privkey).scriptSig - txin_redeemScript_b64 = base64.urlsafe_b64encode(txin_redeemScript).decode() - txin_signature_b64 = base64.urlsafe_b64encode(txin_signature).decode() - p2shScript = P2SHScript( - script=txin_redeemScript_b64, - signature=txin_signature_b64, - address=str(txin_p2sh_address), - ) - await store_p2sh(p2shScript, db=self.db) - assert p2shScript.address - return p2shScript.address - - async def create_p2pk_pubkey(self): - assert ( - self.private_key - ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" - public_key = self.private_key.pubkey - # logger.debug(f"Private key: {self.private_key.bech32()}") - assert public_key - return public_key.serialize().hex() - - async def create_p2pk_lock( - self, - pubkey: str, - locktime_seconds: Optional[int] = None, - tags: Optional[Tags] = None, - sig_all: bool = False, - n_sigs: int = 1, - ) -> Secret: - logger.debug(f"Provided tags: {tags}") - if not tags: - tags = Tags() - logger.debug(f"Before tags: {tags}") - if locktime_seconds: - tags["locktime"] = str( - int((datetime.now() + timedelta(seconds=locktime_seconds)).timestamp()) - ) - tags["sigflag"] = SigFlags.SIG_ALL if sig_all else SigFlags.SIG_INPUTS - if n_sigs > 1: - tags["n_sigs"] = str(n_sigs) - logger.debug(f"After tags: {tags}") - return Secret( - kind=SecretKind.P2PK, - data=pubkey, - tags=tags, - ) - - async def create_p2sh_lock( - self, - address: str, - locktime: Optional[int] = None, - tags: Tags = Tags(), - ) -> Secret: - if locktime: - tags["locktime"] = str( - (datetime.now() + timedelta(seconds=locktime)).timestamp() - ) - - return Secret( - kind=SecretKind.P2SH, - data=address, - tags=tags, - ) - - async def sign_p2pk_proofs(self, proofs: List[Proof]) -> List[str]: - assert ( - self.private_key - ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" - private_key = self.private_key - assert private_key.pubkey - logger.trace( - f"Signing with private key: {private_key.serialize()} public key:" - f" {private_key.pubkey.serialize().hex()}" - ) - for proof in proofs: - logger.trace(f"Signing proof: {proof}") - logger.trace(f"Signing message: {proof.secret}") - - signatures = [ - sign_p2pk_sign( - message=proof.secret.encode("utf-8"), - private_key=private_key, - ) - for proof in proofs - ] - logger.debug(f"Signatures: {signatures}") - return signatures - - async def sign_p2pk_outputs(self, outputs: List[BlindedMessage]) -> List[str]: - assert ( - self.private_key - ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" - private_key = self.private_key - assert private_key.pubkey - return [ - sign_p2pk_sign( - message=output.B_.encode("utf-8"), - private_key=private_key, - ) - for output in outputs - ] - # ---------- BALANCE CHECKS ---------- @property diff --git a/tests/test_wallet_p2pk.py b/tests/test_wallet_p2pk.py index 50101af9..9a864242 100644 --- a/tests/test_wallet_p2pk.py +++ b/tests/test_wallet_p2pk.py @@ -6,9 +6,10 @@ import pytest import pytest_asyncio -from cashu.core.base import Proof, SigFlags, Tags +from cashu.core.base import Proof from cashu.core.crypto.secp import PrivateKey, PublicKey from cashu.core.migrations import migrate_databases +from cashu.core.p2pk import SigFlags, Tags from cashu.wallet import migrations from cashu.wallet.wallet import Wallet from cashu.wallet.wallet import Wallet as Wallet1 @@ -134,6 +135,7 @@ async def test_p2pk_locktime_with_refund_pubkey(wallet1: Wallet, wallet2: Wallet ) send_proofs_copy = copy.deepcopy(send_proofs) # receiver side: can't redeem since we used a garbage pubkey + # and locktime has not passed await assert_err( wallet2.redeem(send_proofs), "Mint Error: no valid signature provided for input.", @@ -162,6 +164,7 @@ async def test_p2pk_locktime_with_wrong_refund_pubkey(wallet1: Wallet, wallet2: ) send_proofs_copy = copy.deepcopy(send_proofs) # receiver side: can't redeem since we used a garbage pubkey + # and locktime has not passed await assert_err( wallet2.redeem(send_proofs), "Mint Error: no valid signature provided for input.", @@ -174,6 +177,38 @@ async def test_p2pk_locktime_with_wrong_refund_pubkey(wallet1: Wallet, wallet2: ) +@pytest.mark.asyncio +async def test_p2pk_locktime_with_second_refund_pubkey( + wallet1: Wallet, wallet2: Wallet +): + await wallet1.mint(64) + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() # receiver side + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side + # sender side + garbage_pubkey = PrivateKey().pubkey + assert garbage_pubkey + secret_lock = await wallet1.create_p2pk_lock( + garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey + locktime_seconds=2, # locktime + tags=Tags( + [["refund", pubkey_wallet2, pubkey_wallet1]] + ), # multiple refund pubkeys + ) # sender side + _, send_proofs = await wallet1.split_to_send( + wallet1.proofs, 8, secret_lock=secret_lock + ) + send_proofs_copy = copy.deepcopy(send_proofs) + # receiver side: can't redeem since we used a garbage pubkey + # and locktime has not passed + await assert_err( + wallet1.redeem(send_proofs), + "Mint Error: no valid signature provided for input.", + ) + await asyncio.sleep(2) + # we can now redeem because of the refund locktime + await wallet1.redeem(send_proofs_copy) + + @pytest.mark.asyncio async def test_p2pk_multisig_2_of_2(wallet1: Wallet, wallet2: Wallet): await wallet1.mint(64) @@ -182,15 +217,15 @@ async def test_p2pk_multisig_2_of_2(wallet1: Wallet, wallet2: Wallet): assert pubkey_wallet1 != pubkey_wallet2 # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=2 + pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet1]]), n_sigs=2 ) _, send_proofs = await wallet1.split_to_send( wallet1.proofs, 8, secret_lock=secret_lock ) - # add signatures of wallet2 + # add signatures of wallet1 send_proofs = await wallet1.add_p2pk_witnesses_to_proofs(send_proofs) - # here we add the signatures of wallet1 + # here we add the signatures of wallet2 await wallet2.redeem(send_proofs) @@ -202,15 +237,15 @@ async def test_p2pk_multisig_duplicate_signature(wallet1: Wallet, wallet2: Walle assert pubkey_wallet1 != pubkey_wallet2 # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=2 + pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet1]]), n_sigs=2 ) _, send_proofs = await wallet1.split_to_send( wallet1.proofs, 8, secret_lock=secret_lock ) - # add signatures of wallet2 + # add signatures of wallet2 – this is a duplicate signature send_proofs = await wallet2.add_p2pk_witnesses_to_proofs(send_proofs) - # here we add the signatures of wallet1 + # here we add the signatures of wallet2 await assert_err( wallet2.redeem(send_proofs), "Mint Error: p2pk signatures must be unique." ) @@ -224,7 +259,7 @@ async def test_p2pk_multisig_quorum_not_met_1_of_2(wallet1: Wallet, wallet2: Wal assert pubkey_wallet1 != pubkey_wallet2 # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=2 + pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet1]]), n_sigs=2 ) _, send_proofs = await wallet1.split_to_send( wallet1.proofs, 8, secret_lock=secret_lock @@ -243,7 +278,7 @@ async def test_p2pk_multisig_quorum_not_met_2_of_3(wallet1: Wallet, wallet2: Wal assert pubkey_wallet1 != pubkey_wallet2 # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=3 + pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet1]]), n_sigs=3 ) _, send_proofs = await wallet1.split_to_send( @@ -264,7 +299,7 @@ async def test_p2pk_multisig_with_duplicate_publickey(wallet1: Wallet, wallet2: pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet2]]), n_sigs=2 + pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet2]]), n_sigs=2 ) _, send_proofs = await wallet1.split_to_send( wallet1.proofs, 8, secret_lock=secret_lock @@ -287,7 +322,7 @@ async def test_p2pk_multisig_with_wrong_first_private_key( # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkey", wrong_public_key_hex]]), n_sigs=2 + pubkey_wallet2, tags=Tags([["pubkeys", wrong_public_key_hex]]), n_sigs=2 ) _, send_proofs = await wallet1.split_to_send( wallet1.proofs, 8, secret_lock=secret_lock @@ -300,14 +335,16 @@ async def test_p2pk_multisig_with_wrong_first_private_key( def test_tags(): - tags = Tags([["key1", "value1"], ["key2", "value2"], ["key2", "value3"]]) + tags = Tags( + [["key1", "value1"], ["key2", "value2", "value2_1"], ["key2", "value3"]] + ) assert tags.get_tag("key1") == "value1" assert tags["key1"] == "value1" assert tags.get_tag("key2") == "value2" assert tags["key2"] == "value2" assert tags.get_tag("key3") is None assert tags["key3"] is None - assert tags.get_tag_all("key2") == ["value2", "value3"] + assert tags.get_tag_all("key2") == ["value2", "value2_1", "value3"] @pytest.mark.asyncio