Skip to content

Commit

Permalink
rpcserver: add channel checks to RFQ orders
Browse files Browse the repository at this point in the history
  • Loading branch information
guggero committed Nov 19, 2024
1 parent 49c8857 commit 41d36d3
Showing 1 changed file with 86 additions and 14 deletions.
100 changes: 86 additions & 14 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6322,7 +6322,7 @@ func unmarshalAssetBuyOrder(

// AddAssetBuyOrder upserts a new buy order for the given asset into the RFQ
// manager. If the order already exists for the given asset, it will be updated.
func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
func (r *rpcServer) AddAssetBuyOrder(ctx context.Context,
req *rfqrpc.AddAssetBuyOrderRequest) (*rfqrpc.AddAssetBuyOrderResponse,
error) {

Expand All @@ -6336,13 +6336,24 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
return nil, fmt.Errorf("error unmarshalling buy order: %w", err)
}

peerStr := fn.MapOptionZ(
buyOrder.Peer, func(peerVertex route.Vertex) string {
return peerVertex.String()
},
// Currently, we require the peer to be specified in the buy order.
peer, err := buyOrder.Peer.UnwrapOrErr(
fmt.Errorf("buy order peer must be specified"),
)
if err != nil {
return nil, err
}

// Check if we have a channel with the peer.
err = r.checkPeerChannel(
ctx, peer, buyOrder.AssetSpecifier, req.SkipAssetChannelCheck,
)
if err != nil {
return nil, fmt.Errorf("error checking peer channel: %w", err)
}

rpcsLog.Debugf("[AddAssetBuyOrder]: upserting buy order "+
"(dest_peer=%s)", peerStr)
"(dest_peer=%s)", peer.String())

// Register an event listener before actually inserting the order, so we
// definitely don't miss any responses.
Expand Down Expand Up @@ -6384,11 +6395,61 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,

case <-timeout:
return nil, fmt.Errorf("timeout waiting for response "+
"(peer=%s)", peerStr)
"(peer=%s)", peer.String())
}
}
}

// checkPeerChannel checks if there is a channel with the given peer. If the
// asset channel check is enabled, it will also check if there is a channel with
// the given asset with the peer.
func (r *rpcServer) checkPeerChannel(ctx context.Context, peer route.Vertex,
specifier asset.Specifier, skipAssetChannelCheck bool) error {

// We want to make sure there is at least a channel between us and the
// peer, otherwise RFQ negotiation doesn't make sense.
switch {
// For integration tests, we can't create asset channels, so we allow
// the asset channel check to be skipped. In this case we simply check
// that we have any channel with the peer.
case skipAssetChannelCheck:
activeChannels, err := r.cfg.Lnd.Client.ListChannels(
ctx, true, false,
)
if err != nil {
return fmt.Errorf("unable to fetch channels: %w", err)
}
peerChannels := fn.Filter(
activeChannels, func(c lndclient.ChannelInfo) bool {
return c.PubKeyBytes == peer
},
)
if len(peerChannels) == 0 {
return fmt.Errorf("no active channel found with peer "+
"%x", peer[:])
}

// For any other case, we'll want to make sure there is enough balance
// of the given asset in the channel to carry the order.
default:
assetID, err := specifier.UnwrapIdOrErr()
if err != nil {
return fmt.Errorf("cannot check asset channel, " +
"missing asset ID")
}

// If we don't get an error here, it means we do have an asset
// channel with the peer.
_, err = r.rfqChannel(ctx, assetID, &peer)
if err != nil {
return fmt.Errorf("error checking asset channel: %w",
err)
}
}

return nil
}

// unmarshalAssetSellOrder unmarshals an asset sell order from the RPC form.
func unmarshalAssetSellOrder(
req *rfqrpc.AddAssetSellOrderRequest) (*rfq.SellOrder, error) {
Expand Down Expand Up @@ -6439,7 +6500,7 @@ func unmarshalAssetSellOrder(

// AddAssetSellOrder upserts a new sell order for the given asset into the RFQ
// manager. If the order already exists for the given asset, it will be updated.
func (r *rpcServer) AddAssetSellOrder(_ context.Context,
func (r *rpcServer) AddAssetSellOrder(ctx context.Context,
req *rfqrpc.AddAssetSellOrderRequest) (*rfqrpc.AddAssetSellOrderResponse,
error) {

Expand All @@ -6454,13 +6515,24 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,
err)
}

// Extract peer identifier as a string for logging.
peerStr := fn.MapOptionZ(sellOrder.Peer, func(p route.Vertex) string {
return p.String()
})
// Currently, we require the peer to be specified in the buy order.
peer, err := sellOrder.Peer.UnwrapOrErr(
fmt.Errorf("sell order peer must be specified"),
)
if err != nil {
return nil, err
}

// Check if we have a channel with the peer.
err = r.checkPeerChannel(
ctx, peer, sellOrder.AssetSpecifier, req.SkipAssetChannelCheck,
)
if err != nil {
return nil, fmt.Errorf("error checking peer channel: %w", err)
}

rpcsLog.Debugf("[AddAssetSellOrder]: upserting sell order "+
"(dest_peer=%s)", peerStr)
"(dest_peer=%s)", peer.String())

// Register an event listener before actually inserting the order, so we
// definitely don't miss any responses.
Expand Down Expand Up @@ -6502,7 +6574,7 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,

case <-timeout:
return nil, fmt.Errorf("timeout waiting for response "+
"from peer %s", peerStr)
"from peer %s", peer.String())
}
}
}
Expand Down

0 comments on commit 41d36d3

Please sign in to comment.