From a8c4d2bdfa0cd50c7f37ee807d4db3510dcb2944 Mon Sep 17 00:00:00 2001 From: DanGould Date: Wed, 8 Nov 2023 17:32:23 -0500 Subject: [PATCH] De/Serialize v2 payload --- payjoin-relay/Cargo.toml | 1 - payjoin/Cargo.toml | 1 + payjoin/src/receive/error.rs | 9 + payjoin/src/receive/mod.rs | 23 +- payjoin/src/send/mod.rs | 62 +++- payjoin/tests/integration.rs | 606 ++++++++++++++++++----------------- 6 files changed, 385 insertions(+), 317 deletions(-) diff --git a/payjoin-relay/Cargo.toml b/payjoin-relay/Cargo.toml index f04b5f02..b374af0a 100644 --- a/payjoin-relay/Cargo.toml +++ b/payjoin-relay/Cargo.toml @@ -28,4 +28,3 @@ sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio"] } tokio = { version = "1.12.0", features = ["full"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } - diff --git a/payjoin/Cargo.toml b/payjoin/Cargo.toml index 63ea7384..ee2e4f1f 100644 --- a/payjoin/Cargo.toml +++ b/payjoin/Cargo.toml @@ -18,6 +18,7 @@ exclude = ["tests"] send = [] receive = ["rand"] base64 = ["bitcoin/base64"] +v2 = [] [dependencies] bitcoin = { version = "0.30.0", features = ["base64"] } diff --git a/payjoin/src/receive/error.rs b/payjoin/src/receive/error.rs index ffc91360..9bfdfcf1 100644 --- a/payjoin/src/receive/error.rs +++ b/payjoin/src/receive/error.rs @@ -65,6 +65,11 @@ pub(crate) enum InternalRequestError { /// Original PSBT input has been seen before. Only automatic receivers, aka "interactive" in the spec /// look out for these to prevent probing attacks. InputSeen(bitcoin::OutPoint), + /// Serde deserialization failed + #[cfg(feature = "v2")] + ParsePsbt(bitcoin::psbt::PsbtParseError), + #[cfg(feature = "v2")] + Utf8(std::string::FromUtf8Error), } impl From for RequestError { @@ -125,6 +130,10 @@ impl fmt::Display for RequestError { write_error(f, "original-psbt-rejected", &format!("Input Type Error: {}.", e)), InternalRequestError::InputSeen(_) => write_error(f, "original-psbt-rejected", "The receiver rejected the original PSBT."), + #[cfg(feature = "v2")] + InternalRequestError::ParsePsbt(e) => write_error(f, "Error parsing PSBT:", e), + #[cfg(feature = "v2")] + InternalRequestError::Utf8(e) => write_error(f, "Error parsing PSBT:", e), } } } diff --git a/payjoin/src/receive/mod.rs b/payjoin/src/receive/mod.rs index 72e257ae..88bd36c7 100644 --- a/payjoin/src/receive/mod.rs +++ b/payjoin/src/receive/mod.rs @@ -303,23 +303,22 @@ pub struct UncheckedProposal { } impl UncheckedProposal { + #[cfg(feature = "v2")] pub fn from_relay_response(mut body: impl std::io::Read) -> Result { + use std::str::FromStr; + let mut buf = Vec::new(); let _ = body.read_to_end(&mut buf); - let base64 = bitcoin::base64::decode(buf).map_err(InternalRequestError::Base64)?; - let unchecked_psbt = Psbt::deserialize(&base64).map_err(InternalRequestError::Psbt)?; - + let buf_as_string = String::from_utf8(buf.to_vec()).map_err(InternalRequestError::Utf8)?; + log::debug!("{}", &buf_as_string); + let (query, base64) = buf_as_string.split_once('\n').unwrap_or_default(); + let unchecked_psbt = Psbt::from_str(base64).map_err(InternalRequestError::ParsePsbt)?; let psbt = unchecked_psbt.validate().map_err(InternalRequestError::InconsistentPsbt)?; log::debug!("Received original psbt: {:?}", psbt); - - // TODO accept parameters - // let pairs = url::form_urlencoded::parse(query.as_bytes()); - // let params = Params::from_query_pairs(pairs).map_err(InternalRequestError::SenderParams)?; - // log::debug!("Received request with params: {:?}", params); - - // TODO handle v1 and v2 - - Ok(UncheckedProposal { psbt, params: Params::default() }) + let params = Params::from_query_pairs(url::form_urlencoded::parse(query.as_bytes())) + .map_err(InternalRequestError::SenderParams)?; + log::debug!("Received request with params: {:?}", params); + Ok(Self { psbt, params }) } pub fn from_request( diff --git a/payjoin/src/send/mod.rs b/payjoin/src/send/mod.rs index 32e5601f..161a9804 100644 --- a/payjoin/src/send/mod.rs +++ b/payjoin/src/send/mod.rs @@ -326,16 +326,34 @@ impl<'a> RequestBuilder<'a> { let sequence = zeroth_input.txin.sequence; let txout = zeroth_input.previous_txout().expect("We already checked this above"); let input_type = InputType::from_spent_input(txout, zeroth_input.psbtin).unwrap(); - let url = serialize_url( - self.uri.extras._endpoint.into(), - disable_output_substitution, - fee_contribution, - self.min_fee_rate, - ) - .map_err(InternalCreateRequestError::Url)?; - let body = serialize_psbt(&psbt); + + #[cfg(not(feature = "v2"))] + let request = { + let url = serialize_url( + self.uri.extras._endpoint.into(), + disable_output_substitution, + fee_contribution, + self.min_fee_rate, + ) + .map_err(InternalCreateRequestError::Url)?; + let body = psbt.to_string().as_bytes().to_vec(); + Request { url, body } + }; + + #[cfg(feature = "v2")] + let request = { + let url = self.uri.extras._endpoint; + let body = serialize_v2_body( + &psbt, + disable_output_substitution, + fee_contribution, + self.min_fee_rate, + )?; + Request { url, body } + }; + Ok(( - Request { url, body }, + request, Context { original_psbt: psbt, disable_output_substitution, @@ -372,6 +390,7 @@ pub struct Request { /// /// This type is used to process the response. Get it from [`RequestBuilder`](crate::send::RequestBuilder)'s build methods. /// Then you only need to call [`.process_response()`](crate::send::Context::process_response()) on it to continue BIP78 flow. +#[derive(Debug)] pub struct Context { original_psbt: Psbt, disable_output_substitution: bool, @@ -769,6 +788,26 @@ fn determine_fee_contribution( }) } +#[cfg(feature = "v2")] +fn serialize_v2_body( + psbt: &Psbt, + disable_output_substitution: bool, + fee_contribution: Option<(bitcoin::Amount, usize)>, + min_feerate: FeeRate, +) -> Result, CreateRequestError> { + // Grug say localhost base be discarded anyway. no big brain needed. + let placeholder_url = serialize_url( + "http:/localhost".to_string(), + disable_output_substitution, + fee_contribution, + min_feerate, + ) + .map_err(InternalCreateRequestError::Url)?; + let query_params = placeholder_url.query().unwrap_or_default(); + let body = psbt.to_string(); + Ok(format!("{}\n{}", query_params, body).into_bytes()) +} + fn serialize_url( endpoint: String, disable_output_substitution: bool, @@ -793,11 +832,6 @@ fn serialize_url( Ok(url) } -fn serialize_psbt(psbt: &Psbt) -> Vec { - let bytes = psbt.serialize(); - bitcoin::base64::encode(bytes).into_bytes() -} - #[cfg(test)] mod tests { #[test] diff --git a/payjoin/tests/integration.rs b/payjoin/tests/integration.rs index ab378487..5b57d91d 100644 --- a/payjoin/tests/integration.rs +++ b/payjoin/tests/integration.rs @@ -2,9 +2,7 @@ mod integration { use std::collections::HashMap; use std::env; - use std::process::Stdio; use std::str::FromStr; - use std::sync::Arc; use bitcoin::address::NetworkChecked; use bitcoin::psbt::Psbt; @@ -14,270 +12,339 @@ mod integration { use bitcoind::bitcoincore_rpc::RpcApi; use log::{debug, log_enabled, Level}; use payjoin::bitcoin::base64; - use payjoin::receive::{Headers, UncheckedProposal}; - use payjoin::send::{Request, RequestBuilder}; + use payjoin::receive::UncheckedProposal; + use payjoin::send::RequestBuilder; use payjoin::Uri; - use testcontainers::Container; - use testcontainers_modules::postgres::Postgres; - use testcontainers_modules::testcontainers::clients::Cli; - use tokio::process::{Child, Command}; - use tokio::task::spawn_blocking; - - const EXAMPLE_URL: &str = "https://example.com"; - const RELAY_URL: &str = "https://localhost:8088"; - const LOCAL_CERT_FILE: &str = "localhost.der"; type BoxError = Box; - #[test] - fn v1_to_v1() -> Result<(), BoxError> { - let _ = env_logger::try_init(); - let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver()?; - - // Receiver creates the payjoin URI - let pj_receiver_address = receiver.get_new_address(None, None)?.assume_checked(); - let pj_uri = build_pj_uri(pj_receiver_address, Amount::ONE_BTC, EXAMPLE_URL); - // Sender create a funded PSBT (not broadcasted) to address with amount given in the pj_uri - let psbt = build_original_psbt(&sender, &pj_uri)?; - debug!("Original psbt: {:#?}", psbt); - let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt, pj_uri)? - .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)?; - let headers = HeaderMock::from_vec(&req.body); - - // ********************** - // Inside the Receiver: - // this data would transit from one party to another over the network in production - let response = handle_pj_request(req, headers, receiver); - // this response would be returned as http response to the sender - - // ********************** - // Inside the Sender: - // Sender checks, signs, finalizes, extracts, and broadcasts - let checked_payjoin_proposal_psbt = ctx.process_response(&mut response.as_bytes())?; - let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; - sender.send_raw_transaction(&payjoin_tx)?; - Ok(()) - } + #[cfg(not(feature = "v2"))] + mod v1 { + use payjoin::receive::Headers; + use payjoin::send::RequestBuilder; + + use super::*; + + const EXAMPLE_URL: &str = "https://example.com"; + + #[test] + fn v1_to_v1() -> Result<(), BoxError> { + let _ = env_logger::try_init(); + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver()?; + + // Receiver creates the payjoin URI + let pj_receiver_address = receiver.get_new_address(None, None)?.assume_checked(); + let pj_uri = build_pj_uri(pj_receiver_address, Amount::ONE_BTC, EXAMPLE_URL); + // Sender create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let psbt = build_original_psbt(&sender, &pj_uri)?; + debug!("Original psbt: {:#?}", psbt); + let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt, pj_uri)? + .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)?; + let headers = HeaderMock::from_vec(&req.body); + + // ********************** + // Inside the Receiver: + // this data would transit from one party to another over the network in production + let response = handle_pj_request(req, headers, receiver); + // this response would be returned as http response to the sender + + // ********************** + // Inside the Sender: + // Sender checks, signs, finalizes, extracts, and broadcasts + let checked_payjoin_proposal_psbt = ctx.process_response(&mut response.as_bytes())?; + let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; + sender.send_raw_transaction(&payjoin_tx)?; + Ok(()) + } - #[tokio::test] - async fn v2_to_v2() -> Result<(), BoxError> { - std::env::set_var("RUST_LOG", "debug"); - let _ = env_logger::builder().is_test(true).try_init(); - let docker = Cli::default(); - let (mut relay, _db) = init_relay(&docker).await; - let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver()?; - - // ********************** - // Inside the Receiver: - // Enroll with relay - let secp = bitcoin::secp256k1::Secp256k1::new(); - let mut rng = bitcoin::secp256k1::rand::thread_rng(); - let key = bitcoin::secp256k1::KeyPair::new(&secp, &mut rng); - let b64_config = base64::Config::new(base64::CharacterSet::UrlSafe, false); - let pubkey_base64 = base64::encode_config(key.public_key().to_string(), b64_config); - let pk64 = pubkey_base64.clone(); - let enroll = - spawn_blocking(move || http_agent().post(RELAY_URL).send_string(&pk64)).await??; - assert!(enroll.status() == 204); - // Receiver creates the payjoin URI - let pj_receiver_address = receiver.get_new_address(None, None)?.assume_checked(); - let relay_endpoint = format!("{}/{}", RELAY_URL, &pubkey_base64); - let pj_uri = build_pj_uri(pj_receiver_address, Amount::ONE_BTC, &relay_endpoint); - - // ********************** - // Inside the Sender: - // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri - let psbt = build_original_psbt(&sender, &pj_uri)?; - debug!("Original psbt: {:#?}", psbt); - let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt, pj_uri)? - .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)?; - log::info!("send fallback v2"); - log::debug!("Request: {:#?}", &req.body); - let response = spawn_blocking(move || { - http_agent() - .post(req.url.as_str()) - .set("Content-Type", "text/plain") - .set("Async", "true") - .send_string(String::from_utf8(req.body).unwrap().as_ref()) - }) - .await??; - log::info!("Response: {:#?}", &response); - assert!(response.status() == 202); - // no response body yet since we are async and pushed fallback_psbt to the buffer - - // ********************** - // Inside the Receiver: - // this data would transit from one party to another over the network in production - let receive_endpoint = format!("{}/{}", RELAY_URL, &pubkey_base64); - let response = spawn_blocking(move || http_agent().get(&receive_endpoint).call()).await??; - let response = handle_relay_response(response.into_reader(), receiver); - // this response would be returned as http response to the sender - - // ********************** - // Inside the Sender: - // Sender checks, signs, finalizes, extracts, and broadcasts - let checked_payjoin_proposal_psbt = ctx.process_response(&mut response.as_bytes())?; - let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; - sender.send_raw_transaction(&payjoin_tx)?; - log::info!("sent"); - relay.kill().await?; - let output = &relay.wait_with_output().await?; - log::info!("Status: {}", output.status); - Ok(()) - } + struct HeaderMock(HashMap); - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn v1_to_v2() -> Result<(), BoxError> { - std::env::set_var("RUST_LOG", "debug"); - let _ = env_logger::builder().is_test(true).try_init(); - let docker = Cli::default(); - let (mut relay, _db) = init_relay(&docker).await; - let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver()?; - - // ********************** - // Inside the Receiver: - // Enroll with relay - let secp = bitcoin::secp256k1::Secp256k1::new(); - let mut rng = bitcoin::secp256k1::rand::thread_rng(); - let key = bitcoin::secp256k1::KeyPair::new(&secp, &mut rng); - let b64_config = base64::Config::new(base64::CharacterSet::UrlSafe, false); - let pubkey_base64 = base64::encode_config(key.public_key().to_string(), b64_config); - let pk64 = pubkey_base64.clone(); - let enroll = - spawn_blocking(move || http_agent().post(RELAY_URL).send_string(&pk64.clone())) - .await? - .unwrap(); - assert!(enroll.status() == 204); - - // Receiver creates the payjoin URI - let pj_receiver_address = receiver.get_new_address(None, None).unwrap().assume_checked(); - let relay_endpoint = format!("{}/{}", RELAY_URL, &pubkey_base64); - let pj_uri = build_pj_uri(pj_receiver_address, Amount::ONE_BTC, &relay_endpoint); - - // ********************** - // Inside the V1 Sender: - // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri - let psbt = build_original_psbt(&sender, &pj_uri)?; - debug!("Original psbt: {:#?}", psbt); - let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt, pj_uri)? - .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)?; - log::info!("send fallback v1 to offline receiver fail"); - let req_clone = req.clone(); - let res = spawn_blocking(move || { - http_agent() - .post(req_clone.url.as_str()) - .set("Content-Type", "text/plain") - .send_bytes(&req_clone.body) - }) - .await?; - match res { - Err(ureq::Error::Status(code, _)) => assert_eq!(code, 503), - _ => panic!("Expected response status code 503, found {:?}", res), + impl Headers for HeaderMock { + fn get_header(&self, key: &str) -> Option<&str> { self.0.get(key).map(|e| e.as_str()) } } - // ********************** - // Inside the Receiver: - let receiver_loop = tokio::task::spawn(async move { - let fallback_psbt_body = loop { - let pk64 = pubkey_base64.clone(); - let response = spawn_blocking(move || { - let receive_endpoint = format!("{}/{}", RELAY_URL, &pk64); - http_agent().get(&receive_endpoint).call() - }) - .await??; + impl HeaderMock { + fn from_vec(body: &[u8]) -> HeaderMock { + let mut h = HashMap::new(); + h.insert("content-type".to_string(), "text/plain".to_string()); + h.insert("content-length".to_string(), body.len().to_string()); + HeaderMock(h) + } + } + + // Receiver receive and process original_psbt from a sender + // In production it it will come in as an HTTP request (over ssl or onion) + fn handle_pj_request( + req: Request, + headers: impl Headers, + receiver: bitcoincore_rpc::Client, + ) -> String { + // Receiver receive payjoin proposal, IRL it will be an HTTP request (over ssl or onion) + let proposal = payjoin::receive::UncheckedProposal::from_request( + req.body.as_slice(), + req.url.query().unwrap_or(""), + headers, + ) + .unwrap(); + handle_proposal(proposal, receiver) + } + } - if response.status() == 200 { - debug!("GET'd fallback_psbt"); - break response.into_reader(); - } else if response.status() == 202 { - log::info!("No response yet for POST payjoin request, retrying some seconds"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } else { - log::error!("Unexpected response status: {}", response.status()); - panic!("Unexpected response status: {}", response.status()) - } - }; - debug!("handle relay response"); - let response = handle_relay_response(fallback_psbt_body, receiver); - debug!("Post payjoin_psbt to relay"); - // Respond with payjoin psbt within the time window the sender is willing to wait - let payjoin_endpoint = format!("{}/{}/payjoin", RELAY_URL, &pubkey_base64); + #[cfg(feature = "v2")] + mod v2 { + use std::process::Stdio; + use std::sync::Arc; + + use testcontainers::Container; + use testcontainers_modules::postgres::Postgres; + use testcontainers_modules::testcontainers::clients::Cli; + use tokio::process::{Child, Command}; + use tokio::task::spawn_blocking; + + use super::*; + + const RELAY_URL: &str = "https://localhost:8088"; + const LOCAL_CERT_FILE: &str = "localhost.der"; + + #[tokio::test] + async fn v2_to_v2() -> Result<(), BoxError> { + std::env::set_var("RUST_LOG", "debug"); + let _ = env_logger::builder().is_test(true).try_init(); + let docker = Cli::default(); + let (mut relay, _db) = init_relay(&docker).await; + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver()?; + + // ********************** + // Inside the Receiver: + // Enroll with relay + let secp = bitcoin::secp256k1::Secp256k1::new(); + let mut rng = bitcoin::secp256k1::rand::thread_rng(); + let key = bitcoin::secp256k1::KeyPair::new(&secp, &mut rng); + let b64_config = base64::Config::new(base64::CharacterSet::UrlSafe, false); + let pubkey_base64 = base64::encode_config(key.public_key().to_string(), b64_config); + let pk64 = pubkey_base64.clone(); + let enroll = + spawn_blocking(move || http_agent().post(RELAY_URL).send_string(&pk64)).await??; + assert!(enroll.status() == 204); + // Receiver creates the payjoin URI + let pj_receiver_address = receiver.get_new_address(None, None)?.assume_checked(); + let relay_endpoint = format!("{}/{}", RELAY_URL, &pubkey_base64); + let pj_uri = build_pj_uri(pj_receiver_address, Amount::ONE_BTC, &relay_endpoint); + + // ********************** + // Inside the Sender: + // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let psbt = build_original_psbt(&sender, &pj_uri)?; + debug!("Original psbt: {:#?}", psbt); + let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt, pj_uri)? + .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)?; + log::info!("send fallback v2"); + log::debug!("Request: {:#?}", &req.body); + let response = spawn_blocking(move || { + http_agent() + .post(req.url.as_str()) + .set("Content-Type", "text/plain") + .set("Async", "true") + .send_string(String::from_utf8(req.body).unwrap().as_ref()) + }) + .await??; + log::info!("Response: {:#?}", &response); + assert!(response.status() == 202); + // no response body yet since we are async and pushed fallback_psbt to the buffer + + // ********************** + // Inside the Receiver: + // this data would transit from one party to another over the network in production + let receive_endpoint = format!("{}/{}", RELAY_URL, &pubkey_base64); let response = - spawn_blocking(move || http_agent().post(&payjoin_endpoint).send_string(&response)) + spawn_blocking(move || http_agent().get(&receive_endpoint).call()).await??; + let response = handle_relay_response(response.into_reader(), receiver); + // this response would be returned as http response to the sender + + // ********************** + // Inside the Sender: + // Sender checks, signs, finalizes, extracts, and broadcasts + let checked_payjoin_proposal_psbt = ctx.process_response(&mut response.as_bytes())?; + let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; + sender.send_raw_transaction(&payjoin_tx)?; + log::info!("sent"); + relay.kill().await?; + let output = &relay.wait_with_output().await?; + log::info!("Status: {}", output.status); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[cfg(feature = "v2")] + async fn v1_to_v2() -> Result<(), BoxError> { + std::env::set_var("RUST_LOG", "debug"); + let _ = env_logger::builder().is_test(true).try_init(); + let docker = Cli::default(); + let (mut relay, _db) = init_relay(&docker).await; + let (_bitcoind, sender, receiver) = init_bitcoind_sender_receiver()?; + + // ********************** + // Inside the Receiver: + // Enroll with relay + let secp = bitcoin::secp256k1::Secp256k1::new(); + let mut rng = bitcoin::secp256k1::rand::thread_rng(); + let key = bitcoin::secp256k1::KeyPair::new(&secp, &mut rng); + let b64_config = base64::Config::new(base64::CharacterSet::UrlSafe, false); + let pubkey_base64 = base64::encode_config(key.public_key().to_string(), b64_config); + let pk64 = pubkey_base64.clone(); + let enroll = + spawn_blocking(move || http_agent().post(RELAY_URL).send_string(&pk64.clone())) + .await? + .unwrap(); + assert!(enroll.status() == 204); + + // Receiver creates the payjoin URI + let pj_receiver_address = + receiver.get_new_address(None, None).unwrap().assume_checked(); + let relay_endpoint = format!("{}/{}", RELAY_URL, &pubkey_base64); + let pj_uri = build_pj_uri(pj_receiver_address, Amount::ONE_BTC, &relay_endpoint); + + // ********************** + // Inside the V1 Sender: + // Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri + let psbt = build_original_psbt(&sender, &pj_uri)?; + debug!("Original psbt: {:#?}", psbt); + let (req, ctx) = RequestBuilder::from_psbt_and_uri(psbt, pj_uri)? + .build_with_additional_fee(Amount::from_sat(10000), None, FeeRate::ZERO, false)?; + log::info!("send fallback v1 to offline receiver fail"); + let req_clone = req.clone(); + let res = spawn_blocking(move || { + http_agent() + .post(req_clone.url.as_str()) + .set("Content-Type", "text/plain") + .send_bytes(&req_clone.body) + }) + .await?; + match res { + Err(ureq::Error::Status(code, _)) => assert_eq!(code, 503), + _ => panic!("Expected response status code 503, found {:?}", res), + } + + // ********************** + // Inside the Receiver: + let receiver_loop = tokio::task::spawn(async move { + let fallback_psbt_body = loop { + let pk64 = pubkey_base64.clone(); + let response = spawn_blocking(move || { + let receive_endpoint = format!("{}/{}", RELAY_URL, &pk64); + http_agent().get(&receive_endpoint).call() + }) .await??; - debug!("POSTed with payjoin_psbt response status {}", response.status()); - assert!(response.status() == 204); - Ok::<_, Box>(()) - }); - - // ********************** - // send fallback v1 to online receiver - log::info!("send fallback v1 to online receiver should succeed"); - let req_clone = req.clone(); - let response = spawn_blocking(move || { - http_agent() - .post(req_clone.url.as_str()) - .set("Content-Type", "text/plain") - .send_bytes(&req_clone.body) - .expect("Failed to send request") - }) - .await?; - log::info!("Response: {:#?}", &response); - assert!(response.status() == 200); - - let checked_payjoin_proposal_psbt = ctx.process_response(&mut response.into_reader())?; - let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; - sender.send_raw_transaction(&payjoin_tx)?; - log::info!("sent"); - assert!(receiver_loop.await.is_ok(), "The spawned task panicked or returned an error"); - relay.kill().await?; - let output = &relay.wait_with_output().await?; - log::info!("Status: {}", output.status); - Ok(()) - } - struct HeaderMock(HashMap); + if response.status() == 200 { + debug!("GET'd fallback_psbt"); + break response.into_reader(); + } else if response.status() == 202 { + log::info!( + "No response yet for POST payjoin request, retrying some seconds" + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } else { + log::error!("Unexpected response status: {}", response.status()); + panic!("Unexpected response status: {}", response.status()) + } + }; + debug!("handle relay response"); + let response = handle_relay_response(fallback_psbt_body, receiver); + debug!("Post payjoin_psbt to relay"); + // Respond with payjoin psbt within the time window the sender is willing to wait + let payjoin_endpoint = format!("{}/{}/payjoin", RELAY_URL, &pubkey_base64); + let response = spawn_blocking(move || { + http_agent().post(&payjoin_endpoint).send_string(&response) + }) + .await??; + debug!("POSTed with payjoin_psbt response status {}", response.status()); + assert!(response.status() == 204); + Ok::<_, Box>(()) + }); + + // ********************** + // send fallback v1 to online receiver + log::info!("send fallback v1 to online receiver should succeed"); + let req_clone = req.clone(); + let response = spawn_blocking(move || { + http_agent() + .post(req_clone.url.as_str()) + .set("Content-Type", "text/plain") + .send_bytes(&req_clone.body) + .expect("Failed to send request") + }) + .await?; + log::info!("Response: {:#?}", &response); + assert!(response.status() == 200); + + let checked_payjoin_proposal_psbt = + ctx.process_response(&mut response.into_reader())?; + let payjoin_tx = extract_pj_tx(&sender, checked_payjoin_proposal_psbt)?; + sender.send_raw_transaction(&payjoin_tx)?; + log::info!("sent"); + assert!(receiver_loop.await.is_ok(), "The spawned task panicked or returned an error"); + relay.kill().await?; + let output = &relay.wait_with_output().await?; + log::info!("Status: {}", output.status); + Ok(()) + } - impl Headers for HeaderMock { - fn get_header(&self, key: &str) -> Option<&str> { self.0.get(key).map(|e| e.as_str()) } - } + async fn init_relay<'a>(docker: &'a Cli) -> (Child, Container<'a, Postgres>) { + println!("Initializing relay server"); + env::set_var("PJ_RELAY_PORT", "8088"); + env::set_var("PJ_RELAY_TIMEOUT_SECS", "2"); + //env::set_var("PGPASSWORD", "welcome"); + let postgres = docker.run(Postgres::default()); + env::set_var("PJ_DB_HOST", format!("127.0.0.1:{}", postgres.get_host_port_ipv4(5432))); + println!("Postgres running on {}", postgres.get_host_port_ipv4(5432)); + compile_payjoin_relay().await.wait().await.unwrap(); + let workspace_root = env::var("CARGO_MANIFEST_DIR").unwrap(); + let binary_path = format!("{}/../target/debug/payjoin-relay", workspace_root); + let mut command = Command::new(binary_path); + command.stdout(Stdio::inherit()).stderr(Stdio::inherit()); + (command.spawn().unwrap(), postgres) + } - impl HeaderMock { - fn from_vec(body: &[u8]) -> HeaderMock { - let mut h = HashMap::new(); - h.insert("content-type".to_string(), "text/plain".to_string()); - h.insert("content-length".to_string(), body.len().to_string()); - HeaderMock(h) + async fn compile_payjoin_relay() -> Child { + // set payjoin relay target dir to payjoin-relay + let mut command = Command::new("cargo"); + command.stdout(Stdio::inherit()).stderr(Stdio::inherit()).args([ + "build", + "--package", + "payjoin-relay", + "--features", + "danger-local-https", + ]); + command.spawn().unwrap() } - } - async fn init_relay<'a>(docker: &'a Cli) -> (Child, Container<'a, Postgres>) { - println!("Initializing relay server"); - env::set_var("PJ_RELAY_PORT", "8088"); - env::set_var("PJ_RELAY_TIMEOUT_SECS", "2"); - //env::set_var("PGPASSWORD", "welcome"); - let postgres = docker.run(Postgres::default()); - env::set_var("PJ_DB_HOST", format!("127.0.0.1:{}", postgres.get_host_port_ipv4(5432))); - println!("Postgres running on {}", postgres.get_host_port_ipv4(5432)); - compile_payjoin_relay().await.wait().await.unwrap(); - let workspace_root = env::var("CARGO_MANIFEST_DIR").unwrap(); - let binary_path = format!("{}/../target/debug/payjoin-relay", workspace_root); - let mut command = Command::new(binary_path); - command.stdout(Stdio::inherit()).stderr(Stdio::inherit()); - (command.spawn().unwrap(), postgres) - } + fn handle_relay_response( + res: impl std::io::Read, + receiver: bitcoincore_rpc::Client, + ) -> String { + let proposal = payjoin::receive::UncheckedProposal::from_relay_response(res).unwrap(); + handle_proposal(proposal, receiver) + } - async fn compile_payjoin_relay() -> Child { - // set payjoin relay target dir to payjoin-relay - let mut command = Command::new("cargo"); - command.stdout(Stdio::inherit()).stderr(Stdio::inherit()).args([ - "build", - "--package", - "payjoin-relay", - "--features", - "danger-local-https", - ]); - command.spawn().unwrap() + fn http_agent() -> ureq::Agent { + use rustls::client::ClientConfig; + use rustls::{Certificate, RootCertStore}; + use ureq::AgentBuilder; + + let mut local_cert_path = std::env::temp_dir(); + local_cert_path.push(LOCAL_CERT_FILE); + println!("TEST CERT PATH {:?}", &local_cert_path); + let cert_der = std::fs::read(local_cert_path).unwrap(); + let mut root_cert_store = RootCertStore::empty(); + root_cert_store.add(&Certificate(cert_der)).unwrap(); + let client_config = ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + + AgentBuilder::new().tls_config(Arc::new(client_config)).build() + } } fn init_bitcoind_sender_receiver( @@ -347,28 +414,6 @@ mod integration { Ok(Psbt::from_str(&psbt)?) } - // Receiver receive and process original_psbt from a sender - // In production it it will come in as an HTTP request (over ssl or onion) - fn handle_pj_request( - req: Request, - headers: impl Headers, - receiver: bitcoincore_rpc::Client, - ) -> String { - // Receiver receive payjoin proposal, IRL it will be an HTTP request (over ssl or onion) - let proposal = payjoin::receive::UncheckedProposal::from_request( - req.body.as_slice(), - req.url.query().unwrap_or(""), - headers, - ) - .unwrap(); - handle_proposal(proposal, receiver) - } - - fn handle_relay_response(res: impl std::io::Read, receiver: bitcoincore_rpc::Client) -> String { - let proposal = payjoin::receive::UncheckedProposal::from_relay_response(res).unwrap(); - handle_proposal(proposal, receiver) - } - fn handle_proposal(proposal: UncheckedProposal, receiver: bitcoincore_rpc::Client) -> String { // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast(); @@ -471,23 +516,4 @@ mod integration { Ok(payjoin_psbt.extract_tx()) } - - fn http_agent() -> ureq::Agent { - use rustls::client::ClientConfig; - use rustls::{Certificate, RootCertStore}; - use ureq::AgentBuilder; - - let mut local_cert_path = std::env::temp_dir(); - local_cert_path.push(LOCAL_CERT_FILE); - println!("TEST CERT PATH {:?}", &local_cert_path); - let cert_der = std::fs::read(local_cert_path).unwrap(); - let mut root_cert_store = RootCertStore::empty(); - root_cert_store.add(&Certificate(cert_der)).unwrap(); - let client_config = ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_cert_store) - .with_no_client_auth(); - - AgentBuilder::new().tls_config(Arc::new(client_config)).build() - } }