Skip to content

Commit

Permalink
Simplify recurrent request handling (except Object service) (#3081)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider authored Jan 15, 2025
2 parents c627b53 + c5b0800 commit 1d699a9
Show file tree
Hide file tree
Showing 51 changed files with 2,004 additions and 3,268 deletions.
14 changes: 1 addition & 13 deletions cmd/neofs-node/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package main
import (
accountingGRPC "github.com/nspcc-dev/neofs-api-go/v2/accounting/grpc"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/balance"
accountingTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/accounting/grpc"
accountingService "github.com/nspcc-dev/neofs-node/pkg/services/accounting"
accounting "github.com/nspcc-dev/neofs-node/pkg/services/accounting/morph"
)

func initAccountingService(c *cfg) {
Expand All @@ -16,17 +14,7 @@ func initAccountingService(c *cfg) {
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.shared.basics.balanceSH, 0)
fatalOnErr(err)

server := accountingTransportGRPC.New(
accountingService.NewSignService(
&c.key.PrivateKey,
accountingService.NewResponseService(
accountingService.NewExecutionService(
accounting.NewExecutor(balanceMorphWrapper),
),
c.respSvc,
),
),
)
server := accountingService.New(&c.key.PrivateKey, c.networkState, balanceMorphWrapper)

for _, srv := range c.cfgGRPC.servers {
accountingGRPC.RegisterAccountingServiceServer(srv, server)
Expand Down
23 changes: 10 additions & 13 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient"
contractsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/contracts"
Expand Down Expand Up @@ -418,21 +417,22 @@ type cfg struct {
cfgObject cfgObject
}

// ReadCurrentNetMap reads network map which has been cached at the
// latest epoch. Returns an error if value has not been cached yet.
// GetNetworkMap reads network map which has been cached at the latest epoch.
// Returns an error if value has not been cached yet.
//
// Provides interface for NetmapService server.
func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
func (c *cfg) GetNetworkMap() (netmap.NetMap, error) {
val := c.netMap.Load()
if val == nil {
return errors.New("missing local network map")
return netmap.NetMap{}, errors.New("missing local network map")
}

val.(netmap.NetMap).WriteToV2(msg)

return nil
return val.(netmap.NetMap), nil
}

// CurrentEpoch returns the latest cached epoch.
func (c *cfg) CurrentEpoch() uint64 { return c.networkState.CurrentEpoch() }

type cfgGRPC struct {
listeners []net.Listener

Expand Down Expand Up @@ -831,14 +831,11 @@ func (c *cfg) reloadObjectPoolSizes() {
c.cfgObject.pool.replication.Tune(c.cfgObject.pool.replicatorPoolSize)
}

func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
func (c *cfg) LocalNodeInfo() (netmap.NodeInfo, error) {
c.cfgNodeInfo.localInfoLock.RLock()
defer c.cfgNodeInfo.localInfoLock.RUnlock()

var res netmapV2.NodeInfo
c.cfgNodeInfo.localInfo.WriteToV2(&res)

return &res, nil
return c.cfgNodeInfo.localInfo, nil
}

// handleLocalNodeInfoFromNetwork rewrites cached node info from the NeoFS network map.
Expand Down
196 changes: 128 additions & 68 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"errors"
"fmt"

containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
apicontainer "github.com/nspcc-dev/neofs-api-go/v2/container"
protocontainer "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
apirefs "github.com/nspcc-dev/neofs-api-go/v2/refs"
refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc"
protosession "github.com/nspcc-dev/neofs-api-go/v2/session/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/signature"
protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc"
containerrpc "github.com/nspcc-dev/neofs-contract/rpc/container"
"github.com/nspcc-dev/neofs-node/pkg/core/client"
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
Expand All @@ -19,19 +24,21 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
netmapEv "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
loadroute "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route"
placementrouter "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/storage"
containerMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
apiClient "github.com/nspcc-dev/neofs-sdk-go/client"
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"go.uber.org/zap"
)

Expand All @@ -47,17 +54,16 @@ func initContainerService(c *cfg) {

cnrCli := c.shared.basics.cCli

cnrRdr := new(morphContainerReader)
cnrWrt := &morphContainerWriter{neoClient: cnrCli}
cnrs := &containersInChain{contractClient: cnrCli}
cnrSrc := cntClient.AsContainerSource(cnrCli)
eaclFetcher := &morphEACLFetcher{cnrCli}

if c.shared.basics.ttl <= 0 {
c.cfgObject.eaclSource = eaclFetcher
cnrRdr.eacl = eaclFetcher
cnrs.eacl = eaclFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.get = cnrSrc
cnrRdr.lister = cnrCli
cnrs.get = cnrSrc
cnrs.lister = cnrCli
} else {
cnrCache := c.shared.basics.containerCache
cnrListCache := c.shared.basics.containerListCache
Expand Down Expand Up @@ -109,12 +115,11 @@ func initContainerService(c *cfg) {
c.cfgObject.eaclSource = eaclCache
c.cfgObject.cnrSource = cnrCache

cnrRdr.lister = cnrListCache
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.get = c.cfgObject.cnrSource

cnrWrt.cacheEnabled = true
cnrWrt.eacls = eaclCache
cnrs.lister = cnrListCache
cnrs.eacl = c.cfgObject.eaclSource
cnrs.get = c.cfgObject.cnrSource
cnrs.cacheEnabled = true
cnrs.eacls = eaclCache
}

estimationsLogger := c.log.With(zap.String("component", "container_estimations"))
Expand Down Expand Up @@ -187,24 +192,16 @@ func initContainerService(c *cfg) {
})
})

server := containerTransportGRPC.New(
containerService.NewSignService(
&c.key.PrivateKey,
containerService.NewResponseService(
&usedSpaceService{
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt, c.networkState)),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
c.respSvc,
),
),
)
server := &usedSpaceService{
ContainerServiceServer: containerService.New(&c.key.PrivateKey, c.networkState, cnrs),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
}

for _, srv := range c.cfgGRPC.servers {
containerGRPC.RegisterContainerServiceServer(srv, server)
protocontainer.RegisterContainerServiceServer(srv, server)
}
}

Expand Down Expand Up @@ -469,7 +466,7 @@ func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontr
}

type usedSpaceService struct {
containerService.Server
protocontainer.ContainerServiceServer

loadWriterProvider loadcontroller.WriterProvider

Expand Down Expand Up @@ -519,10 +516,37 @@ func (c *usedSpaceService) ExternalAddresses() []string {
return c.cfg.ExternalAddresses()
}

func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
func (c *usedSpaceService) makeResponse(body *protocontainer.AnnounceUsedSpaceResponse_Body, st *protostatus.Status) (*protocontainer.AnnounceUsedSpaceResponse, error) {
v := version.Current()
var v2 apirefs.Version
v.WriteToV2(&v2)
resp := &protocontainer.AnnounceUsedSpaceResponse{
Body: body,
MetaHeader: &protosession.ResponseMetaHeader{
Version: v2.ToGRPCMessage().(*refs.Version),
Epoch: c.cfg.networkState.CurrentEpoch(),
Status: st,
},
}
return util.SignResponse(&c.cfg.key.PrivateKey, resp, apicontainer.AnnounceUsedSpaceResponse{}), nil
}

func (c *usedSpaceService) makeStatusResponse(err error) (*protocontainer.AnnounceUsedSpaceResponse, error) {
return c.makeResponse(nil, util.ToStatus(err))
}

func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *protocontainer.AnnounceUsedSpaceRequest) (*protocontainer.AnnounceUsedSpaceResponse, error) {
putReq := new(apicontainer.AnnounceUsedSpaceRequest)
if err := putReq.FromGRPCMessage(req); err != nil {
return nil, err
}
if err := signature.VerifyServiceMessage(putReq); err != nil {
return c.makeStatusResponse(util.ToRequestSignatureVerificationError(err))
}

var passedRoute []loadroute.ServerInfo

for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
for hdr := req.GetVerifyHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
key: hdr.GetBodySignature().GetKey(),
})
Expand All @@ -536,28 +560,27 @@ func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *container

w, err := c.loadWriterProvider.InitWriter(loadroute.NewRouteContext(ctx, passedRoute))
if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
return c.makeStatusResponse(fmt.Errorf("could not initialize container's used space writer: %w", err))
}

var est containerSDK.SizeEstimation

for _, aV2 := range req.GetBody().GetAnnouncements() {
err = est.ReadFromV2(aV2)
for _, a := range req.GetBody().GetAnnouncements() {
var a2 apicontainer.UsedSpaceAnnouncement
if err := a2.FromGRPCMessage(a); err != nil {
panic(err)
}
err = est.ReadFromV2(a2)
if err != nil {
return nil, fmt.Errorf("invalid size announcement: %w", err)
return c.makeStatusResponse(fmt.Errorf("invalid size announcement: %w", err))
}

if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
return nil, err
return c.makeStatusResponse(err)
}
}

respBody := new(containerV2.AnnounceUsedSpaceResponseBody)

resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)

return resp, nil
return c.makeResponse(nil, util.StatusOK)
}

var errNodeOutsideContainer = errors.New("node outside the container")
Expand Down Expand Up @@ -620,53 +643,90 @@ func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.Si
return nil
}

// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
type containersInChain struct {
eacl containerCore.EACLSource

get containerCore.Source

lister interface {
List(*user.ID) ([]cid.ID, error)
}

contractClient *cntClient.Client

cacheEnabled bool
eacls *ttlEACLStorage
}

func (x *morphContainerReader) Get(id cid.ID) (*containerCore.Container, error) {
return x.get.Get(id)
func (x *containersInChain) Get(id cid.ID) (containerSDK.Container, error) {
c, err := x.get.Get(id)
if err != nil {
return containerSDK.Container{}, err
}
return c.Value, nil
}

func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
return x.eacl.GetEACL(id)
func (x *containersInChain) GetEACL(id cid.ID) (eacl.Table, error) {
c, err := x.eacl.GetEACL(id)
if err != nil {
return eacl.Table{}, err
}
return *c.Value, nil
}

func (x *morphContainerReader) List(id *user.ID) ([]cid.ID, error) {
return x.lister.List(id)
func (x *containersInChain) List(id user.ID) ([]cid.ID, error) {
return x.lister.List(&id)
}

type morphContainerWriter struct {
neoClient *cntClient.Client
func (x *containersInChain) Put(cnr containerSDK.Container, pub, sig []byte, st *session.Container) (cid.ID, error) {
data := cnr.Marshal()
d := cnr.ReadDomain()

cacheEnabled bool
eacls *ttlEACLStorage
}
var prm cntClient.PutPrm
prm.SetContainer(data)
prm.SetName(d.Name())
prm.SetZone(d.Zone())
prm.SetKey(pub)
prm.SetSignature(sig)
if st != nil {
prm.SetToken(st.Marshal())
}
if v := cnr.Attribute("__NEOFS__METAINFO_CONSISTENCY"); v == "optimistic" || v == "strict" {
prm.EnableMeta()
}
if err := x.contractClient.Put(prm); err != nil {
return cid.ID{}, err
}

func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
return cntClient.Put(m.neoClient, cnr)
return cid.NewFromMarshalledContainer(data), nil
}

func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
return cntClient.Delete(m.neoClient, witness)
func (x *containersInChain) Delete(id cid.ID, _, sig []byte, st *session.Container) error {
var prm cntClient.DeletePrm
prm.SetCID(id[:])
prm.SetSignature(sig)
prm.RequireAlphabetSignature()
if st != nil {
prm.SetToken(st.Marshal())
}
return x.contractClient.Delete(prm)
}

func (m morphContainerWriter) PutEACL(eaclInfo containerCore.EACL) error {
err := cntClient.PutEACL(m.neoClient, eaclInfo)
if err != nil {
func (x *containersInChain) PutEACL(eACL eacl.Table, pub, sig []byte, st *session.Container) error {
var prm cntClient.PutEACLPrm
prm.SetTable(eACL.Marshal())
prm.SetKey(pub)
prm.SetSignature(sig)
prm.RequireAlphabetSignature()
if st != nil {
prm.SetToken(st.Marshal())
}
if err := x.contractClient.PutEACL(prm); err != nil {
return err
}

if m.cacheEnabled {
id := eaclInfo.Value.GetCID()
m.eacls.InvalidateEACL(id)
if x.cacheEnabled {
x.eacls.InvalidateEACL(eACL.GetCID())
}

return nil
Expand Down
Loading

0 comments on commit 1d699a9

Please sign in to comment.