Skip to content

Commit

Permalink
Merge pull request #610 from OffchainLabs/das-rpc
Browse files Browse the repository at this point in the history
Switch DAS server over to rpc
  • Loading branch information
Tristan-Wilson authored May 13, 2022
2 parents f88b6fa + 1b826b7 commit 7e3a9b4
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 63 deletions.
3 changes: 1 addition & 2 deletions cmd/daserver/daserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func startup() error {
return err
}
<-sigint
server.Stop()

return nil
return server.Shutdown(ctx)
}
56 changes: 28 additions & 28 deletions das/dasrpc/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,75 +6,75 @@ package dasrpc
import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/das"

"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/blsSignatures"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt DASServiceImplClient
clnt *rpc.Client
}

func NewDASRPCClient(target string) (*DASRPCClient, error) {
// TODO revisit insecure setting
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
}
clnt := NewDASServiceImplClient(conn)
return &DASRPCClient{clnt: clnt}, nil
}

func (clnt *DASRPCClient) Retrieve(ctx context.Context, cert *arbstate.DataAvailabilityCertificate) ([]byte, error) {
func (c *DASRPCClient) Retrieve(ctx context.Context, cert *arbstate.DataAvailabilityCertificate) ([]byte, error) {
certBytes := das.Serialize(cert)
response, err := clnt.clnt.Retrieve(ctx, &RetrieveRequest{CertBytes: certBytes})
if err != nil {
var ret hexutil.Bytes
if err := c.clnt.CallContext(ctx, &ret, "das_retrieve", hexutil.Bytes(certBytes)); err != nil {
return nil, err
}
return response.Result, nil
return ret, nil
}

func (clnt *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*arbstate.DataAvailabilityCertificate, error) {
response, err := clnt.clnt.Store(ctx, &StoreRequest{Message: message, Timeout: timeout, Sig: reqSig})
if err != nil {
func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*arbstate.DataAvailabilityCertificate, error) {
var ret StoreResult
if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil {
return nil, err
}
var keysetHash [32]byte
copy(keysetHash[:], response.KeysetHash)
copy(keysetHash[:], ret.KeysetHash)
var dataHash [32]byte
copy(dataHash[:], response.DataHash)
respSig, err := blsSignatures.SignatureFromBytes(response.Sig)
copy(dataHash[:], ret.DataHash)
respSig, err := blsSignatures.SignatureFromBytes(ret.Sig)
if err != nil {
return nil, err
}
return &arbstate.DataAvailabilityCertificate{
DataHash: dataHash,
Timeout: response.Timeout,
SignersMask: response.SignersMask,
Timeout: uint64(ret.Timeout),
SignersMask: uint64(ret.SignersMask),
Sig: respSig,
KeysetHash: keysetHash,
}, nil
}

func (clnt *DASRPCClient) KeysetFromHash(ctx context.Context, ksHash []byte) ([]byte, error) {
response, err := clnt.clnt.KeysetFromHash(ctx, &KeysetFromHashRequest{KsHash: ksHash})
if err != nil {
func (c *DASRPCClient) KeysetFromHash(ctx context.Context, ksHash []byte) ([]byte, error) {
var ret hexutil.Bytes
if err := c.clnt.CallContext(ctx, &ret, "das_keysetFromHash", hexutil.Bytes(ksHash)); err != nil {
return nil, err
}
return response.Result, nil
return ret, nil
}

func (clnt *DASRPCClient) CurrentKeysetBytes(ctx context.Context) ([]byte, error) {
response, err := clnt.clnt.CurrentKeysetBytes(ctx, &CurrentKeysetBytesRequest{})
if err != nil {
func (c *DASRPCClient) CurrentKeysetBytes(ctx context.Context) ([]byte, error) {
var ret hexutil.Bytes
if err := c.clnt.CallContext(ctx, &ret, "das_currentKeysetBytes"); err != nil {
return nil, err
}
return response.Result, nil
return ret, nil
}

func (clnt *DASRPCClient) String() string {
return fmt.Sprintf("DASRPCClient{clnt:%v}", clnt.clnt)
func (c *DASRPCClient) String() string {
return fmt.Sprintf("DASRPCClient{c:%v}", c.clnt)
}
67 changes: 40 additions & 27 deletions das/dasrpc/dasRpcServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,86 +8,99 @@ import (
"context"
"fmt"
"net"
"net/http"

"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/das"
"google.golang.org/grpc"
)

type DASRPCServer struct {
UnimplementedDASServiceImplServer // this allows grpc to verify its version invariant
grpcServer *grpc.Server
localDAS das.DataAvailabilityService
localDAS das.DataAvailabilityService
}

func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDAS das.DataAvailabilityService) (*DASRPCServer, error) {
func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDAS das.DataAvailabilityService) (*http.Server, error) {
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr, portNum))
if err != nil {
return nil, err
}
return StartDASRPCServerOnListener(ctx, listener, localDAS)
}

func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, localDAS das.DataAvailabilityService) (*DASRPCServer, error) {
grpcServer := grpc.NewServer()
dasServer := &DASRPCServer{grpcServer: grpcServer, localDAS: localDAS}
RegisterDASServiceImplServer(grpcServer, dasServer)
func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, localDAS das.DataAvailabilityService) (*http.Server, error) {
rpcServer := rpc.NewServer()
err := rpcServer.RegisterName("das", &DASRPCServer{localDAS: localDAS})
if err != nil {
return nil, err
}

srv := &http.Server{
Handler: rpcServer,
}

go func() {
err := grpcServer.Serve(listener)
err := srv.Serve(listener)
if err != nil {
return
}
}()
go func() {
<-ctx.Done()
grpcServer.GracefulStop()
_ = srv.Shutdown(context.Background())
}()
return dasServer, nil
return srv, nil
}

func (serv *DASRPCServer) Stop() {
serv.grpcServer.GracefulStop()
type StoreResult struct {
DataHash hexutil.Bytes `json:"dataHash,omitempty"`
Timeout hexutil.Uint64 `json:"timeout,omitempty"`
SignersMask hexutil.Uint64 `json:"signersMask,omitempty"`
KeysetHash hexutil.Bytes `json:"keysetHash,omitempty"`
Sig hexutil.Bytes `json:"sig,omitempty"`
}

func (serv *DASRPCServer) Store(ctx context.Context, req *StoreRequest) (*StoreResponse, error) {
cert, err := serv.localDAS.Store(ctx, req.Message, req.Timeout, req.Sig)
func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) {
cert, err := serv.localDAS.Store(ctx, message, uint64(timeout), sig)
if err != nil {
return nil, err
}
return &StoreResponse{
return &StoreResult{
KeysetHash: cert.KeysetHash[:],
DataHash: cert.DataHash[:],
Timeout: cert.Timeout,
SignersMask: cert.SignersMask,
Timeout: hexutil.Uint64(cert.Timeout),
SignersMask: hexutil.Uint64(cert.SignersMask),
Sig: blsSignatures.SignatureToBytes(cert.Sig),
}, nil
}

func (serv *DASRPCServer) Retrieve(ctx context.Context, req *RetrieveRequest) (*RetrieveResponse, error) {
cert, err := arbstate.DeserializeDASCertFrom(bytes.NewReader(req.CertBytes))
func (serv *DASRPCServer) Retrieve(ctx context.Context, certBytes hexutil.Bytes) (hexutil.Bytes, error) {
cert, err := arbstate.DeserializeDASCertFrom(bytes.NewReader(certBytes))
if err != nil {
return nil, err
}
result, err := serv.localDAS.Retrieve(ctx, cert)
if err != nil {
return nil, err
}
return &RetrieveResponse{Result: result}, nil
return result, nil
}

func (serv *DASRPCServer) KeysetFromHash(ctx context.Context, req *KeysetFromHashRequest) (*KeysetFromHashResponse, error) {
resp, err := serv.localDAS.KeysetFromHash(ctx, req.KsHash)
func (serv *DASRPCServer) KeysetFromHash(ctx context.Context, ksHash hexutil.Bytes) (hexutil.Bytes, error) {
resp, err := serv.localDAS.KeysetFromHash(ctx, ksHash)
if err != nil {
return nil, err
}
return &KeysetFromHashResponse{Result: resp}, nil
return resp, nil
}

func (serv *DASRPCServer) CurrentKeysetBytes(ctx context.Context, req *CurrentKeysetBytesRequest) (*CurrentKeysetBytesResponse, error) {
func (serv *DASRPCServer) CurrentKeysetBytes(ctx context.Context) (hexutil.Bytes, error) {
resp, err := serv.localDAS.CurrentKeysetBytes(ctx)
if err != nil {
return nil, err
}
return &CurrentKeysetBytesResponse{Result: resp}, nil
return resp, nil
}
11 changes: 11 additions & 0 deletions das/dasrpc/rpc_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package dasrpc

import (
"encoding/json"

"github.com/offchainlabs/nitro/solgen/go/bridgegen"

"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/arbutil"

Expand Down Expand Up @@ -33,6 +36,14 @@ func NewRPCAggregatorWithL1Info(config das.AggregatorConfig, l1client arbutil.L1
return das.NewAggregatorWithL1Info(config, services, l1client, seqInboxAddress)
}

func NewRPCAggregatorWithSeqInboxCaller(config das.AggregatorConfig, seqInboxCaller *bridgegen.SequencerInboxCaller) (*das.Aggregator, error) {
services, err := setUpServices(config)
if err != nil {
return nil, err
}
return das.NewAggregatorWithSeqInboxCaller(config, services, seqInboxCaller)
}

func setUpServices(config das.AggregatorConfig) ([]das.ServiceDetails, error) {
var cs []BackendConfig
err := json.Unmarshal([]byte(config.Backends), &cs)
Expand Down
69 changes: 69 additions & 0 deletions das/dasrpc/rpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dasrpc

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"net"
"testing"

"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/util/testhelpers"
)

func blsPubToBase64(pubkey *blsSignatures.PublicKey) string {
pubkeyBytes := blsSignatures.PublicKeyToBytes(*pubkey)
encodedPubkey := make([]byte, base64.StdEncoding.EncodedLen(len(pubkeyBytes)))
base64.StdEncoding.Encode(encodedPubkey, pubkeyBytes)
return string(encodedPubkey)
}

func TestRPC(t *testing.T) {
ctx := context.Background()
lis, err := net.Listen("tcp", "localhost:0")
testhelpers.RequireImpl(t, err)
keyDir := t.TempDir()
dataDir := t.TempDir()
pubkey, _, err := das.GenerateAndStoreKeys(keyDir)
testhelpers.RequireImpl(t, err)
dasConfig := das.LocalDiskDASConfig{
KeyDir: keyDir,
DataDir: dataDir,
}
localDas, err := das.NewLocalDiskDASWithSeqInboxCaller(dasConfig, nil)
testhelpers.RequireImpl(t, err)
dasServer, err := StartDASRPCServerOnListener(ctx, lis, localDas)
defer func() {
if err := dasServer.Shutdown(ctx); err != nil {
panic(err)
}
}()
testhelpers.RequireImpl(t, err)
config := BackendConfig{
URL: "http://" + lis.Addr().String(),
PubKeyBase64Encoded: blsPubToBase64(pubkey),
SignerMask: 1,
}

backendsJsonByte, err := json.Marshal([]BackendConfig{config})
testhelpers.RequireImpl(t, err)
aggConf := das.AggregatorConfig{
AssumedHonest: 1,
Backends: string(backendsJsonByte),
}
rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil)
testhelpers.RequireImpl(t, err)

msg := testhelpers.RandomizeSlice(make([]byte, 100))
cert, err := rpcAgg.Store(ctx, msg, 0, nil)
testhelpers.RequireImpl(t, err)

retrievedMessage, err := rpcAgg.Retrieve(ctx, cert)
testhelpers.RequireImpl(t, err)

if !bytes.Equal(msg, retrievedMessage) {
testhelpers.FailImpl(t, "failed to retrieve correct message")
}
}
18 changes: 12 additions & 6 deletions system_tests/das_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"encoding/base64"
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/arbutil"
"math/big"
"net"
"net/http"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/offchainlabs/nitro/arbutil"

"github.com/ethereum/go-ethereum/ethclient"

"github.com/offchainlabs/nitro/blsSignatures"
Expand All @@ -30,7 +32,7 @@ func startLocalDASServer(
dataDir string,
l1client arbutil.L1Interface,
seqInboxAddress common.Address,
) (*dasrpc.DASRPCServer, *blsSignatures.PublicKey, dasrpc.BackendConfig) {
) (*http.Server, *blsSignatures.PublicKey, dasrpc.BackendConfig) {
lis, err := net.Listen("tcp", "localhost:0")
Require(t, err)
keyDir := t.TempDir()
Expand All @@ -45,7 +47,7 @@ func startLocalDASServer(
dasServer, err := dasrpc.StartDASRPCServerOnListener(ctx, lis, localDas)
Require(t, err)
config := dasrpc.BackendConfig{
URL: lis.Addr().String(),
URL: "http://" + lis.Addr().String(),
PubKeyBase64Encoded: blsPubToBase64(pubkey),
SignerMask: 1,
}
Expand Down Expand Up @@ -109,9 +111,13 @@ func TestDASRekey(t *testing.T) {
nodeA.StopAndWait()
nodeB.StopAndWait()

dasServerA.Stop()
err = dasServerA.Shutdown(ctx)
Require(t, err)
dasServerB, pubkeyB, backendConfigB := startLocalDASServer(t, ctx, dasDataDir, l1client, addresses.SequencerInbox)
defer dasServerB.Stop()
defer func() {
err = dasServerB.Shutdown(ctx)
Require(t, err)
}()
authorizeDASKeyset(t, ctx, pubkeyB, l1info, l1client)

// Restart the node on the new keyset against the new DAS server running on the same disk as the first with new keys
Expand Down

0 comments on commit 7e3a9b4

Please sign in to comment.