diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 015bc7c1eb..9669f1a826 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -3,12 +3,13 @@ package main import ( "bytes" "context" + "crypto/ecdsa" "errors" "fmt" "sync/atomic" + "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" - "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" @@ -22,7 +23,6 @@ import ( v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" @@ -49,8 +49,7 @@ type objectSvc struct { search *searchsvc.Service - get *getsvcV2.Service - get_ *getsvc.Service + get *getsvc.Service delete *deletesvc.Service } @@ -71,7 +70,7 @@ func (s *objectSvc) Put(ctx context.Context) (*putsvc.Streamer, error) { } func (s *objectSvc) Head(ctx context.Context, prm getsvc.HeadPrm) error { - return s.get_.Head(ctx, prm) + return s.get.Head(ctx, prm) } func (s *objectSvc) Search(ctx context.Context, prm searchsvc.Prm) error { @@ -79,7 +78,7 @@ func (s *objectSvc) Search(ctx context.Context, prm searchsvc.Prm) error { } func (s *objectSvc) Get(ctx context.Context, prm getsvc.Prm) error { - return s.get_.Get(ctx, prm) + return s.get.Get(ctx, prm) } func (s *objectSvc) Delete(ctx context.Context, prm deletesvc.Prm) error { @@ -87,11 +86,11 @@ func (s *objectSvc) Delete(ctx context.Context, prm deletesvc.Prm) error { } func (s *objectSvc) GetRange(ctx context.Context, prm getsvc.RangePrm) error { - return s.get_.GetRange(ctx, prm) + return s.get.GetRange(ctx, prm) } -func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - return s.get.GetRangeHash(ctx, req) +func (s *objectSvc) GetRangeHash(ctx context.Context, prm getsvc.RangeHashPrm) (*getsvc.RangeHashRes, error) { + return s.get.GetRangeHash(ctx, prm) } type delNetInfo struct { @@ -230,11 +229,6 @@ func initObjectService(c *cfg) { *c.cfgObject.getSvc = *sGet // need smth better - sGetV2 := getsvcV2.NewService( - getsvcV2.WithInternalService(sGet), - getsvcV2.WithKeyStorage(keyStorage), - ) - cnrNodes, err := newContainerNodes(c.cfgObject.cnrSource, c.netMapSource) fatalOnErr(err) c.cfgObject.containerNodes = cnrNodes @@ -279,8 +273,7 @@ func initObjectService(c *cfg) { objSvc := &objectSvc{ put: sPut, search: sSearch, - get: sGetV2, - get_: sGet, + get: sGet, delete: sDelete, } @@ -308,7 +301,11 @@ func initObjectService(c *cfg) { SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber, c.log)), ) - server := objectService.New(objSvc, mNumber, fsChain, (*putObjectServiceWrapper)(sPut), c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc) + storage := storageForObjectService{ + putSvc: sPut, + keys: keyStorage, + } + server := objectService.New(objSvc, mNumber, fsChain, storage, c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) @@ -617,10 +614,21 @@ func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool { // maintenance now. func (x *fsChainForObjects) LocalNodeUnderMaintenance() bool { return x.isMaintenance.Load() } -type putObjectServiceWrapper putsvc.Service +type storageForObjectService struct { + putSvc *putsvc.Service + keys *util.KeyStorage +} + +func (x storageForObjectService) VerifyAndStoreObjectLocally(obj objectSDK.Object) error { + return x.putSvc.ValidateAndStoreObjectLocally(obj) +} -func (x *putObjectServiceWrapper) VerifyAndStoreObject(obj objectSDK.Object) error { - return (*putsvc.Service)(x).ValidateAndStoreObjectLocally(obj) +func (x storageForObjectService) GetSessionPrivateKey(usr user.ID, uid uuid.UUID) (ecdsa.PrivateKey, error) { + k, err := x.keys.GetKey(&util.SessionInfo{ID: uid, Owner: usr}) + if err != nil { + return ecdsa.PrivateKey{}, err + } + return *k, nil } type objectSource struct { diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go deleted file mode 100644 index 23ed610a1b..0000000000 --- a/pkg/services/object/get/v2/service.go +++ /dev/null @@ -1,64 +0,0 @@ -package getsvc - -import ( - "context" - - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" -) - -// Service implements Get operation of Object service v2. -type Service struct { - *cfg -} - -// Option represents Service constructor option. -type Option func(*cfg) - -type cfg struct { - svc *getsvc.Service - - keyStorage *objutil.KeyStorage -} - -// NewService constructs Service instance from provided options. -func NewService(opts ...Option) *Service { - c := new(cfg) - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -// GetRangeHash calls internal service and returns v2 response. -func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { - p, err := s.toHashRangePrm(req) - if err != nil { - return nil, err - } - - res, err := s.svc.GetRangeHash(ctx, *p) - if err != nil { - return nil, err - } - - return toHashResponse(req.GetBody().GetType(), res), nil -} - -func WithInternalService(v *getsvc.Service) Option { - return func(c *cfg) { - c.svc = v - } -} - -// WithKeyStorage returns option to set local private key storage. -func WithKeyStorage(ks *objutil.KeyStorage) Option { - return func(c *cfg) { - c.keyStorage = ks - } -} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go deleted file mode 100644 index 8af1e9b056..0000000000 --- a/pkg/services/object/get/v2/util.go +++ /dev/null @@ -1,222 +0,0 @@ -package getsvc - -import ( - "context" - "crypto/ecdsa" - "crypto/sha256" - "errors" - "fmt" - "hash" - "sync" - - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" - "github.com/nspcc-dev/neofs-api-go/v2/refs" - "github.com/nspcc-dev/neofs-api-go/v2/session" - "github.com/nspcc-dev/neofs-api-go/v2/signature" - "github.com/nspcc-dev/neofs-api-go/v2/status" - protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" - "github.com/nspcc-dev/neofs-node/pkg/core/client" - "github.com/nspcc-dev/neofs-node/pkg/network" - getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" - "github.com/nspcc-dev/tzhash/tz" - "google.golang.org/grpc" -) - -func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.RangeHashPrm, error) { - body := req.GetBody() - - addrV2 := body.GetAddress() - if addrV2 == nil { - return nil, errors.New("missing object address") - } - - var addr oid.Address - - err := addr.ReadFromV2(*addrV2) - if err != nil { - return nil, fmt.Errorf("invalid object address: %w", err) - } - - commonPrm, err := util.CommonPrmFromV2(req) - if err != nil { - return nil, err - } - - p := new(getsvc.RangeHashPrm) - p.SetCommonParameters(commonPrm) - - p.WithAddress(addr) - - if tok := commonPrm.SessionToken(); tok != nil { - signerKey, err := s.keyStorage.GetKey(&util.SessionInfo{ - ID: tok.ID(), - Owner: tok.Issuer(), - }) - if err != nil && errors.As(err, new(apistatus.SessionTokenNotFound)) { - commonPrm.ForgetTokens() - signerKey, err = s.keyStorage.GetKey(nil) - } - - if err != nil { - return nil, fmt.Errorf("fetching session key: %w", err) - } - - p.WithCachedSignerKey(signerKey) - } - - rngsV2 := body.GetRanges() - rngs := make([]object.Range, len(rngsV2)) - - for i := range rngsV2 { - rngs[i] = *object.NewRangeFromV2(&rngsV2[i]) - } - - p.SetRangeList(rngs) - p.SetSalt(body.GetSalt()) - - switch t := body.GetType(); t { - default: - return nil, fmt.Errorf("unknown checksum type %v", t) - case refs.SHA256: - p.SetHashGenerator(func() hash.Hash { - return sha256.New() - }) - case refs.TillichZemor: - p.SetHashGenerator(func() hash.Hash { - return tz.New() - }) - } - - if !commonPrm.LocalOnly() { - var onceResign sync.Once - var key *ecdsa.PrivateKey - - key, err = s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - p.SetRangeHashRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) ([][]byte, error) { - meta := req.GetMetaHeader() - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - if err != nil { - return nil, err - } - - var resp *protoobject.GetRangeHashResponse - err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { - resp, err = protoobject.NewObjectServiceClient(conn).GetRangeHash(ctx, req.ToGRPCMessage().(*protoobject.GetRangeHashRequest)) - return err - }) - if err != nil { - return nil, fmt.Errorf("GetRangeHash rpc failure: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - resp2 := new(objectV2.GetRangeResponse) - if err = resp2.FromGRPCMessage(resp); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - if err := signature.VerifyServiceMessage(resp2); err != nil { - return nil, fmt.Errorf("could not verify %T: %w", resp, err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - return resp.GetBody().GetHashList(), nil - })) - } - - return p, nil -} - -func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { - resp := new(objectV2.GetRangeHashResponse) - - body := new(objectV2.GetRangeHashResponseBody) - resp.SetBody(body) - - body.SetType(typ) - body.SetHashList(res.Hashes()) - - return resp -} - -func groupAddressRequestForwarder[V any](f func(context.Context, network.Address, client.MultiAddressClient, []byte) (V, error)) func(context.Context, client.NodeInfo, client.MultiAddressClient) (V, error) { - return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (V, error) { - var ( - firstErr error - res V - - key = info.PublicKey() - ) - - info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) { - var err error - - defer func() { - stop = err == nil - - if stop || firstErr == nil { - firstErr = err - } - - // would be nice to log otherwise - }() - - res, err = f(ctx, addr, c, key) - - return - }) - - return res, firstErr - } -} - -func writeCurrentVersion(metaHdr *session.RequestMetaHeader) { - versionV2 := new(refs.Version) - - apiVersion := versionSDK.Current() - apiVersion.WriteToV2(versionV2) - - metaHdr.SetVersion(versionV2) -} - -func checkStatus(st *protostatus.Status) error { - stV2 := new(status.Status) - if err := stV2.FromGRPCMessage(st); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - if !status.IsSuccess(stV2.Code()) { - return apistatus.ErrorFromV2(stV2) - } - - return nil -} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 865d8eeeac..ec610a9eb3 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/ecdsa" + "crypto/sha256" "encoding/binary" "errors" "fmt" @@ -11,6 +12,7 @@ import ( "sync" "time" + "github.com/google/uuid" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" refsv2 "github.com/nspcc-dev/neofs-api-go/v2/refs" @@ -41,19 +43,20 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" "google.golang.org/grpc" ) -// ServiceServer is an interface of utility -// serving v2 Object service. -type ServiceServer interface { +// Handlers represents storage node's internal handler Object service op +// payloads. +type Handlers interface { Get(context.Context, getsvc.Prm) error Put(context.Context) (*putsvc.Streamer, error) Head(context.Context, getsvc.HeadPrm) error Search(context.Context, searchsvc.Prm) error Delete(context.Context, deletesvc.Prm) error GetRange(context.Context, getsvc.RangePrm) error - GetRangeHash(context.Context, *v2object.GetRangeHashRequest) (*v2object.GetRangeHashResponse, error) + GetRangeHash(context.Context, getsvc.RangeHashPrm) (*getsvc.RangeHashRes, error) } // Various NeoFS protocol status codes. @@ -102,13 +105,22 @@ type FSChain interface { LocalNodeUnderMaintenance() bool } -// Storage groups ops of the node's object storage required to serve NeoFS API -// Object service. +type sessions interface { + // GetSessionPrivateKey reads private session key by user and session IDs. + // Returns [apistatus.ErrSessionTokenNotFound] if there is no data for the + // referenced session. + GetSessionPrivateKey(usr user.ID, uid uuid.UUID) (ecdsa.PrivateKey, error) +} + +// Storage groups ops of the node's storage required to serve NeoFS API Object +// service. type Storage interface { - // VerifyAndStoreObject checks whether given object has correct format and, if - // so, saves it in the Storage. StoreObject is called only when local node - // complies with the container's storage policy. - VerifyAndStoreObject(object.Object) error + sessions + + // VerifyAndStoreObjectLocally checks whether given object has correct format + // and, if so, saves it in the Storage. StoreObject is called only when local + // node complies with the container's storage policy. + VerifyAndStoreObjectLocally(object.Object) error } type RequestInfoProcessor interface { @@ -146,8 +158,7 @@ const ( ) type server struct { - srv ServiceServer - + handlers Handlers fsChain FSChain storage Storage signer ecdsa.PrivateKey @@ -158,9 +169,9 @@ type server struct { } // New provides protoobject.ObjectServiceServer for the given parameters. -func New(c ServiceServer, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp RequestInfoProcessor) protoobject.ObjectServiceServer { +func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp RequestInfoProcessor) protoobject.ObjectServiceServer { return &server{ - srv: c, + handlers: hs, fsChain: fsChain, storage: st, signer: signer, @@ -175,12 +186,16 @@ func (s *server) pushOpExecResult(op stat.Method, err error, startedAt time.Time s.metrics.HandleOpExecResult(op, err == nil, time.Since(startedAt)) } -func (s *server) makeResponseMetaHeader(st *protostatus.Status) *protosession.ResponseMetaHeader { +func newCurrentProtoVersionMessage() *refs.Version { v := version.Current() var v2 refsv2.Version v.WriteToV2(&v2) + return v2.ToGRPCMessage().(*refs.Version) +} + +func (s *server) makeResponseMetaHeader(st *protostatus.Status) *protosession.ResponseMetaHeader { return &protosession.ResponseMetaHeader{ - Version: v2.ToGRPCMessage().(*refs.Version), + Version: newCurrentProtoVersionMessage(), Epoch: s.fsChain.CurrentEpoch(), Status: st, } @@ -385,7 +400,7 @@ func (x *putStream) close() (*protoobject.PutResponse, error) { func (s *server) Put(gStream protoobject.ObjectService_PutServer) error { startTime := time.Now() - stream, err := s.srv.Put(gStream.Context()) + stream, err := s.handlers.Put(gStream.Context()) if err != nil { return s.sendStatusPutResponse(startTime, gStream, err) } @@ -514,7 +529,7 @@ func (s *server) Delete(ctx context.Context, req *protoobject.DeleteRequest) (*p p.SetCommonParameters(cp) p.WithAddress(addr) p.WithTombstoneAddressTarget((*deleteResponseBody)(&rb)) - if err := s.srv.Delete(ctx, p); err != nil { + if err := s.handlers.Delete(ctx, p); err != nil { return s.makeStatusDeleteResponse(startTime, err) } @@ -574,7 +589,7 @@ func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto if err != nil { return s.makeStatusHeadResponse(startTime, err) } - err = s.srv.Head(ctx, p) + err = s.handlers.Head(ctx, p) if err != nil { return s.makeStatusHeadResponse(startTime, err) } @@ -841,12 +856,155 @@ func (s *server) GetRangeHash(ctx context.Context, req *protoobject.GetRangeHash return s.makeStatusHashResponse(startTime, eACLErr(reqInfo, err)) } - resp, err := s.srv.GetRangeHash(ctx, hashRngReq) + p, err := convertHashPrm(s.signer, s.storage, req) if err != nil { return s.makeStatusHashResponse(startTime, err) } + res, err := s.handlers.GetRangeHash(ctx, p) + if err != nil { + return s.makeStatusHashResponse(startTime, err) + } + + return s.signHashResponse(startTime, util.StatusOKErr, &protoobject.GetRangeHashResponse{ + Body: &protoobject.GetRangeHashResponse_Body{ + Type: req.Body.Type, + HashList: res.Hashes(), + }, + }) +} + +// converts original request into parameters accepted by the internal handler. +func convertHashPrm(signer ecdsa.PrivateKey, ss sessions, req *protoobject.GetRangeHashRequest) (getsvc.RangeHashPrm, error) { + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.RangeHashPrm{}, errors.New("missing object address") + } + + var addr oid.Address + var addr2 refsv2.Address + if err := addr2.FromGRPCMessage(ma); err != nil { + panic(err) + } + if err := addr.ReadFromV2(addr2); err != nil { + return getsvc.RangeHashPrm{}, fmt.Errorf("invalid object address: %w", err) + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.RangeHashPrm{}, err + } + + var p getsvc.RangeHashPrm + + switch t := body.GetType(); t { + default: + return getsvc.RangeHashPrm{}, fmt.Errorf("unknown checksum type %v", t) + case refs.ChecksumType_SHA256: + p.SetHashGenerator(sha256.New) + case refs.ChecksumType_TZ: + p.SetHashGenerator(tz.New) + } + + if tok := cp.SessionToken(); tok != nil { + signerKey, err := ss.GetSessionPrivateKey(tok.Issuer(), tok.ID()) + if err != nil { + if !errors.Is(err, apistatus.ErrSessionTokenNotFound) { + return getsvc.RangeHashPrm{}, fmt.Errorf("fetching session key: %w", err) + } + cp.ForgetTokens() + signerKey = signer + } + p.WithCachedSignerKey(&signerKey) + } + + mr := body.GetRanges() + rngs := make([]object.Range, len(mr)) + for i := range mr { + var r2 v2object.Range + if err := r2.FromGRPCMessage(mr[i]); err != nil { + panic(err) + } + rngs[i] = *object.NewRangeFromV2(&r2) + } + + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.SetRangeList(rngs) + p.SetSalt(body.GetSalt()) + + if cp.LocalOnly() { + return p, nil + } - return s.signHashResponse(startTime, nil, resp.ToGRPCMessage().(*protoobject.GetRangeHashResponse)) + var onceResign sync.Once + meta := req.GetMetaHeader() + p.SetRangeHashRequestForwarder(func(ctx context.Context, node client.NodeInfo, c client.MultiAddressClient) ([][]byte, error) { + var err error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + // TODO: #1165 think how to set the other fields + Version: newCurrentProtoVersionMessage(), + Ttl: meta.GetTtl() - 1, // FIXME: meta can be nil + Origin: meta, + } + var req2 v2object.GetRangeHashRequest + if err := req2.FromGRPCMessage(req); err != nil { + panic(err) + } + if err = signature.SignServiceMessage(&signer, &req2); err == nil { + req = req2.ToGRPCMessage().(*protoobject.GetRangeHashRequest) + } + }) + if err != nil { + return nil, err + } + + var firstErr error + nodePub := node.PublicKey() + addrs := node.AddressGroup() + for i := range addrs { + hs, err := getHashesFromRemoteNode(ctx, c, addrs[i], nodePub, req) + if err == nil { + return hs, nil + } + if firstErr == nil { + firstErr = err + } + // TODO: log error + } + return nil, firstErr + }) + return p, nil +} + +func getHashesFromRemoteNode(ctx context.Context, c client.MultiAddressClient, addr network.Address, nodePub []byte, + req *protoobject.GetRangeHashRequest) ([][]byte, error) { + var resp *protoobject.GetRangeHashResponse + err := c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + var err error + resp, err = protoobject.NewObjectServiceClient(conn).GetRangeHash(ctx, req) + return err + }) + if err != nil { + return nil, fmt.Errorf("GetRangeHash rpc failure: %w", err) + } + + if err := internal.VerifyResponseKeyV2(nodePub, resp); err != nil { + return nil, err + } + resp2 := new(v2object.GetRangeHashResponse) + if err := resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return nil, err + } + // TODO: verify number of hashes + return resp.GetBody().GetHashList(), nil } func (s *server) sendGetResponse(stream protoobject.ObjectService_GetServer, resp *protoobject.GetResponse) error { @@ -947,7 +1105,7 @@ func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ if err != nil { return s.sendStatusGetResponse(startTime, gStream, err) } - err = s.srv.Get(gStream.Context(), p) + err = s.handlers.Get(gStream.Context(), p) if err != nil { return s.sendStatusGetResponse(startTime, gStream, err) } @@ -1212,7 +1370,7 @@ func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. if err != nil { return s.sendStatusRangeResponse(startTime, gStream, err) } - err = s.srv.GetRange(gStream.Context(), p) + err = s.handlers.GetRange(gStream.Context(), p) if err != nil { return s.sendStatusRangeResponse(startTime, gStream, err) } @@ -1458,7 +1616,7 @@ func (s *server) Search(req *protoobject.SearchRequest, gStream protoobject.Obje if err != nil { return s.sendStatusSearchResponse(startTime, gStream, err) } - err = s.srv.Search(gStream.Context(), p) + err = s.handlers.Search(gStream.Context(), p) if err != nil { return s.sendStatusSearchResponse(startTime, gStream, err) } @@ -1749,7 +1907,7 @@ func (s *server) Replicate(_ context.Context, req *protoobject.ReplicateRequest) }}, nil } - err = s.storage.VerifyAndStoreObject(*obj) + err = s.storage.VerifyAndStoreObjectLocally(*obj) if err != nil { return &protoobject.ReplicateResponse{Status: &protostatus.Status{ Code: codeInternal, diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index 80f7a56faa..c46d15db72 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/google/uuid" objectgrpc "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" refsv2 "github.com/nspcc-dev/neofs-api-go/v2/refs" refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" @@ -22,6 +22,7 @@ import ( getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" @@ -68,7 +69,7 @@ func (x noCallObjectService) GetRange(context.Context, getsvc.RangePrm) error { panic("must not be called") } -func (x noCallObjectService) GetRangeHash(context.Context, *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { +func (x noCallObjectService) GetRangeHash(context.Context, getsvc.RangeHashPrm) (*getsvc.RangeHashRes, error) { panic("must not be called") } @@ -85,7 +86,12 @@ func (*noCallTestFSChain) LocalNodeUnderMaintenance() bool { panic("must not be type noCallTestStorage struct{} -func (noCallTestStorage) VerifyAndStoreObject(object.Object) error { panic("must not be called") } +func (noCallTestStorage) VerifyAndStoreObjectLocally(object.Object) error { + panic("must not be called") +} +func (noCallTestStorage) GetSessionPrivateKey(user.ID, uuid.UUID) (ecdsa.PrivateKey, error) { + panic("implement me") +} type noCallTestACLChecker struct{} @@ -212,11 +218,15 @@ func newTestStorage(t testing.TB, obj *objectgrpc.Object) *testStorage { return &testStorage{t: t, obj: obj} } -func (x *testStorage) VerifyAndStoreObject(obj object.Object) error { +func (x *testStorage) VerifyAndStoreObjectLocally(obj object.Object) error { require.Equal(x.t, x.obj, obj.ToV2().ToGRPCMessage().(*objectgrpc.Object)) return x.storeErr } +func (x *testStorage) GetSessionPrivateKey(user.ID, uuid.UUID) (ecdsa.PrivateKey, error) { + return ecdsa.PrivateKey{}, apistatus.ErrSessionTokenNotFound +} + func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID oid.ID) (*objectgrpc.ReplicateRequest, object.Object) { obj := objecttest.Object() obj.SetType(object.TypeRegular) @@ -546,7 +556,10 @@ func (nopFSChain) LocalNodeUnderMaintenance() bool { return false } type nopStorage struct{} -func (nopStorage) VerifyAndStoreObject(object.Object) error { return nil } +func (nopStorage) VerifyAndStoreObjectLocally(object.Object) error { return nil } +func (nopStorage) GetSessionPrivateKey(user.ID, uuid.UUID) (ecdsa.PrivateKey, error) { + return ecdsa.PrivateKey{}, apistatus.ErrSessionTokenNotFound +} func BenchmarkServer_Replicate(b *testing.B) { ctx := context.Background() diff --git a/pkg/services/object/util/prm.go b/pkg/services/object/util/prm.go index 93c62b684c..750aa9678a 100644 --- a/pkg/services/object/util/prm.go +++ b/pkg/services/object/util/prm.go @@ -84,53 +84,6 @@ func (p *CommonPrm) ForgetTokens() { } } -func CommonPrmFromV2(req interface { - GetMetaHeader() *session.RequestMetaHeader -}) (*CommonPrm, error) { - meta := req.GetMetaHeader() - ttl := meta.GetTTL() - - // unwrap meta header to get original request meta information - for meta.GetOrigin() != nil { - meta = meta.GetOrigin() - } - - var tokenSession *sessionsdk.Object - var err error - - if tokenSessionV2 := meta.GetSessionToken(); tokenSessionV2 != nil { - tokenSession = new(sessionsdk.Object) - - err = tokenSession.ReadFromV2(*tokenSessionV2) - if err != nil { - return nil, fmt.Errorf("invalid session token: %w", err) - } - } - - xHdrs := meta.GetXHeaders() - - prm := &CommonPrm{ - local: ttl <= maxLocalTTL, - token: tokenSession, - ttl: ttl - 1, // decrease TTL for new requests - xhdrs: make([]string, 0, 2*len(xHdrs)), - } - - if tok := meta.GetBearerToken(); tok != nil { - prm.bearer = new(bearer.Token) - err = prm.bearer.ReadFromV2(*tok) - if err != nil { - return nil, fmt.Errorf("invalid bearer token: %w", err) - } - } - - for i := range xHdrs { - prm.xhdrs = append(prm.xhdrs, xHdrs[i].GetKey(), xHdrs[i].GetValue()) - } - - return prm, nil -} - // CommonPrmFromRequest is a temporary copy-paste of [CommonPrmFromV2]. func CommonPrmFromRequest(req interface { GetMetaHeader() *protosession.RequestMetaHeader