Skip to content

Commit

Permalink
Store and forward v2 messages with payjoin-relay
Browse files Browse the repository at this point in the history
Use postgres and hyper to store and notify clients' updates.
  • Loading branch information
DanGould committed Dec 11, 2023
1 parent 745326d commit 933fd5b
Show file tree
Hide file tree
Showing 11 changed files with 1,814 additions and 99 deletions.
988 changes: 970 additions & 18 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["payjoin", "payjoin-cli"]
members = ["payjoin", "payjoin-cli", "payjoin-relay"]
resolver = "2"

[patch.crates-io.payjoin]
Expand Down
31 changes: 31 additions & 0 deletions payjoin-relay/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "payjoin-relay"
version = "0.0.1"
authors = ["Dan Gould <[email protected]>"]
description = "A relay server for Payjoin V2 coordination"
repository = "https://github.com/payjoin/rust-payjoin"
readme = "README.md"
keywords = ["bip78", "payjoin", "bitcoin", "relay"]
categories = ["cryptography::cryptocurrencies", "network-programming"]
license = "MITNFA"
edition = "2021"
resolver = "2"
exclude = ["tests"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
danger-local-https = ["hyper-rustls", "rcgen", "rustls"]

[dependencies]
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = { version = "0.24", optional = true }
anyhow = "1.0.71"
payjoin = { path = "../payjoin", features = ["base64"] }
rcgen = { version = "0.11", optional = true }
rustls = { version = "0.21", optional = true }
sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio"] }
tokio = { version = "1.12.0", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

15 changes: 15 additions & 0 deletions payjoin-relay/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# payjoin-relay

## Payjoin v2 Relay

Payjoin v2 peers relay HTTP client messages in order to cordinate an asynchronous Payjoin transaction. Version 1 Requires the receiver to host a public HTTP server and to set up security using either HTTPS or Onion Services above and beyond typical HTTP client operation.

V2 clients use Hybrid Pubkey Encryption established in the bitcoin URI payment request for security instead, allowing lightweight clients secure communication without the burden of setup, which is done by the operator of this third-party relay server. This relay only sees OHTTP encapsulated, encrypted requests to prevent it from collecting metadata to break the privacy benefits of payjoin for messages who follow the spec.

This relay *only* accepts v2 payloads via Oblivious HTTP (OHTTP), preventing it from identifying IP addresses of clients.

## Architecture

The relay is a simple mailbox. Receivers may enroll by making a request to a pubkey identified subdirectory. After success response, they may share this subdirectory as payjoin endpoint to the sender in a bitcoin URI. The sender may poll the subdirectory with a request posting their encrypted Fallback PSBT expecting a Payjoin Proposal PSBT response. The receiver may poll the enroll endpoint to await a request, later posting their Payjoin Proposal PSBT for the sender to receive, sign, and broadcast.

The relay does depend on a second independent Oblivious HTTP Relay to help secure request/response metadata from the Payjoin Relay.
189 changes: 189 additions & 0 deletions payjoin-relay/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::time::Duration;

use anyhow::Result;
use sqlx::postgres::{PgListener, PgPoolOptions};
use sqlx::{PgPool, Pool, Postgres};
use tracing::debug;

const RES_COLUMN: &str = "res";
const REQ_COLUMN: &str = "req";

pub(crate) struct DbPool {
pool: Pool<Postgres>,
timeout: Duration,
}

impl DbPool {
/// Initialize a database connection pool with specified peek timeout
pub async fn new(timeout: Duration, db_host: String) -> Result<Self> {
let pool = init_postgres(db_host).await?;
Ok(Self { pool, timeout })
}

pub async fn peek_req(&self, pubkey_id: &str) -> Option<Result<Vec<u8>, sqlx::Error>> {
peek_with_timeout(&self.pool, pubkey_id, REQ_COLUMN, self.timeout).await
}
pub async fn peek_res(&self, pubkey_id: &str) -> Option<Result<Vec<u8>, sqlx::Error>> {
debug!("peek res");
peek_with_timeout(&self.pool, pubkey_id, RES_COLUMN, self.timeout).await
}

pub async fn push_req(&self, pubkey_id: &str, data: Vec<u8>) -> Result<(), sqlx::Error> {
push(&self.pool, pubkey_id, REQ_COLUMN, data).await
}

pub async fn push_res(&self, pubkey_id: &str, data: Vec<u8>) -> Result<(), sqlx::Error> {
debug!("push res");
push(&self.pool, pubkey_id, RES_COLUMN, data).await
}
}

impl Clone for DbPool {
fn clone(&self) -> Self { Self { pool: self.pool.clone(), timeout: self.timeout } }
}

async fn init_postgres(db_host: String) -> Result<PgPool> {
let pool = PgPoolOptions::new()
.connect(&format!("postgres://postgres:welcome@{}/postgres", db_host))
.await?;
// Create table if not exist yet
let (table_exists,): (bool,) =
sqlx::query_as("SELECT EXISTS (SELECT FROM pg_tables WHERE tablename = 'relay')")
.fetch_one(&pool)
.await?;

if !table_exists {
// Create the table
sqlx::query(
r#"
CREATE TABLE relay (
pubkey_id VARCHAR PRIMARY KEY,
req BYTEA,
res BYTEA
);
"#,
)
.execute(&pool)
.await?;

// Create the function for notification
sqlx::query(
r#"
CREATE OR REPLACE FUNCTION notify_change()
RETURNS TRIGGER AS $$
DECLARE
channel_name text;
BEGIN
channel_name := NEW.pubkey_id || '_' || TG_ARGV[0];
PERFORM pg_notify(channel_name, TG_TABLE_NAME || ', ' || NEW.pubkey_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"#,
)
.execute(&pool)
.await?;

// Create triggers
sqlx::query(
r#"
CREATE TRIGGER relay_req_trigger
AFTER INSERT OR UPDATE OF req ON relay
FOR EACH ROW
EXECUTE FUNCTION notify_change('req');
"#,
)
.execute(&pool)
.await?;

sqlx::query(
r#"
CREATE TRIGGER relay_res_trigger
AFTER INSERT OR UPDATE OF res ON relay
FOR EACH ROW
EXECUTE FUNCTION notify_change('res');
"#,
)
.execute(&pool)
.await?;
}
Ok(pool)
}

async fn push(
pool: &Pool<Postgres>,
pubkey_id: &str,
channel_type: &str,
data: Vec<u8>,
) -> Result<(), sqlx::Error> {
// Use an UPSERT operation to insert or update the record
let query = format!(
"INSERT INTO relay (pubkey_id, {}) VALUES ($1, $2) \
ON CONFLICT (pubkey_id) DO UPDATE SET {} = EXCLUDED.{}",
channel_type, channel_type, channel_type
);

sqlx::query(&query).bind(pubkey_id).bind(data).execute(pool).await?;

Ok(())
}

async fn peek_with_timeout(
pool: &Pool<Postgres>,
pubkey_id: &str,
channel_type: &str,
timeout: Duration,
) -> Option<Result<Vec<u8>, sqlx::Error>> {
tokio::time::timeout(timeout, peek(pool, pubkey_id, channel_type)).await.ok()
}

async fn peek(
pool: &Pool<Postgres>,
pubkey_id: &str,
channel_type: &str,
) -> Result<Vec<u8>, sqlx::Error> {
// Step 1: Attempt to fetch existing content for the given pubkey_id and channel_type
match sqlx::query_as::<Postgres, (Option<Vec<u8>>,)>(&format!(
"SELECT {} FROM relay WHERE pubkey_id = $1",
channel_type
))
.bind(pubkey_id)
.fetch_one(pool)
.await
{
Ok(row) =>
if let Some(content) = row.0 {
if !content.is_empty() {
return Ok(content);
}
},
Err(e) => {
debug!("Failed to fetch content initially: {}", e);
// We'll continue to the next step even if the query failed
}
}

// Step 2: If no content was found, set up a listener
let mut listener = PgListener::connect_with(pool).await?;
let channel_name = format!("{}_{}", pubkey_id, channel_type);
listener.listen(&channel_name).await?;
debug!("Listening on channel: {}", channel_name);

// Step 3: Wait for a notification and then fetch the new content
loop {
let notification = listener.recv().await?;
debug!("Received notification: {:?}", notification);
if notification.channel() == channel_name {
let row: (Vec<u8>,) =
sqlx::query_as(&format!("SELECT {} FROM relay WHERE pubkey_id = $1", channel_type))
.bind(pubkey_id)
.fetch_one(pool)
.await?;

let updated_content = row.0;
if !updated_content.is_empty() {
return Ok(updated_content);
}
}
}
}
Loading

0 comments on commit 933fd5b

Please sign in to comment.