diff --git a/cmd/daserver/daserver.go b/cmd/daserver/daserver.go index 2887ad4a5b..3b01a29da5 100644 --- a/cmd/daserver/daserver.go +++ b/cmd/daserver/daserver.go @@ -128,7 +128,6 @@ func startup() error { return err } <-sigint - server.Stop() - return nil + return server.Shutdown(ctx) } diff --git a/das/dasrpc/dasRpcClient.go b/das/dasrpc/dasRpcClient.go index ab787947a2..075d4a1cc8 100644 --- a/das/dasrpc/dasRpcClient.go +++ b/das/dasrpc/dasRpcClient.go @@ -6,75 +6,75 @@ package dasrpc import ( "context" "fmt" + + "github.com/ethereum/go-ethereum/common/hexutil" + + "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/das" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/blsSignatures" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type DASRPCClient struct { // implements DataAvailabilityService - clnt DASServiceImplClient + clnt *rpc.Client } func NewDASRPCClient(target string) (*DASRPCClient, error) { - // TODO revisit insecure setting - conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + clnt, err := rpc.Dial(target) if err != nil { return nil, err } - clnt := NewDASServiceImplClient(conn) return &DASRPCClient{clnt: clnt}, nil } -func (clnt *DASRPCClient) Retrieve(ctx context.Context, cert *arbstate.DataAvailabilityCertificate) ([]byte, error) { +func (c *DASRPCClient) Retrieve(ctx context.Context, cert *arbstate.DataAvailabilityCertificate) ([]byte, error) { certBytes := das.Serialize(cert) - response, err := clnt.clnt.Retrieve(ctx, &RetrieveRequest{CertBytes: certBytes}) - if err != nil { + var ret hexutil.Bytes + if err := c.clnt.CallContext(ctx, &ret, "das_retrieve", hexutil.Bytes(certBytes)); err != nil { return nil, err } - return response.Result, nil + return ret, nil } -func (clnt *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*arbstate.DataAvailabilityCertificate, error) { - response, err := clnt.clnt.Store(ctx, &StoreRequest{Message: message, Timeout: timeout, Sig: reqSig}) - if err != nil { +func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*arbstate.DataAvailabilityCertificate, error) { + var ret StoreResult + if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil { return nil, err } var keysetHash [32]byte - copy(keysetHash[:], response.KeysetHash) + copy(keysetHash[:], ret.KeysetHash) var dataHash [32]byte - copy(dataHash[:], response.DataHash) - respSig, err := blsSignatures.SignatureFromBytes(response.Sig) + copy(dataHash[:], ret.DataHash) + respSig, err := blsSignatures.SignatureFromBytes(ret.Sig) if err != nil { return nil, err } return &arbstate.DataAvailabilityCertificate{ DataHash: dataHash, - Timeout: response.Timeout, - SignersMask: response.SignersMask, + Timeout: uint64(ret.Timeout), + SignersMask: uint64(ret.SignersMask), Sig: respSig, KeysetHash: keysetHash, }, nil } -func (clnt *DASRPCClient) KeysetFromHash(ctx context.Context, ksHash []byte) ([]byte, error) { - response, err := clnt.clnt.KeysetFromHash(ctx, &KeysetFromHashRequest{KsHash: ksHash}) - if err != nil { +func (c *DASRPCClient) KeysetFromHash(ctx context.Context, ksHash []byte) ([]byte, error) { + var ret hexutil.Bytes + if err := c.clnt.CallContext(ctx, &ret, "das_keysetFromHash", hexutil.Bytes(ksHash)); err != nil { return nil, err } - return response.Result, nil + return ret, nil } -func (clnt *DASRPCClient) CurrentKeysetBytes(ctx context.Context) ([]byte, error) { - response, err := clnt.clnt.CurrentKeysetBytes(ctx, &CurrentKeysetBytesRequest{}) - if err != nil { +func (c *DASRPCClient) CurrentKeysetBytes(ctx context.Context) ([]byte, error) { + var ret hexutil.Bytes + if err := c.clnt.CallContext(ctx, &ret, "das_currentKeysetBytes"); err != nil { return nil, err } - return response.Result, nil + return ret, nil } -func (clnt *DASRPCClient) String() string { - return fmt.Sprintf("DASRPCClient{clnt:%v}", clnt.clnt) +func (c *DASRPCClient) String() string { + return fmt.Sprintf("DASRPCClient{c:%v}", c.clnt) } diff --git a/das/dasrpc/dasRpcServer.go b/das/dasrpc/dasRpcServer.go index 070bad1fa0..33e6ae8c71 100644 --- a/das/dasrpc/dasRpcServer.go +++ b/das/dasrpc/dasRpcServer.go @@ -8,20 +8,22 @@ import ( "context" "fmt" "net" + "net/http" + + "github.com/ethereum/go-ethereum/common/hexutil" + + "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/blsSignatures" "github.com/offchainlabs/nitro/das" - "google.golang.org/grpc" ) type DASRPCServer struct { - UnimplementedDASServiceImplServer // this allows grpc to verify its version invariant - grpcServer *grpc.Server - localDAS das.DataAvailabilityService + localDAS das.DataAvailabilityService } -func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDAS das.DataAvailabilityService) (*DASRPCServer, error) { +func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDAS das.DataAvailabilityService) (*http.Server, error) { listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr, portNum)) if err != nil { return nil, err @@ -29,43 +31,54 @@ func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDA return StartDASRPCServerOnListener(ctx, listener, localDAS) } -func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, localDAS das.DataAvailabilityService) (*DASRPCServer, error) { - grpcServer := grpc.NewServer() - dasServer := &DASRPCServer{grpcServer: grpcServer, localDAS: localDAS} - RegisterDASServiceImplServer(grpcServer, dasServer) +func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, localDAS das.DataAvailabilityService) (*http.Server, error) { + rpcServer := rpc.NewServer() + err := rpcServer.RegisterName("das", &DASRPCServer{localDAS: localDAS}) + if err != nil { + return nil, err + } + + srv := &http.Server{ + Handler: rpcServer, + } + go func() { - err := grpcServer.Serve(listener) + err := srv.Serve(listener) if err != nil { return } }() go func() { <-ctx.Done() - grpcServer.GracefulStop() + _ = srv.Shutdown(context.Background()) }() - return dasServer, nil + return srv, nil } -func (serv *DASRPCServer) Stop() { - serv.grpcServer.GracefulStop() +type StoreResult struct { + DataHash hexutil.Bytes `json:"dataHash,omitempty"` + Timeout hexutil.Uint64 `json:"timeout,omitempty"` + SignersMask hexutil.Uint64 `json:"signersMask,omitempty"` + KeysetHash hexutil.Bytes `json:"keysetHash,omitempty"` + Sig hexutil.Bytes `json:"sig,omitempty"` } -func (serv *DASRPCServer) Store(ctx context.Context, req *StoreRequest) (*StoreResponse, error) { - cert, err := serv.localDAS.Store(ctx, req.Message, req.Timeout, req.Sig) +func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) { + cert, err := serv.localDAS.Store(ctx, message, uint64(timeout), sig) if err != nil { return nil, err } - return &StoreResponse{ + return &StoreResult{ KeysetHash: cert.KeysetHash[:], DataHash: cert.DataHash[:], - Timeout: cert.Timeout, - SignersMask: cert.SignersMask, + Timeout: hexutil.Uint64(cert.Timeout), + SignersMask: hexutil.Uint64(cert.SignersMask), Sig: blsSignatures.SignatureToBytes(cert.Sig), }, nil } -func (serv *DASRPCServer) Retrieve(ctx context.Context, req *RetrieveRequest) (*RetrieveResponse, error) { - cert, err := arbstate.DeserializeDASCertFrom(bytes.NewReader(req.CertBytes)) +func (serv *DASRPCServer) Retrieve(ctx context.Context, certBytes hexutil.Bytes) (hexutil.Bytes, error) { + cert, err := arbstate.DeserializeDASCertFrom(bytes.NewReader(certBytes)) if err != nil { return nil, err } @@ -73,21 +86,21 @@ func (serv *DASRPCServer) Retrieve(ctx context.Context, req *RetrieveRequest) (* if err != nil { return nil, err } - return &RetrieveResponse{Result: result}, nil + return result, nil } -func (serv *DASRPCServer) KeysetFromHash(ctx context.Context, req *KeysetFromHashRequest) (*KeysetFromHashResponse, error) { - resp, err := serv.localDAS.KeysetFromHash(ctx, req.KsHash) +func (serv *DASRPCServer) KeysetFromHash(ctx context.Context, ksHash hexutil.Bytes) (hexutil.Bytes, error) { + resp, err := serv.localDAS.KeysetFromHash(ctx, ksHash) if err != nil { return nil, err } - return &KeysetFromHashResponse{Result: resp}, nil + return resp, nil } -func (serv *DASRPCServer) CurrentKeysetBytes(ctx context.Context, req *CurrentKeysetBytesRequest) (*CurrentKeysetBytesResponse, error) { +func (serv *DASRPCServer) CurrentKeysetBytes(ctx context.Context) (hexutil.Bytes, error) { resp, err := serv.localDAS.CurrentKeysetBytes(ctx) if err != nil { return nil, err } - return &CurrentKeysetBytesResponse{Result: resp}, nil + return resp, nil } diff --git a/das/dasrpc/rpc_aggregator.go b/das/dasrpc/rpc_aggregator.go index 3e6d22a9eb..7800a00652 100644 --- a/das/dasrpc/rpc_aggregator.go +++ b/das/dasrpc/rpc_aggregator.go @@ -5,6 +5,9 @@ package dasrpc import ( "encoding/json" + + "github.com/offchainlabs/nitro/solgen/go/bridgegen" + "github.com/ethereum/go-ethereum/common" "github.com/offchainlabs/nitro/arbutil" @@ -33,6 +36,14 @@ func NewRPCAggregatorWithL1Info(config das.AggregatorConfig, l1client arbutil.L1 return das.NewAggregatorWithL1Info(config, services, l1client, seqInboxAddress) } +func NewRPCAggregatorWithSeqInboxCaller(config das.AggregatorConfig, seqInboxCaller *bridgegen.SequencerInboxCaller) (*das.Aggregator, error) { + services, err := setUpServices(config) + if err != nil { + return nil, err + } + return das.NewAggregatorWithSeqInboxCaller(config, services, seqInboxCaller) +} + func setUpServices(config das.AggregatorConfig) ([]das.ServiceDetails, error) { var cs []BackendConfig err := json.Unmarshal([]byte(config.Backends), &cs) diff --git a/das/dasrpc/rpc_test.go b/das/dasrpc/rpc_test.go new file mode 100644 index 0000000000..e7fe3a83fc --- /dev/null +++ b/das/dasrpc/rpc_test.go @@ -0,0 +1,69 @@ +package dasrpc + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "net" + "testing" + + "github.com/offchainlabs/nitro/blsSignatures" + "github.com/offchainlabs/nitro/das" + "github.com/offchainlabs/nitro/util/testhelpers" +) + +func blsPubToBase64(pubkey *blsSignatures.PublicKey) string { + pubkeyBytes := blsSignatures.PublicKeyToBytes(*pubkey) + encodedPubkey := make([]byte, base64.StdEncoding.EncodedLen(len(pubkeyBytes))) + base64.StdEncoding.Encode(encodedPubkey, pubkeyBytes) + return string(encodedPubkey) +} + +func TestRPC(t *testing.T) { + ctx := context.Background() + lis, err := net.Listen("tcp", "localhost:0") + testhelpers.RequireImpl(t, err) + keyDir := t.TempDir() + dataDir := t.TempDir() + pubkey, _, err := das.GenerateAndStoreKeys(keyDir) + testhelpers.RequireImpl(t, err) + dasConfig := das.LocalDiskDASConfig{ + KeyDir: keyDir, + DataDir: dataDir, + } + localDas, err := das.NewLocalDiskDASWithSeqInboxCaller(dasConfig, nil) + testhelpers.RequireImpl(t, err) + dasServer, err := StartDASRPCServerOnListener(ctx, lis, localDas) + defer func() { + if err := dasServer.Shutdown(ctx); err != nil { + panic(err) + } + }() + testhelpers.RequireImpl(t, err) + config := BackendConfig{ + URL: "http://" + lis.Addr().String(), + PubKeyBase64Encoded: blsPubToBase64(pubkey), + SignerMask: 1, + } + + backendsJsonByte, err := json.Marshal([]BackendConfig{config}) + testhelpers.RequireImpl(t, err) + aggConf := das.AggregatorConfig{ + AssumedHonest: 1, + Backends: string(backendsJsonByte), + } + rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil) + testhelpers.RequireImpl(t, err) + + msg := testhelpers.RandomizeSlice(make([]byte, 100)) + cert, err := rpcAgg.Store(ctx, msg, 0, nil) + testhelpers.RequireImpl(t, err) + + retrievedMessage, err := rpcAgg.Retrieve(ctx, cert) + testhelpers.RequireImpl(t, err) + + if !bytes.Equal(msg, retrievedMessage) { + testhelpers.FailImpl(t, "failed to retrieve correct message") + } +} diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 24904f2136..d6bdc6533a 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -4,13 +4,15 @@ import ( "context" "encoding/base64" "encoding/json" - "github.com/ethereum/go-ethereum/common" - "github.com/offchainlabs/nitro/arbutil" "math/big" "net" + "net/http" "testing" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/offchainlabs/nitro/arbutil" + "github.com/ethereum/go-ethereum/ethclient" "github.com/offchainlabs/nitro/blsSignatures" @@ -30,7 +32,7 @@ func startLocalDASServer( dataDir string, l1client arbutil.L1Interface, seqInboxAddress common.Address, -) (*dasrpc.DASRPCServer, *blsSignatures.PublicKey, dasrpc.BackendConfig) { +) (*http.Server, *blsSignatures.PublicKey, dasrpc.BackendConfig) { lis, err := net.Listen("tcp", "localhost:0") Require(t, err) keyDir := t.TempDir() @@ -45,7 +47,7 @@ func startLocalDASServer( dasServer, err := dasrpc.StartDASRPCServerOnListener(ctx, lis, localDas) Require(t, err) config := dasrpc.BackendConfig{ - URL: lis.Addr().String(), + URL: "http://" + lis.Addr().String(), PubKeyBase64Encoded: blsPubToBase64(pubkey), SignerMask: 1, } @@ -109,9 +111,13 @@ func TestDASRekey(t *testing.T) { nodeA.StopAndWait() nodeB.StopAndWait() - dasServerA.Stop() + err = dasServerA.Shutdown(ctx) + Require(t, err) dasServerB, pubkeyB, backendConfigB := startLocalDASServer(t, ctx, dasDataDir, l1client, addresses.SequencerInbox) - defer dasServerB.Stop() + defer func() { + err = dasServerB.Shutdown(ctx) + Require(t, err) + }() authorizeDASKeyset(t, ctx, pubkeyB, l1info, l1client) // Restart the node on the new keyset against the new DAS server running on the same disk as the first with new keys