Skip to content

Commit

Permalink
Merge #966
Browse files Browse the repository at this point in the history
966: Feat/track pnl r=da-kami a=da-kami

Especially the first commit will be interesting for everybody to see - we are finally associating the `positions` with the contracts - Thanks for the great pairing session `@luckysori` :)

note: This PR only handles the realized pnl, I will see to add tracking for the unrealized pnl in a follow up.

related ticket: #932
(I'll add the *fixes* to the follow up :)

Co-authored-by: Daniel Karzel <[email protected]>
  • Loading branch information
bors[bot] and da-kami authored Jul 21, 2023
2 parents 2e737dd + ac754fc commit 830378e
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE
"positions" DROP COLUMN "temporary_contract_id";
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Your SQL goes here
ALTER TABLE
positions
ADD
COLUMN "temporary_contract_id" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file should undo anything in `up.sql`
-- ... but in this case it does not fully.
-- Postgres does not allow removing enum type values. One can only re-create an enum type with fewer values and replace the references.
-- However, there is no proper way to replace the values to be removed where they are used (i.e. referenced in `positions` table)
-- We opt to NOT remove enum values that were added at a later point.
ALTER TABLE
"positions" DROP COLUMN "realized_pnl";
10 changes: 10 additions & 0 deletions coordinator/migrations/2023-07-19-055143_closed_positions/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Your SQL goes here
-- Note that the `IF NOT EXISTS` is essential because there is no `down` migration for removing this value because it is not really feasible to remove enum values!
-- In order to allow re-running this migration we thus have to make sure to only add the value if it does not exist yet.
ALTER TYPE "PositionState_Type"
ADD
VALUE IF NOT EXISTS 'Closed';
ALTER TABLE
positions
ADD
COLUMN "realized_pnl" BIGINT;
124 changes: 16 additions & 108 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use anyhow::Context;
use anyhow::Result;
use coordinator::cli::Opts;
use coordinator::db;
use coordinator::logger;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::storage::NodeStorage;
use coordinator::node::Node;
use coordinator::node::TradeAction;
use coordinator::position::models::Position;
use coordinator::position::models::PositionState;
use coordinator::routes::router;
use coordinator::run_migration;
use coordinator::settings::Settings;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
use hex::FromHex;
use lightning::ln::PaymentHash;
use lightning::util::events::Event;
use ln_dlc_node::seed::Bip39Seed;
use rand::thread_rng;
Expand All @@ -30,15 +26,14 @@ use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tracing::metadata::LevelFilter;
use trade::bitmex_client::BitmexClient;

const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_secs(5);
const POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);

#[tokio::main]
Expand Down Expand Up @@ -171,106 +166,19 @@ async fn main() -> Result<()> {
let node = node.clone();
async move {
loop {
tokio::time::sleep(POSITION_SYNC_INTERVAL).await;

let mut conn = match node.pool.get() {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Failed to get pool connection. Error: {e:?}");
continue;
}
};

let positions = match db::positions::Position::get_all_open_positions(&mut conn) {
Ok(positions) => positions,
Err(e) => {
tracing::error!("Failed to get positions. Error: {e:?}");
continue;
}
};

let positions = positions
.into_iter()
.filter(|p| {
p.position_state == PositionState::Open
&& OffsetDateTime::now_utc().ge(&p.expiry_timestamp)
})
.collect::<Vec<Position>>();

for position in positions.iter() {
tracing::trace!(trader_pk=%position.trader, %position.expiry_timestamp, "Attempting to close expired position");

if !node.is_connected(&position.trader) {
tracing::debug!(
"Could not close expired position with {} as trader is not connected.",
position.trader
);
continue;
}

let channel_id = match node.decide_trade_action(&position.trader) {
Ok(TradeAction::Close(channel_id)) => channel_id,
Ok(_) => {
tracing::error!(
?position,
"Unable to find sub channel of expired position."
);
continue;
}
Err(e) => {
tracing::error!(
?position,
"Failed to decide trade action. Error: {e:?}"
);
continue;
}
};

let closing_price =
match BitmexClient::get_quote(&position.expiry_timestamp).await {
Ok(quote) => match position.direction {
trade::Direction::Long => quote.bid_price,
trade::Direction::Short => quote.ask_price,
},
Err(e) => {
tracing::warn!(
"Failed to get quote from bitmex for {} at {}. Error: {e:?}",
position.trader,
position.expiry_timestamp
);
continue;
}
};

// Upon collab closing an expired position we cannot charge a fee using an
// invoice. This dummy hash exists in the database to
// represent zero-amount invoices.
let zero_amount_payment_hash_dummy = PaymentHash(
<[u8; 32]>::from_hex(
"6f9b8c95c2ba7b1857b19f975372308161fedf50feb78a252200135a41875210",
)
.expect("static payment hash to decode"),
);
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
expired_positions::close(node.clone()).await;
}
}
});

match node
.close_position(
position,
closing_price,
channel_id,
zero_amount_payment_hash_dummy,
)
.await
{
Ok(_) => tracing::info!(
"Successfully proposed to close expired position with {}",
position.trader
),
Err(e) => tracing::warn!(
?position,
"Failed to close expired position with {}. Error: {e:?}",
position.trader
),
}
tokio::spawn({
let node = node.clone();
async move {
loop {
tokio::time::sleep(CLOSED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = closed_positions::sync(node.clone()) {
tracing::error!("Failed to sync closed DLCs with positions in database: {e:#}");
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/db/custom_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl ToSql<PositionStateType, Pg> for PositionState {
match *self {
PositionState::Open => out.write_all(b"Open")?,
PositionState::Closing => out.write_all(b"Closing")?,
PositionState::Closed => out.write_all(b"Closed")?,
}
Ok(IsNull::No)
}
Expand All @@ -53,6 +54,7 @@ impl FromSql<PositionStateType, Pg> for PositionState {
match bytes.as_bytes() {
b"Open" => Ok(PositionState::Open),
b"Closing" => Ok(PositionState::Closing),
b"Closed" => Ok(PositionState::Closed),
_ => Err("Unrecognized enum variant".into()),
}
}
Expand Down
66 changes: 61 additions & 5 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use crate::schema::sql_types::PositionStateType;
use anyhow::bail;
use anyhow::Result;
use autometrics::autometrics;
use bitcoin::hashes::hex::ToHex;
use diesel::prelude::*;
use diesel::query_builder::QueryId;
use diesel::result::QueryResult;
use diesel::AsExpression;
use diesel::FromSqlRow;
use dlc_manager::ContractId;
use hex::FromHex;
use std::any::TypeId;
use time::OffsetDateTime;

Expand All @@ -28,6 +31,8 @@ pub struct Position {
pub expiry_timestamp: OffsetDateTime,
pub update_timestamp: OffsetDateTime,
pub trader_pubkey: String,
pub temporary_contract_id: String,
pub realized_pnl: Option<i64>,
}

impl Position {
Expand Down Expand Up @@ -62,6 +67,26 @@ impl Position {
Ok(positions)
}

#[autometrics]
pub fn get_all_open_or_closing_positions(
conn: &mut PgConnection,
) -> QueryResult<Vec<crate::position::models::Position>> {
let positions = positions::table
.filter(
positions::position_state
.eq(PositionState::Open)
.or(positions::position_state.eq(PositionState::Closing)),
)
.load::<Position>(conn)?;

let positions = positions
.into_iter()
.map(crate::position::models::Position::from)
.collect();

Ok(positions)
}

/// sets the status of all open position to closing (note, we expect that number to be always
/// exactly 1)
pub fn set_open_position_to_closing(
Expand All @@ -71,7 +96,10 @@ impl Position {
let effected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey.clone()))
.filter(positions::position_state.eq(PositionState::Open))
.set(positions::position_state.eq(PositionState::Closing))
.set((
positions::position_state.eq(PositionState::Closing),
positions::update_timestamp.eq(OffsetDateTime::now_utc()),
))
.execute(conn)?;

if effected_rows == 0 {
Expand All @@ -81,6 +109,23 @@ impl Position {
Ok(())
}

pub fn set_position_to_closed(conn: &mut PgConnection, id: i32, pnl: i64) -> Result<()> {
let effected_rows = diesel::update(positions::table)
.filter(positions::id.eq(id))
.set((
positions::position_state.eq(PositionState::Closed),
positions::realized_pnl.eq(Some(pnl)),
positions::update_timestamp.eq(OffsetDateTime::now_utc()),
))
.execute(conn)?;

if effected_rows == 0 {
bail!("Could not update position to Closed with realized pnl {pnl} for position {id}")
}

Ok(())
}

/// inserts the given position into the db. Returns the position if successful
#[autometrics]
pub fn insert(
Expand All @@ -105,12 +150,17 @@ impl From<Position> for crate::position::models::Position {
direction: trade::Direction::from(value.direction),
average_entry_price: value.average_entry_price,
liquidation_price: value.liquidation_price,
position_state: crate::position::models::PositionState::from(value.position_state),
position_state: crate::position::models::PositionState::from((
value.position_state,
value.realized_pnl,
)),
collateral: value.collateral,
creation_timestamp: value.creation_timestamp,
expiry_timestamp: value.expiry_timestamp,
update_timestamp: value.update_timestamp,
trader: value.trader_pubkey.parse().expect("to be valid public key"),
temporary_contract_id: ContractId::from_hex(value.temporary_contract_id.as_str())
.expect("contract id to decode"),
}
}
}
Expand All @@ -128,6 +178,7 @@ struct NewPosition {
pub collateral: i64,
pub expiry_timestamp: OffsetDateTime,
pub trader_pubkey: String,
pub temporary_contract_id: String,
}

impl From<crate::position::models::NewPosition> for NewPosition {
Expand All @@ -143,6 +194,7 @@ impl From<crate::position::models::NewPosition> for NewPosition {
collateral: value.collateral,
expiry_timestamp: value.expiry_timestamp,
trader_pubkey: value.trader.to_string(),
temporary_contract_id: value.temporary_contract_id.to_hex(),
}
}
}
Expand All @@ -152,6 +204,7 @@ impl From<crate::position::models::NewPosition> for NewPosition {
pub enum PositionState {
Open,
Closing,
Closed,
}

impl QueryId for PositionStateType {
Expand All @@ -163,11 +216,14 @@ impl QueryId for PositionStateType {
}
}

impl From<PositionState> for crate::position::models::PositionState {
fn from(value: PositionState) -> Self {
match value {
impl From<(PositionState, Option<i64>)> for crate::position::models::PositionState {
fn from((position_state, realized_pnl): (PositionState, Option<i64>)) -> Self {
match position_state {
PositionState::Open => crate::position::models::PositionState::Open,
PositionState::Closing => crate::position::models::PositionState::Closing,
PositionState::Closed => crate::position::models::PositionState::Closed {
pnl: realized_pnl.expect("realized pnl to be set when position is closed"),
},
}
}
}
Expand Down
Loading

0 comments on commit 830378e

Please sign in to comment.