Skip to content

Commit

Permalink
Store private nodes events only for present and enabled specific node…
Browse files Browse the repository at this point in the history
…s. (#295)

* Store private nodes events only for present and enabled specific nodes.

* Init unreachable state only for enabled specific nodes.

* Write initial state for new specific node.

* Add fast path if no nodes for analyzyzer in notification.
  • Loading branch information
nickeskov authored Oct 7, 2024
1 parent 8f7c828 commit 414b76f
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 36 deletions.
15 changes: 8 additions & 7 deletions cmd/nodemon/nodemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (c *nodemonConfig) runAnalyzers(
es *events.Storage,
ns nodes.Storage,
logger *zap.Logger,
pew specific.PrivateNodesEventsWriter,
notifications <-chan entities.NodesGatheringNotification,
) {
alerts := runAnalyzer(cfg, es, logger, notifications)
Expand All @@ -230,7 +231,7 @@ func (c *nodemonConfig) runAnalyzers(
mergedAlerts := merge(alerts, alertL2)
alerts = mergedAlerts
}
runMessagingServices(ctx, cfg, alerts, logger, ns, es)
runMessagingServices(ctx, cfg, alerts, logger, ns, es, pew)
}

func run() error {
Expand Down Expand Up @@ -290,10 +291,9 @@ func run() error {

notifications := scraper.Start(ctx)
notifications = privateNodesHandler.Run(notifications) // wraps scrapper's notification with private nodes handler
pew := privateNodesHandler.PrivateNodesEventsWriter()

a, err := api.NewAPI(cfg.bindAddress, ns, es, cfg.apiReadTimeout, logger,
privateNodesHandler.PrivateNodesEventsWriter(), atom, cfg.development,
)
a, err := api.NewAPI(cfg.bindAddress, ns, es, cfg.apiReadTimeout, logger, pew, atom, cfg.development)
if err != nil {
logger.Error("failed to initialize API", zap.Error(err))
return err
Expand All @@ -303,7 +303,7 @@ func run() error {
return apiErr
}

cfg.runAnalyzers(ctx, cfg, es, ns, logger, notifications)
cfg.runAnalyzers(ctx, cfg, es, ns, logger, pew, notifications)

<-ctx.Done()
a.Shutdown()
Expand Down Expand Up @@ -361,6 +361,7 @@ func runMessagingServices(
logger *zap.Logger,
ns nodes.Storage,
es *events.Storage,
pew specific.PrivateNodesEventsWriter,
) {
go func() {
pubSubErr := pubsub.StartPubMessagingServer(ctx, cfg.nanomsgPubSubURL, alerts, logger)
Expand All @@ -371,7 +372,7 @@ func runMessagingServices(

if cfg.runTelegramPairServer() {
go func() {
pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairTelegramURL, ns, es, logger)
pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairTelegramURL, ns, es, pew, logger)
if pairErr != nil {
logger.Fatal("failed to start pair messaging server", zap.Error(pairErr))
}
Expand All @@ -380,7 +381,7 @@ func runMessagingServices(

if cfg.runDiscordPairServer() {
go func() {
pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairDiscordURL, ns, es, logger)
pairErr := pair.StartPairMessagingServer(ctx, cfg.nanomsgPairDiscordURL, ns, es, pew, logger)
if pairErr != nil {
logger.Fatal("failed to start pair messaging server", zap.Error(pairErr))
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/analysis/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ func (a *Analyzer) processNotification(alerts chan<- entities.Alert, n entities.
if err := n.Error(); err != nil {
alerts <- entities.NewInternalErrorAlert(n.Timestamp(), err)
}
a.zap.Sugar().Infof("Statements gathering completed with %d nodes", n.NodesCount())
nodesCount := n.NodesCount()
if nodesCount == 0 { // nothing to analyze
return nil
}
a.zap.Sugar().Infof("Statements gathering completed with %d nodes", nodesCount)
cnt, err := a.es.StatementsCount()
if err != nil {
return errors.Wrap(err, "failed to get statements count")
Expand Down
35 changes: 27 additions & 8 deletions pkg/messaging/pair/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"encoding/json"
"strings"
"time"

"nodemon/pkg/entities"
"nodemon/pkg/storing/events"
"nodemon/pkg/storing/nodes"
"nodemon/pkg/storing/specific"

"github.com/pkg/errors"
"go.nanomsg.org/mangos/v3/protocol"
Expand All @@ -20,6 +22,7 @@ func StartPairMessagingServer(
nanomsgURL string,
ns nodes.Storage,
es *events.Storage,
pew specific.PrivateNodesEventsWriter,
logger *zap.Logger,
) error {
if len(nanomsgURL) == 0 || len(strings.Fields(nanomsgURL)) > 1 {
Expand All @@ -39,7 +42,7 @@ func StartPairMessagingServer(
return err
}

loopErr := enterLoop(ctx, socket, logger, ns, es)
loopErr := enterLoop(ctx, socket, logger, ns, es, pew)
if loopErr != nil && !errors.Is(loopErr, context.Canceled) {
return loopErr
}
Expand All @@ -52,6 +55,7 @@ func enterLoop(
logger *zap.Logger,
ns nodes.Storage,
es *events.Storage,
pew specific.PrivateNodesEventsWriter,
) error {
for {
select {
Expand All @@ -63,7 +67,7 @@ func enterLoop(
logger.Error("Failed to receive a message from pair socket", zap.Error(recvErr))
return recvErr
}
err := handleMessage(rawMsg, ns, logger, socket, es)
err := handleMessage(rawMsg, ns, logger, socket, es, pew)
if err != nil {
return err
}
Expand All @@ -77,6 +81,7 @@ func handleMessage(
logger *zap.Logger,
socket protocol.Socket,
es *events.Storage,
pew specific.PrivateNodesEventsWriter,
) error {
if len(rawMsg) == 0 {
logger.Warn("empty raw message received from pair socket")
Expand All @@ -96,9 +101,9 @@ func handleMessage(
return err
}
case RequestInsertNewNodeType:
insertNodeIfNew(msg, ns, false, logger)
insertRegularNodeIfNew(msg, ns, logger)
case RequestInsertSpecificNewNodeType:
insertNodeIfNew(msg, ns, true, logger)
insertSpecificNodeIfNew(msg, ns, pew, logger)
case RequestUpdateNodeType:
handleUpdateNodeRequest(msg, logger, ns)
case RequestDeleteNodeType:
Expand All @@ -111,14 +116,28 @@ func handleMessage(
return nil
}

func insertNodeIfNew(msg []byte, ns nodes.Storage, specific bool, logger *zap.Logger) {
url := msg
err := ns.InsertIfNew(string(url), specific)
func insertNodeIfNew(url string, ns nodes.Storage, specific bool, logger *zap.Logger) bool {
appended, err := ns.InsertIfNew(url, specific)
if err != nil {
logger.Error("Failed to insert a new node to storage",
zap.Error(err), zap.Bool("specific", specific),
zap.Error(err), zap.String("node", url), zap.Bool("specific", specific),
)
}
return appended
}

func insertRegularNodeIfNew(msg []byte, ns nodes.Storage, logger *zap.Logger) {
url := string(msg)
_ = insertNodeIfNew(url, ns, false, logger)
}

func insertSpecificNodeIfNew(msg []byte, ns nodes.Storage, pew specific.PrivateNodesEventsWriter, logger *zap.Logger) {
url := string(msg)
appended := insertNodeIfNew(url, ns, true, logger)
if appended { // its new specific node
ts := time.Now().Unix()
pew.WriteInitialStateForSpecificNode(url, ts) // write unreachable event for the initial specific node state
}
}

func handleDeleteNodeRequest(msg []byte, ns nodes.Storage, logger *zap.Logger) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storing/nodes/nodes_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *JSONStorage) Update(node entities.Node) error {
return nil
}

func (s *JSONStorage) InsertIfNew(url string, specific bool) error {
func (s *JSONStorage) InsertIfNew(url string, specific bool) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -245,14 +245,14 @@ func (s *JSONStorage) InsertIfNew(url string, specific bool) error {
s.db.CommonNodes, appended = appendIfNew(s.db.CommonNodes, url)
}
if !appended {
return nil
return appended, nil
}

if err := s.syncDB(); err != nil {
return errors.Wrapf(err, "failed to insert new node '%s' (specific=%t)", url, specific)
return false, errors.Wrapf(err, "failed to insert new node '%s' (specific=%t)", url, specific)
}
s.zap.Sugar().Infof("New node '%s' (specific=%t) was stored", url, specific)
return nil
return appended, nil
}

func (s *JSONStorage) Delete(url string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storing/nodes/nodes_json_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestJSONStorage_InsertIfNew(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
storage, dbFilePath := newTestJSONStorageWithDB(t, &test.db)
err := storage.InsertIfNew(test.nodeURL, test.specific)
_, err := storage.InsertIfNew(test.nodeURL, test.specific)
require.NoError(t, err)
assert.Equal(t, &test.updatedDB, storage.db)
checkFileIsUpdated(t, dbFilePath, &test.updatedDB)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storing/nodes/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Storage interface {
EnabledNodes() ([]entities.Node, error)
EnabledSpecificNodes() ([]entities.Node, error)
Update(nodeToUpdate entities.Node) error
InsertIfNew(url string, specific bool) error
InsertIfNew(url string, specific bool) (bool, error)
Delete(url string) error
FindAlias(url string) (string, error)
}
68 changes: 54 additions & 14 deletions pkg/storing/specific/private_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package specific

import (
stderrs "errors"
"maps"
"sync"
"time"

Expand All @@ -15,18 +16,23 @@ import (

type PrivateNodesEventsWriter interface {
Write(event entities.EventProducerWithTimestamp)
WriteInitialStateForSpecificNode(node string, ts int64)
}

type privateNodesEvents struct {
mu *sync.RWMutex
data map[string]entities.EventProducerWithTimestamp // map[node]NodeStatement
}

func newPrivateNodesEvents() *privateNodesEvents {
return &privateNodesEvents{
func newPrivateNodesEvents(initial ...entities.EventProducerWithTimestamp) *privateNodesEvents {
pe := &privateNodesEvents{
mu: new(sync.RWMutex),
data: make(map[string]entities.EventProducerWithTimestamp),
data: make(map[string]entities.EventProducerWithTimestamp, len(initial)),
}
for _, producer := range initial {
pe.unsafeWrite(producer)
}
return pe
}

func (p *privateNodesEvents) Write(producer entities.EventProducerWithTimestamp) {
Expand All @@ -39,8 +45,40 @@ func (p *privateNodesEvents) unsafeWrite(producer entities.EventProducerWithTime
p.data[producer.Node()] = producer
}

func (p *privateNodesEvents) WriteInitialStateForSpecificNode(node string, ts int64) {
p.mu.Lock()
defer p.mu.Unlock()
p.unsafeWriteInitialStateForSpecificNodes(node, ts)
}

func (p *privateNodesEvents) unsafeWriteInitialStateForSpecificNodes(node string, ts int64) {
if _, ok := p.data[node]; ok { // already exists
return
}
p.unsafeWrite(entities.NewUnreachableEvent(node, ts))
}

func (p *privateNodesEvents) filterPrivateNodes(ns nodes.Storage) error {
specificNodesList, err := ns.EnabledSpecificNodes()
if err != nil {
return errors.Wrap(err, "privateNodesEvents: failed to get specific nodes")
}
nodesURLs := make(map[string]struct{}, len(specificNodesList))
for _, node := range specificNodesList {
nodesURLs[node.URL] = struct{}{}
}
p.mu.Lock()
defer p.mu.Unlock()
maps.DeleteFunc(p.data, func(node string, _ entities.EventProducerWithTimestamp) bool {
_, ok := nodesURLs[node]
return !ok
})
return nil
}

type PrivateNodesHandler struct {
es *events.Storage
ns nodes.Storage
zap *zap.Logger
privateEvents *privateNodesEvents
}
Expand All @@ -50,31 +88,29 @@ func NewPrivateNodesHandlerWithUnreachableInitialState(
ns nodes.Storage,
zap *zap.Logger,
) (*PrivateNodesHandler, error) {
privateNodes, err := ns.Nodes(true) // get private nodes aka specific nodes
privateNodes, err := ns.EnabledSpecificNodes() // get private nodes aka specific nodes
if err != nil {
return nil, errors.Wrap(err, "failed to get specific nodes")
}
initialTS := time.Now().Unix()
initialPrivateNodesEvents := make([]entities.EventProducerWithTimestamp, len(privateNodes))
for i, node := range privateNodes {
initialPrivateNodesEvents[i] = entities.NewUnreachableEvent(node.URL, initialTS)
ts := time.Now().Unix()
h := NewPrivateNodesHandler(es, ns, zap)
for _, node := range privateNodes {
h.privateEvents.unsafeWriteInitialStateForSpecificNodes(node.URL, ts)
}
return NewPrivateNodesHandler(es, zap, initialPrivateNodesEvents...), nil
return h, nil
}

func NewPrivateNodesHandler(
es *events.Storage,
ns nodes.Storage,
zap *zap.Logger,
initial ...entities.EventProducerWithTimestamp,
) *PrivateNodesHandler {
pe := newPrivateNodesEvents()
for _, producer := range initial {
pe.unsafeWrite(producer)
}
return &PrivateNodesHandler{
es: es,
ns: ns,
zap: zap,
privateEvents: pe,
privateEvents: newPrivateNodesEvents(initial...),
}
}

Expand Down Expand Up @@ -141,6 +177,10 @@ func (h *PrivateNodesHandler) handlePrivateEvents(
continue
}
ts := notification.Timestamp()
if err := h.privateEvents.filterPrivateNodes(h.ns); err != nil {
h.zap.Error("Failed to filter private nodes", zap.Error(err))
output <- entities.NewNodesGatheringError(err, ts) // pass through error, but continue processing
}
storedPrivateNodes, err := h.putPrivateNodesEvents(ts)
h.zap.Sugar().Infof("Total count of stored private nodes statements is %d at timestamp %d",
len(storedPrivateNodes), ts,
Expand Down

0 comments on commit 414b76f

Please sign in to comment.