Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract DHT message sender from the DHT #659

Merged
merged 15 commits into from
Jan 4, 2021

Conversation

aschmahmann
Copy link
Contributor

This can be used to make life easier for crawlers

@aschmahmann aschmahmann force-pushed the refactor/extract-messages branch from 24302d9 to 755d6df Compare June 1, 2020 19:04
@aarshkshah1992 aarshkshah1992 self-requested a review June 1, 2020 19:20
dht_net.go Outdated
@@ -208,12 +210,38 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}
}

type messageManager struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better name welcome 😄

records.go Show resolved Hide resolved
routing.go Outdated Show resolved Hide resolved
@aschmahmann aschmahmann force-pushed the refactor/extract-messages branch from 755d6df to b918f42 Compare June 1, 2020 20:14
go.sum Outdated Show resolved Hide resolved
Copy link
Contributor

@aarshkshah1992 aarshkshah1992 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely makes the DHT API easier to understand.

dht_net.go Outdated Show resolved Hide resolved
dht_net.go Outdated
@@ -208,12 +210,38 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}
}

type messageManager struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann I am not entirely sure we need this indirection. Why not just stick all this functionality on the ProtocolMessenger ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because I'm not really a fan of adding in all the complexity of the stream management into the a simple wrapper for sending protocol messages, but it was the easiest way to do it for now.

It'd be nice to make that all optional in the future, but the stream management and stream parsing are bit mixed together at the moment in the form of the messageManager struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this more clear I've now decoupled the messageManager from the ProtocolMessenger via an interface. This should give consumers of ProtocolMessenger more flexibility to do things like immediately close connections after sending messages if they're not interested in long-term connections.

defer ms.lk.Unlock()
ms.invalidate()
}()
dht.protoMessenger.m.streamDisconnect(dht.Context(), p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the reasons I don't like the messageManager indirection. Let's just keep all that on the ProtocolMessenger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, the idea that this stream management optimization is tied into the protocol messenger is not optimal, it's just something to live with for now. Once we can dissociate the stream management from stream processing then it will not have as much indirection.

routing.go Outdated Show resolved Hide resolved
wg := sync.WaitGroup{}
for p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
logger.Debugf("putProvider(%s, %s)", keyMH, p)
err := dht.sendMessage(ctx, p, mes)
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann Instead of passing in the host and peerId to this function, why not change it to PutProviderRecord(ctx, peer.AddrInfo) and pass them from here ?

I'd like to avoid passing in the host if we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can if you think so. However, I put in the host instead of just peer.AddrInfo to account for:

  1. You cannot currently create a valid provider record for any peer other than host
  2. When we have signed provider records we'll be able to get them from the host, but not from peer.AddrInfo

However, given that this is a "use me if you know what you're doing" function 1 isn't a huge concern. Additionally, we likely will need to add/change the function signature anyway once we have signed provider records to accommodate rebroadcasting other people's records.

LMK what you think

messages.go Outdated Show resolved Hide resolved
messages.go Outdated Show resolved Hide resolved
messages.go Outdated Show resolved Hide resolved
messages.go Outdated Show resolved Hide resolved
messages.go Outdated Show resolved Hide resolved
@aschmahmann aschmahmann force-pushed the refactor/extract-messages branch 2 times, most recently from 1cbdbdf to 773aa01 Compare October 7, 2020 03:14
wire/messages.go Outdated Show resolved Hide resolved
wire/messages.go Outdated Show resolved Hide resolved
Copy link
Contributor Author

@aschmahmann aschmahmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some more refactoring of this PR and broke things into commits in case we want to revert any one of them and to make it easier to review. Recommend reviewing by commit.

dht_net.go Outdated Show resolved Hide resolved
dht_net.go Outdated Show resolved Hide resolved
wire/messages.go Outdated Show resolved Hide resolved
wire/messages.go Outdated Show resolved Hide resolved
wire/messages.go Outdated Show resolved Hide resolved
wire/messages.go Outdated Show resolved Hide resolved
Copy link
Contributor

@willscott willscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@aschmahmann aschmahmann force-pushed the refactor/extract-messages branch from eac5d7d to fcf7104 Compare October 12, 2020 18:33
@aschmahmann aschmahmann force-pushed the refactor/extract-messages branch from fcf7104 to 138cb80 Compare January 4, 2021 06:11
@aschmahmann aschmahmann mentioned this pull request Jan 4, 2021
return nil, err
}

// Double check the key. Can't hurt.
if rec != nil && string(rec.GetKey()) != key {
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", loggableRecordKeyString(key), "got", rec.GetKey())
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", internal.LoggableRecordKeyString(key), "got", rec.GetKey())
return nil, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.. seems like it's an error not a nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably... I mean realistically it wouldn't even be crazy to panic here since if this occurs then there was a programming error somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this shouldn't happen, returning nil, nil when there is a validation error does fulfill the function contract just like it does in dht.getRecordFromDatastore. I've copied the comment explaining that from dht.getRecordFromDatastore to here.


import "errors"

var ErrInvalidRecord = errors.New("received invalid record")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this error never going to be returned to an external consumer? re-export it or have it defined somewhere that can be referenced externally preferably

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't currently. We were using and checking against it internally and I thought it'd be more recognizable when reading code to have it as a member of the internal package then as an unexported variable somewhere

message_manager.go Outdated Show resolved Hide resolved
}()
}

// SendRequest sends out a request, but also makes sure to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a somewhat weird place to be doing this. It

  • isn't DHT specific - why aren't we doing this at the host connection level?
  • feels potentially ugly with a general connection manager rather than factored out into a peer prioritization logic place.

I think the Ask is to think about what the ideal place for speed measurement logic to live, and file an issue for that (probably not in this repo.) I'm okay having it here for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just a move from dht_net.go where it was previously. I tend to agree that I'm not sure why we're recording latency with m.host.Peerstore().RecordLatency(p, time.Since(start)) when it includes processing time on the server too.

The other metrics emitted here are DHT specific though.

// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
// backwards compatibility reasons).
nstr, err := ms.m.host.NewStream(ctx, ms.p, ms.m.protocols...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only place that the message sender uses it's reference to the manager, and also the only place the host is really made use of.

Can we provide messageSender's with a closure for making new streams, rather than a full reference to the manager? making it clear that responsibility is manger->sender(s) will be clearer code than having back references.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but we also need the host for the latency measurements above. I'm not sure who is using those metrics or why (perhaps @Stebalien has some ideas), so I left things the same (previously there was a reference to the IpfsDHT here).

pb/protocol_messenger.go Show resolved Hide resolved
pb/protocol_messenger.go Outdated Show resolved Hide resolved
@@ -329,7 +329,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
return nil, err
default:
return nil, err
case nil, errInvalidRecord:
case nil, internal.ErrInvalidRecord:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning this where the consumer can't match on it seems bad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is returned from an internal function and then swallowed, it shouldn't escape to the user.

@aschmahmann aschmahmann merged commit 03d4b62 into master Jan 4, 2021
@aschmahmann aschmahmann mentioned this pull request May 14, 2021
71 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants