diff --git a/move/sources/api.move b/move/sources/api.move index cf5c112..c800f12 100644 --- a/move/sources/api.move +++ b/move/sources/api.move @@ -10,8 +10,6 @@ module dtp::api { use dtp::pipe::{Pipe}; use dtp::inner_pipe::{InnerPipe}; - use dtp::conn_objects::{ConnObjects}; - // === Friends === // === Errors === @@ -76,30 +74,30 @@ module dtp::api { // Transmit a request toward the server. // // The encoding of the 'data' depends on the service. - public fun send_request(service_idx: u8, data: &vector, ipipe: &mut InnerPipe, args: vector, ctx: &mut TxContext): vector + public fun send_request(ipipe: &mut InnerPipe, data: vector, args: vector, ctx: &mut TxContext): vector { let kvargs = kvalues::from_bytes(&args); - let ret_value = dtp::api_impl::send_request(service_idx, data, ipipe, &kvargs, ctx); + let ret_value = dtp::api_impl::send_request(ipipe, data, &kvargs, ctx); kvalues::to_bytes(&ret_value) } // Transmit a response toward the client. // // The encoding of the 'data' depends on the service. - public fun send_response(service_idx: u8, data: &vector, seq_number: u64, ipipe: &mut InnerPipe, args: vector, ctx: &mut TxContext): vector + public fun send_response( ipipe: &mut InnerPipe, seq_num: u64, data: vector, args: vector, ctx: &mut TxContext): vector { let kvargs = kvalues::from_bytes(&args); - let ret_value = dtp::api_impl::send_response(service_idx, data, seq_number, ipipe, &kvargs, ctx); + let ret_value = dtp::api_impl::send_response(ipipe, seq_num, data, &kvargs, ctx); kvalues::to_bytes(&ret_value) } // Transmit a notification toward the peer (no response expected). // // The encoding of the 'data' depends on the service. - public fun send_notification(service_idx: u8, data: &vector, ipipe: &mut InnerPipe, args: vector, ctx: &mut TxContext): vector + public fun send_notification(ipipe: &mut InnerPipe, data: &vector, args: vector, ctx: &mut TxContext): vector { let kvargs = kvalues::from_bytes(&args); - let ret_value = dtp::api_impl::send_notification(service_idx, data, ipipe,&kvargs, ctx); + let ret_value = dtp::api_impl::send_notification( ipipe, data,&kvargs, ctx); kvalues::to_bytes(&ret_value) } diff --git a/move/sources/api_impl.move b/move/sources/api_impl.move index cc9540c..0ddd6f3 100644 --- a/move/sources/api_impl.move +++ b/move/sources/api_impl.move @@ -5,11 +5,12 @@ module dtp::api_impl { use dtp::host::{Self,Host}; use dtp::transport_control::{Self}; - use dtp::conn_objects::{Self,ConnObjects}; + use dtp::conn_objects::{Self}; use dtp::transport_control::{TransportControl}; use dtp::pipe::{Pipe}; use dtp::inner_pipe::{Self,InnerPipe}; use dtp::kvalues::{Self,KValues}; + use dtp::events::{Self}; use dtp::weak_ref::{Self}; @@ -58,9 +59,9 @@ module dtp::api_impl { // Create the connection. Will emit an event on success transport_control::create_best_effort(service_idx, cli_host, srv_host, &mut conn, ctx); - // TODO Add references in Host object for slow discovery. - //host::add_connection(cli_host, &conn.transport_control); - //host::add_connection(srv_host, &conn.transport_control); + // Add weak references in Host objects for slow discovery. + host::add_connection(cli_host, weak_ref::new_from_address(conn_objects::get_tc_address(&conn))); + host::add_connection(srv_host, weak_ref::new_from_address(conn_objects::get_tc_address(&conn))); kvalues::new() } @@ -96,23 +97,43 @@ module dtp::api_impl { // Transmit a request toward the server. // // The encoding of the 'data' depends on the service. - public(friend) fun send_request(_service_idx: u8, _data: &vector, _ipipe: &InnerPipe, _kvargs: &KValues, _ctx: &mut TxContext): KValues - { + public(friend) fun send_request(ipipe: &mut InnerPipe, data: vector, _kvargs: &KValues, _ctx: &mut TxContext): KValues + { + let seq_num = inner_pipe::inc_seq_num(ipipe); + + // Emit a request event. + let ipipe_ref = weak_ref::new_from_obj(ipipe); + let tc_ref = inner_pipe::get_tc_ref(ipipe); + let service_idx = inner_pipe::get_service_idx(ipipe); + events::emit_request(service_idx, seq_num, tc_ref, ipipe_ref, data); + + // Update stats for debugging. + inner_pipe::inc_emit_cnt(ipipe); + kvalues::new() } // Transmit a response toward the client. // // The encoding of the 'data' depends on the service. - public(friend) fun send_response(_service_idx: u8, _data: &vector, _seq_number: u64, _ipipe: &InnerPipe, _kvargs: &KValues, _ctx: &mut TxContext): KValues + public(friend) fun send_response(ipipe: &mut InnerPipe, seq_num: u64, data: vector, _kvargs: &KValues, _ctx: &mut TxContext): KValues { + // Emit a response event. + let ipipe_ref = weak_ref::new_from_obj(ipipe); + let tc_ref = inner_pipe::get_tc_ref(ipipe); + let service_idx = inner_pipe::get_service_idx(ipipe); + events::emit_response(service_idx, seq_num, tc_ref, ipipe_ref, data); + + // Update stats for debugging. + inner_pipe::inc_emit_cnt(ipipe); + kvalues::new() } // Transmit a notification toward the peer (no response expected). // // The encoding of the 'data' depends on the service. - public(friend) fun send_notification(_service_idx: u8, _data: &vector, _ipipe: &InnerPipe, _kvargs: &KValues, _ctx: &mut TxContext): KValues + public(friend) fun send_notification(_ipipe: &InnerPipe, _data: &vector, _kvargs: &KValues, _ctx: &mut TxContext): KValues { kvalues::new() } diff --git a/move/sources/conn_objects.move b/move/sources/conn_objects.move index 2081467..cc4f691 100644 --- a/move/sources/conn_objects.move +++ b/move/sources/conn_objects.move @@ -21,6 +21,8 @@ module dtp::conn_objects { // If an end-point loose these references, they can be // re-discovered using one of the related Host object. tc: address, // TransportControl + cli_auth: address, + srv_auth: address, cli_tx_pipe: address, srv_tx_pipe: address, cli_tx_ipipes: vector
, @@ -37,6 +39,8 @@ module dtp::conn_objects { public(friend) fun new(): ConnObjects { ConnObjects{ tc: @0x0, + cli_auth: @0x0, + srv_auth: @0x0, cli_tx_pipe: @0x0, srv_tx_pipe: @0x0, cli_tx_ipipes: vector::empty(), @@ -48,6 +52,26 @@ module dtp::conn_objects { self.tc = tc; } + public(friend) fun get_tc_address(self: &ConnObjects): address { + self.tc + } + + public(friend) fun set_cli_auth(self: &mut ConnObjects, cli_auth: address) { + self.cli_auth = cli_auth; + } + + public(friend) fun get_cli_auth_address(self: &ConnObjects): address { + self.cli_auth + } + + public(friend) fun set_srv_auth(self: &mut ConnObjects, srv_auth: address) { + self.srv_auth = srv_auth; + } + + public(friend) fun get_srv_auth_address(self: &ConnObjects): address { + self.srv_auth + } + public(friend) fun set_cli_tx_pipe(self: &mut ConnObjects, cli_tx_pipe: address) { self.cli_tx_pipe = cli_tx_pipe; } diff --git a/move/sources/events.move b/move/sources/events.move index ac06676..939c422 100644 --- a/move/sources/events.move +++ b/move/sources/events.move @@ -4,22 +4,38 @@ module dtp::events { // === Imports === use sui::event; use dtp::conn_objects::ConnObjects; + use dtp::weak_ref::WeakRef; // === Friends === friend dtp::host; friend dtp::inner_pipe; - friend dtp::transport_control; + friend dtp::transport_control; + friend dtp::api_impl; // === Errors === // === Constants === // === Structs === + + // TODO Add KValues to all events for future proofing. struct ConnReq has copy, drop { - service_idx: u8, // Service Type - conn: ConnObjects, // Info to get the connection started (e.g. Pipes and InnerPipes addresses). + flags: u8, // Reserve for future. + src: u8, // Typically 0x03 because coming from Host. + src_addr: address, // Host Address + service_idx: u8, // Service Type [1..253] + conn: ConnObjects, // Enough info to get the connection started (e.g. TC, Pipes and InnerPipes addresses). } + struct Datagram has copy, drop { + flags: u8, // Reserve for future. + src: u8, // 0x01 or 0x02 for respectively cli_tx_ipipe and srv_tx_ipipe. + src_addr: address, // InnerPipe Address + service_idx: u8, // Service Type [1..253] + seq_num: u64, + tc_ref: WeakRef, // TransportControl Address. + data: vector, // The endpoint response/request (e.g. JSON-RPC). + } // === Public-Mutative Functions === @@ -28,13 +44,23 @@ module dtp::events { // === Admin Functions === // === Public-Friend Functions === - public(friend) fun emit_conn_req( service_idx: u8, conn: ConnObjects ) { - event::emit(ConnReq { service_idx, conn }); + public(friend) fun emit_conn_req( src_addr: address, service_idx: u8, conn: ConnObjects ) { + event::emit(ConnReq { flags: 0, src: 0x03, src_addr, service_idx, conn }); + } + + public(friend) fun emit_response( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector ) { + let src_addr = dtp::weak_ref::get_address(&ipipe_ref); + event::emit(Datagram { flags: 0, src: 0x02, src_addr, service_idx, seq_num, tc_ref, data }); + } + + public(friend) fun emit_request( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector ) { + let src_addr = dtp::weak_ref::get_address(&ipipe_ref); + event::emit(Datagram { flags: 0, src: 0x01, src_addr, service_idx, seq_num, tc_ref, data }); } // === Private Functions === // === Test Functions === - + } \ No newline at end of file diff --git a/move/sources/host.move b/move/sources/host.move index c14593d..00c9c88 100644 --- a/move/sources/host.move +++ b/move/sources/host.move @@ -145,7 +145,7 @@ module dtp::host { } } - #[allow(lint(share_owned))] + //#[allow(lint(share_owned))] public(friend) fun new_transfered( ctx: &mut TxContext ): WeakRef { let new_obj = dtp::host::new(ctx); @@ -154,7 +154,7 @@ module dtp::host { new_obj_ref } - public(friend) fun upsert_service(self: &mut Host, service_id: u8, service_type: u8, _args: &KValues, ctx: &mut TxContext ) + public(friend) fun upsert_service(self: &mut Host, service_id: u8, service_type: u8, _args: &KValues, _ctx: &mut TxContext ) { /*if (!table::contains(&self.services, service_idx )) { //assert!(table::contains(&self.services, service_idx) == false, 1); @@ -185,14 +185,19 @@ module dtp::host { uid_to_address(&self.id) } - public(friend) fun authority(host: &Host): address { - host.authority + public(friend) fun authority(self: &Host): address { + self.authority } - public(friend) fun is_caller_authority(host: &Host, ctx: &TxContext): bool { - tx_context::sender(ctx) == host.authority + public(friend) fun is_caller_authority(self: &Host, ctx: &TxContext): bool { + tx_context::sender(ctx) == self.authority } + public(friend) fun add_connection(_self: &mut Host, _tc_ref: WeakRef ) { + // TODO Keep track of connections for slow discovery. + } + + // === Private Functions === // === Test Functions === diff --git a/move/sources/inner_pipe.move b/move/sources/inner_pipe.move index a859bb8..698fec7 100644 --- a/move/sources/inner_pipe.move +++ b/move/sources/inner_pipe.move @@ -8,7 +8,7 @@ module dtp::inner_pipe { // === Imports === - use sui::object::{Self, UID, uid_to_address}; + use sui::object::{Self, UID, ID, uid_to_address}; use sui::transfer::{Self}; use sui::tx_context::{TxContext}; use dtp::weak_ref::{Self,WeakRef}; @@ -32,20 +32,17 @@ module dtp::inner_pipe { struct InnerPipe has key, store { id: UID, flgs: u8, // DTP version+esc flags always after UID. - - pipe_id: WeakRef, - + service_idx: u8, + tc_ref: WeakRef, + pipe_ref: WeakRef, sync_data: PipeSyncData, + seq_num: u64, + // Stats to help debugging. + emit_cnt: u64, + sync_cnt: u64, } // === Public-Mutative Functions === - public entry fun send( - _self: &mut InnerPipe, - _data: vector, - _ctx: &mut TxContext ) - { - // TODO Emit the event. Add sequential number logic. - } // === Public-View Functions === @@ -53,29 +50,67 @@ module dtp::inner_pipe { // === Public-Friend Functions === - public(friend) fun new( pipe_address: &address, ctx: &mut TxContext ): InnerPipe { + public(friend) fun new( service_idx: u8, tc_id: &ID, pipe_addr: address, ctx: &mut TxContext ): InnerPipe { let new_obj = InnerPipe { id: object::new(ctx), flgs: 0u8, - pipe_id: weak_ref::new_from_address(*pipe_address), - sync_data: pipe_sync_data::new(), + service_idx, + tc_ref: weak_ref::new(tc_id), + pipe_ref: weak_ref::new_from_address(pipe_addr), + sync_data: pipe_sync_data::new(), + seq_num: 1, + emit_cnt:0, + sync_cnt: 0, }; new_obj } - public(friend) fun new_transfered( pipe_address: &address, recipient: address, ctx: &mut TxContext ): WeakRef + public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, pipe_addr: address, recipient: address, ctx: &mut TxContext ): address { - let new_obj = new(pipe_address,ctx); - let new_obj_ref = weak_ref::new_from_address(uid_to_address(&new_obj.id)); + let new_obj = new(service_idx, tc_id, pipe_addr, ctx); + let new_obj_addr = uid_to_address(&new_obj.id); transfer::transfer(new_obj, recipient); - new_obj_ref + new_obj_addr } public(friend) fun delete( self: InnerPipe ) { - let InnerPipe { id, flgs: _, pipe_id: _, sync_data: _ } = self; + let InnerPipe { id, flgs: _, service_idx: _, + tc_ref: _, pipe_ref: _, + sync_data: _, + seq_num: _ , + emit_cnt: _, sync_cnt: _ + } = self; + object::delete(id); } + public(friend) fun inc_seq_num( self: &mut InnerPipe ): u64 { + self.seq_num = self.seq_num + 1; + self.seq_num + } + + public(friend) fun inc_emit_cnt( self: &mut InnerPipe ): u64 { + self.emit_cnt = self.emit_cnt + 1; + self.emit_cnt + } + + public(friend) fun inc_sync_cnt( self: &mut InnerPipe ): u64 { + self.sync_cnt = self.sync_cnt + 1; + self.sync_cnt + } + + public(friend) fun get_address( self: &InnerPipe ): address { + weak_ref::get_address(&self.tc_ref ) + } + + public (friend) fun get_tc_ref( self: &InnerPipe ): WeakRef { + self.tc_ref + } + + public (friend) fun get_service_idx( self: &InnerPipe ): u8 { + self.service_idx + } + // === Private Functions === // === Test Functions === diff --git a/move/sources/pipe.move b/move/sources/pipe.move index 2da3f31..365e1a5 100644 --- a/move/sources/pipe.move +++ b/move/sources/pipe.move @@ -35,11 +35,10 @@ module dtp::pipe { struct Pipe has key { id: UID, flgs: u8, // DTP version+esc flags always after UID. - + service_idx: u8, + tc_ref: WeakRef, // TransportControl + ipipe_refs: vector, // InnerPipe(s) sync_data: PipeSyncData, // Merged of all InnerPipe sync_data. - - tc: WeakRef, // TransportControl - ipipes: vector, // InnerPipe(s) } // === Public-Mutative Functions === @@ -50,15 +49,16 @@ module dtp::pipe { // === Public-Friend Functions === - public(friend) fun new_transfered( tc: &ID, ipipe_count: u8, recipient: address, is_cli_tx_pipe: bool, conn: &mut ConnObjects, ctx: &mut TxContext ): WeakRef { + public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, ipipe_count: u8, recipient: address, is_cli_tx_pipe: bool, conn: &mut ConnObjects, ctx: &mut TxContext ): address { assert!(ipipe_count > 0, errors::EInvalidPipeCount()); let new_pipe = Pipe { id: object::new(ctx), - flgs: 0, + flgs: 0, + service_idx, + tc_ref: weak_ref::new(tc_id), + ipipe_refs: vector::empty(), sync_data: pipe_sync_data::new(), - tc: weak_ref::new(tc), - ipipes: vector::empty(), }; let pipe_addr = uid_to_address(&new_pipe.id); @@ -71,23 +71,23 @@ module dtp::pipe { // Create InnerPipes. let i: u8 = 0; while (i < ipipe_count) { - let ipipe_ref = inner_pipe::new_transfered(&pipe_addr, recipient, ctx); - let ipipe_addr = weak_ref::get_address(&ipipe_ref); + let ipipe_addr = inner_pipe::new_transfered(service_idx, tc_id, pipe_addr, recipient, ctx); // Save WeakRef in the Pipe object (for slow discovery), and the addresses in // the ConnObjects (to be return/emitted to the end-points). - vector::push_back(&mut new_pipe.ipipes, ipipe_ref); if (is_cli_tx_pipe) { conn_objects::add_cli_tx_ipipe(conn, ipipe_addr); } else { conn_objects::add_srv_tx_ipipe(conn, ipipe_addr); }; + let ipipe_ref = weak_ref::new_from_address(ipipe_addr); + vector::push_back(&mut new_pipe.ipipe_refs, ipipe_ref); i = i + 1; }; transfer::transfer(new_pipe, recipient); - weak_ref::new_from_address(pipe_addr) + pipe_addr } /* TODO diff --git a/move/sources/transport_control.move b/move/sources/transport_control.move index ca02fc8..f3effb6 100644 --- a/move/sources/transport_control.move +++ b/move/sources/transport_control.move @@ -122,14 +122,22 @@ module dtp::transport_control { }; // Initialize the Weak references (for slow discovery). - tc.cli_tx_pipe = dtp::pipe::new_transfered(object::borrow_id(&tc), + let cli_tx_pipe_addr = dtp::pipe::new_transfered(service_idx, + object::borrow_id(&tc), 2, tc.cli_addr, true, conn, ctx); - tc.srv_tx_pipe = dtp::pipe::new_transfered(object::borrow_id(&tc), - 2, tc.srv_addr, false, conn, ctx); - // Update the ConnObjects (returned to the end-points when a connection is completed). - conn_objects::set_cli_tx_pipe(conn, weak_ref::get_address(&tc.cli_tx_pipe)); - conn_objects::set_srv_tx_pipe(conn, weak_ref::get_address(&tc.srv_tx_pipe)); + let srv_tx_pipe_addr = dtp::pipe::new_transfered(service_idx, + object::borrow_id(&tc), + 2, tc.srv_addr, false, conn, ctx ); + + // Update the ConnObjects (observed by the end-points when a connection is initiated). + conn_objects::set_cli_tx_pipe(conn, cli_tx_pipe_addr); + conn_objects::set_srv_tx_pipe(conn, srv_tx_pipe_addr); + conn_objects::set_cli_auth(conn, host::authority(cli_host)); + conn_objects::set_srv_auth(conn, host::authority(srv_host)); + + tc.cli_tx_pipe = weak_ref::new_from_address(cli_tx_pipe_addr); + tc.srv_tx_pipe = weak_ref::new_from_address(srv_tx_pipe_addr); tc } @@ -199,7 +207,7 @@ module dtp::transport_control { // - cli_tx_ipipe: First InnerPipe used by client to TX to server. // - server_tx_ipipe: First InnerPipe used by server to TX to client. - #[allow(lint(share_owned))] + //#[allow(lint(share_owned))] public fun create_best_effort( service_idx: u8, cli_host: &mut Host, srv_host: &Host, @@ -221,8 +229,9 @@ module dtp::transport_control { conn_objects::set_tc(conn, object::id_to_address( object::borrow_id(&tc) )); // Emit the "Connection Request" Move event. - // The server will see the sender address therefore will know the TC and plenty of info! - dtp::events::emit_conn_req( service_idx, *conn ); + // "src_addr" helps events consumer to filter for this srv_host. + let src_addr = host::get_address(srv_host); + dtp::events::emit_conn_req( src_addr, service_idx, *conn ); transfer::share_object(tc); // TODO Add the TC address to the Client Host object registry (for slow discovery). diff --git a/move/sources/weak_ref.move b/move/sources/weak_ref.move index fd4219d..23835e6 100644 --- a/move/sources/weak_ref.move +++ b/move/sources/weak_ref.move @@ -32,6 +32,7 @@ module dtp::weak_ref { friend dtp::inner_pipe; friend dtp::host; friend dtp::api_impl; + friend dtp::events; // === Errors ===