Skip to content

Commit

Permalink
refactor p2pk types from base
Browse files Browse the repository at this point in the history
  • Loading branch information
callebtc committed Sep 21, 2023
1 parent 7e74d36 commit 7aff9f4
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 149 deletions.
139 changes: 2 additions & 137 deletions cashu/core/base.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,19 @@
import base64
import json
import time
from sqlite3 import Row
from typing import Any, Dict, List, Optional, Union
from typing import Dict, List, Optional, Union

from loguru import logger
from pydantic import BaseModel

from .crypto.keys import derive_keys, derive_keyset_id, derive_pubkeys
from .crypto.secp import PrivateKey, PublicKey
from .legacy import derive_keys_backwards_compatible_insecure_pre_0_12

# from .p2pk import sign_p2pk_sign
from .p2pk import P2SHScript

# ------- PROOFS -------


class SecretKind:
P2SH = "P2SH"
P2PK = "P2PK"


class SigFlags:
SIG_INPUTS = ( # require signatures only on the inputs (default signature flag)
"SIG_INPUTS"
)
SIG_ALL = "SIG_ALL" # require signatures on inputs and outputs


class Tags(BaseModel):
"""
Tags are used to encode additional information in the Secret of a Proof.
"""

__root__: List[List[str]] = []

def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs):
super().__init__(**kwargs)
self.__root__ = tags or []

def __setitem__(self, key: str, value: str) -> None:
self.__root__.append([key, value])

def __getitem__(self, key: str) -> Union[str, None]:
return self.get_tag(key)

def get_tag(self, tag_name: str) -> Union[str, None]:
for tag in self.__root__:
if tag[0] == tag_name:
return tag[1]
return None

def get_tag_all(self, tag_name: str) -> List[str]:
all_tags = []
for tag in self.__root__:
if tag[0] == tag_name:
for t in tag[1:]:
all_tags.append(t)
return all_tags


class Secret(BaseModel):
"""Describes spending condition encoded in the secret field of a Proof."""

kind: str
data: str
tags: Tags
nonce: Union[None, str] = None

def serialize(self) -> str:
data_dict: Dict[str, Any] = {
"data": self.data,
"nonce": self.nonce or PrivateKey().serialize()[:32],
}
if self.tags.__root__:
logger.debug(f"Serializing tags: {self.tags.__root__}")
data_dict["tags"] = self.tags.__root__
return json.dumps(
[self.kind, data_dict],
)

@classmethod
def deserialize(cls, from_proof: str):
kind, kwargs = json.loads(from_proof)
data = kwargs.pop("data")
nonce = kwargs.pop("nonce")
tags_list: List = kwargs.pop("tags", None)
tags = Tags(tags=tags_list)
logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}")
return cls(kind=kind, data=data, nonce=nonce, tags=tags)


class P2PKSecret(Secret):
@classmethod
def from_secret(cls, secret: Secret):
assert secret.kind == SecretKind.P2PK, "Secret is not a P2PK secret"
# NOTE: exclude tags in .dict() because it doesn't deserialize it properly
# need to add it back in manually with tags=secret.tags
return cls(**secret.dict(exclude={"tags"}), tags=secret.tags)

def get_p2pk_pubkey_from_secret(self) -> List[str]:
"""Gets the P2PK pubkey from a Secret depending on the locktime
Args:
secret (Secret): P2PK Secret in ecash token
Returns:
str: pubkey to use for P2PK, empty string if anyone can spend (locktime passed)
"""
# the pubkey in the data field is the pubkey to use for P2PK
pubkeys: List[str] = [self.data]

# get all additional pubkeys from tags for multisig
pubkeys += self.tags.get_tag_all("pubkeys")

# check if locktime is passed and if so, only return refund pubkeys
now = time.time()
if self.locktime and self.locktime < now:
logger.trace(f"p2pk locktime ran out ({self.locktime}<{now}).")
# check tags if a refund pubkey is present.
# If yes, we demand the signature to be from the refund pubkey
return self.tags.get_tag_all("refund")

return pubkeys

@property
def locktime(self) -> Union[None, int]:
locktime = self.tags.get_tag("locktime")
return int(locktime) if locktime else None

@property
def sigflag(self) -> Union[None, str]:
return self.tags.get_tag("sigflag")

@property
def n_sigs(self) -> Union[None, int]:
n_sigs = self.tags.get_tag("n_sigs")
return int(n_sigs) if n_sigs else None


class P2SHScript(BaseModel):
"""
Unlocks P2SH spending condition of a Proof
"""

script: str
signature: str
address: Union[str, None] = None


class Proof(BaseModel):
"""
Value token
Expand Down
139 changes: 139 additions & 0 deletions cashu/core/p2pk.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,147 @@
import hashlib
import json
import time
from typing import Any, Dict, List, Optional, Union

from loguru import logger
from pydantic import BaseModel

from .crypto.secp import PrivateKey, PublicKey


class SecretKind:
P2SH = "P2SH"
P2PK = "P2PK"


class SigFlags:
SIG_INPUTS = ( # require signatures only on the inputs (default signature flag)
"SIG_INPUTS"
)
SIG_ALL = "SIG_ALL" # require signatures on inputs and outputs


class Tags(BaseModel):
"""
Tags are used to encode additional information in the Secret of a Proof.
"""

__root__: List[List[str]] = []

def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs):
super().__init__(**kwargs)
self.__root__ = tags or []

def __setitem__(self, key: str, value: str) -> None:
self.__root__.append([key, value])

def __getitem__(self, key: str) -> Union[str, None]:
return self.get_tag(key)

def get_tag(self, tag_name: str) -> Union[str, None]:
for tag in self.__root__:
if tag[0] == tag_name:
return tag[1]
return None

def get_tag_all(self, tag_name: str) -> List[str]:
all_tags = []
for tag in self.__root__:
if tag[0] == tag_name:
for t in tag[1:]:
all_tags.append(t)
return all_tags


class Secret(BaseModel):
"""Describes spending condition encoded in the secret field of a Proof."""

kind: str
data: str
tags: Tags
nonce: Union[None, str] = None

def serialize(self) -> str:
data_dict: Dict[str, Any] = {
"data": self.data,
"nonce": self.nonce or PrivateKey().serialize()[:32],
}
if self.tags.__root__:
logger.debug(f"Serializing tags: {self.tags.__root__}")
data_dict["tags"] = self.tags.__root__
return json.dumps(
[self.kind, data_dict],
)

@classmethod
def deserialize(cls, from_proof: str):
kind, kwargs = json.loads(from_proof)
data = kwargs.pop("data")
nonce = kwargs.pop("nonce")
tags_list: List = kwargs.pop("tags", None)
tags = Tags(tags=tags_list)
logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}")
return cls(kind=kind, data=data, nonce=nonce, tags=tags)


class P2PKSecret(Secret):
@classmethod
def from_secret(cls, secret: Secret):
assert secret.kind == SecretKind.P2PK, "Secret is not a P2PK secret"
# NOTE: exclude tags in .dict() because it doesn't deserialize it properly
# need to add it back in manually with tags=secret.tags
return cls(**secret.dict(exclude={"tags"}), tags=secret.tags)

def get_p2pk_pubkey_from_secret(self) -> List[str]:
"""Gets the P2PK pubkey from a Secret depending on the locktime
Args:
secret (Secret): P2PK Secret in ecash token
Returns:
str: pubkey to use for P2PK, empty string if anyone can spend (locktime passed)
"""
# the pubkey in the data field is the pubkey to use for P2PK
pubkeys: List[str] = [self.data]

# get all additional pubkeys from tags for multisig
pubkeys += self.tags.get_tag_all("pubkeys")

# check if locktime is passed and if so, only return refund pubkeys
now = time.time()
if self.locktime and self.locktime < now:
logger.trace(f"p2pk locktime ran out ({self.locktime}<{now}).")
# check tags if a refund pubkey is present.
# If yes, we demand the signature to be from the refund pubkey
return self.tags.get_tag_all("refund")

return pubkeys

@property
def locktime(self) -> Union[None, int]:
locktime = self.tags.get_tag("locktime")
return int(locktime) if locktime else None

@property
def sigflag(self) -> Union[None, str]:
return self.tags.get_tag("sigflag")

@property
def n_sigs(self) -> Union[None, int]:
n_sigs = self.tags.get_tag("n_sigs")
return int(n_sigs) if n_sigs else None


class P2SHScript(BaseModel):
"""
Unlocks P2SH spending condition of a Proof
"""

script: str
signature: str
address: Union[str, None] = None


def sign_p2pk_sign(message: bytes, private_key: PrivateKey):
# ecdsa version
# signature = private_key.ecdsa_serialize(private_key.ecdsa_sign(message))
Expand Down
12 changes: 7 additions & 5 deletions cashu/mint/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
Invoice,
MintKeyset,
MintKeysets,
P2PKSecret,
Proof,
Secret,
SecretKind,
SigFlags,
)
from ..core.crypto import b_dhke
from ..core.crypto.keys import derive_pubkey, random_hash
Expand All @@ -34,7 +30,13 @@
TransactionError,
)
from ..core.helpers import fee_reserve, sum_proofs
from ..core.p2pk import verify_p2pk_signature
from ..core.p2pk import (
P2PKSecret,
Secret,
SecretKind,
SigFlags,
verify_p2pk_signature,
)
from ..core.script import verify_bitcoin_script
from ..core.settings import settings
from ..core.split import amount_split
Expand Down
2 changes: 1 addition & 1 deletion cashu/mint/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def main(
host=host,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
**d,
**d, # type: ignore
)
server = uvicorn.Server(config)
server.run()
10 changes: 6 additions & 4 deletions cashu/wallet/p2pk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
from ..core import bolt11 as bolt11
from ..core.base import (
BlindedMessage,
Proof,
)
from ..core.crypto.secp import PrivateKey
from ..core.db import Database
from ..core.p2pk import (
P2PKSecret,
P2SHScript,
Proof,
Secret,
SecretKind,
SigFlags,
Tags,
sign_p2pk_sign,
)
from ..core.crypto.secp import PrivateKey
from ..core.db import Database
from ..core.p2pk import sign_p2pk_sign
from ..core.script import (
step0_carol_checksig_redeemscrip,
step0_carol_privkey,
Expand Down
2 changes: 1 addition & 1 deletion cashu/wallet/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
PostRestoreResponse,
PostSplitRequest,
Proof,
Secret,
TokenV2,
TokenV2Mint,
TokenV3,
Expand All @@ -43,6 +42,7 @@
from ..core.db import Database
from ..core.helpers import calculate_number_of_blank_outputs, sum_proofs
from ..core.migrations import migrate_databases
from ..core.p2pk import Secret
from ..core.settings import settings
from ..core.split import amount_split
from ..tor.tor import TorProxy
Expand Down
3 changes: 2 additions & 1 deletion tests/test_wallet_p2pk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import pytest
import pytest_asyncio

from cashu.core.base import Proof, SigFlags, Tags
from cashu.core.base import Proof
from cashu.core.crypto.secp import PrivateKey, PublicKey
from cashu.core.migrations import migrate_databases
from cashu.core.p2pk import SigFlags, Tags
from cashu.wallet import migrations
from cashu.wallet.wallet import Wallet
from cashu.wallet.wallet import Wallet as Wallet1
Expand Down

0 comments on commit 7aff9f4

Please sign in to comment.