-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract DHT message sender from the DHT #659
Conversation
24302d9
to
755d6df
Compare
dht_net.go
Outdated
@@ -208,12 +210,38 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { | |||
} | |||
} | |||
|
|||
type messageManager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better name welcome 😄
755d6df
to
b918f42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely makes the DHT API easier to understand.
dht_net.go
Outdated
@@ -208,12 +210,38 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { | |||
} | |||
} | |||
|
|||
type messageManager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann I am not entirely sure we need this indirection. Why not just stick all this functionality on the ProtocolMessenger
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly because I'm not really a fan of adding in all the complexity of the stream management into the a simple wrapper for sending protocol messages, but it was the easiest way to do it for now.
It'd be nice to make that all optional in the future, but the stream management and stream parsing are bit mixed together at the moment in the form of the messageManager
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this more clear I've now decoupled the messageManager
from the ProtocolMessenger
via an interface. This should give consumers of ProtocolMessenger
more flexibility to do things like immediately close connections after sending messages if they're not interested in long-term connections.
subscriber_notifee.go
Outdated
defer ms.lk.Unlock() | ||
ms.invalidate() | ||
}() | ||
dht.protoMessenger.m.streamDisconnect(dht.Context(), p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the reasons I don't like the messageManager
indirection. Let's just keep all that on the ProtocolMessenger
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, the idea that this stream management optimization is tied into the protocol messenger is not optimal, it's just something to live with for now. Once we can dissociate the stream management from stream processing then it will not have as much indirection.
wg := sync.WaitGroup{} | ||
for p := range peers { | ||
wg.Add(1) | ||
go func(p peer.ID) { | ||
defer wg.Done() | ||
logger.Debugf("putProvider(%s, %s)", keyMH, p) | ||
err := dht.sendMessage(ctx, p, mes) | ||
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.host) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann Instead of passing in the host and peerId to this function, why not change it to PutProviderRecord(ctx, peer.AddrInfo)
and pass them from here ?
I'd like to avoid passing in the host if we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can if you think so. However, I put in the host
instead of just peer.AddrInfo
to account for:
- You cannot currently create a valid provider record for any peer other than
host
- When we have signed provider records we'll be able to get them from the
host
, but not frompeer.AddrInfo
However, given that this is a "use me if you know what you're doing" function 1 isn't a huge concern. Additionally, we likely will need to add/change the function signature anyway once we have signed provider records to accommodate rebroadcasting other people's records.
LMK what you think
1cbdbdf
to
773aa01
Compare
b0dc23d
to
197ecae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did some more refactoring of this PR and broke things into commits in case we want to revert any one of them and to make it easier to review. Recommend reviewing by commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
eac5d7d
to
fcf7104
Compare
…ct. create a new struct that manages sending DHT messages that can be used independently from the DHT.
…ng them into a new package
fcf7104
to
138cb80
Compare
return nil, err | ||
} | ||
|
||
// Double check the key. Can't hurt. | ||
if rec != nil && string(rec.GetKey()) != key { | ||
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", loggableRecordKeyString(key), "got", rec.GetKey()) | ||
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", internal.LoggableRecordKeyString(key), "got", rec.GetKey()) | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.. seems like it's an error not a nil
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably... I mean realistically it wouldn't even be crazy to panic here since if this occurs then there was a programming error somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this shouldn't happen, returning nil, nil
when there is a validation error does fulfill the function contract just like it does in dht.getRecordFromDatastore
. I've copied the comment explaining that from dht.getRecordFromDatastore
to here.
|
||
import "errors" | ||
|
||
var ErrInvalidRecord = errors.New("received invalid record") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this error never going to be returned to an external consumer? re-export it or have it defined somewhere that can be referenced externally preferably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't currently. We were using and checking against it internally and I thought it'd be more recognizable when reading code to have it as a member of the internal package then as an unexported variable somewhere
}() | ||
} | ||
|
||
// SendRequest sends out a request, but also makes sure to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a somewhat weird place to be doing this. It
- isn't DHT specific - why aren't we doing this at the host connection level?
- feels potentially ugly with a general connection manager rather than factored out into a peer prioritization logic place.
I think the Ask is to think about what the ideal place for speed measurement logic to live, and file an issue for that (probably not in this repo.) I'm okay having it here for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just a move from dht_net.go
where it was previously. I tend to agree that I'm not sure why we're recording latency with m.host.Peerstore().RecordLatency(p, time.Since(start))
when it includes processing time on the server too.
The other metrics emitted here are DHT specific though.
// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks | ||
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for | ||
// backwards compatibility reasons). | ||
nstr, err := ms.m.host.NewStream(ctx, ms.p, ms.m.protocols...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the only place that the message sender uses it's reference to the manager, and also the only place the host is really made use of.
Can we provide messageSender's with a closure for making new streams, rather than a full reference to the manager? making it clear that responsibility is manger->sender(s)
will be clearer code than having back references.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but we also need the host for the latency measurements above. I'm not sure who is using those metrics or why (perhaps @Stebalien has some ideas), so I left things the same (previously there was a reference to the IpfsDHT
here).
@@ -329,7 +329,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st | |||
return nil, err | |||
default: | |||
return nil, err | |||
case nil, errInvalidRecord: | |||
case nil, internal.ErrInvalidRecord: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returning this where the consumer can't match on it seems bad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is returned from an internal function and then swallowed, it shouldn't escape to the user.
…s to clarify what peerMessageSender and messageManager do
This can be used to make life easier for crawlers